Skip to content

State Management

Persistent execution state with automatic recovery.

Overview

State management provides:

  • Automatic checkpointing at each step
  • Crash recovery - Resume from last checkpoint
  • Audit trails - Full execution history
  • ACID guarantees - Transactional updates

Quick Start

from stratarouter_runtime import StateManager

state = StateManager(
    backend="postgresql",
    db_url="postgresql://localhost/stratarouter"
)

# Save state
await state.save(
    execution_id="exec-123",
    step=5,
    state_data={"status": "processing", "progress": 50}
)

# Load state
state_data = await state.load("exec-123")

Configuration

state = StateManager(
    # Backend
    backend="postgresql",               # "postgresql" or "memory"
    db_url="postgresql://localhost/stratarouter",

    # Checkpointing
    checkpoint_interval=10,             # Steps between checkpoints
    checkpoint_retention=100,           # Max checkpoints to keep

    # Recovery
    enable_auto_recovery=True,
    recovery_timeout=300,

    # Optimization
    enable_compression=True,
    batch_writes=True
)

Checkpointing

Automatic Checkpoints

async def execute_workflow(workflow):
    execution_id = generate_id()

    for step_num, step in enumerate(workflow.steps):
        # Execute step
        result = await execute_step(step)

        # Auto-checkpoint every 10 steps
        if step_num % 10 == 0:
            await state.checkpoint(
                execution_id=execution_id,
                step=step_num,
                state_data={
                    "current_step": step_num,
                    "results": result,
                    "status": "in_progress"
                }
            )

Manual Checkpoints

# Checkpoint at critical points
await state.checkpoint(execution_id, step=0, state_data=initial_state)

# Execute critical operation
result = await critical_operation()

# Checkpoint after success
await state.checkpoint(execution_id, step=1, state_data=updated_state)

Crash Recovery

Automatic Recovery

# On restart, automatically resume
state_manager = StateManager(enable_auto_recovery=True)

# Finds incomplete executions and resumes
await state_manager.recover_all()

Manual Recovery

# Get execution state
execution = await state.get_execution("exec-123")

if execution.status == "failed":
    # Resume from last checkpoint
    await resume_execution(
        execution_id="exec-123",
        from_step=execution.last_checkpoint_step
    )

Audit Trails

Full History

# Get complete execution history
history = await state.get_history("exec-123")

for checkpoint in history:
    print(f"Step {checkpoint.step}: {checkpoint.state_data}")
    print(f"Timestamp: {checkpoint.timestamp}")

Query History

# Find executions by criteria
executions = await state.query(
    status="completed",
    start_time=datetime(2026, 1, 1),
    end_time=datetime(2026, 1, 31)
)

Transaction Support

ACID Guarantees

async with state.transaction() as txn:
    # All or nothing
    await txn.save("exec-1", step=1, data={...})
    await txn.save("exec-2", step=1, data={...})

    # Automatically committed

Rollback

async with state.transaction() as txn:
    await txn.save("exec-123", step=5, data={...})

    if error:
        await txn.rollback()

State Snapshots

Create Snapshot

# Save complete state snapshot
await state.create_snapshot(
    execution_id="exec-123",
    snapshot_name="before_critical_step"
)

Restore Snapshot

# Restore from snapshot
await state.restore_snapshot(
    execution_id="exec-123",
    snapshot_name="before_critical_step"
)

Performance Optimization

Batch Writes

# Buffer writes and flush periodically
state = StateManager(
    batch_writes=True,
    batch_size=100,
    flush_interval_ms=1000
)

Compression

# Compress state data
state = StateManager(
    enable_compression=True,
    compression_level=6
)

Partitioning

# Partition by date for better performance
state = StateManager(
    enable_partitioning=True,
    partition_by="month"
)

Monitoring

metrics = await state.get_metrics()

print(f"Total executions: {metrics.total_executions}")
print(f"Active executions: {metrics.active_executions}")
print(f"Checkpoints: {metrics.total_checkpoints}")
print(f"Storage used: {metrics.storage_mb}MB")
print(f"Avg checkpoint time: {metrics.avg_checkpoint_ms}ms")

Best Practices

1. Checkpoint at Key Points

# Checkpoint before expensive operations
await state.checkpoint(exec_id, step, data)
expensive_result = await expensive_operation()
await state.checkpoint(exec_id, step+1, {**data, "result": expensive_result})

2. Clean Up Old State

# Delete completed executions older than 30 days
await state.cleanup(
    older_than_days=30,
    status="completed"
)

3. Use Compression for Large State

if state_data_size > 1_000_000:  # > 1MB
    state = StateManager(enable_compression=True)

4. Monitor State Growth

# Alert if state storage grows too large
if metrics.storage_mb > 10_000:
    alert("State storage exceeds 10GB")

Runtime Index | Architecture