Skip to content

Execution Engine

Secure, isolated execution for routing decisions.

Overview

The Execution Engine handles safe execution of routing decisions with enterprise-grade reliability.

Features

  • Process Isolation - Sandboxed execution
  • Retry Logic - Exponential backoff
  • Circuit Breakers - Prevent cascading failures
  • Timeout Management - Per-operation limits
  • Resource Limits - Memory and CPU caps

Basic Usage

from stratarouter_runtime import ExecutionEngine, ExecutionConfig

config = ExecutionConfig(
    timeout=60,
    max_retries=3,
    retry_delay_ms=100
)

engine = ExecutionEngine(config)

result = await engine.execute(plan)

Configuration

config = ExecutionConfig(
    # Timeouts
    timeout=60,                    # Max execution time (seconds)
    provider_timeout=30,           # LLM call timeout

    # Retry logic
    max_retries=3,                 # Max retry attempts
    retry_delay_ms=100,            # Initial delay
    retry_backoff=2.0,             # Exponential multiplier
    retry_jitter=0.1,              # Jitter factor

    # Circuit breaker
    circuit_breaker_threshold=5,   # Failures before opening
    circuit_breaker_timeout=60,    # Reset timeout

    # Resource limits
    max_memory_mb=512,             # Memory limit
    max_cpu_time_ms=5000,          # CPU time limit

    # Isolation
    sandbox_enabled=True,          # Enable sandboxing
)

Retry Logic

Exponential backoff with jitter:

async def execute_with_retry(plan):
    for attempt in range(max_retries):
        try:
            result = await execute(plan)
            return result
        except RetryableError as e:
            if attempt == max_retries - 1:
                raise

            delay = retry_delay_ms * (retry_backoff ** attempt)
            delay += random.uniform(0, delay * retry_jitter)
            await asyncio.sleep(delay / 1000)

Circuit Breaker

Prevent cascading failures:

class CircuitBreaker:
    def __init__(self, threshold=5, timeout=60):
        self.threshold = threshold
        self.timeout = timeout
        self.failures = 0
        self.state = "closed"  # closed, open, half_open

    async def call(self, func):
        if self.state == "open":
            if time.time() - self.opened_at > self.timeout:
                self.state = "half_open"
            else:
                raise CircuitBreakerOpenError()

        try:
            result = await func()
            if self.state == "half_open":
                self.state = "closed"
                self.failures = 0
            return result
        except Exception as e:
            self.failures += 1
            if self.failures >= self.threshold:
                self.state = "open"
                self.opened_at = time.time()
            raise

Resource Management

Memory Limits

import resource

def set_memory_limit(mb):
    limit = mb * 1024 * 1024
    resource.setrlimit(resource.RLIMIT_AS, (limit, limit))

set_memory_limit(512)

CPU Time Limits

def set_cpu_limit(ms):
    limit = ms / 1000
    resource.setrlimit(resource.RLIMIT_CPU, (limit, limit))

set_cpu_limit(5000)

Sandboxing

Isolated execution environment:

from stratarouter_runtime import Sandbox

sandbox = Sandbox(
    max_memory_mb=512,
    max_cpu_time_ms=5000,
    network_enabled=True,
    filesystem_access="readonly"
)

result = await sandbox.execute(plan)

Error Handling

from stratarouter_runtime import (
    ExecutionTimeout,
    ResourceExhausted,
    CircuitBreakerOpen
)

try:
    result = await engine.execute(plan)
except ExecutionTimeout:
    logger.error("Execution timed out")
except ResourceExhausted:
    logger.error("Resource limit exceeded")
except CircuitBreakerOpen:
    logger.error("Circuit breaker is open")

Monitoring

metrics = engine.get_metrics()

print(f"Total executions: {metrics.total}")
print(f"Success rate: {metrics.success_rate:.2%}")
print(f"Avg latency: {metrics.avg_latency_ms}ms")
print(f"Retry rate: {metrics.retry_rate:.2%}")
print(f"Circuit breaker state: {metrics.circuit_breaker_state}")

Runtime Index | Architecture