Skip to main content
ntro.workflow is the orchestration surface every runbook subclasses. For the concept-level walkthrough of authoring a runbook, see Build runbooks. This page is the API reference.

Install

pip install 'ntro[workflow]'
The [workflow] extra brings in Temporal, all ntro.capabilities.* modules, ntro.events, ntro.accounting, and ntro.data — everything a runbook needs at runtime.

Surface

from ntro.workflow import (
    NtroWorkflow,
    runbook,          # facade — .defn .run .signal .query .step
                      #          .execute_activity .wait_condition
                      #          .info .now .logger
    activity,         # facade — .defn .info .logger
    WorkflowError,    # raised inside activities; the worker marks it
                      # non-retryable
)
SymbolKindPurpose
NtroWorkflowclassBase class for all runbooks. Subclass + decorate with @runbook.defn.
runbookfacade objectMirrors temporalio.workflow — decorators (.defn, .run, .signal, .query, .step) and runtime helpers (.execute_activity, .wait_condition, .info, .now, .logger).
activityfacade objectMirrors temporalio.activity.defn decorator, .info(), .logger.
WorkflowErrorexceptionRaised inside activities to fail the workflow. non_retryable=True flags deterministic-input failures so retries don’t loop.
await_signal_with_actionmethod on NtroWorkflowBlock until a predicate is satisfied; advertise the pending action via current_pending_action.
wait_for_actionmethod on NtroWorkflowBlock on a HITL approve / reject / correct signal from the Tenant UI.
run_child_workflowmethod on NtroWorkflowDispatch another runbook by slug. Children appear as nested progress trees in the UI.
previousfunctionLoad committed-document context from a prior task. See ntro.workflow.task.

Runtime backends

runbook and activity dispatch to whichever backend is auto-selected at first attribute access:
BackendSelected whenBehaviour
TemporalRuntimetemporalio is importable AND TEMPORAL_HOST is set — i.e. inside an ntro-worker podEvery facade call passes straight through to temporalio.workflow.X / temporalio.activity.X. Production behaviour, unchanged from pre-N-102.
LocalRuntimeAnywhere else — local dev, CI smoke jobs, Claude MA sandboxActivities run as direct await fn(...) coroutines; child workflows run on the same event loop; signals are plain method calls; wait_condition polls at 50ms granularity. No durability, no retries, no replay — exceptions bubble up immediately.
One INFO log line on first selection so it’s obvious which is active:
ntro.workflow: TemporalRuntime active (TEMPORAL_HOST=…)
ntro.workflow: LocalRuntime active (no Temporal detected; running in-process)
To force a specific runtime in tests, call ntro.workflow.set_runtime(...) before importing any runbook module. Otherwise the auto-selection runs once and is cached.

NtroWorkflow

from ntro.workflow import NtroWorkflow, runbook

@runbook.defn
class MyWorkflow(NtroWorkflow):
    @runbook.run
    async def run(self, ctx): ...
Subclasses get four auto-wired queries / signals at construction:
NameKindReturns / acceptsPurpose
current_pending_actionquery{action, display_hint}What is the workflow waiting for? Polled by Tenant UI.
current_stepsquerylist[{id, title, status, icon}]The @runbook.step ladder + per-step status.
current_ui_statequerycombined snapshotOne-shot read for UI rendering.
user_actionsignal{action, payload}UI dispatches approve / reject / correct here.
Your subclass doesn’t need to register these — the base class does it. Add domain-specific signals (tb_submitted, document_submitted, …) as needed.

@runbook.step

@runbook.step(name="period_open", title="Open period", icon="Calendar")
async def _step_period_open(self, ctx): ...
ArgTypePurpose
namestrStable step id (used in current_steps, snapshots, child-workflow ids).
titlestrDisplay name in the Tenant UI breadcrumb.
iconstrLucide icon name.
The decorator wraps the method so step lifecycle (_current_step_id, _steps_completed) tracks automatically. Class-definition order is breadcrumb order in the UI sidebar.

wait_for_action

Block on a HITL signal. The display_hint tells the Tenant UI which review component to render.
response = await self.wait_for_action(
    payload=extracted,
    display_hint={
        "type": "review_extraction",
        "schema_slug": ctx.schema_slug,
    },
    reason="Awaiting accountant approval of extraction",
)
match response.action:
    case "approved": ...
    case "rejected": ...
    case "corrected": ...
ArgTypePurpose
payloadAny (pydantic-serialisable)What the user is reviewing — rendered by the UI component.
display_hintdictUI component selector + config.
reasonstrOne-line description shown in the pending-action chip.
Returns UserAction(action, payload, corrections). corrections is workflow-specific (e.g. row-level edits to an extracted document).

await_signal_with_action

Block until a predicate is true. Advertises the pending action via current_pending_action so the UI knows what to surface.
await self.await_signal_with_action(
    predicate=lambda: self._submission_received,
    action="awaiting_document_submission",
    display_hint={"source": ctx.source, "filename_pattern": ctx.expected_pattern},
)
Useful when the workflow waits for an upload, an email arrival, or any human-driven event without a fixed schedule. Pair with a @runbook.signal handler that mutates the predicate’s state.

run_child_workflow

Dispatch another runbook. Children get their own progress tree under the parent’s step.
result = await self.run_child_workflow(
    slug="document-ingest",
    input=DocumentIngestContext(
        period=ctx.period,
        entity_slug=ctx.entity_slug,
        source=expected.source,
        schema=expected.schema,
    ),
    step_id="ingest_documents",
)
ArgTypePurpose
slugstrThe child runbook’s slug (registered with the worker).
inputpydantic modelChild workflow input. Must match the child’s declared Context model.
step_idstrParent step id this child belongs to — drives UI nesting.
Cardinality many (fan-out): call run_child_workflow in a loop; awaits join.

WorkflowError

Raised inside an activity to fail the workflow. Replaces the legacy from temporalio.exceptions import ApplicationError import.
from ntro.workflow import WorkflowError

@activity.defn(name="commit_starting_tb")
async def commit_starting_tb(input):
    if input.period not in input.extracted.fields:
        raise WorkflowError(
            f"Period {input.period} missing from extraction",
            type="EXTRACTION_INPUT_INVALID",
            non_retryable=True,
        )
    ...
ArgTypePurpose
messagestrHuman-readable failure message.
typestr | NoneError type tag — the worker interceptor uses this to discriminate deterministic-input failures (e.g. "VALIDATION") from infra failures.
non_retryableboolWhen True, the worker marks the failure as non-retryable so the workflow fails fast rather than re-attempting the activity. Use for deterministic-input failures where retrying can’t change the outcome.
Under TemporalRuntime, WorkflowError subclasses temporalio.exceptions.ApplicationError so the ntro-worker’s existing non-retryable interceptor catches it unchanged. Under LocalRuntime it subclasses Exception directly — non_retryable has no effect (there are no retries to suppress) and the exception bubbles up to whoever awaited the activity. For Temporal’s ActivityError wrapping pattern (used in the journal-proposer’s HITL-vs-propagate discrimination), also re-exported:
from ntro.workflow.exceptions import ActivityError, ApplicationError
Under LocalRuntime both alias to WorkflowError so the except (ActivityError, ApplicationError) clause parses — though no exception is actually wrapped in ActivityError under LocalRuntime, the ApplicationError branch picks up WorkflowError directly.

Running locally

ntro.workflow.run_local(cls, arg, *, workflow_id=..., task_queue=...) is the entry helper for python -m runbooks.X-style scripts. Instantiates the runbook, sets up the WorkflowInfo contextvar, and awaits cls().run(arg). See Running locally for the full walkthrough — when LocalRuntime is selected, what works and what doesn’t, and how to force a specific runtime from test fixtures.

Build runbooks (concept)

Worked walkthrough of authoring a runbook from scratch.

ntro.workflow.task

Load context from prior task executions.

ntro.workflow.agents

Invoke a registered external agent from inside a @runbook.step.

Running locally

Run a runbook end-to-end in-process under LocalRuntime — no Temporal cluster, sub-second feedback.

UI and Temporal signals

How Tenant UI actions reach workflows end-to-end.