Policy Engine¶
Implement governance, compliance, and access control with StrataRouter's policy engine.
Overview¶
The policy engine provides declarative policy management for routing decisions, enabling:
- Access control and authentication
- Cost limits and budgets
- Compliance requirements
- Governance workflows
- Audit trails
Policy Structure¶
Basic Policy¶
from stratarouter.policy import Policy, PolicyEngine
policy = Policy(
name="billing_access",
description="Control access to billing routes",
rules=[
{
"condition": "route.id == 'billing'",
"require": ["authenticated", "billing_permission"],
"deny_reason": "Billing access requires authentication"
}
]
)
Policy Engine¶
engine = PolicyEngine()
engine.add_policy(policy)
# Evaluate policy
context = {
"user_id": "user-123",
"authenticated": True,
"permissions": ["billing_permission"]
}
decision = engine.evaluate(route="billing", context=context)
if decision.allowed:
# Execute route
pass
else:
# Return denial reason
print(decision.reason)
Policy Types¶
1. Access Control¶
access_policy = Policy(
name="rbac",
rules=[
{
"condition": "route.category == 'admin'",
"require": ["role:admin"],
"deny_reason": "Admin access required"
},
{
"condition": "route.category == 'financial'",
"require": ["role:finance", "mfa_verified"],
"deny_reason": "Finance role and MFA required"
}
]
)
2. Cost Control¶
cost_policy = Policy(
name="budget_limits",
rules=[
{
"condition": "user.monthly_spend > 1000",
"action": "deny",
"deny_reason": "Monthly budget exceeded"
},
{
"condition": "route.cost_per_request > 0.10",
"action": "approve_required",
"approver": "finance_team"
}
]
)
3. Compliance¶
compliance_policy = Policy(
name="gdpr_compliance",
rules=[
{
"condition": "data.contains_pii == true",
"require": ["gdpr_consent", "encryption_enabled"],
"deny_reason": "GDPR compliance requirements not met"
},
{
"condition": "user.region == 'EU'",
"enforce": ["data_residency_eu", "audit_logging"],
}
]
)
4. Governance¶
governance_policy = Policy(
name="production_changes",
rules=[
{
"condition": "environment == 'production'",
"require": ["code_review", "security_scan", "approval:2"],
"deny_reason": "Production changes require approvals"
}
]
)
Policy Language¶
Conditions¶
Policies support rich conditional logic:
# Equality
"route.id == 'billing'"
# Comparison
"user.monthly_spend > 1000"
# Logical operators
"authenticated == true AND role == 'admin'"
# List membership
"'billing_permission' in user.permissions"
# Pattern matching
"route.id matches 'finance_.*'"
# Complex expressions
"(user.role == 'admin' OR user.role == 'owner') AND environment != 'production'"
Actions¶
Available policy actions:
allow- Explicitly allowdeny- Explicitly denyapprove_required- Require manual approvallog_only- Log violation but allowrate_limit- Apply rate limitingredirect- Redirect to different route
Implementation Examples¶
Complete Access Control¶
from stratarouter.policy import PolicyEngine, Policy
class RouterWithPolicies:
def __init__(self, router, policy_engine):
self.router = router
self.policies = policy_engine
def route_with_policy(self, query, embedding, context):
"""Route with policy enforcement."""
# Get routing decision
result = self.router.route(query, embedding)
# Evaluate policies
policy_decision = self.policies.evaluate(
route=result.route_id,
context=context
)
if not policy_decision.allowed:
return {
"status": "denied",
"reason": policy_decision.reason,
"required": policy_decision.requirements
}
return {
"status": "allowed",
"route": result.route_id,
"confidence": result.confidence
}
# Setup
engine = PolicyEngine()
engine.add_policies([
access_policy,
cost_policy,
compliance_policy
])
router_with_policies = RouterWithPolicies(router, engine)
# Use
context = {
"user_id": "user-123",
"authenticated": True,
"permissions": ["billing_permission"],
"monthly_spend": 500.00
}
result = router_with_policies.route_with_policy(
query="Show my invoice",
embedding=embedding,
context=context
)
Budget Tracking¶
class BudgetPolicy:
def __init__(self, monthly_limit: float):
self.monthly_limit = monthly_limit
self.usage = {} # user_id -> spend
def evaluate(self, user_id: str, cost: float) -> dict:
"""Evaluate budget policy."""
current_spend = self.usage.get(user_id, 0)
if current_spend + cost > self.monthly_limit:
return {
"allowed": False,
"reason": f"Budget limit ${self.monthly_limit} exceeded",
"current_spend": current_spend,
"remaining": self.monthly_limit - current_spend
}
return {"allowed": True}
def record_usage(self, user_id: str, cost: float):
"""Record usage for tracking."""
self.usage[user_id] = self.usage.get(user_id, 0) + cost
# Usage
budget = BudgetPolicy(monthly_limit=1000.00)
decision = budget.evaluate(user_id="user-123", cost=10.50)
if decision["allowed"]:
# Execute
budget.record_usage("user-123", 10.50)
else:
# Deny
print(decision["reason"])
Approval Workflows¶
class ApprovalPolicy:
def __init__(self):
self.pending = {}
self.approved = set()
def requires_approval(self, route_id: str, context: dict) -> bool:
"""Check if route requires approval."""
if context.get("environment") == "production":
if route_id not in self.approved:
return True
return False
def request_approval(self, request_id: str, route_id: str, requester: str):
"""Request approval for route execution."""
self.pending[request_id] = {
"route": route_id,
"requester": requester,
"timestamp": time.time(),
"approvers": []
}
# Notify approvers
self.notify_approvers(request_id)
def approve(self, request_id: str, approver: str):
"""Approve pending request."""
if request_id in self.pending:
self.pending[request_id]["approvers"].append(approver)
# Check if sufficient approvals
if len(self.pending[request_id]["approvers"]) >= 2:
route_id = self.pending[request_id]["route"]
self.approved.add(route_id)
del self.pending[request_id]
return {"status": "approved"}
return {"status": "pending"}
Audit Logging¶
Enable Audit Trail¶
from stratarouter.policy import AuditLogger
logger = AuditLogger(
backend="postgresql",
connection_string="postgresql://localhost/audit"
)
# Log policy decisions
logger.log(
event="policy_evaluation",
user_id="user-123",
route_id="billing",
decision="allowed",
policies_evaluated=["access_policy", "cost_policy"],
context={...},
timestamp=time.time()
)
# Query audit log
events = logger.query(
user_id="user-123",
start_date="2026-01-01",
end_date="2026-01-31"
)
Immutable Audit Log¶
# Use blockchain-based audit log (Enterprise)
from stratarouter.enterprise.audit import BlockchainAuditLog
audit_log = BlockchainAuditLog(
chain="ethereum",
contract_address="0x..."
)
# Log events (immutable)
audit_log.log_event({
"event": "high_value_transaction",
"user_id": "user-123",
"amount": 10000.00,
"timestamp": time.time()
})
# Verify integrity
is_valid = audit_log.verify_chain()
Policy Templates¶
HIPAA Compliance¶
hipaa_policy = Policy(
name="hipaa_compliance",
rules=[
{
"condition": "data.contains_phi == true",
"require": [
"encryption_at_rest",
"encryption_in_transit",
"audit_logging",
"access_controls"
],
"deny_reason": "HIPAA requirements not met"
},
{
"condition": "user.role != 'healthcare_provider'",
"action": "deny",
"deny_reason": "PHI access restricted to healthcare providers"
}
]
)
GDPR Compliance¶
gdpr_policy = Policy(
name="gdpr_compliance",
rules=[
{
"condition": "data.contains_pii == true",
"require": ["explicit_consent", "purpose_limitation"],
"deny_reason": "GDPR consent required for PII"
},
{
"condition": "user.region == 'EU'",
"enforce": [
"data_residency_eu",
"right_to_erasure",
"data_portability"
]
}
]
)
SOC 2 Compliance¶
soc2_policy = Policy(
name="soc2_compliance",
rules=[
{
"condition": "operation.type == 'admin'",
"require": [
"mfa_enabled",
"session_timeout",
"audit_logging",
"change_approval"
]
},
{
"condition": "data.classification == 'confidential'",
"enforce": [
"encryption",
"access_controls",
"data_retention_policy"
]
}
]
)
Best Practices¶
Policy Design¶
- Start simple - Begin with basic access control
- Layer policies - Combine multiple policies for defense-in-depth
- Test thoroughly - Verify policies in staging before production
- Monitor violations - Track and alert on policy violations
- Regular audits - Review and update policies quarterly
Performance¶
- Cache policy decisions - Reduce evaluation overhead
- Optimize conditions - Place fastest checks first
- Batch evaluations - Evaluate multiple policies together
- Async evaluation - Don't block on audit logging
Security¶
- Principle of least privilege - Deny by default
- Defense in depth - Multiple policy layers
- Separation of duties - Require multiple approvals
- Audit everything - Comprehensive logging
Troubleshooting¶
Policy Not Enforced¶
Issue: Policy rules not being applied.
Solutions: - Verify policy is registered with engine - Check condition syntax - Enable debug logging - Test condition in isolation
Performance Impact¶
Issue: Policy evaluation slowing down routing.
Solutions: - Cache policy decisions (5-10s TTL) - Optimize condition logic - Use async audit logging - Profile policy evaluation
Next Steps¶
- Multi-Model - Route to multiple models
- Cost Optimization - Reduce costs
- Enterprise Security - Advanced security
Enforce governance with confidence. 🛡️