Skip to content

Async Patterns

Resilient async operations with retry, rate limiting, circuit breakers, and concurrency control.

Overview

The async module provides production-ready patterns for building resilient asynchronous applications:

  • Retry: Handle transient failures with backoff strategies
  • Rate Limiting: Control request rates and prevent overload
  • Circuit Breakers: Prevent cascading failures
  • Concurrency Control: Manage parallel execution
  • Sync/Async Bridge: Mix sync and async code

Retry Patterns

Why Retry?

Transient failures are common in distributed systems: - Network timeouts - Temporary service unavailability - Database connection issues - Rate limit errors

Basic Retry

from dspu.aio import retry

@retry(max_attempts=3)
async def fetch_data(url):
    async with httpx.AsyncClient() as client:
        return await client.get(url)

Backoff Strategies

Exponential Backoff

from dspu.aio import retry, BackoffStrategy

@retry(
    max_attempts=5,
    strategy=BackoffStrategy.EXPONENTIAL,
    base_delay=1.0,      # Start with 1 second
    max_delay=60.0,      # Cap at 60 seconds
    jitter=True          # Add randomness
)
async def api_call():
    ...

Delays: 1s, 2s, 4s, 8s, 16s (with jitter)

Linear Backoff

@retry(
    max_attempts=5,
    strategy=BackoffStrategy.LINEAR,
    base_delay=2.0
)
async def api_call():
    ...

Delays: 2s, 4s, 6s, 8s, 10s

Constant Backoff

@retry(
    max_attempts=5,
    strategy=BackoffStrategy.CONSTANT,
    base_delay=1.0
)
async def api_call():
    ...

Delays: 1s, 1s, 1s, 1s, 1s

Exception Filtering

Only retry specific errors:

@retry(
    max_attempts=3,
    exceptions=(httpx.TimeoutException, httpx.NetworkError)
)
async def fetch_data():
    ...  # Only retries on timeout/network errors

Retry Callbacks

Monitor retry attempts:

async def on_retry(attempt: int, exception: Exception):
    logger.warning(f"Retry {attempt}: {exception}")
    metrics.increment("retries")

@retry(max_attempts=3, on_retry=on_retry)
async def api_call():
    ...

Rate Limiting

Why Rate Limiting?

  • Comply with API rate limits
  • Protect backend services
  • Control costs
  • Prevent overload

Basic Rate Limiting

from dspu.aio import RateLimiter

# 10 requests per second
limiter = RateLimiter(rate=10.0, per=1.0)

async def make_request():
    async with limiter:
        return await api_client.get("/data")

Token Bucket Algorithm

# 100 requests per second with burst of 200
limiter = RateLimiter(
    rate=100.0,      # 100 tokens/second
    capacity=200.0   # Burst up to 200 tokens
)

Weighted Rate Limiting

Different operations consume different tokens:

limiter = RateLimiter(rate=10.0)

# Light operation (1 token)
async with limiter.acquire(tokens=1):
    await api.get_user()

# Heavy operation (5 tokens)
async with limiter.acquire(tokens=5):
    await api.bulk_upload()

Try Acquire

Non-blocking acquire for optional operations:

limiter = RateLimiter(rate=10.0)

if await limiter.try_acquire():
    # Got tokens, proceed
    await api.refresh_cache()
else:
    # No tokens available, skip
    logger.info("Rate limit reached, skipping cache refresh")

Circuit Breakers

Why Circuit Breakers?

Prevent cascading failures when services are down: - Stop calling failing services - Give services time to recover - Fail fast instead of waiting for timeouts

Circuit States

CLOSED → Service healthy, requests go through
  ↓ (failures exceed threshold)
OPEN → Service down, requests fail immediately
  ↓ (timeout expires)
HALF_OPEN → Testing recovery, limited requests
  ↓ (success) / ↓ (failure)
CLOSED      OPEN

Basic Circuit Breaker

from dspu.aio import CircuitBreaker

# Trip after 5 failures, recover after 60 seconds
breaker = CircuitBreaker(
    failure_threshold=5,
    timeout=60.0,
    success_threshold=2  # Need 2 successes to close
)

async def call_service():
    async with breaker.protect():
        return await external_api.call()

Exception Filtering

Only count specific errors as failures:

breaker = CircuitBreaker(
    failure_threshold=5,
    exceptions=(httpx.TimeoutException, httpx.NetworkError)
)

# HTTP 404 won't trip the circuit
async with breaker.protect():
    return await api.get_user(user_id)

State Callbacks

Monitor circuit state changes:

async def on_state_change(old_state, new_state):
    logger.warning(f"Circuit: {old_state}{new_state}")
    metrics.gauge("circuit_state", new_state.value)

breaker = CircuitBreaker(
    failure_threshold=5,
    on_state_change=on_state_change
)

Manual Reset

# Force circuit to close
await breaker.reset()

# Check current state
if breaker.state == CircuitState.OPEN:
    logger.error("Circuit is open!")

Combining Patterns

Resilient API Client

from dspu.aio import retry, RateLimiter, CircuitBreaker

# Setup
limiter = RateLimiter(rate=10.0)
breaker = CircuitBreaker(failure_threshold=5)

@retry(max_attempts=3, strategy=BackoffStrategy.EXPONENTIAL)
async def fetch_user(user_id: int):
    # Rate limit
    async with limiter:
        # Circuit breaker
        async with breaker.protect():
            # API call
            return await api.get(f"/users/{user_id}")

Graceful Degradation

breaker = CircuitBreaker(failure_threshold=3, timeout=30.0)
cache = {}

async def get_user(user_id: int):
    try:
        # Try primary source
        async with breaker.protect():
            return await api.get_user(user_id)
    except CircuitBreakerError:
        # Circuit open, use cache
        logger.warning("Circuit open, using cache")
        return cache.get(user_id, default_user)
    except Exception as e:
        # Other errors, still try cache
        logger.error(f"API error: {e}, using cache")
        return cache.get(user_id, default_user)

Concurrency Control

Gather with Limit

Process many tasks with concurrency limit:

from dspu.aio import gather_with_limit

# Fetch 1000 users, max 10 concurrent
user_ids = range(1000)
tasks = [fetch_user(uid) for uid in user_ids]

results = await gather_with_limit(*tasks, limit=10)

Timeout Management

from dspu.aio import run_with_timeout, Timeout

# Functional API
try:
    result = await run_with_timeout(slow_operation(), timeout=5.0)
except TimeoutError:
    logger.error("Operation timed out")

# Context manager
async with Timeout(5.0):
    await slow_operation()

Shield from Cancellation

from dspu.aio import run_with_shield

# Critical operation that shouldn't be cancelled
async with Timeout(10.0):
    # Timeout will cancel this task
    await cancellable_operation()

    # But not this one (shielded)
    await run_with_shield(critical_cleanup())

Sync/Async Bridge

Running Sync in Async

from dspu.aio import sync_to_async, run_in_executor

# Convert sync function to async
def expensive_calculation(n):
    return sum(i**2 for i in range(n))

# Method 1: Decorator
@sync_to_async
def calc(n):
    return expensive_calculation(n)

result = await calc(1000000)

# Method 2: Direct execution
result = await run_in_executor(expensive_calculation, 1000000)

Running Async in Sync

from dspu.aio import async_to_sync, run_async

# Convert async function to sync
async def fetch_data():
    async with httpx.AsyncClient() as client:
        return await client.get("https://api.example.com/data")

# Method 1: Decorator
@async_to_sync
async def fetch():
    return await fetch_data()

result = fetch()  # Call synchronously

# Method 2: Direct execution
result = run_async(fetch_data())

Common Patterns

Pattern 1: API Client with Full Protection

class ResilientAPIClient:
    def __init__(self):
        self.limiter = RateLimiter(rate=10.0)
        self.breaker = CircuitBreaker(failure_threshold=5)

    @retry(max_attempts=3)
    async def get(self, endpoint: str):
        async with self.limiter:
            async with self.breaker.protect():
                return await self._http_get(endpoint)

Pattern 2: Batch Processing

async def process_batch(items: list):
    # Process 100 items at a time, max 10 concurrent
    async def process_item(item):
        async with rate_limiter:
            return await expensive_operation(item)

    results = await gather_with_limit(
        *[process_item(item) for item in items],
        limit=10
    )
    return results

Pattern 3: Timeout with Fallback

async def fetch_with_fallback(url: str):
    try:
        return await run_with_timeout(
            fetch_from_api(url),
            timeout=2.0
        )
    except TimeoutError:
        logger.warning("API timeout, using cache")
        return fetch_from_cache(url)

Best Practices

Retry

DO: - Use exponential backoff for external APIs - Add jitter to prevent thundering herd - Set max_delay cap - Filter exceptions (only retry transient errors) - Log retry attempts

DON'T: - Don't retry indefinitely - Don't retry non-transient errors (400, 403, 404) - Don't use constant backoff for everything - Don't ignore retry costs

Rate Limiting

DO: - Match API provider limits - Add burst capacity for spikes - Use weighted tokens for different operations - Monitor rate limit usage - Use try_acquire for optional operations

DON'T: - Don't set rate too high (respect limits) - Don't set rate too low (waste capacity) - Don't skip rate limiting in development - Don't share rate limiters across services

Circuit Breakers

DO: - Set appropriate failure threshold (3-5) - Set reasonable timeout (30-60s) - Add state change callbacks - Implement fallbacks - Monitor circuit state

DON'T: - Don't set threshold too low (false trips) - Don't set timeout too short (no recovery time) - Don't forget fallback logic - Don't ignore circuit state

Installation

# Async utilities included in base installation
pip install dspu

Next Steps