Consensus Engine¶
Overview¶
The Consensus Engine provides distributed multi-agent decision making with complete audit trails. It implements a Raft-based consensus protocol suitable for enterprise AI systems.
Features¶
- Quorum-Based Voting: Configurable approval thresholds
- Byzantine Fault Tolerance: Resilient to malicious agents
- Complete Audit Trail: Every decision is recorded
- Agent Authorization: Only registered agents can participate
- Proposal Expiration: Time-bound decision making
Quick Start¶
use stratarouter_enterprise::consensus::{ConsensusEngine, QuorumConfig};
// Create engine
let config = QuorumConfig {
min_quorum: 3,
approval_threshold: 0.66, // 2/3 majority
timeout_ms: 5000,
max_retries: 3,
require_unanimous: false,
};
let engine = ConsensusEngine::new(config);
Core Concepts¶
Proposals¶
A proposal represents an action that requires consensus approval.
use stratarouter_enterprise::consensus::{Proposal, ActionType};
let proposal = Proposal {
id: uuid::Uuid::new_v4(),
action_type: ActionType::FinancialTransaction,
payload: serde_json::json!({
"from": "account-A",
"to": "account-B",
"amount": 50000
}),
proposer: "agent-1".to_string(),
required_confidence: 0.9,
created_at: SystemTime::now(),
expires_at: SystemTime::now() + Duration::from_secs(300),
};
Voting¶
Agents cast votes on proposals:
use stratarouter_enterprise::consensus::{Vote, VoteDecision};
let vote = Vote {
agent_id: "agent-2".to_string(),
decision: VoteDecision::Approve,
confidence: 0.95,
justification: Some("Transaction verified".to_string()),
voted_at: SystemTime::now(),
};
engine.vote(decision_id, vote).await?;
Complete Example¶
use stratarouter_enterprise::consensus::*;
use std::time::{Duration, SystemTime};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// 1. Create engine
let engine = ConsensusEngine::new(QuorumConfig {
min_quorum: 3,
approval_threshold: 0.66,
..Default::default()
});
// 2. Register agents
for i in 1..=5 {
engine.register_agent(format!("agent-{}", i)).await?;
}
// 3. Create proposal
let proposal = Proposal {
id: uuid::Uuid::new_v4(),
action_type: ActionType::FinancialTransaction,
payload: serde_json::json!({"amount": 50000}),
proposer: "agent-1".to_string(),
required_confidence: 0.9,
created_at: SystemTime::now(),
expires_at: SystemTime::now() + Duration::from_secs(300),
};
// 4. Submit for consensus
let decision_id = engine.propose(proposal).await?;
// 5. Agents vote
for i in 1..=4 {
let vote = Vote {
agent_id: format!("agent-{}", i),
decision: if i <= 3 { VoteDecision::Approve } else { VoteDecision::Reject },
confidence: 0.95,
justification: None,
voted_at: SystemTime::now(),
};
engine.vote(decision_id, vote).await?;
}
// 6. Get result
let decision = engine.get_decision(decision_id).await?;
match decision.status {
DecisionStatus::Approved => println!("✅ Approved"),
DecisionStatus::Rejected => println!("❌ Rejected"),
_ => println!("⏳ Pending"),
}
// 7. Review audit trail
println!("Audit events: {}", decision.audit_trail.len());
Ok(())
}
Configuration¶
Quorum Config¶
pub struct QuorumConfig {
/// Minimum number of votes required
pub min_quorum: usize,
/// Timeout in milliseconds
pub timeout_ms: u64,
/// Maximum retry attempts
pub max_retries: u32,
/// Required approval percentage (0.0 - 1.0)
pub approval_threshold: f64,
/// Whether to require unanimous approval
pub require_unanimous: bool,
}
Action Types¶
pub enum ActionType {
FinancialTransaction, // Financial operations
PolicyChange, // Policy modifications
AgentDeployment, // Agent deployment
SystemConfiguration, // System config changes
DataModification, // Data modifications
SecurityOperation, // Security operations
}
Decision Status¶
pub enum DecisionStatus {
Pending, // Awaiting votes
Approved, // Approved by consensus
Rejected, // Rejected by consensus
Expired, // Expired before consensus
Cancelled, // Cancelled
}
Best Practices¶
1. Set Appropriate Thresholds¶
// For critical operations
let config = QuorumConfig {
min_quorum: 5,
approval_threshold: 0.8, // 80% approval
require_unanimous: false,
..Default::default()
};
// For routine operations
let config = QuorumConfig {
min_quorum: 3,
approval_threshold: 0.66, // 2/3 majority
..Default::default()
};
2. Use Meaningful Justifications¶
let vote = Vote {
agent_id: "risk-analyzer".to_string(),
decision: VoteDecision::Approve,
confidence: 0.92,
justification: Some(
"Risk score: 0.12 (low), fraud probability: 0.03%, \
account verified, transaction within limits".to_string()
),
voted_at: SystemTime::now(),
};
3. Handle Expiration¶
let proposal = Proposal {
// ... other fields ...
expires_at: SystemTime::now() + Duration::from_secs(300), // 5 minutes
};
// Check if expired
let decision = engine.get_decision(decision_id).await?;
if decision.status == DecisionStatus::Expired {
// Handle expiration
}
4. Audit Trail Analysis¶
let decision = engine.get_decision(decision_id).await?;
for event in &decision.audit_trail {
match event.event_type {
AuditEventType::ProposalCreated => {
println!("Proposed by: {}", event.agent_id);
},
AuditEventType::VoteCast => {
println!("Vote from: {}", event.agent_id);
},
AuditEventType::DecisionFinalized => {
println!("Finalized at: {:?}", event.timestamp);
},
_ => {}
}
}
Performance¶
| Operation | Latency | Notes |
|---|---|---|
| Agent Registration | < 0.1ms | In-memory operation |
| Proposal Creation | < 1ms | Includes validation |
| Vote Processing | < 0.5ms | Includes quorum check |
| Decision Retrieval | < 0.1ms | Hash map lookup |
| Audit Trail | < 0.1ms | Included in decision |
Throughput: 10,000+ decisions per second on modern hardware
Error Handling¶
use stratarouter_enterprise::error::EnterpriseError;
match engine.vote(decision_id, vote).await {
Ok(()) => println!("Vote recorded"),
Err(EnterpriseError::UnauthorizedAgent(agent)) => {
println!("Agent {} not registered", agent);
},
Err(EnterpriseError::DecisionNotFound(id)) => {
println!("Decision {:?} not found", id);
},
Err(EnterpriseError::ProposalExpired(id)) => {
println!("Proposal {:?} expired", id);
},
Err(e) => println!("Error: {}", e),
}
Security Considerations¶
1. Agent Authorization¶
Only registered agents can vote:
// Register before voting
engine.register_agent("agent-1".to_string()).await?;
// Unauthorized vote will fail
let vote = Vote {
agent_id: "unregistered-agent".to_string(),
// ... other fields ...
};
// Returns: EnterpriseError::UnauthorizedAgent
engine.vote(decision_id, vote).await?;
2. Confidence Thresholds¶
Require high confidence for critical operations:
let proposal = Proposal {
action_type: ActionType::FinancialTransaction,
required_confidence: 0.95, // 95% confidence required
// ... other fields ...
};
3. Audit Trail Integrity¶
The audit trail is append-only and immutable:
let decision = engine.get_decision(decision_id).await?;
// Audit trail contains complete history
assert!(decision.audit_trail.iter().any(|e|
matches!(e.event_type, AuditEventType::ProposalCreated)
));
Integration Examples¶
With Transaction Log¶
use stratarouter_enterprise::{consensus::*, transaction_log::*};
let consensus = ConsensusEngine::new(QuorumConfig::default());
let tx_log = TransactionLog::new(TransactionLogConfig::default()).await?;
// Propose
let decision_id = consensus.propose(proposal).await?;
// Log proposal
tx_log.append(LogEntry::new(
"proposal_created",
serde_json::to_value(&proposal)?
)).await?;
// Vote and log each vote
for vote in votes {
consensus.vote(decision_id, vote.clone()).await?;
tx_log.append(LogEntry::new("vote_cast", serde_json::to_value(&vote)?)).await?;
}
With Policy Engine¶
use stratarouter_enterprise::{consensus::*, policies::*};
let consensus = ConsensusEngine::new(QuorumConfig::default());
let policies = PolicyEngine::new();
// Add policy requiring consensus for high-value transactions
let policy = Policy::new("high-value-consensus")
.when_action(PolicyAction::FinancialTransaction)
.max_amount(100000.0);
policies.add_policy(policy).await?;
// Evaluate before proposing
let policy_result = policies.evaluate(&context).await?;
if policy_result.allowed && policy_result.requires_approval {
// Submit for consensus
let decision_id = consensus.propose(proposal).await?;
}
Testing¶
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_consensus_approval() {
let engine = ConsensusEngine::new(QuorumConfig {
min_quorum: 3,
approval_threshold: 0.66,
..Default::default()
});
// Register agents
for i in 1..=4 {
engine.register_agent(format!("agent-{}", i)).await.unwrap();
}
// Create and propose
let proposal = create_test_proposal();
let decision_id = engine.propose(proposal).await.unwrap();
// 3 approvals
for i in 1..=3 {
let vote = Vote {
agent_id: format!("agent-{}", i),
decision: VoteDecision::Approve,
confidence: 0.9,
justification: None,
voted_at: SystemTime::now(),
};
engine.vote(decision_id, vote).await.unwrap();
}
let decision = engine.get_decision(decision_id).await.unwrap();
assert_eq!(decision.status, DecisionStatus::Approved);
}
}
Next Steps¶
- Transaction Log - Audit trail logging
- Policy Engine - Access control
- Multi-Tenancy - Tenant isolation
- API Reference - Complete API docs