Skip to content

Worker: Continuous Event-Processing Service

The PyStator Worker is a long-running process that consumes events from a durable queue (the worker_events table by default), routes each event to the appropriate FSM Orchestrator, and persists state. Multiple worker replicas can run safely; the database-backed event source uses atomic claims so each event is processed by only one worker.

Use the Worker when you want:

  • Decoupled event processing — API or other services enqueue events; the worker processes them asynchronously.
  • Durable processing — Events survive process restarts; failed events can be retried.
  • Scaling — Run multiple worker replicas; they share the same state store and event queue.

Overview

┌──────────────┐     submit_event()      ┌─────────────────┐
│  API / App   │ ──────────────────────▶ │  worker_events   │
└──────────────┘     (or direct insert)  │  (table)         │
                                        └────────┬─────────┘
                                                 │ claim_next()
                                        ┌────────▼─────────┐
                                        │  PyStator Worker │
                                        │  (pystator worker)│
                                        │  N poller tasks   │
                                        └────────┬─────────┘
                                                 │ load state → process → persist → act
                                        ┌────────▼─────────┐
                                        │  State store     │
                                        │  (entity_states) │
                                        └─────────────────┘
  1. Producers (API, scripts, other services) enqueue events via submit_event() (or by inserting into worker_events).
  2. Worker polls the event source, claims one event at a time (atomic per worker), loads the machine and entity state, runs the Orchestrator (transition + persist + actions), then marks the event complete or failed.
  3. State is stored in the same state store (e.g. entity_states table) used by the Orchestrator, so API and worker share the same entity state.

Quick start

1. Database and schema

The Worker uses the same database as the API. Ensure migrations are applied so worker_events and entity_states (and optionally machines) exist:

pystator db upgrade

2. Enqueue an event

From any process with database access (e.g. your API after handling a request):

from pystator.worker import submit_event

# Async
event_id = await submit_event(
    "order_management",   # machine_name
    "order-123",         # entity_id
    "fill",               # trigger
    context={"fill_qty": 100, "order_qty": 100},
)

# Or sync (e.g. in a Flask/Django view)
from pystator.worker import submit_event_sync
event_id = submit_event_sync("order_management", "order-123", "fill", context={...})

Events are written to the worker_events table. The Worker does not need to be running for submit_event to succeed; events will be processed when a worker starts and polls.

3. Run the worker

pystator worker

The worker starts, connects to the database, loads machine definitions (from the machines table by default), and runs several concurrent poller tasks. Each poller repeatedly claims an event, processes it through the Orchestrator, then marks it complete or failed. The process runs until you stop it (e.g. Ctrl+C); it handles SIGTERM/SIGINT and drains in-flight events before exiting.

Configuration

Worker configuration is resolved in this order: constructor arguments → environment variables → pystator.cfg → defaults.

Setting Env var Default Description
Database URL PYSTATOR_DATABASE_URL or PYSTATOR_WORKER_DB_URL (from pystator config) Database for worker_events and state store.
Poll interval PYSTATOR_WORKER_POLL_INTERVAL_MS 500 Milliseconds between event-source polls.
Concurrency PYSTATOR_WORKER_CONCURRENCY 5 Number of concurrent poller tasks.
Drain timeout PYSTATOR_WORKER_DRAIN_TIMEOUT_S 30 Seconds to wait for in-flight events on shutdown.
Max attempts PYSTATOR_WORKER_MAX_ATTEMPTS 5 Default max processing attempts per event.
Machine source PYSTATOR_WORKER_MACHINE_SOURCE db Where to load machines: db (machines table) or yaml (files).
Machine dir PYSTATOR_WORKER_MACHINE_DIR Directory of YAML machine configs (when machine_source=yaml).
Worker ID PYSTATOR_WORKER_ID auto Unique ID for this replica (default: hostname-pid-uuid).

You can also use a [worker] section in pystator.cfg:

[worker]
concurrency = 10
poll_interval_ms = 250
drain_timeout_s = 60

CLI overrides (when using pystator worker):

  • --concurrency N — number of poller tasks.
  • --poll-interval N — poll interval in milliseconds.
  • --app MODULE:FACTORY — use a custom Worker instance (see Custom worker).

Submitting events

submit_event (async)

from pystator.worker import submit_event

event_id = await submit_event(
    machine_name="order_management",
    entity_id="order-123",
    trigger="fill",
    context={"fill_qty": 100, "order_qty": 100},
    fires_at=None,           # when the event becomes eligible (None = now)
    idempotency_key=None,   # optional dedup key
    max_attempts=5,
    db_url=None,            # optional; otherwise uses config
)

Returns the event_id of the enqueued event. Requires a database URL (passed or via PYSTATOR_DATABASE_URL).

submit_event_sync

For synchronous code (scripts, Django/Flask views):

from pystator.worker import submit_event_sync

event_id = submit_event_sync("order_management", "order-123", "fill", context={...})

Delayed events (fires_at)

To make an event eligible only after a certain time (e.g. for delayed transitions or scheduled work), pass fires_at:

from datetime import datetime, timezone, timedelta

event_id = await submit_event(
    "order_management", "order-123", "remind",
    fires_at=datetime.now(timezone.utc) + timedelta(minutes=30),
    context={},
)

The Worker’s event source only claims events whose fires_at <= now, so the event will not be processed until that time.

Idempotency

Pass idempotency_key to avoid duplicate processing when the same logical event might be submitted more than once:

event_id = await submit_event(
    "order_management", "order-123", "fill",
    idempotency_key="order-123-fill-abc",
    context={"fill_qty": 50},
)

The default event source (database) enforces uniqueness on idempotency_key; duplicate keys result in a single stored event.

Custom worker (guards, actions, custom config)

To use your own guards and actions (or a custom event source), create a Worker in code and expose it via a factory; then run the worker with --app.

1. Define a factory (e.g. in myapp/worker.py):

from pystator import GuardRegistry, ActionRegistry
from pystator.worker import Worker
from pystator.worker.config import WorkerConfig

def create_worker() -> Worker:
    guards = GuardRegistry()
    guards.register("is_full_fill", lambda ctx: ctx.get("fill_qty", 0) >= ctx.get("order_qty", 1))

    actions = ActionRegistry()
    @actions.register("notify_filled")
    def notify_filled(ctx):
        print(f"Order {ctx.get('entity_id')} filled")

    config = WorkerConfig(concurrency=10, poll_interval_ms=200)
    return Worker(config=config, guards=guards, actions=actions)

2. Run with --app:

pystator worker --app myapp.worker:create_worker --concurrency 10

The CLI loads the factory, gets the Worker instance, applies any CLI overrides (e.g. --concurrency), then runs it.

State store used by the worker

The Worker uses a StateStore to load and persist entity state when processing each event. By default it:

  1. Detects the database type from the URL and creates the appropriate store (SQLiteStateStore, PostgresStateStore, or MongoDBStateStore). It connects and, if the schema is present, uses the entity_states table.
  2. If that fails (e.g. schema not initialized), it falls back to InMemoryStateStore. In that case, entity state is not persisted across worker restarts.

For production, run migrations so the Worker uses the database state store. Then the API and the Worker share the same entity state.

Graceful shutdown

On SIGTERM or SIGINT, the Worker:

  1. Stops claiming new events.
  2. Cancels poller tasks (they exit after the current iteration).
  3. Waits up to drain_timeout_s for in-flight events to complete.
  4. Closes the event source and exits.

Use a process manager (systemd, Kubernetes, Docker) that sends SIGTERM and allows the drain period before SIGKILL.

Relation to other “workers”

  • PyStator Worker (this guide): The built-in long-running service that polls worker_events, runs the Orchestrator, and persists state. No extra infrastructure beyond the database.
  • Celery / Redis “worker patterns” (see Deployment): Used for delayed transitions (scheduler) or for running actions in a separate task queue. You can use the PyStator Worker alone for event processing and add Redis/Celery when you need durable delayed transitions or offloaded action execution.

See also

  • State stores — How entity state is stored and shared between API and Worker.
  • Deployment — Running the Worker in Docker and Kubernetes.
  • API referenceWorker, WorkerConfig, submit_event, submit_event_sync.