Skip to content

Schedulers and Delayed Transitions

This guide explains when you need a scheduler, how to choose one, and how to run delayed transitions end-to-end with the Orchestrator.

When you need a scheduler

You only need a scheduler if your FSM uses delayed transitions: a transition with an after field in YAML (e.g. after: "5s" or after: 5000). The scheduler is responsible for calling the Orchestrator when the delay expires so the transition can be applied.

One process (or Redis/Celery) can track many delayed transitions for many entities and FSM types—there is no “one scheduler per entity.”

Choosing a scheduler

Need Scheduler Extra infra Notes
Delayed transitions, single process or dev AsyncioScheduler None In-memory; delays lost if process exits
Multiple replicas, HA, or survive restarts RedisScheduler Redis Delays stored in Redis; any worker can fire them
Task queue already in use CeleryScheduler Celery + broker Delays become Celery tasks
  • AsyncioScheduler: One pod can track all delays for all entities; no extra services. Best for development or single-instance deployments. Restarting the process loses pending delays.
  • RedisScheduler / CeleryScheduler: Delays are stored externally, so multiple replicas can run and restarts do not lose pending delays. Use for production when you need HA or scale. See Deployment for setup.

Minimal delayed-transitions example

End-to-end: YAML with after, Orchestrator, in-memory state store, and AsyncioScheduler.

1. FSM with a delayed transition (my_fsm.yaml):

meta:
  version: "1.0.0"
  machine_name: "demo"
  strict_mode: true

states:
  - name: idle
    type: initial
  - name: pending
    type: stable
  - name: done
    type: terminal

transitions:
  - trigger: start
    source: idle
    dest: pending
  - trigger: tick
    source: pending
    dest: done
    after: "2s"

2. Run with Orchestrator + AsyncioScheduler:

import asyncio
from pystator import StateMachine, Orchestrator, GuardRegistry, ActionRegistry
from pystator import InMemoryStateStore
from pystator.scheduler import AsyncioScheduler

machine = StateMachine.from_yaml("my_fsm.yaml")
store = InMemoryStateStore()
guards = GuardRegistry()
actions = ActionRegistry()
scheduler = AsyncioScheduler()

orchestrator = Orchestrator(
    machine=machine,
    state_store=store,
    guards=guards,
    actions=actions,
    scheduler=scheduler,
)

async def main():
    # First event: idle -> pending and schedule "tick" in 2s
    await orchestrator.async_process_event("entity-1", "start", {})
    await asyncio.sleep(2.5)  # wait for delayed transition
    await orchestrator.close()

asyncio.run(main())

Flow: async_process_event("entity-1", "start", {}) transitions idlepending and schedules the tick transition in 2 seconds. When the delay fires, the scheduler calls the orchestrator again with trigger="tick" for entity-1, which moves pendingdone.

For production or multiple replicas, swap AsyncioScheduler for RedisScheduler or CeleryScheduler and use a persistent state store (e.g. PostgresStateStore or RedisStateStore). See Deployment for Redis/Celery setup.