Skip to content

State Stores

PyStator provides pluggable state storage for entity state persistence. This guide covers the available implementations and how to create custom stores.

Overview

The Orchestrator uses a StateStore to persist entity state between events. This enables:

  • Stateless API: The API server doesn't hold state in memory
  • Distributed processing: Multiple workers can process events for the same entity
  • Crash recovery: State survives process restarts
  • Audit trail: Track state changes over time

Available Implementations

InMemoryStateStore

Simple in-memory store for testing and development. State is lost when the process exits.

from pystator import InMemoryStateStore, Orchestrator

store = InMemoryStateStore()
orchestrator = Orchestrator(
    machine=machine,
    state_store=store,
    guards=guards,
    actions=actions,
)

SQLiteStateStore

SQLite-backed store for development and small deployments. Defaults to sqlite:///<cwd>/pystator.db if no connection string is provided.

from pystator import SQLiteStateStore, Orchestrator

store = SQLiteStateStore("sqlite:///pystator.db")
store.connect()  # auto_initialize=True by default

# Or use defaults (resolves from env/config/cwd)
store = SQLiteStateStore()
store.connect()

PostgresStateStore

PostgreSQL-backed store for production deployments. Connection string is required.

from pystator import PostgresStateStore, Orchestrator

store = PostgresStateStore("postgresql://user:pass@localhost/pystator")
store.connect(auto_initialize=True)

Options

store = PostgresStateStore("postgresql://user:pass@localhost/pystator")
store.connect(validate_schema=True, auto_initialize=False)

MongoDBStateStore

MongoDB-backed store using native pymongo. Install with pip install pystator[api] (includes pymongo).

from pystator import MongoDBStateStore

store = MongoDBStateStore("mongodb://localhost:27017", database_name="pystator")
store.connect()

RedisStateStore

High-throughput Redis-backed store for scenarios where database writes are a bottleneck.

from pystator import RedisStateStore

# connection_string optional if configured via env/config
store = RedisStateStore("redis://localhost:6379/0")
store.connect()

Options

store = RedisStateStore(
    "redis://localhost:6379/0",
    key_prefix="pystator",  # Key prefix for entity keys
)
store.connect()

Key Structure

pystator:entity:{entity_id}
  - state: "open"
  - context: "{...json...}"
  - machine_name: "order_management"

Database Setup

Before using PostgresStateStore (or SQLiteStateStore with auto_initialize=False), initialize the database:

# Initialize with SQLite (default)
pystator db init

# Initialize with PostgreSQL
pystator db init postgresql://user:pass@localhost/pystator

# Apply migrations
pystator db upgrade

# Seed example machines
pystator db seed

Initialization behavior: pystator db init uses Alembic migrations as the single source of truth (same pattern as PyCharter). Do not rely on a create_all fallback; ensure migration files exist under db/migrations/versions/. If no migrations are found, init will fail with a clear message.

Truncate: pystator db truncate clears all PyStator tables in dependency order: worker_events, transition_history, entity_states, and machines. Use with care (prompts for confirmation unless --force).

Custom State Stores

Implement the StateStore protocol for sync stores:

from pystator import StateStore

class MyCustomStateStore:
    """Custom state store implementation."""

    def get_state(self, entity_id: str) -> str | None:
        """Return current state or None if entity doesn't exist."""
        ...

    def set_state(
        self,
        entity_id: str,
        state: str,
        *,
        metadata: dict | None = None,
    ) -> None:
        """Persist the new state."""
        ...

    def get_context(self, entity_id: str) -> dict:
        """Return stored context or empty dict."""
        return {}

For async stores, implement AsyncStateStore:

from pystator import AsyncStateStore

class MyAsyncStateStore:
    async def aget_state(self, entity_id: str) -> str | None:
        ...

    async def aset_state(
        self,
        entity_id: str,
        state: str,
        *,
        metadata: dict | None = None,
    ) -> None:
        ...

    async def aget_context(self, entity_id: str) -> dict:
        return {}

Multi-Machine Setup

When running multiple FSMs, use machine_name to partition state:

# Order FSM store — both share the same DB, machine_name differentiates
order_store = SQLiteStateStore("sqlite:///pystator.db")
order_store.connect()

session_store = SQLiteStateStore("sqlite:///pystator.db")
session_store.connect()

Context Persistence

Context is persisted alongside state via the metadata kwarg:

store = SQLiteStateStore("sqlite:///pystator.db")
store.connect()

# Context will be saved with each state change
orchestrator.process_event(
    "order-123",
    "fill",
    context={"fill_qty": 100, "price": 50.25},
)

# Context is loaded with state
ctx = store.get_context("order-123")
# {"fill_qty": 100, "price": 50.25}

Transition History

The TransitionHistoryModel automatically records all transitions when using the entities API:

# Via API
POST /api/v1/entities/order-123/events
{
    "trigger": "fill",
    "context": {"fill_qty": 100}
}

# Query history
GET /api/v1/entities/order-123/history

History includes: - Source and target states - Trigger name - Success/failure - Error messages - Actions executed - Timestamp and duration

Performance Considerations

Store Read Write Consistency Use Case
InMemory O(1) O(1) N/A Testing
SQLite Fast Fast ACID Development, small deployments
PostgreSQL Fast Medium ACID Production
MongoDB Fast Fast Configurable Document-oriented workflows
Redis O(1) O(1) Eventual High-throughput

For high-throughput scenarios: 1. Use RedisStateStore for hot paths 2. Async-replicate to PostgreSQL for durability 3. Use connection pooling for database stores

Next Steps