Documentation Index Fetch the complete documentation index at: https://docs.ntropii.com/llms.txt
Use this file to discover all available pages before exploring further.
ntro.workspace.Client is how you talk to Ntropii from Python. It wraps the same surface the CLI and MCP server use, organised into resource accessors.
from ntro.workspace import Client
client = Client.from_config()
Construction
# Default connection from ~/.ntro/config.toml
client = Client.from_config()
# Named connection
client = Client.from_config( connection = "production" )
Resolution order: connection= param > env vars (NTRO_HOST, NTRO_API_KEY) > the connection named here > default_connection_name in the config file. client = Client(
host = "https://api.ntropii.com/v1" ,
api_key = "ntro_prod_xxx" ,
timeout = 30.0 , # Optional; default 30s
)
Explicit args bypass the config file entirely. Use in tests, notebooks where you want zero environmental dependency, or when the credential lives in a secrets manager.
Resource accessors
Each property on Client returns a resource client. All methods come in async + _sync flavours.
Accessor What it manages client.identityCurrent user identity (whoami) client.integrationsData platform configurations client.tenantsTenants (clients) client.entitiesEntities (SPVs / funds) within tenants client.workflowsWorkflow definitions client.deploymentsDeploying workflow versions to Ntropii Tenant client.tasksWorkflow runs (create + poll)
Identity
profile = client.identity.whoami_sync()
print (profile.email, profile.orgId)
Integrations
# Register a Databricks data platform
integration = client.integrations.create_data_platform_sync(
name = "Acme Databricks UK" ,
provider = "DATABRICKS" ,
region = "UK-South" ,
config = {
"workspaceUrl" : "https://adb-1234567890.12.azuredatabricks.net" ,
"catalog" : "fund_ops" ,
"authType" : "service-principal" ,
"clientId" : "..." ,
"clientSecret" : "..." ,
},
)
# Test the connection
result = client.integrations.test_connection_sync(integration.id)
print (result.success, result.latencyMs)
# List + discover
platforms = client.integrations.list_data_platforms_sync()
schemas = client.integrations.discover_schemas_sync(integration.id)
Tenants & entities
# Managed Postgres — no external config needed
tenant = client.tenants.create_sync(
name = "Acme Fund" ,
slug = "acme-fund" ,
dataPlatform = "managed-postgres" ,
)
# BYO — bind to a registered config; provider must match dataPlatform
tenant = client.tenants.create_sync(
name = "Acme Fund" ,
slug = "acme-fund" ,
dataPlatform = "snowflake" ,
dataPlatformConfigId = integration.id,
)
entity = client.entities.create_sync(
tenant_id = "acme-fund" ,
name = "Acme Commercial SPV 1" ,
slug = "acme-commercial-spv1" ,
type = "real-estate-spv" ,
jurisdiction = "Jersey" ,
currency = "GBP" ,
schema_ = "nav_pipeline_spv1" ,
)
entities = client.entities.list_sync( tenant_id = "acme-fund" )
Workflows & deployments
workflow = client.workflows.create_sync(
name = "nav-monthly" ,
description = "Monthly NAV pipeline" ,
tenantId = "acme-fund-admin" ,
schedule = "0 8 5 * *" ,
timezone = "Europe/London" ,
)
deployment = client.deployments.create_sync(
workflowId = workflow.id,
workflowVersionId = version.id,
tenantId = "acme-fund-admin" ,
)
status = client.deployments.get_sync(deployment.id)
Tasks (workflow runs)
# Trigger
task = client.tasks.create_sync(
tenantId = "acme-fund-admin" ,
entityId = "acme-commercial-spv1" ,
workflowId = "nav-monthly" ,
context = { "period" : "2026-03" , "priority" : "HIGH" },
)
# Poll
import time
while True :
task = client.tasks.get_sync(task.id)
print (task.status, [s.name for s in task.steps if s.status == "IN_PROGRESS" ])
if task.status in ( "COMPLETED" , "FAILED" , "CANCELLED" ):
break
time.sleep( 5 )
# History for an entity
history = client.tasks.history_sync( "acme-fund-admin" , "acme-commercial-spv1" )
Async usage
Every method is async-first. Use it when you need concurrency or when you’re inside an async context (Temporal activities, FastAPI handlers):
import asyncio
from ntro.workspace import Client
async def main ():
client = Client.from_config()
tenants, workflows = await asyncio.gather(
client.tenants.list(),
client.workflows.list(),
)
await client.close()
asyncio.run(main())
The _sync wrappers are a convenience for scripts and notebooks — they call asyncio.run() internally and will fail if used inside an async function.
Exception types
from ntro.workspace.exceptions import (
AuthenticationError,
AuthorizationError,
NotFoundError,
ConflictError,
ValidationError,
NtroConnectionError,
)
try :
tenant = client.tenants.get_sync( "unknown-slug" )
except NotFoundError:
print ( "Tenant not found" )
except AuthenticationError:
print ( "Invalid API key" )
except NtroConnectionError as e:
print ( f "Could not reach Ntropii: { e } " )
Exception HTTP status When AuthenticationError401 Invalid or missing API key AuthorizationError403 API key lacks permissions for this tenant NotFoundError404 Resource doesn’t exist ConflictError409 Resource already exists (e.g. duplicate slug) ValidationError422 Request payload is invalid NtroConnectionError— Network or timeout error
All inherit from a common NtroError base if you want a catch-all.
What’s next
Workflow capability The SDK primitives runbooks use to define orchestration.
MCP server Same surface, exposed to coding agents.