Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.ntropii.com/llms.txt

Use this file to discover all available pages before exploring further.

ntro.workspace.Client is how you talk to Ntropii from Python. It wraps the same surface the CLI and MCP server use, organised into resource accessors.
from ntro.workspace import Client

client = Client.from_config()

Construction

# Default connection from ~/.ntro/config.toml
client = Client.from_config()

# Named connection
client = Client.from_config(connection="production")
Resolution order: connection= param > env vars (NTRO_HOST, NTRO_API_KEY) > the connection named here > default_connection_name in the config file.

Resource accessors

Each property on Client returns a resource client. All methods come in async + _sync flavours.
AccessorWhat it manages
client.identityCurrent user identity (whoami)
client.integrationsData platform configurations
client.tenantsTenants (clients)
client.entitiesEntities (SPVs / funds) within tenants
client.workflowsWorkflow definitions
client.deploymentsDeploying workflow versions to Ntropii Tenant
client.tasksWorkflow runs (create + poll)

Identity

profile = client.identity.whoami_sync()
print(profile.email, profile.orgId)

Integrations

# Register a Databricks data platform
integration = client.integrations.create_data_platform_sync(
    name="Acme Databricks UK",
    provider="DATABRICKS",
    region="UK-South",
    config={
        "workspaceUrl": "https://adb-1234567890.12.azuredatabricks.net",
        "catalog": "fund_ops",
        "authType": "service-principal",
        "clientId": "...",
        "clientSecret": "...",
    },
)

# Test the connection
result = client.integrations.test_connection_sync(integration.id)
print(result.success, result.latencyMs)

# List + discover
platforms = client.integrations.list_data_platforms_sync()
schemas = client.integrations.discover_schemas_sync(integration.id)

Tenants & entities

# Managed Postgres — no external config needed
tenant = client.tenants.create_sync(
    name="Acme Fund",
    slug="acme-fund",
    dataPlatform="managed-postgres",
)

# BYO — bind to a registered config; provider must match dataPlatform
tenant = client.tenants.create_sync(
    name="Acme Fund",
    slug="acme-fund",
    dataPlatform="snowflake",
    dataPlatformConfigId=integration.id,
)

entity = client.entities.create_sync(
    tenant_id="acme-fund",
    name="Acme Commercial SPV 1",
    slug="acme-commercial-spv1",
    type="real-estate-spv",
    jurisdiction="Jersey",
    currency="GBP",
    schema_="nav_pipeline_spv1",
)

entities = client.entities.list_sync(tenant_id="acme-fund")

Workflows & deployments

workflow = client.workflows.create_sync(
    name="nav-monthly",
    description="Monthly NAV pipeline",
    tenantId="acme-fund-admin",
    schedule="0 8 5 * *",
    timezone="Europe/London",
)

deployment = client.deployments.create_sync(
    workflowId=workflow.id,
    workflowVersionId=version.id,
    tenantId="acme-fund-admin",
)

status = client.deployments.get_sync(deployment.id)

Tasks (workflow runs)

# Trigger
task = client.tasks.create_sync(
    tenantId="acme-fund-admin",
    entityId="acme-commercial-spv1",
    workflowId="nav-monthly",
    context={"period": "2026-03", "priority": "HIGH"},
)

# Poll
import time

while True:
    task = client.tasks.get_sync(task.id)
    print(task.status, [s.name for s in task.steps if s.status == "IN_PROGRESS"])
    if task.status in ("COMPLETED", "FAILED", "CANCELLED"):
        break
    time.sleep(5)

# History for an entity
history = client.tasks.history_sync("acme-fund-admin", "acme-commercial-spv1")

Async usage

Every method is async-first. Use it when you need concurrency or when you’re inside an async context (Temporal activities, FastAPI handlers):
import asyncio
from ntro.workspace import Client

async def main():
    client = Client.from_config()

    tenants, workflows = await asyncio.gather(
        client.tenants.list(),
        client.workflows.list(),
    )

    await client.close()

asyncio.run(main())
The _sync wrappers are a convenience for scripts and notebooks — they call asyncio.run() internally and will fail if used inside an async function.

Exception types

from ntro.workspace.exceptions import (
    AuthenticationError,
    AuthorizationError,
    NotFoundError,
    ConflictError,
    ValidationError,
    NtroConnectionError,
)

try:
    tenant = client.tenants.get_sync("unknown-slug")
except NotFoundError:
    print("Tenant not found")
except AuthenticationError:
    print("Invalid API key")
except NtroConnectionError as e:
    print(f"Could not reach Ntropii: {e}")
ExceptionHTTP statusWhen
AuthenticationError401Invalid or missing API key
AuthorizationError403API key lacks permissions for this tenant
NotFoundError404Resource doesn’t exist
ConflictError409Resource already exists (e.g. duplicate slug)
ValidationError422Request payload is invalid
NtroConnectionErrorNetwork or timeout error
All inherit from a common NtroError base if you want a catch-all.

What’s next

Workflow capability

The SDK primitives runbooks use to define orchestration.

MCP server

Same surface, exposed to coding agents.