Core-Runtime Bridge¶
The Core-Runtime Bridge is the critical integration layer that connects StrataRouter Core's routing decisions to the Runtime's execution engine. It's the production-grade glue that makes semantic routing actually work in real systems.
Why the Bridge Exists¶
StrataRouter Core is blazing fast at making routing decisions (<1ms), but production systems need more:
- ✅ Validation - Ensure routing decisions are safe to execute
- ✅ Context Enrichment - Add user context, metadata, policies
- ✅ Execution Planning - Translate routes into executable actions
- ✅ Feedback Loop - Learn from execution outcomes to improve routing
- ✅ Observability - Track the full journey from route to result
The Bridge handles all of this while adding minimal latency (<0.5ms).
Architecture¶
graph LR
A[Core Router] --> B[Bridge]
B --> C[Executor]
C --> D[Provider]
D --> E[LLM/Agent]
E --> F[Result]
F --> G[Feedback]
G --> A
H[Policy Engine] -.-> B
I[Metrics] -.-> B
J[State Manager] -.-> B
Components¶
1. Routing Decision Translator¶
Converts Core's RouteResult into Runtime's ExecutionPlan.
Input (from Core):
pub struct RouteResult {
pub route_id: String,
pub confidence: f32,
pub scores: HashMap<String, f32>,
pub alternatives: Vec<(String, f32)>,
pub latency_ms: f32,
}
Output (for Runtime):
pub struct ExecutionPlan {
pub target: ExecutionTarget,
pub method: ExecutionMethod,
pub context: ExecutionContext,
pub policies: Vec<Policy>,
pub fallback: Option<FallbackStrategy>,
}
pub enum ExecutionTarget {
Provider { name: String, model: String },
Agent { id: String, config: AgentConfig },
Workflow { id: String, steps: Vec<Step> },
}
Translation Logic:
def translate_routing_decision(
route_result: RouteResult,
user_context: Dict
) -> ExecutionPlan:
# Resolve route to execution target
target = resolve_target(route_result.route_id)
# Determine execution method
method = determine_method(
confidence=route_result.confidence,
alternatives=route_result.alternatives
)
# Build execution context
context = ExecutionContext(
user_id=user_context["user_id"],
session_id=user_context["session_id"],
metadata={
"confidence": route_result.confidence,
"route_scores": route_result.scores,
}
)
# Apply policies
policies = load_policies(user_context["tenant_id"])
# Configure fallback
fallback = configure_fallback(route_result.alternatives)
return ExecutionPlan(
target=target,
method=method,
context=context,
policies=policies,
fallback=fallback
)
2. Validation Layer¶
Validates routing decisions before execution to prevent:
- Invalid route configurations
- Policy violations
- Resource limit breaches
- Security issues
Validation Pipeline:
class BridgeValidator:
async def validate(
self,
plan: ExecutionPlan
) -> ValidationResult:
# 1. Route exists and is configured
await self.validate_target(plan.target)
# 2. User has permission
await self.validate_permissions(plan.context)
# 3. Within resource limits
await self.validate_limits(plan.context)
# 4. Policies allow execution
await self.validate_policies(plan.policies)
# 5. Provider is available
await self.validate_provider(plan.target)
return ValidationResult(valid=True)
Example Validations:
# Route configuration validation
def validate_target(target: ExecutionTarget):
if isinstance(target, Provider):
assert target.name in registry.providers
assert target.model in registry.models[target.name]
assert target.api_key is not None
# Policy validation
def validate_policies(policies: List[Policy]):
for policy in policies:
if policy.type == "content_filter":
if contains_prohibited_content(input_text):
raise PolicyViolation(f"Content filter: {policy.name}")
if policy.type == "rate_limit":
if exceeded_rate_limit(user_id):
raise PolicyViolation(f"Rate limit exceeded")
# Resource limit validation
def validate_limits(context: ExecutionContext):
usage = get_current_usage(context.user_id)
if usage.tokens_today >= limits.daily_tokens:
raise LimitExceeded("Daily token limit")
if usage.concurrent_requests >= limits.max_concurrent:
raise LimitExceeded("Too many concurrent requests")
3. Context Enrichment¶
Adds runtime context to execution plans:
class ContextEnricher:
async def enrich(
self,
plan: ExecutionPlan,
request: Request
) -> ExecutionPlan:
# Add user profile
user = await self.user_service.get_user(plan.context.user_id)
plan.context.user_profile = user.profile
# Add conversation history
history = await self.history_service.get_history(
session_id=plan.context.session_id,
limit=10
)
plan.context.conversation_history = history
# Add tenant config
tenant = await self.tenant_service.get_tenant(
plan.context.tenant_id
)
plan.context.tenant_config = tenant.config
# Add cost budget
budget = await self.billing_service.get_budget(
user_id=plan.context.user_id
)
plan.context.budget_remaining = budget.remaining
return plan
4. Feedback Loop¶
Collects execution outcomes and feeds them back to improve routing:
class FeedbackCollector:
async def collect_feedback(
self,
route_decision: RouteResult,
execution_result: ExecutionResult
):
feedback = RoutingFeedback(
route_id=route_decision.route_id,
confidence=route_decision.confidence,
success=execution_result.success,
latency=execution_result.latency,
cost=execution_result.cost,
user_rating=execution_result.user_rating,
timestamp=datetime.utcnow()
)
# Store feedback
await self.feedback_store.save(feedback)
# Update routing statistics
await self.stats_updater.update(
route_id=route_decision.route_id,
feedback=feedback
)
# Trigger calibration update if needed
if should_recalibrate():
await self.calibrator.update(self.feedback_store.recent())
Feedback Metrics:
@dataclass
class RoutingFeedback:
route_id: str
confidence: float
# Execution outcome
success: bool
latency_ms: float
cost_usd: float
# User feedback (if available)
user_rating: Optional[float] # 1-5 stars
user_correction: Optional[str] # Should have routed to X
# Context
timestamp: datetime
metadata: Dict
Usage¶
Basic Usage¶
from stratarouter import Router
from stratarouter_runtime import CoreRuntimeBridge, BridgeConfig
# Initialize Core
core = Router(config=...)
core.add_routes(routes)
core.build_index()
# Initialize Bridge
bridge = CoreRuntimeBridge(
config=BridgeConfig(
validation_enabled=True,
feedback_enabled=True,
metrics_enabled=True
)
)
# Route and execute
async def handle_request(query: str, user_id: str):
# Step 1: Core routing decision
embedding = await get_embedding(query)
route_result = core.route(query, embedding)
# Step 2: Bridge translation and validation
plan = await bridge.translate(
route_result=route_result,
user_context={"user_id": user_id}
)
# Step 3: Execute
result = await bridge.execute(plan)
# Step 4: Collect feedback
await bridge.collect_feedback(route_result, result)
return result
Advanced Usage with Fallback¶
# Configure fallback strategy
bridge = CoreRuntimeBridge(
config=BridgeConfig(
fallback_strategy=FallbackStrategy(
enabled=True,
confidence_threshold=0.7, # Fallback if < 0.7
alternatives_limit=3, # Try top 3 alternatives
retry_delay_ms=100,
)
)
)
# Execute with automatic fallback
result = await bridge.execute_with_fallback(
route_result=route_result,
user_context=context
)
# Result includes fallback information
if result.fallback_used:
print(f"Primary route failed, used: {result.fallback_route}")
Policy Enforcement¶
# Define policies
policies = [
ContentFilterPolicy(
name="profanity-filter",
prohibited_terms=["badword1", "badword2"]
),
RateLimitPolicy(
name="user-rate-limit",
limit=100,
window_seconds=60
),
CostLimitPolicy(
name="daily-budget",
max_cost_usd=10.0,
window="daily"
)
]
# Bridge automatically enforces
bridge = CoreRuntimeBridge(
config=BridgeConfig(policies=policies)
)
# Execution fails if policy violated
try:
result = await bridge.execute(plan)
except PolicyViolation as e:
print(f"Policy violation: {e.policy_name}")
Configuration¶
Bridge Config¶
@dataclass
class BridgeConfig:
# Validation
validation_enabled: bool = True
validation_timeout_ms: int = 100
# Feedback
feedback_enabled: bool = True
feedback_batch_size: int = 100
feedback_flush_interval_sec: int = 60
# Metrics
metrics_enabled: bool = True
metrics_prefix: str = "stratarouter.bridge"
# Fallback
fallback_strategy: Optional[FallbackStrategy] = None
# Policies
policies: List[Policy] = field(default_factory=list)
# Cache
cache_validation_results: bool = True
cache_ttl_sec: int = 300
# Timeouts
translation_timeout_ms: int = 50
enrichment_timeout_ms: int = 100
execution_timeout_sec: int = 60
Environment Variables¶
# Bridge settings
export BRIDGE_VALIDATION_ENABLED=true
export BRIDGE_FEEDBACK_ENABLED=true
# Policies
export BRIDGE_POLICIES_DIR=/etc/stratarouter/policies
# Timeouts
export BRIDGE_TRANSLATION_TIMEOUT_MS=50
export BRIDGE_EXECUTION_TIMEOUT_SEC=60
Performance¶
Latency Breakdown¶
| Operation | Latency | Notes |
|---|---|---|
| Translation | 0.1-0.2ms | Route ID → Execution Plan |
| Validation | 0.2-0.3ms | Cached after first check |
| Enrichment | 0.1-0.2ms | Context lookup |
| Total Bridge Overhead | 0.4-0.7ms | Negligible |
Throughput¶
- Bridge throughput: 50K operations/sec
- Validation cache hit rate: 95%+
- Feedback batching: 100 items/batch
Optimization Tips¶
-
Enable Validation Caching
-
Batch Feedback
-
Async Enrichment
Monitoring¶
Metrics¶
# Translation metrics
stratarouter_bridge_translations_total{status}
stratarouter_bridge_translation_duration_seconds
# Validation metrics
stratarouter_bridge_validations_total{status, reason}
stratarouter_bridge_validation_duration_seconds
stratarouter_bridge_validation_cache_hits_total
# Execution metrics
stratarouter_bridge_executions_total{route_id, status}
stratarouter_bridge_execution_duration_seconds{route_id}
# Feedback metrics
stratarouter_bridge_feedback_collected_total
stratarouter_bridge_feedback_batch_size
Tracing¶
OpenTelemetry spans:
Bridge Execution (50ms)
├─ Translation (0.2ms)
├─ Validation (0.3ms)
│ ├─ Cache Lookup (0.1ms)
│ └─ Policy Check (0.2ms)
├─ Enrichment (0.2ms)
├─ Runtime Execution (45ms)
│ └─ Provider Call (43ms)
└─ Feedback Collection (4ms)
└─ Batch Write (3ms)
Error Handling¶
Common Errors¶
# Validation failures
try:
await bridge.execute(plan)
except ValidationError as e:
# Handle validation failure
if e.reason == "policy_violation":
return error_response("Content not allowed")
elif e.reason == "limit_exceeded":
return error_response("Rate limit exceeded")
# Execution failures
except ExecutionError as e:
# Handle execution failure
if e.retryable:
# Retry with backoff
await retry_with_backoff(bridge.execute, plan)
else:
# Use fallback
fallback_plan = bridge.get_fallback(plan)
return await bridge.execute(fallback_plan)
Retry Strategy¶
@dataclass
class RetryConfig:
max_attempts: int = 3
initial_delay_ms: int = 100
max_delay_ms: int = 5000
backoff_factor: float = 2.0
jitter: bool = True
retryable_errors = [
"timeout",
"rate_limit",
"service_unavailable"
]
Best Practices¶
1. Always Enable Validation¶
# ✅ Good
bridge = CoreRuntimeBridge(
config=BridgeConfig(validation_enabled=True)
)
# ❌ Bad (skip validation in production)
bridge = CoreRuntimeBridge(
config=BridgeConfig(validation_enabled=False)
)
2. Use Feedback Loop¶
# ✅ Good - Learn from outcomes
await bridge.collect_feedback(route_result, execution_result)
# ❌ Bad - No learning
# (just execute and forget)
3. Configure Fallback¶
# ✅ Good
config = BridgeConfig(
fallback_strategy=FallbackStrategy(
confidence_threshold=0.7,
alternatives_limit=3
)
)
# ❌ Bad - No fallback
config = BridgeConfig(fallback_strategy=None)
4. Monitor Bridge Health¶
# Track key metrics
metrics = await bridge.get_health_metrics()
if metrics.validation_cache_hit_rate < 0.8:
logger.warning("Low validation cache hit rate")
if metrics.avg_translation_time_ms > 1.0:
logger.warning("High translation latency")
Troubleshooting¶
High Latency¶
Symptom: Bridge overhead > 1ms
Diagnose:
metrics = await bridge.get_detailed_metrics()
print(f"Translation: {metrics.translation_p99_ms}ms")
print(f"Validation: {metrics.validation_p99_ms}ms")
print(f"Enrichment: {metrics.enrichment_p99_ms}ms")
Fix: - Enable validation caching - Reduce enrichment data - Use async enrichment
Low Cache Hit Rate¶
Symptom: Validation cache hit rate < 80%
Diagnose:
stats = await bridge.get_cache_stats()
print(f"Hit rate: {stats.hit_rate:.1%}")
print(f"Evictions: {stats.evictions}")
Fix: - Increase cache TTL - Increase cache size - Check for high cardinality in cache keys
Next Steps¶
- Execution Engine - Runtime execution details
- Provider Clients - LLM provider integration
- Caching System - Semantic caching
- API Reference - Complete API docs