Skip to content

Policy Engine

Implement governance, compliance, and access control with StrataRouter's policy engine.

Overview

The policy engine provides declarative policy management for routing decisions, enabling:

  • Access control and authentication
  • Cost limits and budgets
  • Compliance requirements
  • Governance workflows
  • Audit trails

Policy Structure

Basic Policy

from stratarouter.policy import Policy, PolicyEngine

policy = Policy(
    name="billing_access",
    description="Control access to billing routes",
    rules=[
        {
            "condition": "route.id == 'billing'",
            "require": ["authenticated", "billing_permission"],
            "deny_reason": "Billing access requires authentication"
        }
    ]
)

Policy Engine

engine = PolicyEngine()
engine.add_policy(policy)

# Evaluate policy
context = {
    "user_id": "user-123",
    "authenticated": True,
    "permissions": ["billing_permission"]
}

decision = engine.evaluate(route="billing", context=context)
if decision.allowed:
    # Execute route
    pass
else:
    # Return denial reason
    print(decision.reason)

Policy Types

1. Access Control

access_policy = Policy(
    name="rbac",
    rules=[
        {
            "condition": "route.category == 'admin'",
            "require": ["role:admin"],
            "deny_reason": "Admin access required"
        },
        {
            "condition": "route.category == 'financial'",
            "require": ["role:finance", "mfa_verified"],
            "deny_reason": "Finance role and MFA required"
        }
    ]
)

2. Cost Control

cost_policy = Policy(
    name="budget_limits",
    rules=[
        {
            "condition": "user.monthly_spend > 1000",
            "action": "deny",
            "deny_reason": "Monthly budget exceeded"
        },
        {
            "condition": "route.cost_per_request > 0.10",
            "action": "approve_required",
            "approver": "finance_team"
        }
    ]
)

3. Compliance

compliance_policy = Policy(
    name="gdpr_compliance",
    rules=[
        {
            "condition": "data.contains_pii == true",
            "require": ["gdpr_consent", "encryption_enabled"],
            "deny_reason": "GDPR compliance requirements not met"
        },
        {
            "condition": "user.region == 'EU'",
            "enforce": ["data_residency_eu", "audit_logging"],
        }
    ]
)

4. Governance

governance_policy = Policy(
    name="production_changes",
    rules=[
        {
            "condition": "environment == 'production'",
            "require": ["code_review", "security_scan", "approval:2"],
            "deny_reason": "Production changes require approvals"
        }
    ]
)

Policy Language

Conditions

Policies support rich conditional logic:

# Equality
"route.id == 'billing'"

# Comparison
"user.monthly_spend > 1000"

# Logical operators
"authenticated == true AND role == 'admin'"

# List membership
"'billing_permission' in user.permissions"

# Pattern matching
"route.id matches 'finance_.*'"

# Complex expressions
"(user.role == 'admin' OR user.role == 'owner') AND environment != 'production'"

Actions

Available policy actions:

  • allow - Explicitly allow
  • deny - Explicitly deny
  • approve_required - Require manual approval
  • log_only - Log violation but allow
  • rate_limit - Apply rate limiting
  • redirect - Redirect to different route

Implementation Examples

Complete Access Control

from stratarouter.policy import PolicyEngine, Policy

class RouterWithPolicies:
    def __init__(self, router, policy_engine):
        self.router = router
        self.policies = policy_engine

    def route_with_policy(self, query, embedding, context):
        """Route with policy enforcement."""
        # Get routing decision
        result = self.router.route(query, embedding)

        # Evaluate policies
        policy_decision = self.policies.evaluate(
            route=result.route_id,
            context=context
        )

        if not policy_decision.allowed:
            return {
                "status": "denied",
                "reason": policy_decision.reason,
                "required": policy_decision.requirements
            }

        return {
            "status": "allowed",
            "route": result.route_id,
            "confidence": result.confidence
        }

# Setup
engine = PolicyEngine()
engine.add_policies([
    access_policy,
    cost_policy,
    compliance_policy
])

router_with_policies = RouterWithPolicies(router, engine)

# Use
context = {
    "user_id": "user-123",
    "authenticated": True,
    "permissions": ["billing_permission"],
    "monthly_spend": 500.00
}

result = router_with_policies.route_with_policy(
    query="Show my invoice",
    embedding=embedding,
    context=context
)

Budget Tracking

class BudgetPolicy:
    def __init__(self, monthly_limit: float):
        self.monthly_limit = monthly_limit
        self.usage = {}  # user_id -> spend

    def evaluate(self, user_id: str, cost: float) -> dict:
        """Evaluate budget policy."""
        current_spend = self.usage.get(user_id, 0)

        if current_spend + cost > self.monthly_limit:
            return {
                "allowed": False,
                "reason": f"Budget limit ${self.monthly_limit} exceeded",
                "current_spend": current_spend,
                "remaining": self.monthly_limit - current_spend
            }

        return {"allowed": True}

    def record_usage(self, user_id: str, cost: float):
        """Record usage for tracking."""
        self.usage[user_id] = self.usage.get(user_id, 0) + cost

# Usage
budget = BudgetPolicy(monthly_limit=1000.00)

decision = budget.evaluate(user_id="user-123", cost=10.50)
if decision["allowed"]:
    # Execute
    budget.record_usage("user-123", 10.50)
else:
    # Deny
    print(decision["reason"])

Approval Workflows

class ApprovalPolicy:
    def __init__(self):
        self.pending = {}
        self.approved = set()

    def requires_approval(self, route_id: str, context: dict) -> bool:
        """Check if route requires approval."""
        if context.get("environment") == "production":
            if route_id not in self.approved:
                return True
        return False

    def request_approval(self, request_id: str, route_id: str, requester: str):
        """Request approval for route execution."""
        self.pending[request_id] = {
            "route": route_id,
            "requester": requester,
            "timestamp": time.time(),
            "approvers": []
        }

        # Notify approvers
        self.notify_approvers(request_id)

    def approve(self, request_id: str, approver: str):
        """Approve pending request."""
        if request_id in self.pending:
            self.pending[request_id]["approvers"].append(approver)

            # Check if sufficient approvals
            if len(self.pending[request_id]["approvers"]) >= 2:
                route_id = self.pending[request_id]["route"]
                self.approved.add(route_id)
                del self.pending[request_id]
                return {"status": "approved"}

        return {"status": "pending"}

Audit Logging

Enable Audit Trail

from stratarouter.policy import AuditLogger

logger = AuditLogger(
    backend="postgresql",
    connection_string="postgresql://localhost/audit"
)

# Log policy decisions
logger.log(
    event="policy_evaluation",
    user_id="user-123",
    route_id="billing",
    decision="allowed",
    policies_evaluated=["access_policy", "cost_policy"],
    context={...},
    timestamp=time.time()
)

# Query audit log
events = logger.query(
    user_id="user-123",
    start_date="2026-01-01",
    end_date="2026-01-31"
)

Immutable Audit Log

# Use blockchain-based audit log (Enterprise)
from stratarouter.enterprise.audit import BlockchainAuditLog

audit_log = BlockchainAuditLog(
    chain="ethereum",
    contract_address="0x..."
)

# Log events (immutable)
audit_log.log_event({
    "event": "high_value_transaction",
    "user_id": "user-123",
    "amount": 10000.00,
    "timestamp": time.time()
})

# Verify integrity
is_valid = audit_log.verify_chain()

Policy Templates

HIPAA Compliance

hipaa_policy = Policy(
    name="hipaa_compliance",
    rules=[
        {
            "condition": "data.contains_phi == true",
            "require": [
                "encryption_at_rest",
                "encryption_in_transit",
                "audit_logging",
                "access_controls"
            ],
            "deny_reason": "HIPAA requirements not met"
        },
        {
            "condition": "user.role != 'healthcare_provider'",
            "action": "deny",
            "deny_reason": "PHI access restricted to healthcare providers"
        }
    ]
)

GDPR Compliance

gdpr_policy = Policy(
    name="gdpr_compliance",
    rules=[
        {
            "condition": "data.contains_pii == true",
            "require": ["explicit_consent", "purpose_limitation"],
            "deny_reason": "GDPR consent required for PII"
        },
        {
            "condition": "user.region == 'EU'",
            "enforce": [
                "data_residency_eu",
                "right_to_erasure",
                "data_portability"
            ]
        }
    ]
)

SOC 2 Compliance

soc2_policy = Policy(
    name="soc2_compliance",
    rules=[
        {
            "condition": "operation.type == 'admin'",
            "require": [
                "mfa_enabled",
                "session_timeout",
                "audit_logging",
                "change_approval"
            ]
        },
        {
            "condition": "data.classification == 'confidential'",
            "enforce": [
                "encryption",
                "access_controls",
                "data_retention_policy"
            ]
        }
    ]
)

Best Practices

Policy Design

  1. Start simple - Begin with basic access control
  2. Layer policies - Combine multiple policies for defense-in-depth
  3. Test thoroughly - Verify policies in staging before production
  4. Monitor violations - Track and alert on policy violations
  5. Regular audits - Review and update policies quarterly

Performance

  • Cache policy decisions - Reduce evaluation overhead
  • Optimize conditions - Place fastest checks first
  • Batch evaluations - Evaluate multiple policies together
  • Async evaluation - Don't block on audit logging

Security

  • Principle of least privilege - Deny by default
  • Defense in depth - Multiple policy layers
  • Separation of duties - Require multiple approvals
  • Audit everything - Comprehensive logging

Troubleshooting

Policy Not Enforced

Issue: Policy rules not being applied.

Solutions: - Verify policy is registered with engine - Check condition syntax - Enable debug logging - Test condition in isolation

Performance Impact

Issue: Policy evaluation slowing down routing.

Solutions: - Cache policy decisions (5-10s TTL) - Optimize condition logic - Use async audit logging - Profile policy evaluation

Next Steps


Enforce governance with confidence. 🛡️