State Stores¶
PyStator provides pluggable state storage for entity state persistence. This guide covers the available implementations and how to create custom stores.
Overview¶
The Orchestrator uses a StateStore to persist entity state between events. This enables:
- Stateless API: The API server doesn't hold state in memory
- Distributed processing: Multiple workers can process events for the same entity
- Crash recovery: State survives process restarts
- Audit trail: Track state changes over time
Available Implementations¶
InMemoryStateStore¶
Simple in-memory store for testing and development. State is lost when the process exits.
from pystator import InMemoryStateStore, Orchestrator
store = InMemoryStateStore()
orchestrator = Orchestrator(
machine=machine,
state_store=store,
guards=guards,
actions=actions,
)
SQLiteStateStore¶
SQLite-backed store for development and small deployments. Defaults to sqlite:///<cwd>/pystator.db if no connection string is provided.
from pystator import SQLiteStateStore, Orchestrator
store = SQLiteStateStore("sqlite:///pystator.db")
store.connect() # auto_initialize=True by default
# Or use defaults (resolves from env/config/cwd)
store = SQLiteStateStore()
store.connect()
PostgresStateStore¶
PostgreSQL-backed store for production deployments. Connection string is required.
from pystator import PostgresStateStore, Orchestrator
store = PostgresStateStore("postgresql://user:pass@localhost/pystator")
store.connect(auto_initialize=True)
Options¶
store = PostgresStateStore("postgresql://user:pass@localhost/pystator")
store.connect(validate_schema=True, auto_initialize=False)
MongoDBStateStore¶
MongoDB-backed store using native pymongo. Install with pip install pystator[api] (includes pymongo).
from pystator import MongoDBStateStore
store = MongoDBStateStore("mongodb://localhost:27017", database_name="pystator")
store.connect()
RedisStateStore¶
High-throughput Redis-backed store for scenarios where database writes are a bottleneck.
from pystator import RedisStateStore
# connection_string optional if configured via env/config
store = RedisStateStore("redis://localhost:6379/0")
store.connect()
Options¶
store = RedisStateStore(
"redis://localhost:6379/0",
key_prefix="pystator", # Key prefix for entity keys
)
store.connect()
Key Structure¶
pystator:entity:{entity_id}
- state: "open"
- context: "{...json...}"
- machine_name: "order_management"
Database Setup¶
Before using PostgresStateStore (or SQLiteStateStore with auto_initialize=False), initialize the database:
# Initialize with SQLite (default)
pystator db init
# Initialize with PostgreSQL
pystator db init postgresql://user:pass@localhost/pystator
# Apply migrations
pystator db upgrade
# Seed example machines
pystator db seed
Initialization behavior: pystator db init uses Alembic migrations as the single source of truth (same pattern as PyCharter). Do not rely on a create_all fallback; ensure migration files exist under db/migrations/versions/. If no migrations are found, init will fail with a clear message.
Discovery (inferred FSM) SQL schema: When discovery is configured with a SQL backend (sqlite or postgresql) and PYSTATOR_DISCOVERY_DATABASE_URL is set, pystator db init / pystator db upgrade run the same idempotent discovery DDL as the discovery store (tables such as discovery_observations). That URL may match PYSTATOR_DATABASE_URL or point at a separate database. For SQLite dev setups, the API’s automatic migration path also applies discovery schema after Alembic succeeds. Non-SQL discovery backends (memory, mongodb, redis) skip this step.
Discovery machine journey: Discovery now supports selecting an existing machine name or typing a new ephemeral discovery machine name, choosing inference vs manual build mode, and optionally scoping inference to selected entity IDs. In manual mode, explicit edge_selection includes chosen pairs even when they do not meet confidence/count thresholds. Candidate detail responses prefer per-build edge snapshots (metadata.inferred_edges_snapshot) to avoid ambiguity with machine-wide edge diagnostics.
Truncate: pystator db truncate clears all PyStator tables in dependency order: worker_events, transition_history, entity_states, and machines. Use with care (prompts for confirmation unless --force).
Custom State Stores¶
Implement the StateStore protocol for sync stores:
from pystator import StateStore
class MyCustomStateStore:
"""Custom state store implementation."""
def get_state(self, entity_id: str) -> str | None:
"""Return current state or None if entity doesn't exist."""
...
def set_state(
self,
entity_id: str,
state: str,
*,
metadata: dict | None = None,
) -> None:
"""Persist the new state."""
...
def get_context(self, entity_id: str) -> dict:
"""Return stored context or empty dict."""
return {}
For async stores, implement AsyncStateStore:
from pystator import AsyncStateStore
class MyAsyncStateStore:
async def aget_state(self, entity_id: str) -> str | None:
...
async def aset_state(
self,
entity_id: str,
state: str,
*,
metadata: dict | None = None,
) -> None:
...
async def aget_context(self, entity_id: str) -> dict:
return {}
Multi-Machine Setup¶
When running multiple FSMs, use machine_name to partition state:
# Order FSM store — both share the same DB, machine_name differentiates
order_store = SQLiteStateStore("sqlite:///pystator.db")
order_store.connect()
session_store = SQLiteStateStore("sqlite:///pystator.db")
session_store.connect()
Context Persistence¶
Context is persisted alongside state via the metadata kwarg:
store = SQLiteStateStore("sqlite:///pystator.db")
store.connect()
# Context will be saved with each state change
orchestrator.process_event(
"order-123",
"fill",
context={"fill_qty": 100, "price": 50.25},
)
# Context is loaded with state
ctx = store.get_context("order-123")
# {"fill_qty": 100, "price": 50.25}
Transition History¶
The TransitionHistoryModel automatically records all transitions when using the entities API:
# Via API
POST /api/v1/entities/order-123/events
{
"trigger": "fill",
"context": {"fill_qty": 100}
}
# Query history
GET /api/v1/entities/order-123/history
History includes: - Source and target states - Trigger name - Success/failure - Error messages - Actions executed - Timestamp and duration
Performance Considerations¶
| Store | Read | Write | Consistency | Use Case |
|---|---|---|---|---|
| InMemory | O(1) | O(1) | N/A | Testing |
| SQLite | Fast | Fast | ACID | Development, small deployments |
| PostgreSQL | Fast | Medium | ACID | Production |
| MongoDB | Fast | Fast | Configurable | Document-oriented workflows |
| Redis | O(1) | O(1) | Eventual | High-throughput |
For high-throughput scenarios: 1. Use RedisStateStore for hot paths 2. Async-replicate to PostgreSQL for durability 3. Use connection pooling for database stores
Next Steps¶
- Deployment Guide - Kubernetes and production patterns
- Configuration Guide - Environment variables and config files
- API Reference - Full API documentation