State Management¶
Persistent execution state with automatic recovery.
Overview¶
State management provides:
- Automatic checkpointing at each step
- Crash recovery - Resume from last checkpoint
- Audit trails - Full execution history
- ACID guarantees - Transactional updates
Quick Start¶
from stratarouter_runtime import StateManager
state = StateManager(
backend="postgresql",
db_url="postgresql://localhost/stratarouter"
)
# Save state
await state.save(
execution_id="exec-123",
step=5,
state_data={"status": "processing", "progress": 50}
)
# Load state
state_data = await state.load("exec-123")
Configuration¶
state = StateManager(
# Backend
backend="postgresql", # "postgresql" or "memory"
db_url="postgresql://localhost/stratarouter",
# Checkpointing
checkpoint_interval=10, # Steps between checkpoints
checkpoint_retention=100, # Max checkpoints to keep
# Recovery
enable_auto_recovery=True,
recovery_timeout=300,
# Optimization
enable_compression=True,
batch_writes=True
)
Checkpointing¶
Automatic Checkpoints¶
async def execute_workflow(workflow):
execution_id = generate_id()
for step_num, step in enumerate(workflow.steps):
# Execute step
result = await execute_step(step)
# Auto-checkpoint every 10 steps
if step_num % 10 == 0:
await state.checkpoint(
execution_id=execution_id,
step=step_num,
state_data={
"current_step": step_num,
"results": result,
"status": "in_progress"
}
)
Manual Checkpoints¶
# Checkpoint at critical points
await state.checkpoint(execution_id, step=0, state_data=initial_state)
# Execute critical operation
result = await critical_operation()
# Checkpoint after success
await state.checkpoint(execution_id, step=1, state_data=updated_state)
Crash Recovery¶
Automatic Recovery¶
# On restart, automatically resume
state_manager = StateManager(enable_auto_recovery=True)
# Finds incomplete executions and resumes
await state_manager.recover_all()
Manual Recovery¶
# Get execution state
execution = await state.get_execution("exec-123")
if execution.status == "failed":
# Resume from last checkpoint
await resume_execution(
execution_id="exec-123",
from_step=execution.last_checkpoint_step
)
Audit Trails¶
Full History¶
# Get complete execution history
history = await state.get_history("exec-123")
for checkpoint in history:
print(f"Step {checkpoint.step}: {checkpoint.state_data}")
print(f"Timestamp: {checkpoint.timestamp}")
Query History¶
# Find executions by criteria
executions = await state.query(
status="completed",
start_time=datetime(2026, 1, 1),
end_time=datetime(2026, 1, 31)
)
Transaction Support¶
ACID Guarantees¶
async with state.transaction() as txn:
# All or nothing
await txn.save("exec-1", step=1, data={...})
await txn.save("exec-2", step=1, data={...})
# Automatically committed
Rollback¶
async with state.transaction() as txn:
await txn.save("exec-123", step=5, data={...})
if error:
await txn.rollback()
State Snapshots¶
Create Snapshot¶
# Save complete state snapshot
await state.create_snapshot(
execution_id="exec-123",
snapshot_name="before_critical_step"
)
Restore Snapshot¶
# Restore from snapshot
await state.restore_snapshot(
execution_id="exec-123",
snapshot_name="before_critical_step"
)
Performance Optimization¶
Batch Writes¶
# Buffer writes and flush periodically
state = StateManager(
batch_writes=True,
batch_size=100,
flush_interval_ms=1000
)
Compression¶
Partitioning¶
# Partition by date for better performance
state = StateManager(
enable_partitioning=True,
partition_by="month"
)
Monitoring¶
metrics = await state.get_metrics()
print(f"Total executions: {metrics.total_executions}")
print(f"Active executions: {metrics.active_executions}")
print(f"Checkpoints: {metrics.total_checkpoints}")
print(f"Storage used: {metrics.storage_mb}MB")
print(f"Avg checkpoint time: {metrics.avg_checkpoint_ms}ms")
Best Practices¶
1. Checkpoint at Key Points¶
# Checkpoint before expensive operations
await state.checkpoint(exec_id, step, data)
expensive_result = await expensive_operation()
await state.checkpoint(exec_id, step+1, {**data, "result": expensive_result})
2. Clean Up Old State¶
# Delete completed executions older than 30 days
await state.cleanup(
older_than_days=30,
status="completed"
)
3. Use Compression for Large State¶
4. Monitor State Growth¶
# Alert if state storage grows too large
if metrics.storage_mb > 10_000:
alert("State storage exceeds 10GB")