Skip to content

Consensus Engine

Overview

The Consensus Engine provides distributed multi-agent decision making with complete audit trails. It implements a Raft-based consensus protocol suitable for enterprise AI systems.

Features

  • Quorum-Based Voting: Configurable approval thresholds
  • Byzantine Fault Tolerance: Resilient to malicious agents
  • Complete Audit Trail: Every decision is recorded
  • Agent Authorization: Only registered agents can participate
  • Proposal Expiration: Time-bound decision making

Quick Start

use stratarouter_enterprise::consensus::{ConsensusEngine, QuorumConfig};

// Create engine
let config = QuorumConfig {
    min_quorum: 3,
    approval_threshold: 0.66,  // 2/3 majority
    timeout_ms: 5000,
    max_retries: 3,
    require_unanimous: false,
};

let engine = ConsensusEngine::new(config);

Core Concepts

Proposals

A proposal represents an action that requires consensus approval.

use stratarouter_enterprise::consensus::{Proposal, ActionType};

let proposal = Proposal {
    id: uuid::Uuid::new_v4(),
    action_type: ActionType::FinancialTransaction,
    payload: serde_json::json!({
        "from": "account-A",
        "to": "account-B",
        "amount": 50000
    }),
    proposer: "agent-1".to_string(),
    required_confidence: 0.9,
    created_at: SystemTime::now(),
    expires_at: SystemTime::now() + Duration::from_secs(300),
};

Voting

Agents cast votes on proposals:

use stratarouter_enterprise::consensus::{Vote, VoteDecision};

let vote = Vote {
    agent_id: "agent-2".to_string(),
    decision: VoteDecision::Approve,
    confidence: 0.95,
    justification: Some("Transaction verified".to_string()),
    voted_at: SystemTime::now(),
};

engine.vote(decision_id, vote).await?;

Complete Example

use stratarouter_enterprise::consensus::*;
use std::time::{Duration, SystemTime};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // 1. Create engine
    let engine = ConsensusEngine::new(QuorumConfig {
        min_quorum: 3,
        approval_threshold: 0.66,
        ..Default::default()
    });

    // 2. Register agents
    for i in 1..=5 {
        engine.register_agent(format!("agent-{}", i)).await?;
    }

    // 3. Create proposal
    let proposal = Proposal {
        id: uuid::Uuid::new_v4(),
        action_type: ActionType::FinancialTransaction,
        payload: serde_json::json!({"amount": 50000}),
        proposer: "agent-1".to_string(),
        required_confidence: 0.9,
        created_at: SystemTime::now(),
        expires_at: SystemTime::now() + Duration::from_secs(300),
    };

    // 4. Submit for consensus
    let decision_id = engine.propose(proposal).await?;

    // 5. Agents vote
    for i in 1..=4 {
        let vote = Vote {
            agent_id: format!("agent-{}", i),
            decision: if i <= 3 { VoteDecision::Approve } else { VoteDecision::Reject },
            confidence: 0.95,
            justification: None,
            voted_at: SystemTime::now(),
        };
        engine.vote(decision_id, vote).await?;
    }

    // 6. Get result
    let decision = engine.get_decision(decision_id).await?;

    match decision.status {
        DecisionStatus::Approved => println!("✅ Approved"),
        DecisionStatus::Rejected => println!("❌ Rejected"),
        _ => println!("⏳ Pending"),
    }

    // 7. Review audit trail
    println!("Audit events: {}", decision.audit_trail.len());

    Ok(())
}

Configuration

Quorum Config

pub struct QuorumConfig {
    /// Minimum number of votes required
    pub min_quorum: usize,

    /// Timeout in milliseconds
    pub timeout_ms: u64,

    /// Maximum retry attempts
    pub max_retries: u32,

    /// Required approval percentage (0.0 - 1.0)
    pub approval_threshold: f64,

    /// Whether to require unanimous approval
    pub require_unanimous: bool,
}

Action Types

pub enum ActionType {
    FinancialTransaction,  // Financial operations
    PolicyChange,          // Policy modifications
    AgentDeployment,       // Agent deployment
    SystemConfiguration,   // System config changes
    DataModification,      // Data modifications
    SecurityOperation,     // Security operations
}

Decision Status

pub enum DecisionStatus {
    Pending,    // Awaiting votes
    Approved,   // Approved by consensus
    Rejected,   // Rejected by consensus
    Expired,    // Expired before consensus
    Cancelled,  // Cancelled
}

Best Practices

1. Set Appropriate Thresholds

// For critical operations
let config = QuorumConfig {
    min_quorum: 5,
    approval_threshold: 0.8,  // 80% approval
    require_unanimous: false,
    ..Default::default()
};

// For routine operations
let config = QuorumConfig {
    min_quorum: 3,
    approval_threshold: 0.66,  // 2/3 majority
    ..Default::default()
};

2. Use Meaningful Justifications

let vote = Vote {
    agent_id: "risk-analyzer".to_string(),
    decision: VoteDecision::Approve,
    confidence: 0.92,
    justification: Some(
        "Risk score: 0.12 (low), fraud probability: 0.03%, \
         account verified, transaction within limits".to_string()
    ),
    voted_at: SystemTime::now(),
};

3. Handle Expiration

let proposal = Proposal {
    // ... other fields ...
    expires_at: SystemTime::now() + Duration::from_secs(300), // 5 minutes
};

// Check if expired
let decision = engine.get_decision(decision_id).await?;
if decision.status == DecisionStatus::Expired {
    // Handle expiration
}

4. Audit Trail Analysis

let decision = engine.get_decision(decision_id).await?;

for event in &decision.audit_trail {
    match event.event_type {
        AuditEventType::ProposalCreated => {
            println!("Proposed by: {}", event.agent_id);
        },
        AuditEventType::VoteCast => {
            println!("Vote from: {}", event.agent_id);
        },
        AuditEventType::DecisionFinalized => {
            println!("Finalized at: {:?}", event.timestamp);
        },
        _ => {}
    }
}

Performance

Operation Latency Notes
Agent Registration < 0.1ms In-memory operation
Proposal Creation < 1ms Includes validation
Vote Processing < 0.5ms Includes quorum check
Decision Retrieval < 0.1ms Hash map lookup
Audit Trail < 0.1ms Included in decision

Throughput: 10,000+ decisions per second on modern hardware

Error Handling

use stratarouter_enterprise::error::EnterpriseError;

match engine.vote(decision_id, vote).await {
    Ok(()) => println!("Vote recorded"),
    Err(EnterpriseError::UnauthorizedAgent(agent)) => {
        println!("Agent {} not registered", agent);
    },
    Err(EnterpriseError::DecisionNotFound(id)) => {
        println!("Decision {:?} not found", id);
    },
    Err(EnterpriseError::ProposalExpired(id)) => {
        println!("Proposal {:?} expired", id);
    },
    Err(e) => println!("Error: {}", e),
}

Security Considerations

1. Agent Authorization

Only registered agents can vote:

// Register before voting
engine.register_agent("agent-1".to_string()).await?;

// Unauthorized vote will fail
let vote = Vote {
    agent_id: "unregistered-agent".to_string(),
    // ... other fields ...
};

// Returns: EnterpriseError::UnauthorizedAgent
engine.vote(decision_id, vote).await?;

2. Confidence Thresholds

Require high confidence for critical operations:

let proposal = Proposal {
    action_type: ActionType::FinancialTransaction,
    required_confidence: 0.95,  // 95% confidence required
    // ... other fields ...
};

3. Audit Trail Integrity

The audit trail is append-only and immutable:

let decision = engine.get_decision(decision_id).await?;

// Audit trail contains complete history
assert!(decision.audit_trail.iter().any(|e| 
    matches!(e.event_type, AuditEventType::ProposalCreated)
));

Integration Examples

With Transaction Log

use stratarouter_enterprise::{consensus::*, transaction_log::*};

let consensus = ConsensusEngine::new(QuorumConfig::default());
let tx_log = TransactionLog::new(TransactionLogConfig::default()).await?;

// Propose
let decision_id = consensus.propose(proposal).await?;

// Log proposal
tx_log.append(LogEntry::new(
    "proposal_created",
    serde_json::to_value(&proposal)?
)).await?;

// Vote and log each vote
for vote in votes {
    consensus.vote(decision_id, vote.clone()).await?;
    tx_log.append(LogEntry::new("vote_cast", serde_json::to_value(&vote)?)).await?;
}

With Policy Engine

use stratarouter_enterprise::{consensus::*, policies::*};

let consensus = ConsensusEngine::new(QuorumConfig::default());
let policies = PolicyEngine::new();

// Add policy requiring consensus for high-value transactions
let policy = Policy::new("high-value-consensus")
    .when_action(PolicyAction::FinancialTransaction)
    .max_amount(100000.0);

policies.add_policy(policy).await?;

// Evaluate before proposing
let policy_result = policies.evaluate(&context).await?;

if policy_result.allowed && policy_result.requires_approval {
    // Submit for consensus
    let decision_id = consensus.propose(proposal).await?;
}

Testing

#[cfg(test)]
mod tests {
    use super::*;

    #[tokio::test]
    async fn test_consensus_approval() {
        let engine = ConsensusEngine::new(QuorumConfig {
            min_quorum: 3,
            approval_threshold: 0.66,
            ..Default::default()
        });

        // Register agents
        for i in 1..=4 {
            engine.register_agent(format!("agent-{}", i)).await.unwrap();
        }

        // Create and propose
        let proposal = create_test_proposal();
        let decision_id = engine.propose(proposal).await.unwrap();

        // 3 approvals
        for i in 1..=3 {
            let vote = Vote {
                agent_id: format!("agent-{}", i),
                decision: VoteDecision::Approve,
                confidence: 0.9,
                justification: None,
                voted_at: SystemTime::now(),
            };
            engine.vote(decision_id, vote).await.unwrap();
        }

        let decision = engine.get_decision(decision_id).await.unwrap();
        assert_eq!(decision.status, DecisionStatus::Approved);
    }
}

Next Steps