Batch Processing¶
Automatic batching and deduplication for maximum throughput.
Overview¶
Batch processing provides:
- 3-5x throughput gain for bursty workloads
- 40-60% cost reduction from deduplication
- Automatic - No code changes needed
Quick Start¶
from stratarouter_runtime import BatchProcessor
batch = BatchProcessor(
window_ms=50, # Collection window
max_size=32, # Max batch size
dedup_threshold=0.98 # Similarity threshold
)
# Process requests
results = await batch.process([
{"query": "What's the weather?"},
{"query": "How's the weather today?"}, # Similar - deduped!
{"query": "Show me my invoice"},
])
Configuration¶
batch = BatchProcessor(
# Collection
window_ms=50, # Wait 50ms to collect requests
max_size=32, # Process max 32 at once
min_size=2, # Min 2 before processing
# Deduplication
enable_dedup=True,
dedup_threshold=0.98, # 98% similarity = duplicate
dedup_strategy="embedding", # "embedding" or "exact"
# Timeouts
max_wait_ms=100, # Max wait time
processing_timeout=30, # Max processing time
)
How It Works¶
1. Request Collection¶
Time: 0ms
Request 1 arrives → Start collection window
Time: 10ms
Request 2 arrives → Add to batch
Time: 25ms
Request 3 arrives → Add to batch
Time: 50ms
Window closes → Process batch of 3
2. Deduplication¶
# Input: 4 requests
requests = [
"What's the weather?",
"How's the weather today?", # 98% similar to #1
"Show me my invoice",
"What is the weather like?" # 97% similar to #1
]
# After deduplication: 2 unique requests
unique = [
"What's the weather?",
"Show me my invoice"
]
# Process 2, return 4 results
3. Result Distribution¶
# Execute unique requests
results = await execute_batch(unique)
# Map results back to original requests
return [
results[0], # Request 1
results[0], # Request 2 (deduped, same as 1)
results[1], # Request 3
results[0], # Request 4 (deduped, same as 1)
]
Deduplication Strategies¶
Embedding-Based (Recommended)¶
batch = BatchProcessor(
dedup_strategy="embedding",
dedup_threshold=0.98
)
# Uses cosine similarity on embeddings
similarity = cosine_similarity(emb1, emb2)
if similarity >= 0.98:
# Treat as duplicate
Exact Match¶
batch = BatchProcessor(
dedup_strategy="exact"
)
# Uses hash-based exact matching
if hash(query1) == hash(query2):
# Treat as duplicate
Hybrid¶
Performance Optimization¶
Batch Size Tuning¶
# Small batches - Low latency
batch = BatchProcessor(
window_ms=20,
max_size=8
)
# Large batches - High throughput
batch = BatchProcessor(
window_ms=100,
max_size=64
)
# Balanced
batch = BatchProcessor(
window_ms=50,
max_size=32
)
Adaptive Batching¶
batch = BatchProcessor(
adaptive=True,
target_latency_ms=50, # Target latency
min_window_ms=20,
max_window_ms=200
)
# Automatically adjusts window based on load
Monitoring¶
stats = batch.get_stats()
print(f"Requests processed: {stats.total_requests}")
print(f"Batches created: {stats.total_batches}")
print(f"Avg batch size: {stats.avg_batch_size}")
print(f"Dedup rate: {stats.dedup_rate:.2%}")
print(f"Throughput: {stats.requests_per_second}")
print(f"Cost savings: ${stats.cost_saved_usd}")
Use Cases¶
High-Traffic API¶
# Handle burst traffic
@app.post("/api/query")
async def query(request):
result = await batch.process(request)
return result
Parallel Processing¶
# Process multiple users simultaneously
async def process_users(users):
requests = [{"query": u.query} for u in users]
results = await batch.process(requests)
return results
Cost Optimization¶
# Deduplicate common queries
batch = BatchProcessor(
enable_dedup=True,
dedup_threshold=0.95
)
# Typical dedup rate: 40-60%
# Cost savings: 40-60%
Best Practices¶
1. Tune Window Size for Your Workload¶
# Bursty traffic - larger window
batch = BatchProcessor(window_ms=100)
# Steady traffic - smaller window
batch = BatchProcessor(window_ms=20)
2. Monitor Dedup Rate¶
# Alert if dedup rate is too low
if stats.dedup_rate < 0.3:
# Consider lowering threshold
batch.dedup_threshold = 0.95
3. Use Adaptive Batching¶
4. Handle Timeouts Gracefully¶
try:
results = await batch.process(requests, timeout=30)
except BatchTimeoutError:
# Fall back to individual processing
results = [await process(r) for r in requests]
Advanced Features¶
Priority Batching¶
batch = BatchProcessor(enable_priorities=True)
# High priority requests processed first
await batch.process(request, priority=10)
Partial Results¶
# Return results as they complete
async for result in batch.process_streaming(requests):
yield result
Custom Deduplication¶
def custom_dedup(req1, req2):
# Your custom logic
return similarity_score
batch = BatchProcessor(
dedup_function=custom_dedup
)