Skip to content

Orchestrator

Orchestrator coordinates the machine, state store, guards, and actions for stateful event processing (load state, process, persist, run actions, schedule delayed transitions).

Orchestrator

Orchestrator(
    machine: StateMachine,
    state_store: StateStore | AsyncStateStore,
    guards: GuardRegistry | None = None,
    actions: ActionRegistry | None = None,
    *,
    context_validator: ContextValidatorFn | None = None,
    executor: ActionExecutor | None = None,
    scheduler: SchedulerAdapter | None = None,
    use_initial_state_when_missing: bool = True,
    invoke_adapter: InvokeAdapter | None = None,
    hooks: list[TransitionHook] | None = None,
    audit_store: AuditStore | None = None
)

Runs the full FSM sandwich loop: load -> process -> persist -> execute.

Parameters:

Name Type Description Default
machine StateMachine

The StateMachine definition.

required
state_store StateStore | AsyncStateStore

Sync or async state store.

required
guards GuardRegistry | None

Guard registry (bound to machine automatically).

None
actions ActionRegistry | None

Action registry.

None
context_validator ContextValidatorFn | None

Optional callable for pre-transition context validation.

None
executor ActionExecutor | None

Optional ActionExecutor (created automatically if omitted).

None
scheduler SchedulerAdapter | None

Optional scheduler for delayed transitions.

None
use_initial_state_when_missing bool

Use machine's initial state for new entities.

True
invoke_adapter InvokeAdapter | None

Optional adapter to start/stop invoked services on state enter/exit.

None
hooks list[TransitionHook] | None

Optional list of TransitionHook implementations for lifecycle callbacks.

None
audit_store AuditStore | None

Optional AuditStore for transition audit logging.

None

Context validation options (choose one or combine):

1. ``context_validator`` (recommended for external validation, e.g. PyCharter):
   Callable with signature ``(entity_id, state, trigger, context) -> (ok, errors)``.
   Runs BEFORE guards. Use for schema or contract validation.

2. ``machine.meta["validate_context"]``: Config-driven, set in YAML/dict.
   Validates that required keys exist in context. Lightweight, no external deps.

3. Guards: Use for business-rule validation that depends on state + context.
   Runs AFTER context_validator. Best for domain logic (e.g., "is_cancellable").

Parameters:

Name Type Description Default
machine StateMachine

The StateMachine definition.

required
state_store StateStore | AsyncStateStore

Sync or async state store for entity persistence.

required
guards GuardRegistry | None

Optional guard registry (defaults to machine's registry).

None
actions ActionRegistry | None

Optional action registry (defaults to machine's registry).

None
context_validator ContextValidatorFn | None

Optional pre-transition context validator callable.

None
executor ActionExecutor | None

Optional ActionExecutor (auto-created if omitted).

None
scheduler SchedulerAdapter | None

Optional scheduler for delayed transitions.

None
use_initial_state_when_missing bool

Use machine's initial state for new entities.

True
invoke_adapter InvokeAdapter | None

Optional adapter for invoked services.

None
hooks list[TransitionHook] | None

Optional list of TransitionHook implementations.

None
audit_store AuditStore | None

Optional AuditStore for transition audit logging.

None
Source code in src/pystator/orchestrator.py
def __init__(
    self,
    machine: StateMachine,
    state_store: StateStore | AsyncStateStore,
    guards: GuardRegistry | None = None,
    actions: ActionRegistry | None = None,
    *,
    context_validator: ContextValidatorFn | None = None,
    executor: ActionExecutor | None = None,
    scheduler: SchedulerAdapter | None = None,
    use_initial_state_when_missing: bool = True,
    invoke_adapter: InvokeAdapter | None = None,
    hooks: list[TransitionHook] | None = None,
    audit_store: AuditStore | None = None,
) -> None:
    """Initialize the orchestrator.

    Args:
        machine: The StateMachine definition.
        state_store: Sync or async state store for entity persistence.
        guards: Optional guard registry (defaults to machine's registry).
        actions: Optional action registry (defaults to machine's registry).
        context_validator: Optional pre-transition context validator callable.
        executor: Optional ActionExecutor (auto-created if omitted).
        scheduler: Optional scheduler for delayed transitions.
        use_initial_state_when_missing: Use machine's initial state for new entities.
        invoke_adapter: Optional adapter for invoked services.
        hooks: Optional list of TransitionHook implementations.
        audit_store: Optional AuditStore for transition audit logging.
    """
    self._machine = machine
    self._store = state_store

    # Bind guards if provided
    if guards is not None:
        machine.bind_guards(guards)

    # Use machine's registries if not explicitly provided
    self._guards = guards or machine.guard_registry
    self._actions = actions or machine.action_registry
    self._executor = executor or ActionExecutor(self._actions, log_execution=True)
    self._context_validator = context_validator
    self._scheduler = scheduler
    self._use_initial = use_initial_state_when_missing
    self._invoke_adapter = invoke_adapter
    self._audit_store = audit_store

    # Set up transition observer if hooks provided
    self._observer: TransitionObserver | None = None
    if hooks:
        self._observer = TransitionObserver()
        for hook in hooks:
            self._observer.add_hook(hook)

machine property

machine: StateMachine

The underlying StateMachine definition.

store property

store: StateStore | AsyncStateStore

The state store backing this orchestrator.

process_event

process_event(
    entity_id: str,
    event: str | Event,
    context: dict[str, Any] | None = None,
) -> TransitionResult

Run the sandwich loop synchronously.

Source code in src/pystator/orchestrator.py
def process_event(
    self,
    entity_id: str,
    event: str | Event,
    context: dict[str, Any] | None = None,
) -> TransitionResult:
    """Run the sandwich loop synchronously."""
    store = self._store
    if not isinstance(store, StateStore):
        raise TypeError(
            "process_event requires a sync StateStore "
            "(get_state, set_state, get_context)"
        )

    current_state = store.get_state(entity_id)
    if current_state is None:
        if self._use_initial:
            current_state = self._machine.get_initial_state().name
        else:
            raise ValueError(
                f"Entity {entity_id!r} has no state and "
                "use_initial_state_when_missing is False"
            )

    parsed = parse_stored_state(current_state, self._machine.is_parallel_state)
    if isinstance(parsed, ParallelStateConfig):
        return self._process_parallel_event(
            entity_id, parsed, event, context, store
        )

    current_state = parsed
    ctx = dict(store.get_context(entity_id))
    if isinstance(event, Event):
        ctx.update(event.payload)
    else:
        event = Event(trigger=event)
    if context:
        ctx.update(context)
    ctx["_event"] = event
    ctx["_entity_id"] = entity_id

    # Notify hooks: before processing
    if self._observer is not None:
        self._observer.before_process(entity_id, current_state, event.trigger, ctx)

    # Validate context if configured (inline machine validation)
    if self._machine.meta.get("validate_context"):
        valid, errors = self._machine.validate_context(ctx)
        if not valid:
            return TransitionResult.failure_result(
                source_state=current_state,
                trigger=event.trigger,
                error=FSMError(
                    "Context validation failed",
                    context={"validation_errors": errors},
                ),
                metadata={"reason": "context_validation"},
            )

    # External context validator (e.g. PyCharter data contract validation)
    if self._context_validator is not None:
        ok, val_errors = self._context_validator(
            entity_id, current_state, event.trigger, ctx
        )
        if not ok:
            return TransitionResult.failure_result(
                source_state=current_state,
                trigger=event.trigger,
                error=FSMError(
                    f"Validation failed: {'; '.join(val_errors)}",
                    context={"validation_errors": val_errors},
                ),
                metadata={"reason": "context_validation"},
            )

    # Notify hooks: transition starting
    if self._observer is not None:
        self._observer.before_transition(current_state, event.trigger, ctx)

    try:
        result = self._machine.process(current_state, event, ctx)
    except Exception as exc:
        if self._observer is not None:
            self._observer.on_error(exc, ctx)
        raise

    # Notify hooks: transition complete
    if self._observer is not None:
        self._observer.after_transition(result, ctx)

    if result.success and result.target_state is not None:
        self._machine._engine.apply_history(result)
        state_changed = result.target_state != current_state

        if state_changed:
            # Cancel pending delayed transitions (sync path)
            if self._scheduler:
                import asyncio

                try:
                    loop = asyncio.get_running_loop()
                    loop.create_task(self._scheduler.cancel_for_entity(entity_id))
                except RuntimeError:
                    asyncio.run(self._scheduler.cancel_for_entity(entity_id))

            metadata = {
                "trigger": result.trigger,
                "source_state": result.source_state,
                "target_state": result.target_state,
                "is_terminal": (
                    result.target_state in self._machine.terminal_states
                ),
            }
            if self._machine.is_parallel_state(result.target_state):
                config = self._machine.enter_parallel_state(result.target_state)
                store.set_state(entity_id, config.to_string(), metadata=metadata)
            else:
                store.set_state(entity_id, result.target_state, metadata=metadata)

        if self._invoke_adapter is not None:
            get_invoke = lambda n: self._machine.get_state(n).invoke
            run_invoke_stop(
                self._invoke_adapter,
                result.source_state,
                get_invoke,
                ctx,
            )
        if result.all_action_specs:
            ctx["new_status"] = result.target_state
            self._executor.execute(result, ctx)
        if self._invoke_adapter is not None:
            get_invoke = lambda n: self._machine.get_state(n).invoke
            run_invoke_start(
                self._invoke_adapter,
                result.target_state,
                result.source_state,
                get_invoke,
                ctx,
            )

        # Persist updated context (filter internal keys)
        persist_ctx = {k: v for k, v in ctx.items() if not k.startswith("_")}
        if hasattr(store, "set_context"):
            store.set_context(entity_id, persist_ctx)

        # Fire machine callbacks
        self._machine._fire_callbacks(result, ctx)

        # Record audit trail
        if self._audit_store is not None:
            transition_id = self._audit_store.record_transition(
                entity_id=entity_id,
                source_state=result.source_state,
                trigger=result.trigger,
                target_state=result.target_state,
                success=result.success,
            )
            if isinstance(transition_id, str):
                result.metadata["transition_id"] = transition_id

    return result

async_process_event async

async_process_event(
    entity_id: str,
    event: str | Event,
    context: dict[str, Any] | None = None,
) -> TransitionResult

Run the sandwich loop asynchronously.

Source code in src/pystator/orchestrator.py
async def async_process_event(
    self,
    entity_id: str,
    event: str | Event,
    context: dict[str, Any] | None = None,
) -> TransitionResult:
    """Run the sandwich loop asynchronously."""
    store = self._store
    if not isinstance(store, AsyncStateStore):
        raise TypeError(
            "async_process_event requires an AsyncStateStore "
            "(aget_state, aset_state, aget_context)"
        )

    current_state = await store.aget_state(entity_id)
    if current_state is None:
        if self._use_initial:
            current_state = self._machine.get_initial_state().name
        else:
            raise ValueError(
                f"Entity {entity_id!r} has no state and "
                "use_initial_state_when_missing is False"
            )

    parsed = parse_stored_state(current_state, self._machine.is_parallel_state)
    if isinstance(parsed, ParallelStateConfig):
        return await self._process_parallel_event_async(
            entity_id, parsed, event, context, store
        )

    current_state = parsed
    ctx = dict(await store.aget_context(entity_id))
    if isinstance(event, Event):
        ctx.update(event.payload)
    else:
        event = Event(trigger=event)
    if context:
        ctx.update(context)
    ctx["_event"] = event
    ctx["_entity_id"] = entity_id

    # Notify hooks: before processing
    if self._observer is not None:
        self._observer.before_process(entity_id, current_state, event.trigger, ctx)

    if self._machine.meta.get("validate_context"):
        valid, errors = self._machine.validate_context(ctx)
        if not valid:
            return TransitionResult.failure_result(
                source_state=current_state,
                trigger=event.trigger,
                error=FSMError(
                    "Context validation failed",
                    context={"validation_errors": errors},
                ),
                metadata={"reason": "context_validation"},
            )

    # External context validator (e.g. PyCharter data contract validation)
    if self._context_validator is not None:
        if inspect.iscoroutinefunction(self._context_validator):
            ok, val_errors = await self._context_validator(
                entity_id, current_state, event.trigger, ctx
            )
        else:
            ok, val_errors = self._context_validator(
                entity_id, current_state, event.trigger, ctx
            )
        if not ok:
            return TransitionResult.failure_result(
                source_state=current_state,
                trigger=event.trigger,
                error=FSMError(
                    f"Validation failed: {'; '.join(val_errors)}",
                    context={"validation_errors": val_errors},
                ),
                metadata={"reason": "context_validation"},
            )

    # Notify hooks: transition starting
    if self._observer is not None:
        self._observer.before_transition(current_state, event.trigger, ctx)

    try:
        result = await self._machine.aprocess(current_state, event, ctx)
    except Exception as exc:
        if self._observer is not None:
            self._observer.on_error(exc, ctx)
        raise

    # Notify hooks: transition complete
    if self._observer is not None:
        self._observer.after_transition(result, ctx)

    if result.success and result.target_state is not None:
        self._machine._engine.apply_history(result)
        state_changed = result.target_state != current_state

        if state_changed:
            # Cancel pending delayed transitions
            if self._scheduler:
                await self._scheduler.cancel_for_entity(entity_id)

            metadata = {
                "trigger": result.trigger,
                "source_state": result.source_state,
                "target_state": result.target_state,
                "is_terminal": (
                    result.target_state in self._machine.terminal_states
                ),
            }
            if self._machine.is_parallel_state(result.target_state):
                config = self._machine.enter_parallel_state(result.target_state)
                await store.aset_state(
                    entity_id, config.to_string(), metadata=metadata
                )
            else:
                await store.aset_state(
                    entity_id, result.target_state, metadata=metadata
                )

            # Schedule delayed transitions from new state
            if self._scheduler:
                await self._schedule_delayed(entity_id, result.target_state, ctx)

        if self._invoke_adapter is not None:
            get_invoke = lambda n: self._machine.get_state(n).invoke
            run_invoke_stop(
                self._invoke_adapter,
                result.source_state,
                get_invoke,
                ctx,
            )
        if result.all_action_specs:
            ctx["new_status"] = result.target_state
            await self._executor.async_execute(result, ctx)
        if self._invoke_adapter is not None:
            get_invoke = lambda n: self._machine.get_state(n).invoke
            run_invoke_start(
                self._invoke_adapter,
                result.target_state,
                result.source_state,
                get_invoke,
                ctx,
            )

        # Persist updated context (filter internal keys)
        persist_ctx = {k: v for k, v in ctx.items() if not k.startswith("_")}
        if hasattr(store, "aset_context"):
            await store.aset_context(entity_id, persist_ctx)
        elif hasattr(store, "set_context"):
            store.set_context(entity_id, persist_ctx)

        # Fire machine callbacks
        await self._machine._afire_callbacks(result, ctx)

        # Record audit trail
        if self._audit_store is not None:
            transition_id = self._audit_store.record_transition(
                entity_id=entity_id,
                source_state=result.source_state,
                trigger=result.trigger,
                target_state=result.target_state,
                success=result.success,
            )
            if isinstance(transition_id, str):
                result.metadata["transition_id"] = transition_id

    return result

close async

close() -> None

Clean up resources (scheduler, etc.).

Source code in src/pystator/orchestrator.py
async def close(self) -> None:
    """Clean up resources (scheduler, etc.)."""
    if self._scheduler:
        await self._scheduler.close()