Schedulers and Delayed Transitions¶
This guide explains when you need a scheduler, how to choose one, and how to run delayed transitions end-to-end with the Orchestrator.
When you need a scheduler¶
You only need a scheduler if your FSM uses delayed transitions: a transition with an after field in YAML (e.g. after: "5s" or after: 5000). The scheduler is responsible for calling the Orchestrator when the delay expires so the transition can be applied.
One process (or Redis/Celery) can track many delayed transitions for many entities and FSM types—there is no “one scheduler per entity.”
Choosing a scheduler¶
| Need | Scheduler | Extra infra | Notes |
|---|---|---|---|
| Delayed transitions, single process or dev | AsyncioScheduler | None | In-memory; delays lost if process exits |
| Multiple replicas, HA, or survive restarts | RedisScheduler | Redis | Delays stored in Redis; any worker can fire them |
| Task queue already in use | CeleryScheduler | Celery + broker | Delays become Celery tasks |
- AsyncioScheduler: One pod can track all delays for all entities; no extra services. Best for development or single-instance deployments. Restarting the process loses pending delays.
- RedisScheduler / CeleryScheduler: Delays are stored externally, so multiple replicas can run and restarts do not lose pending delays. Use for production when you need HA or scale. See Deployment for setup.
Minimal delayed-transitions example¶
End-to-end: YAML with after, Orchestrator, in-memory state store, and AsyncioScheduler.
1. FSM with a delayed transition (my_fsm.yaml):
meta:
version: "1.0.0"
machine_name: "demo"
strict_mode: true
states:
- name: IDLE
type: initial
- name: PENDING
type: stable
- name: DONE
type: terminal
transitions:
- trigger: start
source: IDLE
dest: PENDING
- trigger: tick
source: PENDING
dest: DONE
after: "2s"
2. Run with Orchestrator + AsyncioScheduler:
import asyncio
from pystator import StateMachine, Orchestrator, GuardRegistry, ActionRegistry
from pystator import InMemoryStateStore
from pystator.scheduler import AsyncioScheduler
machine = StateMachine.from_yaml("my_fsm.yaml")
store = InMemoryStateStore()
guards = GuardRegistry()
actions = ActionRegistry()
scheduler = AsyncioScheduler()
orchestrator = Orchestrator(
machine=machine,
state_store=store,
guards=guards,
actions=actions,
scheduler=scheduler,
)
async def main():
# First event: IDLE -> PENDING and schedule "tick" in 2s
await orchestrator.async_process_event("entity-1", "start", {})
await asyncio.sleep(2.5) # wait for delayed transition
await orchestrator.close()
asyncio.run(main())
Flow: async_process_event("entity-1", "start", {}) transitions IDLE → PENDING and schedules the tick transition in 2 seconds. When the delay fires, the scheduler calls the orchestrator again with trigger="tick" for entity-1, which moves PENDING → DONE.
For production or multiple replicas, swap AsyncioScheduler for RedisScheduler or CeleryScheduler and use a persistent state store (e.g. PostgresStateStore or RedisStateStore). See Deployment for Redis/Celery setup.