Skip to content

Batch Processing

Automatic batching and deduplication for maximum throughput.

Overview

Batch processing provides:

  • 3-5x throughput gain for bursty workloads
  • 40-60% cost reduction from deduplication
  • Automatic - No code changes needed

Quick Start

from stratarouter_runtime import BatchProcessor

batch = BatchProcessor(
    window_ms=50,              # Collection window
    max_size=32,               # Max batch size
    dedup_threshold=0.98       # Similarity threshold
)

# Process requests
results = await batch.process([
    {"query": "What's the weather?"},
    {"query": "How's the weather today?"},  # Similar - deduped!
    {"query": "Show me my invoice"},
])

Configuration

batch = BatchProcessor(
    # Collection
    window_ms=50,                    # Wait 50ms to collect requests
    max_size=32,                     # Process max 32 at once
    min_size=2,                      # Min 2 before processing

    # Deduplication
    enable_dedup=True,
    dedup_threshold=0.98,            # 98% similarity = duplicate
    dedup_strategy="embedding",      # "embedding" or "exact"

    # Timeouts
    max_wait_ms=100,                 # Max wait time
    processing_timeout=30,           # Max processing time
)

How It Works

1. Request Collection

Time: 0ms
Request 1 arrives → Start collection window

Time: 10ms
Request 2 arrives → Add to batch

Time: 25ms
Request 3 arrives → Add to batch

Time: 50ms
Window closes → Process batch of 3

2. Deduplication

# Input: 4 requests
requests = [
    "What's the weather?",
    "How's the weather today?",      # 98% similar to #1
    "Show me my invoice",
    "What is the weather like?"      # 97% similar to #1
]

# After deduplication: 2 unique requests
unique = [
    "What's the weather?",
    "Show me my invoice"
]

# Process 2, return 4 results

3. Result Distribution

# Execute unique requests
results = await execute_batch(unique)

# Map results back to original requests
return [
    results[0],  # Request 1
    results[0],  # Request 2 (deduped, same as 1)
    results[1],  # Request 3
    results[0],  # Request 4 (deduped, same as 1)
]

Deduplication Strategies

batch = BatchProcessor(
    dedup_strategy="embedding",
    dedup_threshold=0.98
)

# Uses cosine similarity on embeddings
similarity = cosine_similarity(emb1, emb2)
if similarity >= 0.98:
    # Treat as duplicate

Exact Match

batch = BatchProcessor(
    dedup_strategy="exact"
)

# Uses hash-based exact matching
if hash(query1) == hash(query2):
    # Treat as duplicate

Hybrid

batch = BatchProcessor(
    dedup_strategy="hybrid",
    exact_weight=0.3,
    semantic_weight=0.7
)

Performance Optimization

Batch Size Tuning

# Small batches - Low latency
batch = BatchProcessor(
    window_ms=20,
    max_size=8
)

# Large batches - High throughput
batch = BatchProcessor(
    window_ms=100,
    max_size=64
)

# Balanced
batch = BatchProcessor(
    window_ms=50,
    max_size=32
)

Adaptive Batching

batch = BatchProcessor(
    adaptive=True,
    target_latency_ms=50,      # Target latency
    min_window_ms=20,
    max_window_ms=200
)

# Automatically adjusts window based on load

Monitoring

stats = batch.get_stats()

print(f"Requests processed: {stats.total_requests}")
print(f"Batches created: {stats.total_batches}")
print(f"Avg batch size: {stats.avg_batch_size}")
print(f"Dedup rate: {stats.dedup_rate:.2%}")
print(f"Throughput: {stats.requests_per_second}")
print(f"Cost savings: ${stats.cost_saved_usd}")

Use Cases

High-Traffic API

# Handle burst traffic
@app.post("/api/query")
async def query(request):
    result = await batch.process(request)
    return result

Parallel Processing

# Process multiple users simultaneously
async def process_users(users):
    requests = [{"query": u.query} for u in users]
    results = await batch.process(requests)
    return results

Cost Optimization

# Deduplicate common queries
batch = BatchProcessor(
    enable_dedup=True,
    dedup_threshold=0.95
)

# Typical dedup rate: 40-60%
# Cost savings: 40-60%

Best Practices

1. Tune Window Size for Your Workload

# Bursty traffic - larger window
batch = BatchProcessor(window_ms=100)

# Steady traffic - smaller window
batch = BatchProcessor(window_ms=20)

2. Monitor Dedup Rate

# Alert if dedup rate is too low
if stats.dedup_rate < 0.3:
    # Consider lowering threshold
    batch.dedup_threshold = 0.95

3. Use Adaptive Batching

# Let system optimize automatically
batch = BatchProcessor(adaptive=True)

4. Handle Timeouts Gracefully

try:
    results = await batch.process(requests, timeout=30)
except BatchTimeoutError:
    # Fall back to individual processing
    results = [await process(r) for r in requests]

Advanced Features

Priority Batching

batch = BatchProcessor(enable_priorities=True)

# High priority requests processed first
await batch.process(request, priority=10)

Partial Results

# Return results as they complete
async for result in batch.process_streaming(requests):
    yield result

Custom Deduplication

def custom_dedup(req1, req2):
    # Your custom logic
    return similarity_score

batch = BatchProcessor(
    dedup_function=custom_dedup
)

Runtime Index | Performance