Skip to content

Orchestrator

Orchestrator coordinates the machine, state store, guards, and actions for stateful event processing (load state, process, persist, run actions, schedule delayed transitions).

Orchestrator

Orchestrator(
    machine: StateMachine,
    state_store: StateStore | AsyncStateStore,
    guards: GuardRegistry | None = None,
    actions: ActionRegistry | None = None,
    *,
    context_validator: ContextValidatorFn | None = None,
    executor: ActionExecutor | None = None,
    scheduler: "SchedulerAdapter | None" = None,
    use_initial_state_when_missing: bool = True,
    invoke_adapter: InvokeAdapter | None = None
)

Runs the full FSM sandwich loop: load -> process -> persist -> execute.

Parameters:

Name Type Description Default
machine StateMachine

The StateMachine definition.

required
state_store StateStore | AsyncStateStore

Sync or async state store.

required
guards GuardRegistry | None

Guard registry (bound to machine automatically).

None
actions ActionRegistry | None

Action registry.

None
context_validator ContextValidatorFn | None

Optional callable for pre-transition context validation.

None
executor ActionExecutor | None

Optional ActionExecutor (created automatically if omitted).

None
scheduler 'SchedulerAdapter | None'

Optional scheduler for delayed transitions.

None
use_initial_state_when_missing bool

Use machine's initial state for new entities.

True
invoke_adapter InvokeAdapter | None

Optional adapter to start/stop invoked services on state enter/exit.

None

Context validation options (choose one or combine):

1. ``context_validator`` (recommended for external validation, e.g. PyCharter):
   Callable with signature ``(entity_id, state, trigger, context) -> (ok, errors)``.
   Runs BEFORE guards. Use for schema or contract validation.

2. ``machine.meta["validate_context"]``: Config-driven, set in YAML/dict.
   Validates that required keys exist in context. Lightweight, no external deps.

3. Guards: Use for business-rule validation that depends on state + context.
   Runs AFTER context_validator. Best for domain logic (e.g., "is_cancellable").
Source code in src/pystator/orchestrator.py
def __init__(
    self,
    machine: StateMachine,
    state_store: StateStore | AsyncStateStore,
    guards: GuardRegistry | None = None,
    actions: ActionRegistry | None = None,
    *,
    context_validator: ContextValidatorFn | None = None,
    executor: ActionExecutor | None = None,
    scheduler: "SchedulerAdapter | None" = None,
    use_initial_state_when_missing: bool = True,
    invoke_adapter: InvokeAdapter | None = None,
) -> None:
    self._machine = machine
    self._store = state_store

    # Bind guards if provided
    if guards is not None:
        machine.bind_guards(guards)

    # Use machine's registries if not explicitly provided
    self._guards = guards or machine.guard_registry
    self._actions = actions or machine.action_registry
    self._executor = executor or ActionExecutor(self._actions, log_execution=True)
    self._context_validator = context_validator
    self._scheduler = scheduler
    self._use_initial = use_initial_state_when_missing
    self._invoke_adapter = invoke_adapter

process_event

process_event(
    entity_id: str,
    event: str | Event,
    context: dict[str, Any] | None = None,
) -> TransitionResult

Run the sandwich loop synchronously.

Source code in src/pystator/orchestrator.py
def process_event(
    self,
    entity_id: str,
    event: str | Event,
    context: dict[str, Any] | None = None,
) -> TransitionResult:
    """Run the sandwich loop synchronously."""
    store = self._store
    if not isinstance(store, StateStore):
        raise TypeError(
            "process_event requires a sync StateStore "
            "(get_state, set_state, get_context)"
        )

    current_state = store.get_state(entity_id)
    if current_state is None:
        if self._use_initial:
            current_state = self._machine.get_initial_state().name
        else:
            raise ValueError(
                f"Entity {entity_id!r} has no state and "
                "use_initial_state_when_missing is False"
            )

    parsed = parse_stored_state(current_state, self._machine.is_parallel_state)
    if isinstance(parsed, ParallelStateConfig):
        return self._process_parallel_event(
            entity_id, parsed, event, context, store
        )

    current_state = parsed
    ctx = dict(store.get_context(entity_id))
    if isinstance(event, Event):
        ctx.update(event.payload)
    else:
        event = Event(trigger=event)
    if context:
        ctx.update(context)
    ctx["_event"] = event
    ctx["_entity_id"] = entity_id

    # Validate context if configured (inline machine validation)
    if self._machine.meta.get("validate_context"):
        valid, errors = self._machine.validate_context(ctx)
        if not valid:
            return TransitionResult.failure_result(
                source_state=current_state,
                trigger=event.trigger,
                error=FSMError(
                    "Context validation failed",
                    context={"validation_errors": errors},
                ),
                metadata={"reason": "context_validation"},
            )

    # External context validator (e.g. PyCharter data contract validation)
    if self._context_validator is not None:
        ok, val_errors = self._context_validator(
            entity_id, current_state, event.trigger, ctx
        )
        if not ok:
            return TransitionResult.failure_result(
                source_state=current_state,
                trigger=event.trigger,
                error=FSMError(
                    f"Validation failed: {'; '.join(val_errors)}",
                    context={"validation_errors": val_errors},
                ),
                metadata={"reason": "context_validation"},
            )

    result = self._machine.process(current_state, event, ctx)

    if result.success and result.target_state is not None:
        self._machine._engine.apply_history(result)
        if result.target_state != current_state:
            metadata = {
                "trigger": result.trigger,
                "source_state": result.source_state,
                "target_state": result.target_state,
                "is_terminal": result.target_state in self._machine.terminal_states,
            }
            if self._machine.is_parallel_state(result.target_state):
                config = self._machine.enter_parallel_state(result.target_state)
                store.set_state(entity_id, config.to_string(), metadata=metadata)
            else:
                store.set_state(entity_id, result.target_state, metadata=metadata)

        if self._invoke_adapter is not None:
            get_invoke = lambda n: self._machine.get_state(n).invoke
            run_invoke_stop(
                self._invoke_adapter,
                result.source_state,
                get_invoke,
                ctx,
            )
        if result.all_action_specs:
            ctx["new_status"] = result.target_state
            self._executor.execute(result, ctx)
        if self._invoke_adapter is not None:
            get_invoke = lambda n: self._machine.get_state(n).invoke
            run_invoke_start(
                self._invoke_adapter,
                result.target_state,
                result.source_state,
                get_invoke,
                ctx,
            )

    return result

async_process_event async

async_process_event(
    entity_id: str,
    event: str | Event,
    context: dict[str, Any] | None = None,
) -> TransitionResult

Run the sandwich loop asynchronously.

Source code in src/pystator/orchestrator.py
async def async_process_event(
    self,
    entity_id: str,
    event: str | Event,
    context: dict[str, Any] | None = None,
) -> TransitionResult:
    """Run the sandwich loop asynchronously."""
    store = self._store
    if not isinstance(store, AsyncStateStore):
        raise TypeError(
            "async_process_event requires an AsyncStateStore "
            "(aget_state, aset_state, aget_context)"
        )

    current_state = await store.aget_state(entity_id)
    if current_state is None:
        if self._use_initial:
            current_state = self._machine.get_initial_state().name
        else:
            raise ValueError(
                f"Entity {entity_id!r} has no state and "
                "use_initial_state_when_missing is False"
            )

    parsed = parse_stored_state(current_state, self._machine.is_parallel_state)
    if isinstance(parsed, ParallelStateConfig):
        return await self._process_parallel_event_async(
            entity_id, parsed, event, context, store
        )

    current_state = parsed
    ctx = dict(await store.aget_context(entity_id))
    if isinstance(event, Event):
        ctx.update(event.payload)
    else:
        event = Event(trigger=event)
    if context:
        ctx.update(context)
    ctx["_event"] = event
    ctx["_entity_id"] = entity_id

    if self._machine.meta.get("validate_context"):
        valid, errors = self._machine.validate_context(ctx)
        if not valid:
            return TransitionResult.failure_result(
                source_state=current_state,
                trigger=event.trigger,
                error=FSMError(
                    "Context validation failed",
                    context={"validation_errors": errors},
                ),
                metadata={"reason": "context_validation"},
            )

    # External context validator (e.g. PyCharter data contract validation)
    if self._context_validator is not None:
        if inspect.iscoroutinefunction(self._context_validator):
            ok, val_errors = await self._context_validator(
                entity_id, current_state, event.trigger, ctx
            )
        else:
            ok, val_errors = self._context_validator(
                entity_id, current_state, event.trigger, ctx
            )
        if not ok:
            return TransitionResult.failure_result(
                source_state=current_state,
                trigger=event.trigger,
                error=FSMError(
                    f"Validation failed: {'; '.join(val_errors)}",
                    context={"validation_errors": val_errors},
                ),
                metadata={"reason": "context_validation"},
            )

    result = await self._machine.aprocess(current_state, event, ctx)

    if result.success and result.target_state is not None:
        self._machine._engine.apply_history(result)
        state_changed = result.target_state != current_state

        if state_changed:
            # Cancel pending delayed transitions
            if self._scheduler:
                await self._scheduler.cancel_for_entity(entity_id)

            metadata = {
                "trigger": result.trigger,
                "source_state": result.source_state,
                "target_state": result.target_state,
                "is_terminal": result.target_state in self._machine.terminal_states,
            }
            if self._machine.is_parallel_state(result.target_state):
                config = self._machine.enter_parallel_state(result.target_state)
                await store.aset_state(
                    entity_id, config.to_string(), metadata=metadata
                )
            else:
                await store.aset_state(
                    entity_id, result.target_state, metadata=metadata
                )

            # Schedule delayed transitions from new state
            if self._scheduler:
                await self._schedule_delayed(entity_id, result.target_state, ctx)

        if self._invoke_adapter is not None:
            get_invoke = lambda n: self._machine.get_state(n).invoke
            run_invoke_stop(
                self._invoke_adapter,
                result.source_state,
                get_invoke,
                ctx,
            )
        if result.all_action_specs:
            ctx["new_status"] = result.target_state
            await self._executor.async_execute(result, ctx)
        if self._invoke_adapter is not None:
            get_invoke = lambda n: self._machine.get_state(n).invoke
            run_invoke_start(
                self._invoke_adapter,
                result.target_state,
                result.source_state,
                get_invoke,
                ctx,
            )

    return result

close async

close() -> None

Clean up resources (scheduler, etc.).

Source code in src/pystator/orchestrator.py
async def close(self) -> None:
    """Clean up resources (scheduler, etc.)."""
    if self._scheduler:
        await self._scheduler.close()