CrewAI Integration¶
Dynamic agent selection for multi-agent crews using StrataRouter.
Installation¶
Overview¶
CrewAI orchestrates teams of specialized AI agents. StrataRouter acts as the crew dispatcher — intelligently routing each task to the agent best equipped to handle it, based on semantic understanding of the request.
Quick Start¶
from crewai import Agent, Crew, Task
from stratarouter import Router, Route
from stratarouter.integrations.crewai import StrataRouterDispatcher
# Define your specialized agents
billing_agent = Agent(
role="Billing Specialist",
goal="Resolve billing queries and payment issues",
backstory="Expert in invoice management and payment processing.",
allow_delegation=False
)
support_agent = Agent(
role="Technical Support Engineer",
goal="Diagnose and resolve technical issues",
backstory="Expert in debugging and technical problem-solving.",
allow_delegation=False
)
# Build the router
router = Router(dimension=384)
billing_route = Route("billing")
billing_route.description = "Billing, invoices, payments, refunds"
billing_route.examples = ["Where's my invoice?", "I need a refund", "Payment failed"]
billing_route.keywords = ["invoice", "payment", "refund", "charge"]
router.add_route(billing_route)
support_route = Route("support")
support_route.description = "Technical issues, bugs, errors, crashes"
support_route.examples = ["App keeps crashing", "Can't login", "Error 404"]
support_route.keywords = ["crash", "bug", "error", "broken", "login"]
router.add_route(support_route)
router.build_index()
# Create dispatcher
dispatcher = StrataRouterDispatcher(
router=router,
agent_map={
"billing": billing_agent,
"support": support_agent,
}
)
# Route and dispatch
query = "My payment method keeps getting declined"
agent = dispatcher.select_agent(query)
print(f"Selected: {agent.role}") # "Billing Specialist"
Dynamic Crew Assembly¶
Build crews dynamically based on the incoming request:
from crewai import Crew, Task
def handle_request(query: str) -> str:
# Select agent via semantic routing
agent = dispatcher.select_agent(query)
# Create task for that agent
task = Task(
description=f"Handle this request: {query}",
agent=agent,
expected_output="A clear, helpful resolution to the user's request."
)
# Run crew with selected agent
crew = Crew(agents=[agent], tasks=[task], verbose=False)
return crew.kickoff()
print(handle_request("I haven't received my invoice for last month"))
# → Billing Specialist handles this
Multi-Agent Crew with Routing¶
Route to a specialist, then escalate to a manager for complex issues:
manager_agent = Agent(
role="Customer Success Manager",
goal="Handle escalations and complex issues",
backstory="Senior manager with authority to resolve edge cases."
)
def handle_with_escalation(query: str) -> str:
route_result = router.route(query, embed(query))
# Confidence-based escalation
if route_result.confidence < 0.6:
primary_agent = manager_agent # escalate to manager
else:
primary_agent = dispatcher.agent_map[route_result.route_id]
task = Task(description=query, agent=primary_agent, expected_output="Resolution")
crew = Crew(agents=[primary_agent], tasks=[task])
return crew.kickoff()
Role-Based Agent Selection¶
Map semantic routes to specialized agent roles:
agents = {
"billing": Agent(role="Billing Specialist", ...),
"support": Agent(role="Technical Engineer", ...),
"sales": Agent(role="Sales Representative", ...),
"compliance": Agent(role="Compliance Officer", ...),
}
routes = {
"billing": ["invoice", "payment", "refund", "subscription"],
"support": ["bug", "crash", "error", "outage"],
"sales": ["pricing", "upgrade", "demo", "enterprise"],
"compliance": ["GDPR", "data deletion", "audit", "security breach"],
}
router = Router(dimension=384)
for name, keywords in routes.items():
r = Route(name)
r.keywords = keywords
r.examples = [f"Question about {k}" for k in keywords[:3]]
router.add_route(r)
router.build_index()
dispatcher = StrataRouterDispatcher(router=router, agent_map=agents)