Async (AIO) Examples¶
Comprehensive examples for async utilities including retry mechanisms, rate limiting, circuit breakers, and concurrency control.
Overview¶
The AIO module provides production-ready async utilities for building resilient applications with proper error handling, resource management, and fault tolerance.
Examples¶
1. Retry Patterns¶
File: examples/aio/01_retry_patterns.py
Retry mechanisms for transient failures.
Topics: - Basic exponential backoff retry - Exception filtering (retry only specific errors) - Retry callbacks for logging/monitoring - Dynamic retry configuration - API client with built-in retries - Graceful degradation with cache fallback
Use Cases: - API calls that may fail temporarily - Database connection retries - Network request resilience - Distributed system communication
Run:
2. Rate Limiting¶
File: examples/aio/02_rate_limiting.py
Rate limiting for resource management.
Topics: - Basic rate limiting (requests per second) - Burst capacity for bursty workloads - Rate limit decorator - Weighted rate limiting (different token costs) - Conditional rate limiting - Multiple rate limiters for different resources - Try-acquire for optional operations - Dynamic rate adjustment
Use Cases: - API rate limit compliance - Database connection pooling - Resource throttling - Load management - Cost control for metered APIs
Run:
3. Circuit Breaker¶
File: examples/aio/03_circuit_breaker.py
Circuit breaker pattern for fault tolerance.
Topics: - Basic circuit breaker (CLOSED → OPEN → HALF_OPEN) - State change callbacks - Circuit breaker decorator - Context manager usage - Exception filtering - Service dependency management - Manual circuit reset - Cache fallback pattern
Use Cases: - Microservices communication - External API calls - Database connection failures - Preventing cascading failures - Service degradation - System recovery
Run:
4. Concurrency and Bridge¶
File: examples/aio/04_concurrency_and_bridge.py
Concurrency control and sync/async bridging.
Topics:
- Concurrent execution with limits (gather_with_limit)
- Timeout management
- Running sync code in async context
- Running async code from sync context
- Sync-to-async conversion
- Async-to-sync conversion
- Mixed sync/async pipelines
- Parallel processing
- Cache-aside pattern
- Error handling in concurrent operations
Use Cases: - Controlling concurrency for rate-limited APIs - Mixing sync libraries with async code - Legacy code integration - CPU-bound operations in async context - Timeout enforcement - Parallel data processing
Run:
Quick Start¶
from dspu.aio import retry, RateLimiter, CircuitBreaker
# Retry with exponential backoff
@retry(max_attempts=3, backoff="exponential")
async def fetch_data(url):
async with httpx.AsyncClient() as client:
return await client.get(url)
# Rate limiting
async def process_with_rate_limit():
limiter = RateLimiter(rate=10, per=1.0)
async with limiter:
await process_item()
# Circuit breaker
circuit_breaker = CircuitBreaker(failure_threshold=5)
@circuit_breaker
async def call_api():
...
Common Patterns¶
Pattern 1: Resilient API Client¶
from dspu.aio import retry, RateLimiter, CircuitBreaker, BackoffStrategy
# Create utilities
limiter = RateLimiter(rate=10.0) # 10 req/s
circuit_breaker = CircuitBreaker(failure_threshold=5, timeout=60.0)
@retry(max_attempts=3, strategy=BackoffStrategy.EXPONENTIAL)
async def call_api(endpoint: str):
# Rate limit
async with limiter.limit():
# Circuit breaker protection
async with circuit_breaker.protect():
# Make API call
return await http_client.get(endpoint)
Pattern 2: Controlled Parallel Processing¶
from dspu.aio import gather_with_limit, run_in_executor
async def process_items(items: list):
"""Process items in parallel with concurrency limit."""
def cpu_intensive_work(item):
# CPU-bound work
return expensive_calculation(item)
# Convert to async and process with limit
async_work = sync_to_async(cpu_intensive_work)
results = await gather_with_limit(
*[async_work(item) for item in items],
limit=4 # Max 4 concurrent
)
return results
Pattern 3: Graceful Degradation¶
from dspu.aio import CircuitBreaker, retry
circuit_breaker = CircuitBreaker(failure_threshold=3, timeout=30.0)
cache = {}
async def get_data(key: str):
"""Get data with cache fallback."""
try:
# Try primary source with circuit breaker
async with circuit_breaker.protect():
return await fetch_from_primary(key)
except CircuitBreakerError:
# Circuit open - use cache immediately
return cache.get(key, default_value)
except Exception:
# Other errors - still try cache
return cache.get(key, default_value)
Pattern 4: Timeout with Fallback¶
from dspu.aio import run_with_timeout_or_default
async def fetch_with_timeout(url: str):
"""Fetch with timeout, return cached data on timeout."""
return await run_with_timeout_or_default(
fetch_from_api(url),
timeout=2.0,
default=get_from_cache(url)
)
Feature Matrix¶
| Feature | Retry | Rate Limiter | Circuit Breaker | Concurrency | Bridge |
|---|---|---|---|---|---|
| Decorator | ✅ | ✅ | ✅ | ❌ | ✅ |
| Context Manager | ❌ | ✅ | ✅ | ✅ | ❌ |
| Functional API | ✅ | ✅ | ✅ | ✅ | ✅ |
| Callbacks | ✅ | ❌ | ✅ | ❌ | ❌ |
| Exception Filtering | ✅ | ❌ | ✅ | ❌ | ❌ |
| Backoff Strategies | ✅ | ❌ | ❌ | ❌ | ❌ |
| State Management | ❌ | ✅ | ✅ | ❌ | ❌ |
Best Practices¶
1. Combining Utilities¶
Stack utilities for comprehensive resilience:
@retry(max_attempts=3)
@rate_limit(api_limiter)
@circuit_breaker(api_breaker)
async def robust_api_call(endpoint: str):
return await http_client.get(endpoint)
2. Resource-Specific Configuration¶
Use different configurations for different resources:
# Aggressive retries for cache (fast)
@retry(max_attempts=5, base_delay=0.1)
async def get_from_cache(key):
...
# Conservative retries for external API (slow)
@retry(max_attempts=3, base_delay=2.0)
async def call_external_api(endpoint):
...
3. Monitoring and Observability¶
Use callbacks for monitoring:
async def log_retry(attempt: int, exception: Exception):
logger.warning(f"Retry {attempt}: {exception}")
metrics.increment("retries", tags=["exception": type(exception).__name__])
async def on_circuit_state_change(old: CircuitState, new: CircuitState):
logger.info(f"Circuit state: {old} → {new}")
metrics.gauge("circuit_state", new.value)
@retry(on_retry=log_retry)
async def monitored_operation():
...
4. Graceful Degradation¶
Always have fallback mechanisms:
async def get_user_data(user_id: int):
try:
return await fetch_from_db(user_id)
except CircuitBreakerError:
return get_from_cache(user_id)
except Exception:
return get_default_user()
Troubleshooting¶
High Retry Latency¶
Problem: Operations take too long due to retries
Solution:
- Reduce max_attempts
- Use shorter base_delay
- Use BackoffStrategy.CONSTANT instead of EXPONENTIAL
- Set aggressive max_delay cap
Circuit Breaker Stuck Open¶
Problem: Circuit breaker stays open even after service recovers
Solution:
- Reduce timeout duration
- Lower failure_threshold
- Use manual reset() after confirming service health
- Check success_threshold isn't too high
Rate Limiter Too Restrictive¶
Problem: Operations are being throttled too much
Solution:
- Increase rate parameter
- Increase capacity for burst allowance
- Use try_acquire() for optional operations
- Consider per-user/per-resource limiters