Skip to content

Core-Runtime Bridge

The Core-Runtime Bridge is the critical integration layer that connects StrataRouter Core's routing decisions to the Runtime's execution engine. It's the production-grade glue that makes semantic routing actually work in real systems.

Why the Bridge Exists

StrataRouter Core is blazing fast at making routing decisions (<1ms), but production systems need more:

  • Validation - Ensure routing decisions are safe to execute
  • Context Enrichment - Add user context, metadata, policies
  • Execution Planning - Translate routes into executable actions
  • Feedback Loop - Learn from execution outcomes to improve routing
  • Observability - Track the full journey from route to result

The Bridge handles all of this while adding minimal latency (<0.5ms).

Architecture

graph LR
    A[Core Router] --> B[Bridge]
    B --> C[Executor]
    C --> D[Provider]
    D --> E[LLM/Agent]
    E --> F[Result]
    F --> G[Feedback]
    G --> A

    H[Policy Engine] -.-> B
    I[Metrics] -.-> B
    J[State Manager] -.-> B

Components

1. Routing Decision Translator

Converts Core's RouteResult into Runtime's ExecutionPlan.

Input (from Core):

pub struct RouteResult {
    pub route_id: String,
    pub confidence: f32,
    pub scores: HashMap<String, f32>,
    pub alternatives: Vec<(String, f32)>,
    pub latency_ms: f32,
}

Output (for Runtime):

pub struct ExecutionPlan {
    pub target: ExecutionTarget,
    pub method: ExecutionMethod,
    pub context: ExecutionContext,
    pub policies: Vec<Policy>,
    pub fallback: Option<FallbackStrategy>,
}

pub enum ExecutionTarget {
    Provider { name: String, model: String },
    Agent { id: String, config: AgentConfig },
    Workflow { id: String, steps: Vec<Step> },
}

Translation Logic:

def translate_routing_decision(
    route_result: RouteResult,
    user_context: Dict
) -> ExecutionPlan:
    # Resolve route to execution target
    target = resolve_target(route_result.route_id)

    # Determine execution method
    method = determine_method(
        confidence=route_result.confidence,
        alternatives=route_result.alternatives
    )

    # Build execution context
    context = ExecutionContext(
        user_id=user_context["user_id"],
        session_id=user_context["session_id"],
        metadata={
            "confidence": route_result.confidence,
            "route_scores": route_result.scores,
        }
    )

    # Apply policies
    policies = load_policies(user_context["tenant_id"])

    # Configure fallback
    fallback = configure_fallback(route_result.alternatives)

    return ExecutionPlan(
        target=target,
        method=method,
        context=context,
        policies=policies,
        fallback=fallback
    )

2. Validation Layer

Validates routing decisions before execution to prevent:

  • Invalid route configurations
  • Policy violations
  • Resource limit breaches
  • Security issues

Validation Pipeline:

class BridgeValidator:
    async def validate(
        self,
        plan: ExecutionPlan
    ) -> ValidationResult:
        # 1. Route exists and is configured
        await self.validate_target(plan.target)

        # 2. User has permission
        await self.validate_permissions(plan.context)

        # 3. Within resource limits
        await self.validate_limits(plan.context)

        # 4. Policies allow execution
        await self.validate_policies(plan.policies)

        # 5. Provider is available
        await self.validate_provider(plan.target)

        return ValidationResult(valid=True)

Example Validations:

# Route configuration validation
def validate_target(target: ExecutionTarget):
    if isinstance(target, Provider):
        assert target.name in registry.providers
        assert target.model in registry.models[target.name]
        assert target.api_key is not None

# Policy validation
def validate_policies(policies: List[Policy]):
    for policy in policies:
        if policy.type == "content_filter":
            if contains_prohibited_content(input_text):
                raise PolicyViolation(f"Content filter: {policy.name}")

        if policy.type == "rate_limit":
            if exceeded_rate_limit(user_id):
                raise PolicyViolation(f"Rate limit exceeded")

# Resource limit validation
def validate_limits(context: ExecutionContext):
    usage = get_current_usage(context.user_id)

    if usage.tokens_today >= limits.daily_tokens:
        raise LimitExceeded("Daily token limit")

    if usage.concurrent_requests >= limits.max_concurrent:
        raise LimitExceeded("Too many concurrent requests")

3. Context Enrichment

Adds runtime context to execution plans:

class ContextEnricher:
    async def enrich(
        self,
        plan: ExecutionPlan,
        request: Request
    ) -> ExecutionPlan:
        # Add user profile
        user = await self.user_service.get_user(plan.context.user_id)
        plan.context.user_profile = user.profile

        # Add conversation history
        history = await self.history_service.get_history(
            session_id=plan.context.session_id,
            limit=10
        )
        plan.context.conversation_history = history

        # Add tenant config
        tenant = await self.tenant_service.get_tenant(
            plan.context.tenant_id
        )
        plan.context.tenant_config = tenant.config

        # Add cost budget
        budget = await self.billing_service.get_budget(
            user_id=plan.context.user_id
        )
        plan.context.budget_remaining = budget.remaining

        return plan

4. Feedback Loop

Collects execution outcomes and feeds them back to improve routing:

class FeedbackCollector:
    async def collect_feedback(
        self,
        route_decision: RouteResult,
        execution_result: ExecutionResult
    ):
        feedback = RoutingFeedback(
            route_id=route_decision.route_id,
            confidence=route_decision.confidence,
            success=execution_result.success,
            latency=execution_result.latency,
            cost=execution_result.cost,
            user_rating=execution_result.user_rating,
            timestamp=datetime.utcnow()
        )

        # Store feedback
        await self.feedback_store.save(feedback)

        # Update routing statistics
        await self.stats_updater.update(
            route_id=route_decision.route_id,
            feedback=feedback
        )

        # Trigger calibration update if needed
        if should_recalibrate():
            await self.calibrator.update(self.feedback_store.recent())

Feedback Metrics:

@dataclass
class RoutingFeedback:
    route_id: str
    confidence: float

    # Execution outcome
    success: bool
    latency_ms: float
    cost_usd: float

    # User feedback (if available)
    user_rating: Optional[float]  # 1-5 stars
    user_correction: Optional[str]  # Should have routed to X

    # Context
    timestamp: datetime
    metadata: Dict

Usage

Basic Usage

from stratarouter import Router
from stratarouter_runtime import CoreRuntimeBridge, BridgeConfig

# Initialize Core
core = Router(config=...)
core.add_routes(routes)
core.build_index()

# Initialize Bridge
bridge = CoreRuntimeBridge(
    config=BridgeConfig(
        validation_enabled=True,
        feedback_enabled=True,
        metrics_enabled=True
    )
)

# Route and execute
async def handle_request(query: str, user_id: str):
    # Step 1: Core routing decision
    embedding = await get_embedding(query)
    route_result = core.route(query, embedding)

    # Step 2: Bridge translation and validation
    plan = await bridge.translate(
        route_result=route_result,
        user_context={"user_id": user_id}
    )

    # Step 3: Execute
    result = await bridge.execute(plan)

    # Step 4: Collect feedback
    await bridge.collect_feedback(route_result, result)

    return result

Advanced Usage with Fallback

# Configure fallback strategy
bridge = CoreRuntimeBridge(
    config=BridgeConfig(
        fallback_strategy=FallbackStrategy(
            enabled=True,
            confidence_threshold=0.7,  # Fallback if < 0.7
            alternatives_limit=3,      # Try top 3 alternatives
            retry_delay_ms=100,
        )
    )
)

# Execute with automatic fallback
result = await bridge.execute_with_fallback(
    route_result=route_result,
    user_context=context
)

# Result includes fallback information
if result.fallback_used:
    print(f"Primary route failed, used: {result.fallback_route}")

Policy Enforcement

# Define policies
policies = [
    ContentFilterPolicy(
        name="profanity-filter",
        prohibited_terms=["badword1", "badword2"]
    ),
    RateLimitPolicy(
        name="user-rate-limit",
        limit=100,
        window_seconds=60
    ),
    CostLimitPolicy(
        name="daily-budget",
        max_cost_usd=10.0,
        window="daily"
    )
]

# Bridge automatically enforces
bridge = CoreRuntimeBridge(
    config=BridgeConfig(policies=policies)
)

# Execution fails if policy violated
try:
    result = await bridge.execute(plan)
except PolicyViolation as e:
    print(f"Policy violation: {e.policy_name}")

Configuration

Bridge Config

@dataclass
class BridgeConfig:
    # Validation
    validation_enabled: bool = True
    validation_timeout_ms: int = 100

    # Feedback
    feedback_enabled: bool = True
    feedback_batch_size: int = 100
    feedback_flush_interval_sec: int = 60

    # Metrics
    metrics_enabled: bool = True
    metrics_prefix: str = "stratarouter.bridge"

    # Fallback
    fallback_strategy: Optional[FallbackStrategy] = None

    # Policies
    policies: List[Policy] = field(default_factory=list)

    # Cache
    cache_validation_results: bool = True
    cache_ttl_sec: int = 300

    # Timeouts
    translation_timeout_ms: int = 50
    enrichment_timeout_ms: int = 100
    execution_timeout_sec: int = 60

Environment Variables

# Bridge settings
export BRIDGE_VALIDATION_ENABLED=true
export BRIDGE_FEEDBACK_ENABLED=true

# Policies
export BRIDGE_POLICIES_DIR=/etc/stratarouter/policies

# Timeouts
export BRIDGE_TRANSLATION_TIMEOUT_MS=50
export BRIDGE_EXECUTION_TIMEOUT_SEC=60

Performance

Latency Breakdown

Operation Latency Notes
Translation 0.1-0.2ms Route ID → Execution Plan
Validation 0.2-0.3ms Cached after first check
Enrichment 0.1-0.2ms Context lookup
Total Bridge Overhead 0.4-0.7ms Negligible

Throughput

  • Bridge throughput: 50K operations/sec
  • Validation cache hit rate: 95%+
  • Feedback batching: 100 items/batch

Optimization Tips

  1. Enable Validation Caching

    config = BridgeConfig(
        cache_validation_results=True,
        cache_ttl_sec=300  # 5 minutes
    )
    

  2. Batch Feedback

    config = BridgeConfig(
        feedback_batch_size=100,
        feedback_flush_interval_sec=60
    )
    

  3. Async Enrichment

    # Don't block on enrichment
    config = BridgeConfig(
        enrichment_async=True
    )
    

Monitoring

Metrics

# Translation metrics
stratarouter_bridge_translations_total{status}
stratarouter_bridge_translation_duration_seconds

# Validation metrics
stratarouter_bridge_validations_total{status, reason}
stratarouter_bridge_validation_duration_seconds
stratarouter_bridge_validation_cache_hits_total

# Execution metrics
stratarouter_bridge_executions_total{route_id, status}
stratarouter_bridge_execution_duration_seconds{route_id}

# Feedback metrics
stratarouter_bridge_feedback_collected_total
stratarouter_bridge_feedback_batch_size

Tracing

OpenTelemetry spans:

Bridge Execution (50ms)
├─ Translation (0.2ms)
├─ Validation (0.3ms)
│  ├─ Cache Lookup (0.1ms)
│  └─ Policy Check (0.2ms)
├─ Enrichment (0.2ms)
├─ Runtime Execution (45ms)
│  └─ Provider Call (43ms)
└─ Feedback Collection (4ms)
   └─ Batch Write (3ms)

Error Handling

Common Errors

# Validation failures
try:
    await bridge.execute(plan)
except ValidationError as e:
    # Handle validation failure
    if e.reason == "policy_violation":
        return error_response("Content not allowed")
    elif e.reason == "limit_exceeded":
        return error_response("Rate limit exceeded")

# Execution failures
except ExecutionError as e:
    # Handle execution failure
    if e.retryable:
        # Retry with backoff
        await retry_with_backoff(bridge.execute, plan)
    else:
        # Use fallback
        fallback_plan = bridge.get_fallback(plan)
        return await bridge.execute(fallback_plan)

Retry Strategy

@dataclass
class RetryConfig:
    max_attempts: int = 3
    initial_delay_ms: int = 100
    max_delay_ms: int = 5000
    backoff_factor: float = 2.0
    jitter: bool = True

    retryable_errors = [
        "timeout",
        "rate_limit",
        "service_unavailable"
    ]

Best Practices

1. Always Enable Validation

# ✅ Good
bridge = CoreRuntimeBridge(
    config=BridgeConfig(validation_enabled=True)
)

# ❌ Bad (skip validation in production)
bridge = CoreRuntimeBridge(
    config=BridgeConfig(validation_enabled=False)
)

2. Use Feedback Loop

# ✅ Good - Learn from outcomes
await bridge.collect_feedback(route_result, execution_result)

# ❌ Bad - No learning
# (just execute and forget)

3. Configure Fallback

# ✅ Good
config = BridgeConfig(
    fallback_strategy=FallbackStrategy(
        confidence_threshold=0.7,
        alternatives_limit=3
    )
)

# ❌ Bad - No fallback
config = BridgeConfig(fallback_strategy=None)

4. Monitor Bridge Health

# Track key metrics
metrics = await bridge.get_health_metrics()

if metrics.validation_cache_hit_rate < 0.8:
    logger.warning("Low validation cache hit rate")

if metrics.avg_translation_time_ms > 1.0:
    logger.warning("High translation latency")

Troubleshooting

High Latency

Symptom: Bridge overhead > 1ms

Diagnose:

metrics = await bridge.get_detailed_metrics()
print(f"Translation: {metrics.translation_p99_ms}ms")
print(f"Validation: {metrics.validation_p99_ms}ms")
print(f"Enrichment: {metrics.enrichment_p99_ms}ms")

Fix: - Enable validation caching - Reduce enrichment data - Use async enrichment

Low Cache Hit Rate

Symptom: Validation cache hit rate < 80%

Diagnose:

stats = await bridge.get_cache_stats()
print(f"Hit rate: {stats.hit_rate:.1%}")
print(f"Evictions: {stats.evictions}")

Fix: - Increase cache TTL - Increase cache size - Check for high cardinality in cache keys

Next Steps