Skip to content

Async API Reference

Async utilities for resilient operations.

Retry

dspu.aio.retry.retry

retry(
    *,
    max_attempts: int = 3,
    strategy: BackoffStrategy = EXPONENTIAL,
    base_delay: float = 1.0,
    max_delay: float = 60.0,
    multiplier: float = 2.0,
    jitter: bool = True,
    exceptions: tuple[type[Exception], ...] = (Exception,),
    on_retry: Callable[[int, Exception], Awaitable[None]]
    | None = None,
) -> Callable[
    [Callable[..., Awaitable[T]]],
    Callable[..., Awaitable[T]],
]

Decorator to retry async functions with configurable backoff strategies.

Parameters:

Name Type Description Default
max_attempts int

Maximum number of attempts (default: 3)

3
strategy BackoffStrategy

Backoff strategy to use (default: EXPONENTIAL)

EXPONENTIAL
base_delay float

Base delay in seconds (default: 1.0)

1.0
max_delay float

Maximum delay in seconds (default: 60.0)

60.0
multiplier float

Multiplier for exponential/linear backoff (default: 2.0)

2.0
jitter bool

Whether to add random jitter (default: True)

True
exceptions tuple[type[Exception], ...]

Tuple of exception types to retry on (default: (Exception,))

(Exception,)
on_retry Callable[[int, Exception], Awaitable[None]] | None

Optional async callback function called on each retry. Receives attempt number and exception as arguments.

None

Returns:

Type Description
Callable[[Callable[..., Awaitable[T]]], Callable[..., Awaitable[T]]]

Decorated function with retry logic

Example

@retry(max_attempts=5, strategy=BackoffStrategy.EXPONENTIAL) async def fetch_data(): # May fail transiently return await api.get("/data")

@retry( max_attempts=3, strategy=BackoffStrategy.LINEAR, exceptions=(ConnectionError, TimeoutError) ) async def connect_to_service(): return await service.connect()

Rate Limiting

dspu.aio.rate_limiter.RateLimiter

RateLimiter(rate: float, *, capacity: float | None = None)

Async-safe rate limiter using token bucket algorithm.

The token bucket algorithm allows bursts up to the bucket capacity while maintaining an average rate over time.

Attributes:

Name Type Description
rate

Number of tokens added per second

capacity

Maximum number of tokens in the bucket

tokens

Current number of available tokens

last_update

Timestamp of last token update

Initialize rate limiter.

Parameters:

Name Type Description Default
rate float

Number of operations allowed per second

required
capacity float | None

Maximum burst capacity. If None, defaults to rate. A larger capacity allows bigger bursts.

None
Example

Allow 10 requests per second with burst of 20

limiter = RateLimiter(rate=10.0, capacity=20.0)

Allow 5 requests per second, no burst

limiter = RateLimiter(rate=5.0, capacity=5.0)

Source code in src/dspu/aio/rate_limiter.py
def __init__(self, rate: float, *, capacity: float | None = None) -> None:
    """Initialize rate limiter.

    Args:
        rate: Number of operations allowed per second
        capacity: Maximum burst capacity. If None, defaults to rate.
            A larger capacity allows bigger bursts.

    Example:
        >>> # Allow 10 requests per second with burst of 20
        >>> limiter = RateLimiter(rate=10.0, capacity=20.0)
        >>>
        >>> # Allow 5 requests per second, no burst
        >>> limiter = RateLimiter(rate=5.0, capacity=5.0)
    """
    if rate <= 0:
        msg = "Rate must be positive"
        raise ValueError(msg)

    self.rate = rate
    self.capacity = capacity if capacity is not None else rate
    self.tokens = self.capacity
    self.last_update = time.monotonic()
    self._lock = asyncio.Lock()

Functions

acquire async

acquire(tokens: float = 1.0) -> None

Acquire tokens, waiting if necessary.

This method blocks until the requested number of tokens becomes available.

Parameters:

Name Type Description Default
tokens float

Number of tokens to acquire (default: 1.0)

1.0

Raises:

Type Description
ValueError

If tokens is negative or exceeds capacity

Source code in src/dspu/aio/rate_limiter.py
async def acquire(self, tokens: float = 1.0) -> None:
    """Acquire tokens, waiting if necessary.

    This method blocks until the requested number of tokens becomes available.

    Args:
        tokens: Number of tokens to acquire (default: 1.0)

    Raises:
        ValueError: If tokens is negative or exceeds capacity
    """
    if tokens <= 0:
        msg = "Tokens must be positive"
        raise ValueError(msg)

    if tokens > self.capacity:
        msg = f"Requested {tokens} tokens exceeds capacity {self.capacity}"
        raise ValueError(msg)

    async with self._lock:
        while True:
            now = time.monotonic()
            elapsed = now - self.last_update

            # Add tokens based on elapsed time
            self.tokens = min(self.capacity, self.tokens + elapsed * self.rate)
            self.last_update = now

            if self.tokens >= tokens:
                # We have enough tokens
                self.tokens -= tokens
                return

            # Calculate how long to wait for enough tokens
            tokens_needed = tokens - self.tokens
            wait_time = tokens_needed / self.rate

            # Release lock while waiting
            await asyncio.sleep(wait_time)

try_acquire async

try_acquire(tokens: float = 1.0) -> bool

Try to acquire tokens without waiting.

Parameters:

Name Type Description Default
tokens float

Number of tokens to acquire (default: 1.0)

1.0

Returns:

Type Description
bool

True if tokens were acquired, False otherwise

Raises:

Type Description
ValueError

If tokens is negative or exceeds capacity

Source code in src/dspu/aio/rate_limiter.py
async def try_acquire(self, tokens: float = 1.0) -> bool:
    """Try to acquire tokens without waiting.

    Args:
        tokens: Number of tokens to acquire (default: 1.0)

    Returns:
        True if tokens were acquired, False otherwise

    Raises:
        ValueError: If tokens is negative or exceeds capacity
    """
    if tokens <= 0:
        msg = "Tokens must be positive"
        raise ValueError(msg)

    if tokens > self.capacity:
        msg = f"Requested {tokens} tokens exceeds capacity {self.capacity}"
        raise ValueError(msg)

    async with self._lock:
        now = time.monotonic()
        elapsed = now - self.last_update

        # Add tokens based on elapsed time
        self.tokens = min(self.capacity, self.tokens + elapsed * self.rate)
        self.last_update = now

        if self.tokens >= tokens:
            self.tokens -= tokens
            return True

        return False

Circuit Breaker

dspu.aio.circuit_breaker.CircuitBreaker

CircuitBreaker(
    *,
    failure_threshold: int = 5,
    success_threshold: int = 2,
    timeout: float = 60.0,
    exceptions: tuple[type[Exception], ...] = (Exception,),
    on_state_change: Callable[
        [CircuitState, CircuitState], Awaitable[None]
    ]
    | None = None,
)

Async-safe circuit breaker for fault tolerance.

The circuit breaker prevents cascading failures by: - CLOSED: Normal operation, counting failures - OPEN: After threshold failures, blocking requests for timeout period - HALF_OPEN: After timeout, allowing test requests to check recovery

Attributes:

Name Type Description
failure_threshold

Number of failures before opening circuit

success_threshold

Number of successes in half-open to close circuit

timeout

Seconds to wait before transitioning to half-open

state CircuitState

Current circuit state

failure_count int

Consecutive failures in closed state

success_count int

Consecutive successes in half-open state

last_failure_time int

Timestamp of last failure

Initialize circuit breaker.

Parameters:

Name Type Description Default
failure_threshold int

Failures before opening circuit (default: 5)

5
success_threshold int

Successes needed to close circuit (default: 2)

2
timeout float

Seconds to wait in open state (default: 60.0)

60.0
exceptions tuple[type[Exception], ...]

Tuple of exceptions to count as failures

(Exception,)
on_state_change Callable[[CircuitState, CircuitState], Awaitable[None]] | None

Optional async callback for state changes. Receives (old_state, new_state) as arguments.

None
Example

Basic circuit breaker

cb = CircuitBreaker(failure_threshold=3, timeout=30.0)

With custom exception handling

cb = CircuitBreaker( failure_threshold=5, exceptions=(ConnectionError, TimeoutError) )

Source code in src/dspu/aio/circuit_breaker.py
def __init__(
    self,
    *,
    failure_threshold: int = 5,
    success_threshold: int = 2,
    timeout: float = 60.0,
    exceptions: tuple[type[Exception], ...] = (Exception,),
    on_state_change: Callable[[CircuitState, CircuitState], Awaitable[None]] | None = None,
) -> None:
    """Initialize circuit breaker.

    Args:
        failure_threshold: Failures before opening circuit (default: 5)
        success_threshold: Successes needed to close circuit (default: 2)
        timeout: Seconds to wait in open state (default: 60.0)
        exceptions: Tuple of exceptions to count as failures
        on_state_change: Optional async callback for state changes.
            Receives (old_state, new_state) as arguments.

    Example:
        >>> # Basic circuit breaker
        >>> cb = CircuitBreaker(failure_threshold=3, timeout=30.0)
        >>>
        >>> # With custom exception handling
        >>> cb = CircuitBreaker(
        >>>     failure_threshold=5,
        >>>     exceptions=(ConnectionError, TimeoutError)
        >>> )
    """
    if failure_threshold <= 0:
        msg = "Failure threshold must be positive"
        raise ValueError(msg)

    if success_threshold <= 0:
        msg = "Success threshold must be positive"
        raise ValueError(msg)

    if timeout <= 0:
        msg = "Timeout must be positive"
        raise ValueError(msg)

    self.failure_threshold = failure_threshold
    self.success_threshold = success_threshold
    self.timeout = timeout
    self.exceptions = exceptions
    self.on_state_change = on_state_change

    self._state = CircuitState.CLOSED
    self._failure_count = 0
    self._success_count = 0
    self._last_failure_time: float | None = None
    self._lock = asyncio.Lock()

Functions

call async

call(
    func: Callable[..., Awaitable[T]],
    *args: Any,
    **kwargs: Any,
) -> T

Call a function through the circuit breaker.

Parameters:

Name Type Description Default
func Callable[..., Awaitable[T]]

The async function to call

required
*args Any

Positional arguments for func

()
**kwargs Any

Keyword arguments for func

{}

Returns:

Type Description
T

Result of the function call

Raises:

Type Description
CircuitBreakerError

If circuit is open

Exception

If function raises an exception

Source code in src/dspu/aio/circuit_breaker.py
async def call(
    self,
    func: Callable[..., Awaitable[T]],
    *args: Any,
    **kwargs: Any,
) -> T:
    """Call a function through the circuit breaker.

    Args:
        func: The async function to call
        *args: Positional arguments for func
        **kwargs: Keyword arguments for func

    Returns:
        Result of the function call

    Raises:
        CircuitBreakerError: If circuit is open
        Exception: If function raises an exception
    """
    async with self._lock:
        await self._check_state()

        # Block requests if circuit is open
        if self._state == CircuitState.OPEN:
            msg = f"Circuit breaker is open (failures: {self._failure_count})"
            raise CircuitBreakerError(
                msg,
                state=self._state,
                failure_count=self._failure_count,
            )

    # Try to execute the function
    try:
        result = await func(*args, **kwargs)

        # Success - update state
        async with self._lock:
            if self._state == CircuitState.HALF_OPEN:
                self._success_count += 1
                if self._success_count >= self.success_threshold:
                    await self._transition_to(CircuitState.CLOSED)
            elif self._state == CircuitState.CLOSED:
                # Reset failure count on success
                self._failure_count = 0

        return result

    except self.exceptions as e:
        # Failure - update state
        async with self._lock:
            self._last_failure_time = time.monotonic()

            if self._state == CircuitState.HALF_OPEN:
                # Failed test request - back to open
                await self._transition_to(CircuitState.OPEN)
            elif self._state == CircuitState.CLOSED:
                self._failure_count += 1
                if self._failure_count >= self.failure_threshold:
                    await self._transition_to(CircuitState.OPEN)

        raise e

reset async

reset() -> None

Manually reset the circuit breaker to closed state.

Source code in src/dspu/aio/circuit_breaker.py
async def reset(self) -> None:
    """Manually reset the circuit breaker to closed state."""
    async with self._lock:
        await self._transition_to(CircuitState.CLOSED)
        self._last_failure_time = None

Concurrency

dspu.aio.concurrency.gather_with_limit async

gather_with_limit(
    *awaitables: Awaitable[T],
    limit: int,
    return_exceptions: bool = False,
) -> list[T]

Gather awaitables with concurrency limit.

Similar to asyncio.gather() but limits the number of concurrent operations. This is useful for rate limiting or resource management.

Parameters:

Name Type Description Default
*awaitables Awaitable[T]

Awaitable objects to execute

()
limit int

Maximum number of concurrent operations

required
return_exceptions bool

If True, exceptions are returned as results instead of being raised (default: False)

False

Returns:

Type Description
list[T]

List of results in the same order as input awaitables

Raises:

Type Description
ValueError

If limit is not positive

Exception

If any awaitable raises and return_exceptions is False

Example

tasks = [fetch_url(url) for url in urls] results = await gather_with_limit(*tasks, limit=10)

With exception handling

results = await gather_with_limit( *tasks, limit=5, return_exceptions=True )

Source code in src/dspu/aio/concurrency.py
async def gather_with_limit(
    *awaitables: Awaitable[T],
    limit: int,
    return_exceptions: bool = False,
) -> list[T]:
    """Gather awaitables with concurrency limit.

    Similar to asyncio.gather() but limits the number of concurrent operations.
    This is useful for rate limiting or resource management.

    Args:
        *awaitables: Awaitable objects to execute
        limit: Maximum number of concurrent operations
        return_exceptions: If True, exceptions are returned as results instead
            of being raised (default: False)

    Returns:
        List of results in the same order as input awaitables

    Raises:
        ValueError: If limit is not positive
        Exception: If any awaitable raises and return_exceptions is False

    Example:
        >>> tasks = [fetch_url(url) for url in urls]
        >>> results = await gather_with_limit(*tasks, limit=10)
        >>>
        >>> # With exception handling
        >>> results = await gather_with_limit(
        >>>     *tasks,
        >>>     limit=5,
        >>>     return_exceptions=True
        >>> )
    """
    if limit <= 0:
        msg = "Limit must be positive"
        raise ValueError(msg)

    if not awaitables:
        return []

    # Semaphore to limit concurrency
    semaphore = asyncio.Semaphore(limit)

    async def limited_awaitable(awaitable: Awaitable[T]) -> T:
        """Wrap awaitable with semaphore."""
        async with semaphore:
            return await awaitable

    # Wrap all awaitables with semaphore
    limited = [limited_awaitable(aw) for aw in awaitables]

    # Gather all with exception handling
    return cast(list[T], await asyncio.gather(*limited, return_exceptions=return_exceptions))

dspu.aio.concurrency.run_with_timeout async

run_with_timeout(
    awaitable: Awaitable[T], *, timeout: float
) -> T

Run an awaitable with a timeout.

Parameters:

Name Type Description Default
awaitable Awaitable[T]

The awaitable to execute

required
timeout float

Timeout in seconds

required

Returns:

Type Description
T

Result of the awaitable

Raises:

Type Description
AsyncTimeoutError

If operation exceeds timeout

Exception

If awaitable raises an exception

Example

result = await run_with_timeout(slow_operation(), timeout=5.0)

Source code in src/dspu/aio/concurrency.py
async def run_with_timeout(
    awaitable: Awaitable[T],
    *,
    timeout: float,
) -> T:
    """Run an awaitable with a timeout.

    Args:
        awaitable: The awaitable to execute
        timeout: Timeout in seconds

    Returns:
        Result of the awaitable

    Raises:
        AsyncTimeoutError: If operation exceeds timeout
        Exception: If awaitable raises an exception

    Example:
        >>> result = await run_with_timeout(slow_operation(), timeout=5.0)
    """
    try:
        return await asyncio.wait_for(awaitable, timeout=timeout)
    except TimeoutError as e:
        msg = f"Operation timed out after {timeout} seconds"
        raise AsyncTimeoutError(msg, timeout=timeout) from e

dspu.aio.concurrency.run_with_shield async

run_with_shield(awaitable: Awaitable[T]) -> T

Shield an awaitable from cancellation.

The awaitable will continue running even if the caller is cancelled. However, the awaitable can still be cancelled directly.

Parameters:

Name Type Description Default
awaitable Awaitable[T]

The awaitable to shield

required

Returns:

Type Description
T

Result of the awaitable

Example

Critical operation that should complete

result = await run_with_shield(commit_transaction())

Source code in src/dspu/aio/concurrency.py
async def run_with_shield(awaitable: Awaitable[T]) -> T:
    """Shield an awaitable from cancellation.

    The awaitable will continue running even if the caller is cancelled.
    However, the awaitable can still be cancelled directly.

    Args:
        awaitable: The awaitable to shield

    Returns:
        Result of the awaitable

    Example:
        >>> # Critical operation that should complete
        >>> result = await run_with_shield(commit_transaction())
    """
    return await asyncio.shield(awaitable)

Sync/Async Bridge

dspu.aio.bridge.sync_to_async

sync_to_async(
    func: Callable[..., T],
    *,
    executor: ThreadPoolExecutor | None = None,
) -> Callable[..., Awaitable[T]]

Convert a sync function to async.

The sync function will run in a thread pool executor to avoid blocking the event loop.

Parameters:

Name Type Description Default
func Callable[..., T]

The sync function to convert

required
executor ThreadPoolExecutor | None

Optional executor. If None, uses default thread pool.

None

Returns:

Type Description
Callable[..., Awaitable[T]]

Async version of the function

Example

def blocking_operation(data): # CPU-intensive or blocking I/O return process(data)

async_operation = sync_to_async(blocking_operation) result = await async_operation(data)

Source code in src/dspu/aio/bridge.py
def sync_to_async(
    func: Callable[..., T],
    *,
    executor: ThreadPoolExecutor | None = None,
) -> Callable[..., Awaitable[T]]:
    """Convert a sync function to async.

    The sync function will run in a thread pool executor to avoid
    blocking the event loop.

    Args:
        func: The sync function to convert
        executor: Optional executor. If None, uses default thread pool.

    Returns:
        Async version of the function

    Example:
        >>> def blocking_operation(data):
        >>>     # CPU-intensive or blocking I/O
        >>>     return process(data)
        >>>
        >>> async_operation = sync_to_async(blocking_operation)
        >>> result = await async_operation(data)
    """

    @functools.wraps(func)
    async def wrapper(*args: Any, **kwargs: Any) -> T:
        loop = asyncio.get_running_loop()
        partial_func = functools.partial(func, *args, **kwargs)
        return await loop.run_in_executor(executor, partial_func)

    return wrapper

dspu.aio.bridge.async_to_sync

async_to_sync(
    func: Callable[..., Awaitable[T]],
) -> Callable[..., T]

Convert an async function to sync.

Parameters:

Name Type Description Default
func Callable[..., Awaitable[T]]

The async function to convert

required

Returns:

Type Description
Callable[..., T]

Sync version of the function

Warning

This should not be called from async code as it will create a new event loop. Use await instead.

Example

async def fetch_data(url): return await http_client.get(url)

sync_fetch = async_to_sync(fetch_data) result = sync_fetch("https://api.example.com/data")

Source code in src/dspu/aio/bridge.py
def async_to_sync(func: Callable[..., Awaitable[T]]) -> Callable[..., T]:
    """Convert an async function to sync.

    Args:
        func: The async function to convert

    Returns:
        Sync version of the function

    Warning:
        This should not be called from async code as it will create
        a new event loop. Use await instead.

    Example:
        >>> async def fetch_data(url):
        >>>     return await http_client.get(url)
        >>>
        >>> sync_fetch = async_to_sync(fetch_data)
        >>> result = sync_fetch("https://api.example.com/data")
    """

    @functools.wraps(func)
    def wrapper(*args: Any, **kwargs: Any) -> T:
        coro = func(*args, **kwargs)
        return run_async(coro)

    return wrapper

dspu.aio.bridge.run_async

run_async(
    coro: Awaitable[T],
    *,
    loop: AbstractEventLoop | None = None,
) -> T

Run an async coroutine from synchronous code.

Parameters:

Name Type Description Default
coro Awaitable[T]

The coroutine to run

required
loop AbstractEventLoop | None

Optional event loop. If None, creates a new one.

None

Returns:

Type Description
T

Result of the coroutine

Example

async def fetch_data(): return await api.get("/data")

Call from sync code

result = run_async(fetch_data())

Source code in src/dspu/aio/bridge.py
def run_async(
    coro: Awaitable[T],
    *,
    loop: asyncio.AbstractEventLoop | None = None,
) -> T:
    """Run an async coroutine from synchronous code.

    Args:
        coro: The coroutine to run
        loop: Optional event loop. If None, creates a new one.

    Returns:
        Result of the coroutine

    Example:
        >>> async def fetch_data():
        >>>     return await api.get("/data")
        >>>
        >>> # Call from sync code
        >>> result = run_async(fetch_data())
    """
    if loop is None:
        # Try to get running loop
        try:
            loop = asyncio.get_running_loop()
        except RuntimeError:
            # No running loop, create new one
            return asyncio.run(coro)  # type: ignore[arg-type]

    # We have a running loop, use run_until_complete
    return loop.run_until_complete(coro)

dspu.aio.bridge.iter_over_async

iter_over_async(
    async_iterable: Any,
    *,
    loop: AbstractEventLoop | None = None,
) -> Iterator[T]

Convert an async iterable to a sync iterator.

Parameters:

Name Type Description Default
async_iterable Any

The async iterable to convert

required
loop AbstractEventLoop | None

Optional event loop. If None, creates new one.

None

Yields:

Type Description
T

Items from the async iterable

Example

async def async_generator(): for i in range(10): await asyncio.sleep(0.1) yield i

Use from sync code

for item in iter_over_async(async_generator()): print(item)

Source code in src/dspu/aio/bridge.py
def iter_over_async(
    async_iterable: Any,
    *,
    loop: asyncio.AbstractEventLoop | None = None,
) -> Iterator[T]:
    """Convert an async iterable to a sync iterator.

    Args:
        async_iterable: The async iterable to convert
        loop: Optional event loop. If None, creates new one.

    Yields:
        Items from the async iterable

    Example:
        >>> async def async_generator():
        >>>     for i in range(10):
        >>>         await asyncio.sleep(0.1)
        >>>         yield i
        >>>
        >>> # Use from sync code
        >>> for item in iter_over_async(async_generator()):
        >>>     print(item)
    """
    if loop is None:
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        close_loop = True
    else:
        close_loop = False

    try:
        async_iter = async_iterable.__aiter__()

        while True:
            try:
                item = loop.run_until_complete(async_iter.__anext__())
                yield item
            except StopAsyncIteration:
                break

    finally:
        if close_loop:
            loop.close()

Usage

from dspu.aio import retry, RateLimiter, CircuitBreaker

@retry(max_attempts=3, backoff="exponential")
async def fetch_data(url):
    async with httpx.AsyncClient() as client:
        return await client.get(url)

async def process_with_rate_limit():
    limiter = RateLimiter(rate=10, per=1.0)
    async with limiter:
        await process_item()

circuit_breaker = CircuitBreaker(failure_threshold=5)

@circuit_breaker
async def call_api():
    ...

See Also