Async API Reference¶
Async utilities for resilient operations.
Retry¶
dspu.aio.retry.retry
¶
retry(
*,
max_attempts: int = 3,
strategy: BackoffStrategy = EXPONENTIAL,
base_delay: float = 1.0,
max_delay: float = 60.0,
multiplier: float = 2.0,
jitter: bool = True,
exceptions: tuple[type[Exception], ...] = (Exception,),
on_retry: Callable[[int, Exception], Awaitable[None]]
| None = None,
) -> Callable[
[Callable[..., Awaitable[T]]],
Callable[..., Awaitable[T]],
]
Decorator to retry async functions with configurable backoff strategies.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
max_attempts
|
int
|
Maximum number of attempts (default: 3) |
3
|
strategy
|
BackoffStrategy
|
Backoff strategy to use (default: EXPONENTIAL) |
EXPONENTIAL
|
base_delay
|
float
|
Base delay in seconds (default: 1.0) |
1.0
|
max_delay
|
float
|
Maximum delay in seconds (default: 60.0) |
60.0
|
multiplier
|
float
|
Multiplier for exponential/linear backoff (default: 2.0) |
2.0
|
jitter
|
bool
|
Whether to add random jitter (default: True) |
True
|
exceptions
|
tuple[type[Exception], ...]
|
Tuple of exception types to retry on (default: (Exception,)) |
(Exception,)
|
on_retry
|
Callable[[int, Exception], Awaitable[None]] | None
|
Optional async callback function called on each retry. Receives attempt number and exception as arguments. |
None
|
Returns:
| Type | Description |
|---|---|
Callable[[Callable[..., Awaitable[T]]], Callable[..., Awaitable[T]]]
|
Decorated function with retry logic |
Example
@retry(max_attempts=5, strategy=BackoffStrategy.EXPONENTIAL) async def fetch_data(): # May fail transiently return await api.get("/data")
@retry( max_attempts=3, strategy=BackoffStrategy.LINEAR, exceptions=(ConnectionError, TimeoutError) ) async def connect_to_service(): return await service.connect()
Rate Limiting¶
dspu.aio.rate_limiter.RateLimiter
¶
Async-safe rate limiter using token bucket algorithm.
The token bucket algorithm allows bursts up to the bucket capacity while maintaining an average rate over time.
Attributes:
| Name | Type | Description |
|---|---|---|
rate |
Number of tokens added per second |
|
capacity |
Maximum number of tokens in the bucket |
|
tokens |
Current number of available tokens |
|
last_update |
Timestamp of last token update |
Initialize rate limiter.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
rate
|
float
|
Number of operations allowed per second |
required |
capacity
|
float | None
|
Maximum burst capacity. If None, defaults to rate. A larger capacity allows bigger bursts. |
None
|
Example
Allow 10 requests per second with burst of 20¶
limiter = RateLimiter(rate=10.0, capacity=20.0)
Allow 5 requests per second, no burst¶
limiter = RateLimiter(rate=5.0, capacity=5.0)
Source code in src/dspu/aio/rate_limiter.py
Functions¶
acquire
async
¶
Acquire tokens, waiting if necessary.
This method blocks until the requested number of tokens becomes available.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
tokens
|
float
|
Number of tokens to acquire (default: 1.0) |
1.0
|
Raises:
| Type | Description |
|---|---|
ValueError
|
If tokens is negative or exceeds capacity |
Source code in src/dspu/aio/rate_limiter.py
try_acquire
async
¶
Try to acquire tokens without waiting.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
tokens
|
float
|
Number of tokens to acquire (default: 1.0) |
1.0
|
Returns:
| Type | Description |
|---|---|
bool
|
True if tokens were acquired, False otherwise |
Raises:
| Type | Description |
|---|---|
ValueError
|
If tokens is negative or exceeds capacity |
Source code in src/dspu/aio/rate_limiter.py
Circuit Breaker¶
dspu.aio.circuit_breaker.CircuitBreaker
¶
CircuitBreaker(
*,
failure_threshold: int = 5,
success_threshold: int = 2,
timeout: float = 60.0,
exceptions: tuple[type[Exception], ...] = (Exception,),
on_state_change: Callable[
[CircuitState, CircuitState], Awaitable[None]
]
| None = None,
)
Async-safe circuit breaker for fault tolerance.
The circuit breaker prevents cascading failures by: - CLOSED: Normal operation, counting failures - OPEN: After threshold failures, blocking requests for timeout period - HALF_OPEN: After timeout, allowing test requests to check recovery
Attributes:
| Name | Type | Description |
|---|---|---|
failure_threshold |
Number of failures before opening circuit |
|
success_threshold |
Number of successes in half-open to close circuit |
|
timeout |
Seconds to wait before transitioning to half-open |
|
state |
CircuitState
|
Current circuit state |
failure_count |
int
|
Consecutive failures in closed state |
success_count |
int
|
Consecutive successes in half-open state |
last_failure_time |
int
|
Timestamp of last failure |
Initialize circuit breaker.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
failure_threshold
|
int
|
Failures before opening circuit (default: 5) |
5
|
success_threshold
|
int
|
Successes needed to close circuit (default: 2) |
2
|
timeout
|
float
|
Seconds to wait in open state (default: 60.0) |
60.0
|
exceptions
|
tuple[type[Exception], ...]
|
Tuple of exceptions to count as failures |
(Exception,)
|
on_state_change
|
Callable[[CircuitState, CircuitState], Awaitable[None]] | None
|
Optional async callback for state changes. Receives (old_state, new_state) as arguments. |
None
|
Example
Basic circuit breaker¶
cb = CircuitBreaker(failure_threshold=3, timeout=30.0)
With custom exception handling¶
cb = CircuitBreaker( failure_threshold=5, exceptions=(ConnectionError, TimeoutError) )
Source code in src/dspu/aio/circuit_breaker.py
Functions¶
call
async
¶
Call a function through the circuit breaker.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
func
|
Callable[..., Awaitable[T]]
|
The async function to call |
required |
*args
|
Any
|
Positional arguments for func |
()
|
**kwargs
|
Any
|
Keyword arguments for func |
{}
|
Returns:
| Type | Description |
|---|---|
T
|
Result of the function call |
Raises:
| Type | Description |
|---|---|
CircuitBreakerError
|
If circuit is open |
Exception
|
If function raises an exception |
Source code in src/dspu/aio/circuit_breaker.py
reset
async
¶
Manually reset the circuit breaker to closed state.
Concurrency¶
dspu.aio.concurrency.gather_with_limit
async
¶
gather_with_limit(
*awaitables: Awaitable[T],
limit: int,
return_exceptions: bool = False,
) -> list[T]
Gather awaitables with concurrency limit.
Similar to asyncio.gather() but limits the number of concurrent operations. This is useful for rate limiting or resource management.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
*awaitables
|
Awaitable[T]
|
Awaitable objects to execute |
()
|
limit
|
int
|
Maximum number of concurrent operations |
required |
return_exceptions
|
bool
|
If True, exceptions are returned as results instead of being raised (default: False) |
False
|
Returns:
| Type | Description |
|---|---|
list[T]
|
List of results in the same order as input awaitables |
Raises:
| Type | Description |
|---|---|
ValueError
|
If limit is not positive |
Exception
|
If any awaitable raises and return_exceptions is False |
Example
tasks = [fetch_url(url) for url in urls] results = await gather_with_limit(*tasks, limit=10)
With exception handling¶
results = await gather_with_limit( *tasks, limit=5, return_exceptions=True )
Source code in src/dspu/aio/concurrency.py
dspu.aio.concurrency.run_with_timeout
async
¶
Run an awaitable with a timeout.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
awaitable
|
Awaitable[T]
|
The awaitable to execute |
required |
timeout
|
float
|
Timeout in seconds |
required |
Returns:
| Type | Description |
|---|---|
T
|
Result of the awaitable |
Raises:
| Type | Description |
|---|---|
AsyncTimeoutError
|
If operation exceeds timeout |
Exception
|
If awaitable raises an exception |
Example
result = await run_with_timeout(slow_operation(), timeout=5.0)
Source code in src/dspu/aio/concurrency.py
dspu.aio.concurrency.run_with_shield
async
¶
Shield an awaitable from cancellation.
The awaitable will continue running even if the caller is cancelled. However, the awaitable can still be cancelled directly.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
awaitable
|
Awaitable[T]
|
The awaitable to shield |
required |
Returns:
| Type | Description |
|---|---|
T
|
Result of the awaitable |
Example
Critical operation that should complete¶
result = await run_with_shield(commit_transaction())
Source code in src/dspu/aio/concurrency.py
Sync/Async Bridge¶
dspu.aio.bridge.sync_to_async
¶
sync_to_async(
func: Callable[..., T],
*,
executor: ThreadPoolExecutor | None = None,
) -> Callable[..., Awaitable[T]]
Convert a sync function to async.
The sync function will run in a thread pool executor to avoid blocking the event loop.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
func
|
Callable[..., T]
|
The sync function to convert |
required |
executor
|
ThreadPoolExecutor | None
|
Optional executor. If None, uses default thread pool. |
None
|
Returns:
| Type | Description |
|---|---|
Callable[..., Awaitable[T]]
|
Async version of the function |
Example
def blocking_operation(data): # CPU-intensive or blocking I/O return process(data)
async_operation = sync_to_async(blocking_operation) result = await async_operation(data)
Source code in src/dspu/aio/bridge.py
dspu.aio.bridge.async_to_sync
¶
Convert an async function to sync.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
func
|
Callable[..., Awaitable[T]]
|
The async function to convert |
required |
Returns:
| Type | Description |
|---|---|
Callable[..., T]
|
Sync version of the function |
Warning
This should not be called from async code as it will create a new event loop. Use await instead.
Example
async def fetch_data(url): return await http_client.get(url)
sync_fetch = async_to_sync(fetch_data) result = sync_fetch("https://api.example.com/data")
Source code in src/dspu/aio/bridge.py
dspu.aio.bridge.run_async
¶
Run an async coroutine from synchronous code.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
coro
|
Awaitable[T]
|
The coroutine to run |
required |
loop
|
AbstractEventLoop | None
|
Optional event loop. If None, creates a new one. |
None
|
Returns:
| Type | Description |
|---|---|
T
|
Result of the coroutine |
Example
async def fetch_data(): return await api.get("/data")
Call from sync code¶
result = run_async(fetch_data())
Source code in src/dspu/aio/bridge.py
dspu.aio.bridge.iter_over_async
¶
Convert an async iterable to a sync iterator.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
async_iterable
|
Any
|
The async iterable to convert |
required |
loop
|
AbstractEventLoop | None
|
Optional event loop. If None, creates new one. |
None
|
Yields:
| Type | Description |
|---|---|
T
|
Items from the async iterable |
Example
async def async_generator(): for i in range(10): await asyncio.sleep(0.1) yield i
Use from sync code¶
for item in iter_over_async(async_generator()): print(item)
Source code in src/dspu/aio/bridge.py
Usage¶
from dspu.aio import retry, RateLimiter, CircuitBreaker
@retry(max_attempts=3, backoff="exponential")
async def fetch_data(url):
async with httpx.AsyncClient() as client:
return await client.get(url)
async def process_with_rate_limit():
limiter = RateLimiter(rate=10, per=1.0)
async with limiter:
await process_item()
circuit_breaker = CircuitBreaker(failure_threshold=5)
@circuit_breaker
async def call_api():
...