Execution Engine¶
Secure, isolated execution for routing decisions.
Overview¶
The Execution Engine handles safe execution of routing decisions with enterprise-grade reliability.
Features¶
- Process Isolation - Sandboxed execution
- Retry Logic - Exponential backoff
- Circuit Breakers - Prevent cascading failures
- Timeout Management - Per-operation limits
- Resource Limits - Memory and CPU caps
Basic Usage¶
from stratarouter_runtime import ExecutionEngine, ExecutionConfig
config = ExecutionConfig(
timeout=60,
max_retries=3,
retry_delay_ms=100
)
engine = ExecutionEngine(config)
result = await engine.execute(plan)
Configuration¶
config = ExecutionConfig(
# Timeouts
timeout=60, # Max execution time (seconds)
provider_timeout=30, # LLM call timeout
# Retry logic
max_retries=3, # Max retry attempts
retry_delay_ms=100, # Initial delay
retry_backoff=2.0, # Exponential multiplier
retry_jitter=0.1, # Jitter factor
# Circuit breaker
circuit_breaker_threshold=5, # Failures before opening
circuit_breaker_timeout=60, # Reset timeout
# Resource limits
max_memory_mb=512, # Memory limit
max_cpu_time_ms=5000, # CPU time limit
# Isolation
sandbox_enabled=True, # Enable sandboxing
)
Retry Logic¶
Exponential backoff with jitter:
async def execute_with_retry(plan):
for attempt in range(max_retries):
try:
result = await execute(plan)
return result
except RetryableError as e:
if attempt == max_retries - 1:
raise
delay = retry_delay_ms * (retry_backoff ** attempt)
delay += random.uniform(0, delay * retry_jitter)
await asyncio.sleep(delay / 1000)
Circuit Breaker¶
Prevent cascading failures:
class CircuitBreaker:
def __init__(self, threshold=5, timeout=60):
self.threshold = threshold
self.timeout = timeout
self.failures = 0
self.state = "closed" # closed, open, half_open
async def call(self, func):
if self.state == "open":
if time.time() - self.opened_at > self.timeout:
self.state = "half_open"
else:
raise CircuitBreakerOpenError()
try:
result = await func()
if self.state == "half_open":
self.state = "closed"
self.failures = 0
return result
except Exception as e:
self.failures += 1
if self.failures >= self.threshold:
self.state = "open"
self.opened_at = time.time()
raise
Resource Management¶
Memory Limits¶
import resource
def set_memory_limit(mb):
limit = mb * 1024 * 1024
resource.setrlimit(resource.RLIMIT_AS, (limit, limit))
set_memory_limit(512)
CPU Time Limits¶
def set_cpu_limit(ms):
limit = ms / 1000
resource.setrlimit(resource.RLIMIT_CPU, (limit, limit))
set_cpu_limit(5000)
Sandboxing¶
Isolated execution environment:
from stratarouter_runtime import Sandbox
sandbox = Sandbox(
max_memory_mb=512,
max_cpu_time_ms=5000,
network_enabled=True,
filesystem_access="readonly"
)
result = await sandbox.execute(plan)
Error Handling¶
from stratarouter_runtime import (
ExecutionTimeout,
ResourceExhausted,
CircuitBreakerOpen
)
try:
result = await engine.execute(plan)
except ExecutionTimeout:
logger.error("Execution timed out")
except ResourceExhausted:
logger.error("Resource limit exceeded")
except CircuitBreakerOpen:
logger.error("Circuit breaker is open")
Monitoring¶
metrics = engine.get_metrics()
print(f"Total executions: {metrics.total}")
print(f"Success rate: {metrics.success_rate:.2%}")
print(f"Avg latency: {metrics.avg_latency_ms}ms")
print(f"Retry rate: {metrics.retry_rate:.2%}")
print(f"Circuit breaker state: {metrics.circuit_breaker_state}")