Skip to content

Async (AIO) Examples

Comprehensive examples for async utilities including retry mechanisms, rate limiting, circuit breakers, and concurrency control.

Overview

The AIO module provides production-ready async utilities for building resilient applications with proper error handling, resource management, and fault tolerance.

Examples

1. Retry Patterns

File: examples/aio/01_retry_patterns.py

Retry mechanisms for transient failures.

Topics: - Basic exponential backoff retry - Exception filtering (retry only specific errors) - Retry callbacks for logging/monitoring - Dynamic retry configuration - API client with built-in retries - Graceful degradation with cache fallback

Use Cases: - API calls that may fail temporarily - Database connection retries - Network request resilience - Distributed system communication

Run:

uv run python examples/aio/01_retry_patterns.py

2. Rate Limiting

File: examples/aio/02_rate_limiting.py

Rate limiting for resource management.

Topics: - Basic rate limiting (requests per second) - Burst capacity for bursty workloads - Rate limit decorator - Weighted rate limiting (different token costs) - Conditional rate limiting - Multiple rate limiters for different resources - Try-acquire for optional operations - Dynamic rate adjustment

Use Cases: - API rate limit compliance - Database connection pooling - Resource throttling - Load management - Cost control for metered APIs

Run:

uv run python examples/aio/02_rate_limiting.py

3. Circuit Breaker

File: examples/aio/03_circuit_breaker.py

Circuit breaker pattern for fault tolerance.

Topics: - Basic circuit breaker (CLOSED → OPEN → HALF_OPEN) - State change callbacks - Circuit breaker decorator - Context manager usage - Exception filtering - Service dependency management - Manual circuit reset - Cache fallback pattern

Use Cases: - Microservices communication - External API calls - Database connection failures - Preventing cascading failures - Service degradation - System recovery

Run:

uv run python examples/aio/03_circuit_breaker.py

4. Concurrency and Bridge

File: examples/aio/04_concurrency_and_bridge.py

Concurrency control and sync/async bridging.

Topics: - Concurrent execution with limits (gather_with_limit) - Timeout management - Running sync code in async context - Running async code from sync context - Sync-to-async conversion - Async-to-sync conversion - Mixed sync/async pipelines - Parallel processing - Cache-aside pattern - Error handling in concurrent operations

Use Cases: - Controlling concurrency for rate-limited APIs - Mixing sync libraries with async code - Legacy code integration - CPU-bound operations in async context - Timeout enforcement - Parallel data processing

Run:

uv run python examples/aio/04_concurrency_and_bridge.py

Quick Start

from dspu.aio import retry, RateLimiter, CircuitBreaker

# Retry with exponential backoff
@retry(max_attempts=3, backoff="exponential")
async def fetch_data(url):
    async with httpx.AsyncClient() as client:
        return await client.get(url)

# Rate limiting
async def process_with_rate_limit():
    limiter = RateLimiter(rate=10, per=1.0)
    async with limiter:
        await process_item()

# Circuit breaker
circuit_breaker = CircuitBreaker(failure_threshold=5)

@circuit_breaker
async def call_api():
    ...

Common Patterns

Pattern 1: Resilient API Client

from dspu.aio import retry, RateLimiter, CircuitBreaker, BackoffStrategy

# Create utilities
limiter = RateLimiter(rate=10.0)  # 10 req/s
circuit_breaker = CircuitBreaker(failure_threshold=5, timeout=60.0)

@retry(max_attempts=3, strategy=BackoffStrategy.EXPONENTIAL)
async def call_api(endpoint: str):
    # Rate limit
    async with limiter.limit():
        # Circuit breaker protection
        async with circuit_breaker.protect():
            # Make API call
            return await http_client.get(endpoint)

Pattern 2: Controlled Parallel Processing

from dspu.aio import gather_with_limit, run_in_executor

async def process_items(items: list):
    """Process items in parallel with concurrency limit."""

    def cpu_intensive_work(item):
        # CPU-bound work
        return expensive_calculation(item)

    # Convert to async and process with limit
    async_work = sync_to_async(cpu_intensive_work)

    results = await gather_with_limit(
        *[async_work(item) for item in items],
        limit=4  # Max 4 concurrent
    )

    return results

Pattern 3: Graceful Degradation

from dspu.aio import CircuitBreaker, retry

circuit_breaker = CircuitBreaker(failure_threshold=3, timeout=30.0)
cache = {}

async def get_data(key: str):
    """Get data with cache fallback."""
    try:
        # Try primary source with circuit breaker
        async with circuit_breaker.protect():
            return await fetch_from_primary(key)
    except CircuitBreakerError:
        # Circuit open - use cache immediately
        return cache.get(key, default_value)
    except Exception:
        # Other errors - still try cache
        return cache.get(key, default_value)

Pattern 4: Timeout with Fallback

from dspu.aio import run_with_timeout_or_default

async def fetch_with_timeout(url: str):
    """Fetch with timeout, return cached data on timeout."""
    return await run_with_timeout_or_default(
        fetch_from_api(url),
        timeout=2.0,
        default=get_from_cache(url)
    )

Feature Matrix

Feature Retry Rate Limiter Circuit Breaker Concurrency Bridge
Decorator
Context Manager
Functional API
Callbacks
Exception Filtering
Backoff Strategies
State Management

Best Practices

1. Combining Utilities

Stack utilities for comprehensive resilience:

@retry(max_attempts=3)
@rate_limit(api_limiter)
@circuit_breaker(api_breaker)
async def robust_api_call(endpoint: str):
    return await http_client.get(endpoint)

2. Resource-Specific Configuration

Use different configurations for different resources:

# Aggressive retries for cache (fast)
@retry(max_attempts=5, base_delay=0.1)
async def get_from_cache(key):
    ...

# Conservative retries for external API (slow)
@retry(max_attempts=3, base_delay=2.0)
async def call_external_api(endpoint):
    ...

3. Monitoring and Observability

Use callbacks for monitoring:

async def log_retry(attempt: int, exception: Exception):
    logger.warning(f"Retry {attempt}: {exception}")
    metrics.increment("retries", tags=["exception": type(exception).__name__])

async def on_circuit_state_change(old: CircuitState, new: CircuitState):
    logger.info(f"Circuit state: {old}{new}")
    metrics.gauge("circuit_state", new.value)

@retry(on_retry=log_retry)
async def monitored_operation():
    ...

4. Graceful Degradation

Always have fallback mechanisms:

async def get_user_data(user_id: int):
    try:
        return await fetch_from_db(user_id)
    except CircuitBreakerError:
        return get_from_cache(user_id)
    except Exception:
        return get_default_user()

Troubleshooting

High Retry Latency

Problem: Operations take too long due to retries

Solution: - Reduce max_attempts - Use shorter base_delay - Use BackoffStrategy.CONSTANT instead of EXPONENTIAL - Set aggressive max_delay cap

Circuit Breaker Stuck Open

Problem: Circuit breaker stays open even after service recovers

Solution: - Reduce timeout duration - Lower failure_threshold - Use manual reset() after confirming service health - Check success_threshold isn't too high

Rate Limiter Too Restrictive

Problem: Operations are being throttled too much

Solution: - Increase rate parameter - Increase capacity for burst allowance - Use try_acquire() for optional operations - Consider per-user/per-resource limiters

See Also