Caching System¶
Intelligent semantic caching for cost reduction and performance.
Overview¶
StrataRouter's caching system provides:
- 85%+ hit rate in production
- 70-80% cost savings from reduced LLM calls
- <1ms exact match latency
- <5ms semantic match latency
Quick Start¶
from stratarouter_runtime import CacheManager
cache = CacheManager(
backend="redis",
ttl=3600,
similarity_threshold=0.95,
)
async def get_response(query: str, embedding: list) -> str:
cached = await cache.get_semantic(query, embedding)
if cached:
return cached.response # Cache hit — no LLM call
response = await llm_call(query)
await cache.set(query, embedding, response)
return response
Cache Types¶
Exact Match Cache¶
Hash-based exact matching:
# Ultra-fast exact match
cached = await cache.get_exact(query)
if cached:
return cached # <1ms latency
Semantic Cache¶
Embedding-based similarity:
# Find similar cached queries
cached = await cache.get_semantic(
query=query,
embedding=embedding,
threshold=0.95
)
if cached and cached.similarity > 0.95:
return cached.response # <5ms latency
Configuration¶
cache = CacheManager(
# Backend
backend="redis", # "redis" or "memory"
redis_url="redis://localhost:6379",
# TTL
ttl=3600, # 1 hour default
max_entries=10000, # Max cached entries
# Semantic matching
enable_semantic=True,
similarity_threshold=0.95, # Min similarity for match
embedding_dimension=384,
# Eviction policy
eviction_policy="lru", # "lru", "lfu", "ttl"
)
Backend Options¶
Redis (Production)¶
cache = CacheManager(
backend="redis",
redis_url="redis://localhost:6379",
redis_pool_size=10,
redis_timeout=1.0
)
Pros: - Distributed caching - Persistence - High capacity
Cons: - Network latency (2-5ms) - External dependency
Memory (Development)¶
Pros: - Ultra-fast (<1ms) - No dependencies
Cons: - Limited capacity - Not shared across instances
Cache Patterns¶
Cache-Aside¶
async def get_with_cache(query):
# Check cache
cached = await cache.get(query)
if cached:
return cached
# Execute
response = await execute(query)
# Update cache
await cache.set(query, response)
return response
Write-Through¶
async def execute_and_cache(query):
# Execute
response = await execute(query)
# Always write to cache
await cache.set(query, response)
return response
Read-Through¶
cache = CacheManager(
read_through=True,
executor=execute_query
)
# Automatically fetches on miss
response = await cache.get(query)
Cache Invalidation¶
Time-Based (TTL)¶
Manual Invalidation¶
# Invalidate single entry
await cache.delete(query)
# Invalidate pattern
await cache.delete_pattern("billing:*")
# Clear all
await cache.clear()
Event-Based¶
# Invalidate on data update
@app.on_event("data_updated")
async def invalidate_cache():
await cache.clear()
Performance Optimization¶
Batch Caching¶
# Batch get
queries = ["q1", "q2", "q3"]
cached = await cache.get_many(queries)
# Batch set
await cache.set_many({
"q1": "r1",
"q2": "r2",
"q3": "r3"
})
Compression¶
Prefetching¶
Monitoring¶
stats = await cache.get_stats()
print(f"Hit rate: {stats.hit_rate:.2%}")
print(f"Total entries: {stats.total_entries}")
print(f"Memory used: {stats.memory_mb}MB")
print(f"Avg get latency: {stats.avg_get_latency_ms}ms")
Cost Savings¶
# Track savings
savings = await cache.get_cost_savings()
print(f"Total requests: {savings.total_requests}")
print(f"Cache hits: {savings.cache_hits}")
print(f"LLM calls avoided: {savings.llm_calls_avoided}")
print(f"Cost saved: ${savings.cost_saved_usd}")
Best Practices¶
1. Use Semantic Caching for Similar Queries¶
2. Set Appropriate TTLs¶
# Short TTL for dynamic data
await cache.set(query, response, ttl=300) # 5 min
# Long TTL for static data
await cache.set(query, response, ttl=86400) # 24 hours
3. Monitor Hit Rates¶
4. Use Warm-Up¶
# Warm cache on startup
async def warm_cache():
common_queries = load_common_queries()
for query in common_queries:
response = await execute(query)
await cache.set(query, response)