Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.ntropii.com/llms.txt

Use this file to discover all available pages before exploring further.

Two related surfaces, both about how a runbook reads and writes data inside Ntropii Tenant. They share a page because they’re easy to confuse.
SurfaceWhen to use it
ntro.data.get_data_plane()Direct access to the customer’s data warehouse — Postgres tables, Databricks SQL, Snowflake. Raw SQL, structured queries, tenant-specific schema.
ntro.capabilities.storageRunbook-managed files and tables. JSON-backed for now; not coupled to a specific warehouse. Use for audit-trail artifacts, intermediate state, anything the runbook itself wants to persist.

Install

pip install 'ntro[workflow]'
Both are bundled with the workflow extra.

ntro.data — direct platform access

ntro.data.get_data_plane() returns a connection to the customer’s data platform under the tenant’s credentials. It’s an async object with methods that mirror the underlying driver — fetchrow, fetch, execute for Postgres / asyncpg; equivalent SQL surfaces for Databricks and Snowflake.
from ntro.data import get_data_plane

db = await get_data_plane(tenant_slug)

# fetchrow — single row, returns None if not found
row = await db.fetchrow(
    "SELECT data_bytes FROM submitted_documents WHERE id = $1",
    document_ref,
)

# fetch — list of rows
rows = await db.fetch(
    "SELECT * FROM journal_proposals WHERE entity_slug = $1 AND period = $2",
    entity_slug,
    period,
)

# execute — INSERT / UPDATE / DELETE
await db.execute(
    "UPDATE journal_proposals SET status = $1 WHERE id = $2",
    "approved",
    proposal_id,
)

Canonical example

Lifted from nav-monthly — the runbook reads from the tenant’s data plane to check that a starting trial balance is in place:
from temporalio import activity

from ntro.data import get_data_plane


@activity.defn(name="nav_monthly.check_starting_tb")
async def check_starting_tb(ctx: NavMonthlyContext) -> StartingTBVerified:
    db = await get_data_plane(ctx.tenant_slug)
    row = await db.fetchrow(
        "SELECT id, source FROM submitted_documents "
        "WHERE entity_slug = $1 AND source = $2 "
        "ORDER BY uploaded_at DESC LIMIT 1",
        ctx.entity_slug,
        ctx.tb_source,
    )
    if row is None:
        raise ApplicationError("Starting TB not found", non_retryable=True)
    return StartingTBVerified(document_ref=row["id"], source=row["source"])
Two things to notice:
  • get_data_plane(tenant_slug) scopes the connection to the tenant. The credentials are resolved from the tenant’s data platform binding — you never see the secret in the runbook.
  • asyncpg-style positional parameters for Postgres. Driver dialect varies for Databricks / Snowflake; the SDK normalises where it can.

What lives here

ntro.data is the right tool for things that belong to the customer’s warehouse: their COA, their journal entries, their submitted documents, their tenant-specific tables. The runbook reads from and writes to the same place the customer’s accountants and their other systems read from and write to.

ntro.capabilities.storage — runbook artifacts

Different surface, different purpose. storage is for runbook-internal artifacts — audit trails, intermediate caches, structured state the runbook wants to persist but isn’t part of the customer’s authoritative warehouse.
from ntro.capabilities import storage

# Files (bytes)
await storage.write(
    path="periods/acme-spv1/2026-03/extractions/rent-roll/123.json",
    content=payload.model_dump_json(indent=2).encode(),
    content_type="application/json",
)

content = await storage.read(path="periods/acme-spv1/2026-03/extractions/rent-roll/123.json")

# Tables (structured rows; JSON-backed for the PoC)
await storage.write_table(
    name="extraction_audit",
    rows=[
        {"period": "2026-03", "source": "rent-roll", "confidence": 0.94, "task_id": "task_abc"},
    ],
)

rows = await storage.read_table(name="extraction_audit")

Canonical example

Lifted from document-ingest — after a document’s extracted payload is committed to the tenant data plane, the runbook also writes a copy to storage as a cheap audit trail:
from ntro.capabilities import storage

# Persist to tenant data plane (authoritative)
record = await db.fetchrow(
    "INSERT INTO extracted_payloads (...) VALUES ($1, $2, ...) RETURNING id",
    ...,
)

# Also write to storage as an audit trail (replayable, cheap)
audit_path = (
    f"periods/{input.entity_slug}/{input.period}/extractions/"
    f"{input.source}/{record['id']}.json"
)
await storage.write(
    path=audit_path,
    content=final_payload.model_dump_json(indent=2).encode(),
    content_type="application/json",
)
The data plane is the source of truth; storage is the audit copy. Two writes, two distinct purposes.

Choosing between them

You want to…Use
Read/write the customer’s authoritative tablesntro.data
Run a complex SQL aggregation against the warehousentro.data
Persist an intermediate result the runbook producedstorage
Keep an audit trail of what the runbook didstorage
Cache a parsed file so a re-run doesn’t reparsestorage
Look up a document by document_ref (the canonical PoC pattern)ntro.data (SELECT data_bytes FROM submitted_documents)
A common pattern: read raw bytes from ntro.data, parse + extract, write the typed result to ntro.data (authoritative) AND write a JSON copy to storage (audit). That’s exactly what document-ingest does.

Collect files

Often pairs with ntro.data — read bytes, parse them.

Tenant architecture

Why the data plane is the customer’s, and Ntropii Tenant accesses it under the customer’s credentials.