Skip to content

Actions

Register and execute actions (on_enter, on_exit, transition actions).

ActionRegistry

ActionRegistry()

Registry for action functions (sync and async).

Example

registry = ActionRegistry() registry.register("notify", lambda ctx: print("notified")) registry.execute("notify", {"user": "alice"})

Source code in src/pystator/actions.py
def __init__(self) -> None:
    self._actions: dict[str, AnyActionFunc] = {}
    self._metadata: dict[str, dict[str, Any]] = {}
    self._async_actions: set[str] = set()
    self._lock = threading.Lock()

register

register(
    name: str,
    func: AnyActionFunc,
    metadata: dict[str, Any] | None = None,
) -> None

Register an action function. Thread-safe.

Source code in src/pystator/actions.py
def register(
    self,
    name: str,
    func: AnyActionFunc,
    metadata: dict[str, Any] | None = None,
) -> None:
    """Register an action function. Thread-safe."""
    if not name:
        raise ValueError("Action name cannot be empty")
    with self._lock:
        if name in self._actions:
            raise ValueError(f"Action '{name}' is already registered")
        self._actions[name] = func
        self._metadata[name] = metadata or {}
        if asyncio.iscoroutinefunction(func) or (
            callable(func)
            and asyncio.iscoroutinefunction(getattr(func, "__call__", None))  # noqa: B004
        ):
            self._async_actions.add(name)

unregister

unregister(name: str) -> None

Unregister an action function. Thread-safe.

Source code in src/pystator/actions.py
def unregister(self, name: str) -> None:
    """Unregister an action function. Thread-safe."""
    with self._lock:
        if name not in self._actions:
            raise ActionNotFoundError(
                f"Action '{name}' not found", action_name=name
            )
        del self._actions[name]
        del self._metadata[name]
        self._async_actions.discard(name)

get

get(name: str) -> AnyActionFunc

Look up a registered action callable by name.

Parameters:

Name Type Description Default
name str

The registered action name.

required

Returns:

Type Description
AnyActionFunc

The action callable.

Raises:

Type Description
ActionNotFoundError

If no action is registered under name.

Source code in src/pystator/actions.py
def get(self, name: str) -> AnyActionFunc:
    """Look up a registered action callable by name.

    Args:
        name: The registered action name.

    Returns:
        The action callable.

    Raises:
        ActionNotFoundError: If no action is registered under *name*.
    """
    if name not in self._actions:
        raise ActionNotFoundError(f"Action '{name}' not found", action_name=name)
    return self._actions[name]

has

has(name: str) -> bool

Check whether an action is registered.

Source code in src/pystator/actions.py
def has(self, name: str) -> bool:
    """Check whether an action is registered."""
    return name in self._actions

get_metadata

get_metadata(name: str) -> dict[str, Any]

Get metadata for a registered action.

Source code in src/pystator/actions.py
def get_metadata(self, name: str) -> dict[str, Any]:
    """Get metadata for a registered action."""
    return dict(self._metadata.get(name, {}))

is_async

is_async(name: str) -> bool

Return True if the named action is an async function.

Source code in src/pystator/actions.py
def is_async(self, name: str) -> bool:
    """Return True if the named action is an async function."""
    return name in self._async_actions

execute

execute(
    name: str,
    context: dict[str, Any],
    raise_on_error: bool = False,
) -> ActionResult

Execute a single named action synchronously.

Parameters:

Name Type Description Default
name str

Registered action name.

required
context dict[str, Any]

Context dict passed to the action callable.

required
raise_on_error bool

If True, re-raise exceptions instead of wrapping.

False

Returns:

Type Description
ActionResult

ActionResult indicating success or failure.

Source code in src/pystator/actions.py
def execute(
    self, name: str, context: dict[str, Any], raise_on_error: bool = False
) -> ActionResult:
    """Execute a single named action synchronously.

    Args:
        name: Registered action name.
        context: Context dict passed to the action callable.
        raise_on_error: If True, re-raise exceptions instead of wrapping.

    Returns:
        ActionResult indicating success or failure.
    """
    func = self.get(name)
    try:
        result = func(context)
        return ActionResult.ok(name, result)
    except Exception as e:
        if raise_on_error:
            raise
        return ActionResult.fail(name, e)

execute_all

execute_all(
    actions: tuple[str, ...] | list[str],
    context: dict[str, Any],
    stop_on_error: bool = False,
) -> list[ActionResult]

Execute multiple actions with stop-on-error control.

Parameters:

Name Type Description Default
actions tuple[str, ...] | list[str]

Sequence of action names to execute.

required
context dict[str, Any]

Context dict passed to each action.

required
stop_on_error bool

If True, halt after the first failure.

False

Returns:

Type Description
list[ActionResult]

List of ActionResult in execution order.

Source code in src/pystator/actions.py
def execute_all(
    self,
    actions: tuple[str, ...] | list[str],
    context: dict[str, Any],
    stop_on_error: bool = False,
) -> list[ActionResult]:
    """Execute multiple actions with stop-on-error control.

    Args:
        actions: Sequence of action names to execute.
        context: Context dict passed to each action.
        stop_on_error: If True, halt after the first failure.

    Returns:
        List of ActionResult in execution order.
    """
    results: list[ActionResult] = []
    for action_name in actions:
        try:
            result = self.execute(action_name, context)
            results.append(result)
            if not result.success and stop_on_error:
                break
        except ActionNotFoundError as e:
            results.append(ActionResult.fail(action_name, e))
            if stop_on_error:
                break
    return results

async_execute async

async_execute(
    name: str,
    context: dict[str, Any],
    raise_on_error: bool = False,
) -> ActionResult

Execute a single named action asynchronously.

Falls back to synchronous execution if the action is not async.

Parameters:

Name Type Description Default
name str

Registered action name.

required
context dict[str, Any]

Context dict passed to the action callable.

required
raise_on_error bool

If True, re-raise exceptions instead of wrapping.

False

Returns:

Type Description
ActionResult

ActionResult indicating success or failure.

Source code in src/pystator/actions.py
async def async_execute(
    self, name: str, context: dict[str, Any], raise_on_error: bool = False
) -> ActionResult:
    """Execute a single named action asynchronously.

    Falls back to synchronous execution if the action is not async.

    Args:
        name: Registered action name.
        context: Context dict passed to the action callable.
        raise_on_error: If True, re-raise exceptions instead of wrapping.

    Returns:
        ActionResult indicating success or failure.
    """
    func = self.get(name)
    try:
        if self.is_async(name):
            result = await func(context)  # type: ignore[misc]
        else:
            result = func(context)
        return ActionResult.ok(name, result)
    except Exception as e:
        if raise_on_error:
            raise
        return ActionResult.fail(name, e)

async_execute_all async

async_execute_all(
    actions: tuple[str, ...] | list[str],
    context: dict[str, Any],
    stop_on_error: bool = False,
) -> list[ActionResult]

Execute multiple actions asynchronously.

Parameters:

Name Type Description Default
actions tuple[str, ...] | list[str]

Sequence of action names to execute.

required
context dict[str, Any]

Context dict passed to each action.

required
stop_on_error bool

If True, halt after the first failure.

False

Returns:

Type Description
list[ActionResult]

List of ActionResult in execution order.

Source code in src/pystator/actions.py
async def async_execute_all(
    self,
    actions: tuple[str, ...] | list[str],
    context: dict[str, Any],
    stop_on_error: bool = False,
) -> list[ActionResult]:
    """Execute multiple actions asynchronously.

    Args:
        actions: Sequence of action names to execute.
        context: Context dict passed to each action.
        stop_on_error: If True, halt after the first failure.

    Returns:
        List of ActionResult in execution order.
    """
    results: list[ActionResult] = []
    for action_name in actions:
        try:
            result = await self.async_execute(action_name, context)
            results.append(result)
            if not result.success and stop_on_error:
                break
        except ActionNotFoundError as e:
            results.append(ActionResult.fail(action_name, e))
            if stop_on_error:
                break
    return results

list_actions

list_actions() -> list[str]

Return all registered action names.

Source code in src/pystator/actions.py
def list_actions(self) -> list[str]:
    """Return all registered action names."""
    return list(self._actions.keys())

clear

clear() -> None

Remove all registered actions.

Source code in src/pystator/actions.py
def clear(self) -> None:
    """Remove all registered actions."""
    self._actions.clear()
    self._metadata.clear()
    self._async_actions.clear()

decorator

decorator(
    name: str | None = None,
    metadata: dict[str, Any] | None = None,
) -> Callable[[AnyActionFunc], AnyActionFunc]

Decorator to register an action function.

Example

@registry.decorator() ... def notify_user(ctx: dict) -> None: ... send_email(ctx["user_email"], "Order updated")

Source code in src/pystator/actions.py
def decorator(
    self,
    name: str | None = None,
    metadata: dict[str, Any] | None = None,
) -> Callable[[AnyActionFunc], AnyActionFunc]:
    """Decorator to register an action function.

    Example:
        >>> @registry.decorator()
        ... def notify_user(ctx: dict) -> None:
        ...     send_email(ctx["user_email"], "Order updated")
    """

    def inner(func: AnyActionFunc) -> AnyActionFunc:
        self.register(name or func.__name__, func, metadata)
        return func

    return inner

ActionExecutor

ActionExecutor(
    registry: ActionRegistry,
    stop_on_error: bool = False,
    log_execution: bool = True,
    default_mode: ExecutionMode = ExecutionMode.SEQUENTIAL,
    strict: bool = False,
)

Executes actions from transition results.

Supports sequential, parallel, and phased execution modes.

Example

executor = ActionExecutor(action_registry) execution = executor.execute(transition_result, context)

Parameters:

Name Type Description Default
registry ActionRegistry

ActionRegistry containing the action callables.

required
stop_on_error bool

If True, halt batch execution on first failure.

False
log_execution bool

If True, log action skips and errors.

True
default_mode ExecutionMode

Default execution mode for batches.

SEQUENTIAL
strict bool

If True, raise on missing actions instead of skipping.

False
Source code in src/pystator/actions.py
def __init__(
    self,
    registry: ActionRegistry,
    stop_on_error: bool = False,
    log_execution: bool = True,
    default_mode: ExecutionMode = ExecutionMode.SEQUENTIAL,
    strict: bool = False,
) -> None:
    """Initialize the action executor.

    Args:
        registry: ActionRegistry containing the action callables.
        stop_on_error: If True, halt batch execution on first failure.
        log_execution: If True, log action skips and errors.
        default_mode: Default execution mode for batches.
        strict: If True, raise on missing actions instead of skipping.
    """
    self.registry = registry
    self.stop_on_error = stop_on_error
    self.log_execution = log_execution
    self.default_mode = default_mode
    self.strict = strict

execute

execute(
    transition_result: TransitionResult,
    context: dict[str, Any],
) -> ExecutionResult

Execute all actions from a transition result sequentially.

Source code in src/pystator/actions.py
def execute(
    self,
    transition_result: TransitionResult,
    context: dict[str, Any],
) -> ExecutionResult:
    """Execute all actions from a transition result sequentially."""
    result = ExecutionResult(
        transition_result=transition_result,
        started_at=datetime.now(UTC),
    )
    if not transition_result.success:
        result.completed_at = datetime.now(UTC)
        return result

    for spec in transition_result.all_action_specs:
        action_result = self._execute_single(
            spec.name,
            context,
            spec.params if spec.has_params else None,
            retry=spec.retry,
            retry_backoff=spec.retry_backoff,
        )
        result.action_results.append(action_result)
        if not action_result.success and self.stop_on_error:
            break

    result.completed_at = datetime.now(UTC)
    return result

execute_specific

execute_specific(
    action_names: list[str] | tuple[str, ...],
    context: dict[str, Any],
) -> list[ActionResult]

Execute specific actions by name sequentially.

Source code in src/pystator/actions.py
def execute_specific(
    self,
    action_names: list[str] | tuple[str, ...],
    context: dict[str, Any],
) -> list[ActionResult]:
    """Execute specific actions by name sequentially."""
    results: list[ActionResult] = []
    for name in action_names:
        results.append(self._execute_single(name, context, None))
    return results

validate_actions_exist

validate_actions_exist(
    transition_result: TransitionResult,
) -> list[str]

Return list of action names that are not registered.

Source code in src/pystator/actions.py
def validate_actions_exist(self, transition_result: TransitionResult) -> list[str]:
    """Return list of action names that are not registered."""
    missing: list[str] = []
    for spec in transition_result.all_action_specs:
        if spec.name.startswith("_effect."):
            continue  # Declarative effects don't need registry
        if not self.registry.has(spec.name):
            missing.append(spec.name)
    return missing

execute_action_spec

execute_action_spec(
    action: ActionSpec, context: dict[str, Any]
) -> ActionResult

Execute a single ActionSpec synchronously.

Source code in src/pystator/actions.py
def execute_action_spec(
    self, action: ActionSpec, context: dict[str, Any]
) -> ActionResult:
    """Execute a single ActionSpec synchronously."""
    return self._execute_single(
        action.name,
        context,
        action.params if action.has_params else None,
        retry=action.retry,
        retry_backoff=action.retry_backoff,
    )

execute_action_specs

execute_action_specs(
    actions: tuple[ActionSpec, ...] | list[ActionSpec],
    context: dict[str, Any],
) -> list[ActionResult]

Execute multiple ActionSpecs sequentially.

Source code in src/pystator/actions.py
def execute_action_specs(
    self,
    actions: tuple[ActionSpec, ...] | list[ActionSpec],
    context: dict[str, Any],
) -> list[ActionResult]:
    """Execute multiple ActionSpecs sequentially."""
    results: list[ActionResult] = []
    for action in actions:
        result = self.execute_action_spec(action, context)
        results.append(result)
        if not result.success and self.stop_on_error:
            break
    return results

async_execute async

async_execute(
    transition_result: TransitionResult,
    context: dict[str, Any],
) -> ExecutionResult

Execute all actions asynchronously in sequential order.

Source code in src/pystator/actions.py
async def async_execute(
    self,
    transition_result: TransitionResult,
    context: dict[str, Any],
) -> ExecutionResult:
    """Execute all actions asynchronously in sequential order."""
    result = ExecutionResult(
        transition_result=transition_result,
        started_at=datetime.now(UTC),
    )
    if not transition_result.success:
        result.completed_at = datetime.now(UTC)
        return result

    for spec in transition_result.all_action_specs:
        ar = await self._async_execute_single(
            spec.name,
            context,
            spec.params if spec.has_params else None,
            timeout=spec.timeout,
            retry=spec.retry,
            retry_backoff=spec.retry_backoff,
        )
        result.action_results.append(ar)
        if not ar.success and self.stop_on_error:
            break

    result.completed_at = datetime.now(UTC)
    return result

async_execute_parallel async

async_execute_parallel(
    transition_result: TransitionResult,
    context: dict[str, Any],
) -> ExecutionResult

Execute all actions concurrently using asyncio.gather.

Source code in src/pystator/actions.py
async def async_execute_parallel(
    self,
    transition_result: TransitionResult,
    context: dict[str, Any],
) -> ExecutionResult:
    """Execute all actions concurrently using asyncio.gather."""
    result = ExecutionResult(
        transition_result=transition_result,
        execution_mode=ExecutionMode.PARALLEL,
        started_at=datetime.now(UTC),
    )
    if not transition_result.success:
        result.completed_at = datetime.now(UTC)
        return result

    specs = transition_result.all_action_specs
    if specs:
        tasks = [
            self._async_execute_single(
                s.name,
                context,
                s.params if s.has_params else None,
                timeout=s.timeout,
                retry=s.retry,
                retry_backoff=s.retry_backoff,
            )
            for s in specs
        ]
        result.action_results = list(await asyncio.gather(*tasks))

    result.completed_at = datetime.now(UTC)
    return result

async_execute_phased async

async_execute_phased(
    transition_result: TransitionResult,
    context: dict[str, Any],
) -> ExecutionResult

Execute actions in phases: exit -> transition -> enter (each parallel).

Source code in src/pystator/actions.py
async def async_execute_phased(
    self,
    transition_result: TransitionResult,
    context: dict[str, Any],
) -> ExecutionResult:
    """Execute actions in phases: exit -> transition -> enter (each parallel)."""
    result = ExecutionResult(
        transition_result=transition_result,
        execution_mode=ExecutionMode.PHASED,
        started_at=datetime.now(UTC),
    )
    if not transition_result.success:
        result.completed_at = datetime.now(UTC)
        return result

    phases = [
        transition_result.on_exit_actions,
        transition_result.actions_to_execute,
        transition_result.on_enter_actions,
    ]
    all_results: list[ActionResult] = []
    for phase_specs in phases:
        if not phase_specs:
            continue
        tasks = [
            self._async_execute_single(
                s.name,
                context,
                s.params if s.has_params else None,
                timeout=s.timeout,
                retry=s.retry,
                retry_backoff=s.retry_backoff,
            )
            for s in phase_specs
        ]
        phase_results = await asyncio.gather(*tasks)
        all_results.extend(phase_results)
        if self.stop_on_error and any(not r.success for r in phase_results):
            break

    result.action_results = all_results
    result.completed_at = datetime.now(UTC)
    return result

async_execute_with_mode async

async_execute_with_mode(
    transition_result: TransitionResult,
    context: dict[str, Any],
    mode: ExecutionMode = ExecutionMode.SEQUENTIAL,
) -> ExecutionResult

Execute with explicit mode (sequential, parallel, or phased).

Source code in src/pystator/actions.py
async def async_execute_with_mode(
    self,
    transition_result: TransitionResult,
    context: dict[str, Any],
    mode: ExecutionMode = ExecutionMode.SEQUENTIAL,
) -> ExecutionResult:
    """Execute with explicit mode (sequential, parallel, or phased)."""
    if mode == ExecutionMode.PARALLEL:
        return await self.async_execute_parallel(transition_result, context)
    if mode == ExecutionMode.PHASED:
        return await self.async_execute_phased(transition_result, context)
    return await self.async_execute(transition_result, context)

async_execute_specific_parallel async

async_execute_specific_parallel(
    action_names: list[str] | tuple[str, ...],
    context: dict[str, Any],
) -> list[ActionResult]

Execute specific actions in parallel by name.

Source code in src/pystator/actions.py
async def async_execute_specific_parallel(
    self,
    action_names: list[str] | tuple[str, ...],
    context: dict[str, Any],
) -> list[ActionResult]:
    """Execute specific actions in parallel by name."""
    if not action_names:
        return []
    tasks = [
        self._async_execute_single(name, context, None) for name in action_names
    ]
    return list(await asyncio.gather(*tasks))

async_execute_action_spec async

async_execute_action_spec(
    action: ActionSpec, context: dict[str, Any]
) -> ActionResult

Async execute a single ActionSpec.

Source code in src/pystator/actions.py
async def async_execute_action_spec(
    self,
    action: ActionSpec,
    context: dict[str, Any],
) -> ActionResult:
    """Async execute a single ActionSpec."""
    return await self._async_execute_single(
        action.name,
        context,
        action.params if action.has_params else None,
        timeout=action.timeout,
        retry=action.retry,
        retry_backoff=action.retry_backoff,
    )

async_execute_action_specs_parallel async

async_execute_action_specs_parallel(
    actions: tuple[ActionSpec, ...] | list[ActionSpec],
    context: dict[str, Any],
) -> list[ActionResult]

Async execute multiple ActionSpecs in parallel.

Source code in src/pystator/actions.py
async def async_execute_action_specs_parallel(
    self,
    actions: tuple[ActionSpec, ...] | list[ActionSpec],
    context: dict[str, Any],
) -> list[ActionResult]:
    """Async execute multiple ActionSpecs in parallel."""
    if not actions:
        return []
    tasks = [self.async_execute_action_spec(a, context) for a in actions]
    return list(await asyncio.gather(*tasks))