Async Patterns¶
Resilient async operations with retry, rate limiting, circuit breakers, and concurrency control.
Overview¶
The async module provides production-ready patterns for building resilient asynchronous applications:
- Retry: Handle transient failures with backoff strategies
- Rate Limiting: Control request rates and prevent overload
- Circuit Breakers: Prevent cascading failures
- Concurrency Control: Manage parallel execution
- Sync/Async Bridge: Mix sync and async code
Retry Patterns¶
Why Retry?¶
Transient failures are common in distributed systems: - Network timeouts - Temporary service unavailability - Database connection issues - Rate limit errors
Basic Retry¶
from dspu.aio import retry
@retry(max_attempts=3)
async def fetch_data(url):
async with httpx.AsyncClient() as client:
return await client.get(url)
Backoff Strategies¶
Exponential Backoff¶
from dspu.aio import retry, BackoffStrategy
@retry(
max_attempts=5,
strategy=BackoffStrategy.EXPONENTIAL,
base_delay=1.0, # Start with 1 second
max_delay=60.0, # Cap at 60 seconds
jitter=True # Add randomness
)
async def api_call():
...
Delays: 1s, 2s, 4s, 8s, 16s (with jitter)
Linear Backoff¶
Delays: 2s, 4s, 6s, 8s, 10s
Constant Backoff¶
@retry(
max_attempts=5,
strategy=BackoffStrategy.CONSTANT,
base_delay=1.0
)
async def api_call():
...
Delays: 1s, 1s, 1s, 1s, 1s
Exception Filtering¶
Only retry specific errors:
@retry(
max_attempts=3,
exceptions=(httpx.TimeoutException, httpx.NetworkError)
)
async def fetch_data():
... # Only retries on timeout/network errors
Retry Callbacks¶
Monitor retry attempts:
async def on_retry(attempt: int, exception: Exception):
logger.warning(f"Retry {attempt}: {exception}")
metrics.increment("retries")
@retry(max_attempts=3, on_retry=on_retry)
async def api_call():
...
Rate Limiting¶
Why Rate Limiting?¶
- Comply with API rate limits
- Protect backend services
- Control costs
- Prevent overload
Basic Rate Limiting¶
from dspu.aio import RateLimiter
# 10 requests per second
limiter = RateLimiter(rate=10.0, per=1.0)
async def make_request():
async with limiter:
return await api_client.get("/data")
Token Bucket Algorithm¶
# 100 requests per second with burst of 200
limiter = RateLimiter(
rate=100.0, # 100 tokens/second
capacity=200.0 # Burst up to 200 tokens
)
Weighted Rate Limiting¶
Different operations consume different tokens:
limiter = RateLimiter(rate=10.0)
# Light operation (1 token)
async with limiter.acquire(tokens=1):
await api.get_user()
# Heavy operation (5 tokens)
async with limiter.acquire(tokens=5):
await api.bulk_upload()
Try Acquire¶
Non-blocking acquire for optional operations:
limiter = RateLimiter(rate=10.0)
if await limiter.try_acquire():
# Got tokens, proceed
await api.refresh_cache()
else:
# No tokens available, skip
logger.info("Rate limit reached, skipping cache refresh")
Circuit Breakers¶
Why Circuit Breakers?¶
Prevent cascading failures when services are down: - Stop calling failing services - Give services time to recover - Fail fast instead of waiting for timeouts
Circuit States¶
CLOSED → Service healthy, requests go through
↓ (failures exceed threshold)
OPEN → Service down, requests fail immediately
↓ (timeout expires)
HALF_OPEN → Testing recovery, limited requests
↓ (success) / ↓ (failure)
CLOSED OPEN
Basic Circuit Breaker¶
from dspu.aio import CircuitBreaker
# Trip after 5 failures, recover after 60 seconds
breaker = CircuitBreaker(
failure_threshold=5,
timeout=60.0,
success_threshold=2 # Need 2 successes to close
)
async def call_service():
async with breaker.protect():
return await external_api.call()
Exception Filtering¶
Only count specific errors as failures:
breaker = CircuitBreaker(
failure_threshold=5,
exceptions=(httpx.TimeoutException, httpx.NetworkError)
)
# HTTP 404 won't trip the circuit
async with breaker.protect():
return await api.get_user(user_id)
State Callbacks¶
Monitor circuit state changes:
async def on_state_change(old_state, new_state):
logger.warning(f"Circuit: {old_state} → {new_state}")
metrics.gauge("circuit_state", new_state.value)
breaker = CircuitBreaker(
failure_threshold=5,
on_state_change=on_state_change
)
Manual Reset¶
# Force circuit to close
await breaker.reset()
# Check current state
if breaker.state == CircuitState.OPEN:
logger.error("Circuit is open!")
Combining Patterns¶
Resilient API Client¶
from dspu.aio import retry, RateLimiter, CircuitBreaker
# Setup
limiter = RateLimiter(rate=10.0)
breaker = CircuitBreaker(failure_threshold=5)
@retry(max_attempts=3, strategy=BackoffStrategy.EXPONENTIAL)
async def fetch_user(user_id: int):
# Rate limit
async with limiter:
# Circuit breaker
async with breaker.protect():
# API call
return await api.get(f"/users/{user_id}")
Graceful Degradation¶
breaker = CircuitBreaker(failure_threshold=3, timeout=30.0)
cache = {}
async def get_user(user_id: int):
try:
# Try primary source
async with breaker.protect():
return await api.get_user(user_id)
except CircuitBreakerError:
# Circuit open, use cache
logger.warning("Circuit open, using cache")
return cache.get(user_id, default_user)
except Exception as e:
# Other errors, still try cache
logger.error(f"API error: {e}, using cache")
return cache.get(user_id, default_user)
Concurrency Control¶
Gather with Limit¶
Process many tasks with concurrency limit:
from dspu.aio import gather_with_limit
# Fetch 1000 users, max 10 concurrent
user_ids = range(1000)
tasks = [fetch_user(uid) for uid in user_ids]
results = await gather_with_limit(*tasks, limit=10)
Timeout Management¶
from dspu.aio import run_with_timeout, Timeout
# Functional API
try:
result = await run_with_timeout(slow_operation(), timeout=5.0)
except TimeoutError:
logger.error("Operation timed out")
# Context manager
async with Timeout(5.0):
await slow_operation()
Shield from Cancellation¶
from dspu.aio import run_with_shield
# Critical operation that shouldn't be cancelled
async with Timeout(10.0):
# Timeout will cancel this task
await cancellable_operation()
# But not this one (shielded)
await run_with_shield(critical_cleanup())
Sync/Async Bridge¶
Running Sync in Async¶
from dspu.aio import sync_to_async, run_in_executor
# Convert sync function to async
def expensive_calculation(n):
return sum(i**2 for i in range(n))
# Method 1: Decorator
@sync_to_async
def calc(n):
return expensive_calculation(n)
result = await calc(1000000)
# Method 2: Direct execution
result = await run_in_executor(expensive_calculation, 1000000)
Running Async in Sync¶
from dspu.aio import async_to_sync, run_async
# Convert async function to sync
async def fetch_data():
async with httpx.AsyncClient() as client:
return await client.get("https://api.example.com/data")
# Method 1: Decorator
@async_to_sync
async def fetch():
return await fetch_data()
result = fetch() # Call synchronously
# Method 2: Direct execution
result = run_async(fetch_data())
Common Patterns¶
Pattern 1: API Client with Full Protection¶
class ResilientAPIClient:
def __init__(self):
self.limiter = RateLimiter(rate=10.0)
self.breaker = CircuitBreaker(failure_threshold=5)
@retry(max_attempts=3)
async def get(self, endpoint: str):
async with self.limiter:
async with self.breaker.protect():
return await self._http_get(endpoint)
Pattern 2: Batch Processing¶
async def process_batch(items: list):
# Process 100 items at a time, max 10 concurrent
async def process_item(item):
async with rate_limiter:
return await expensive_operation(item)
results = await gather_with_limit(
*[process_item(item) for item in items],
limit=10
)
return results
Pattern 3: Timeout with Fallback¶
async def fetch_with_fallback(url: str):
try:
return await run_with_timeout(
fetch_from_api(url),
timeout=2.0
)
except TimeoutError:
logger.warning("API timeout, using cache")
return fetch_from_cache(url)
Best Practices¶
Retry¶
✅ DO: - Use exponential backoff for external APIs - Add jitter to prevent thundering herd - Set max_delay cap - Filter exceptions (only retry transient errors) - Log retry attempts
❌ DON'T: - Don't retry indefinitely - Don't retry non-transient errors (400, 403, 404) - Don't use constant backoff for everything - Don't ignore retry costs
Rate Limiting¶
✅ DO: - Match API provider limits - Add burst capacity for spikes - Use weighted tokens for different operations - Monitor rate limit usage - Use try_acquire for optional operations
❌ DON'T: - Don't set rate too high (respect limits) - Don't set rate too low (waste capacity) - Don't skip rate limiting in development - Don't share rate limiters across services
Circuit Breakers¶
✅ DO: - Set appropriate failure threshold (3-5) - Set reasonable timeout (30-60s) - Add state change callbacks - Implement fallbacks - Monitor circuit state
❌ DON'T: - Don't set threshold too low (false trips) - Don't set timeout too short (no recovery time) - Don't forget fallback logic - Don't ignore circuit state