Skip to content

CrewAI Integration

Dynamic agent selection for multi-agent crews using StrataRouter.

Installation

pip install "stratarouter[langchain]" crewai

Overview

CrewAI orchestrates teams of specialized AI agents. StrataRouter acts as the crew dispatcher — intelligently routing each task to the agent best equipped to handle it, based on semantic understanding of the request.


Quick Start

from crewai import Agent, Crew, Task
from stratarouter import Router, Route
from stratarouter.integrations.crewai import StrataRouterDispatcher

# Define your specialized agents
billing_agent = Agent(
    role="Billing Specialist",
    goal="Resolve billing queries and payment issues",
    backstory="Expert in invoice management and payment processing.",
    allow_delegation=False
)

support_agent = Agent(
    role="Technical Support Engineer",
    goal="Diagnose and resolve technical issues",
    backstory="Expert in debugging and technical problem-solving.",
    allow_delegation=False
)

# Build the router
router = Router(dimension=384)

billing_route = Route("billing")
billing_route.description = "Billing, invoices, payments, refunds"
billing_route.examples = ["Where's my invoice?", "I need a refund", "Payment failed"]
billing_route.keywords = ["invoice", "payment", "refund", "charge"]
router.add_route(billing_route)

support_route = Route("support")
support_route.description = "Technical issues, bugs, errors, crashes"
support_route.examples = ["App keeps crashing", "Can't login", "Error 404"]
support_route.keywords = ["crash", "bug", "error", "broken", "login"]
router.add_route(support_route)

router.build_index()

# Create dispatcher
dispatcher = StrataRouterDispatcher(
    router=router,
    agent_map={
        "billing": billing_agent,
        "support": support_agent,
    }
)

# Route and dispatch
query = "My payment method keeps getting declined"
agent = dispatcher.select_agent(query)
print(f"Selected: {agent.role}")  # "Billing Specialist"

Dynamic Crew Assembly

Build crews dynamically based on the incoming request:

from crewai import Crew, Task

def handle_request(query: str) -> str:
    # Select agent via semantic routing
    agent = dispatcher.select_agent(query)

    # Create task for that agent
    task = Task(
        description=f"Handle this request: {query}",
        agent=agent,
        expected_output="A clear, helpful resolution to the user's request."
    )

    # Run crew with selected agent
    crew = Crew(agents=[agent], tasks=[task], verbose=False)
    return crew.kickoff()

print(handle_request("I haven't received my invoice for last month"))
# → Billing Specialist handles this

Multi-Agent Crew with Routing

Route to a specialist, then escalate to a manager for complex issues:

manager_agent = Agent(
    role="Customer Success Manager",
    goal="Handle escalations and complex issues",
    backstory="Senior manager with authority to resolve edge cases."
)

def handle_with_escalation(query: str) -> str:
    route_result = router.route(query, embed(query))

    # Confidence-based escalation
    if route_result.confidence < 0.6:
        primary_agent = manager_agent   # escalate to manager
    else:
        primary_agent = dispatcher.agent_map[route_result.route_id]

    task = Task(description=query, agent=primary_agent, expected_output="Resolution")
    crew = Crew(agents=[primary_agent], tasks=[task])
    return crew.kickoff()

Role-Based Agent Selection

Map semantic routes to specialized agent roles:

agents = {
    "billing":    Agent(role="Billing Specialist", ...),
    "support":    Agent(role="Technical Engineer", ...),
    "sales":      Agent(role="Sales Representative", ...),
    "compliance": Agent(role="Compliance Officer", ...),
}

routes = {
    "billing":    ["invoice", "payment", "refund", "subscription"],
    "support":    ["bug", "crash", "error", "outage"],
    "sales":      ["pricing", "upgrade", "demo", "enterprise"],
    "compliance": ["GDPR", "data deletion", "audit", "security breach"],
}

router = Router(dimension=384)
for name, keywords in routes.items():
    r = Route(name)
    r.keywords = keywords
    r.examples = [f"Question about {k}" for k in keywords[:3]]
    router.add_route(r)
router.build_index()

dispatcher = StrataRouterDispatcher(router=router, agent_map=agents)

Next Steps