Skip to content

Security API Reference

Secret management, authentication, and encryption.

Secret Management

SecretManager

dspu.security.secrets.SecretManager

SecretManager(backend: SecretBackend)

Unified interface for secret management.

Provides a consistent API for accessing secrets from various backends (Vault, AWS Secrets Manager, environment variables, files).

Example

Auto-detect from environment

secrets = SecretManager.from_env() password = await secrets.get("database/password")

Explicit backend

from dspu.security.backends import EnvBackend secrets = SecretManager(EnvBackend(prefix="APP_")) api_key = await secrets.get("api/key")

Set secrets (if backend supports it)

await secrets.set("new/secret", "value")

Initialize secret manager with backend.

Parameters:

Name Type Description Default
backend SecretBackend

Secret backend implementation.

required
Source code in src/dspu/security/secrets.py
def __init__(self, backend: SecretBackend) -> None:
    """Initialize secret manager with backend.

    Args:
        backend: Secret backend implementation.
    """
    self._backend = backend

Functions

from_env classmethod

from_env(**kwargs: Any) -> SecretManager

Create SecretManager from environment configuration.

Auto-detects backend from environment variables: - VAULT_ADDR + VAULT_TOKEN -> Vault backend - AWS_SECRET_BACKEND=true -> AWS Secrets Manager - Otherwise -> Environment variable backend

Parameters:

Name Type Description Default
**kwargs Any

Backend-specific options.

{}

Returns:

Type Description
SecretManager

SecretManager instance with detected backend.

Example
With Vault

os.environ["VAULT_ADDR"] = "http://localhost:8200" os.environ["VAULT_TOKEN"] = "dev-token" secrets = SecretManager.from_env()

With environment variables

os.environ["SECRET_PREFIX"] = "APP_SECRET_" secrets = SecretManager.from_env()

Source code in src/dspu/security/secrets.py
@classmethod
def from_env(cls, **kwargs: Any) -> "SecretManager":
    """Create SecretManager from environment configuration.

    Auto-detects backend from environment variables:
    - VAULT_ADDR + VAULT_TOKEN -> Vault backend
    - AWS_SECRET_BACKEND=true -> AWS Secrets Manager
    - Otherwise -> Environment variable backend

    Args:
        **kwargs: Backend-specific options.

    Returns:
        SecretManager instance with detected backend.

    Example:
        >>> # With Vault
        >>> os.environ["VAULT_ADDR"] = "http://localhost:8200"
        >>> os.environ["VAULT_TOKEN"] = "dev-token"
        >>> secrets = SecretManager.from_env()
        >>>
        >>> # With environment variables
        >>> os.environ["SECRET_PREFIX"] = "APP_SECRET_"
        >>> secrets = SecretManager.from_env()
    """
    # Check for Vault
    vault_addr = os.getenv("VAULT_ADDR")
    vault_token = os.getenv("VAULT_TOKEN")

    if vault_addr and vault_token:
        try:
            from dspu.security.backends import VaultBackend
        except ImportError as e:
            raise ConfigurationError(
                "Vault backend requires hvac package",
                suggestion="Install with: pip install dspu[security]",
            ) from e

        backend: SecretBackend = VaultBackend(
            url=vault_addr,
            token=vault_token,
            **kwargs,
        )
        return cls(backend)

    # Check for AWS
    if os.getenv("AWS_SECRET_BACKEND", "").lower() == "true":
        try:
            from dspu.security.backends import AWSSecretBackend
        except ImportError as e:
            raise ConfigurationError(
                "AWS backend requires boto3 package",
                suggestion="Install with: pip install dspu[security]",
            ) from e

        region = os.getenv("AWS_REGION", "us-east-1")
        backend = AWSSecretBackend(region=region, **kwargs)
        return cls(backend)

    # Default to environment variables
    from dspu.security.backends import EnvBackend

    prefix = os.getenv("SECRET_PREFIX", "")
    backend = EnvBackend(prefix=prefix, **kwargs)
    return cls(backend)

from_vault classmethod

from_vault(
    url: str, token: str, **kwargs: Any
) -> SecretManager

Create SecretManager with Vault backend.

Parameters:

Name Type Description Default
url str

Vault server URL.

required
token str

Vault authentication token.

required
**kwargs Any

Additional Vault options.

{}

Returns:

Type Description
SecretManager

SecretManager with Vault backend.

Raises:

Type Description
ConfigurationError

If hvac package not installed.

Example

secrets = SecretManager.from_vault( ... url="http://localhost:8200", ... token="dev-token" ... ) password = await secrets.get("secret/database/password")

Source code in src/dspu/security/secrets.py
@classmethod
def from_vault(
    cls,
    url: str,
    token: str,
    **kwargs: Any,
) -> "SecretManager":
    """Create SecretManager with Vault backend.

    Args:
        url: Vault server URL.
        token: Vault authentication token.
        **kwargs: Additional Vault options.

    Returns:
        SecretManager with Vault backend.

    Raises:
        ConfigurationError: If hvac package not installed.

    Example:
        >>> secrets = SecretManager.from_vault(
        ...     url="http://localhost:8200",
        ...     token="dev-token"
        ... )
        >>> password = await secrets.get("secret/database/password")
    """
    try:
        from dspu.security.backends import VaultBackend
    except ImportError as e:
        raise ConfigurationError(
            "Vault backend requires hvac package",
            suggestion="Install with: pip install dspu[security]",
        ) from e

    backend = VaultBackend(url=url, token=token, **kwargs)
    return cls(backend)

from_aws classmethod

from_aws(
    region: str = "us-east-1", **kwargs: Any
) -> SecretManager

Create SecretManager with AWS Secrets Manager backend.

Parameters:

Name Type Description Default
region str

AWS region.

'us-east-1'
**kwargs Any

Additional AWS options.

{}

Returns:

Type Description
SecretManager

SecretManager with AWS backend.

Raises:

Type Description
ConfigurationError

If boto3 package not installed.

Example

secrets = SecretManager.from_aws(region="us-west-2") api_key = await secrets.get("prod/api/key")

Source code in src/dspu/security/secrets.py
@classmethod
def from_aws(cls, region: str = "us-east-1", **kwargs: Any) -> "SecretManager":
    """Create SecretManager with AWS Secrets Manager backend.

    Args:
        region: AWS region.
        **kwargs: Additional AWS options.

    Returns:
        SecretManager with AWS backend.

    Raises:
        ConfigurationError: If boto3 package not installed.

    Example:
        >>> secrets = SecretManager.from_aws(region="us-west-2")
        >>> api_key = await secrets.get("prod/api/key")
    """
    try:
        from dspu.security.backends import AWSSecretBackend
    except ImportError as e:
        raise ConfigurationError(
            "AWS backend requires boto3 package",
            suggestion="Install with: pip install boto3",
        ) from e

    backend = AWSSecretBackend(region=region, **kwargs)
    return cls(backend)

from_env_vars classmethod

from_env_vars(
    prefix: str = "", **kwargs: Any
) -> SecretManager

Create SecretManager with environment variable backend.

Parameters:

Name Type Description Default
prefix str

Prefix for environment variables.

''
**kwargs Any

Additional options.

{}

Returns:

Type Description
SecretManager

SecretManager with environment backend.

Example

secrets = SecretManager.from_env_vars(prefix="APP_SECRET_")

Gets from APP_SECRET_API_KEY environment variable

api_key = await secrets.get("api/key")

Source code in src/dspu/security/secrets.py
@classmethod
def from_env_vars(cls, prefix: str = "", **kwargs: Any) -> "SecretManager":
    """Create SecretManager with environment variable backend.

    Args:
        prefix: Prefix for environment variables.
        **kwargs: Additional options.

    Returns:
        SecretManager with environment backend.

    Example:
        >>> secrets = SecretManager.from_env_vars(prefix="APP_SECRET_")
        >>> # Gets from APP_SECRET_API_KEY environment variable
        >>> api_key = await secrets.get("api/key")
    """
    from dspu.security.backends import EnvBackend

    backend = EnvBackend(prefix=prefix, **kwargs)
    return cls(backend)

from_file classmethod

from_file(path: str, **kwargs: Any) -> SecretManager

Create SecretManager with file-based backend.

Parameters:

Name Type Description Default
path str

Path to secrets file (JSON or YAML).

required
**kwargs Any

Additional options.

{}

Returns:

Type Description
SecretManager

SecretManager with file backend.

Example

secrets = SecretManager.from_file("secrets.yaml") password = await secrets.get("database/password")

Source code in src/dspu/security/secrets.py
@classmethod
def from_file(cls, path: str, **kwargs: Any) -> "SecretManager":
    """Create SecretManager with file-based backend.

    Args:
        path: Path to secrets file (JSON or YAML).
        **kwargs: Additional options.

    Returns:
        SecretManager with file backend.

    Example:
        >>> secrets = SecretManager.from_file("secrets.yaml")
        >>> password = await secrets.get("database/password")
    """
    from dspu.security.backends import FileBackend

    backend = FileBackend(path=path, **kwargs)
    return cls(backend)

get async

get(key: str, default: str | None = None) -> str

Get a secret value.

Parameters:

Name Type Description Default
key str

Secret key/path.

required
default str | None

Default value if secret not found.

None

Returns:

Type Description
str

Secret value.

Raises:

Type Description
SecretNotFoundError

If secret not found and no default.

SecurityError

If access denied or backend error.

Example

password = await secrets.get("database/password") api_key = await secrets.get("api/key", default="dev-key")

Source code in src/dspu/security/secrets.py
async def get(self, key: str, default: str | None = None) -> str:
    """Get a secret value.

    Args:
        key: Secret key/path.
        default: Default value if secret not found.

    Returns:
        Secret value.

    Raises:
        SecretNotFoundError: If secret not found and no default.
        SecurityError: If access denied or backend error.

    Example:
        >>> password = await secrets.get("database/password")
        >>> api_key = await secrets.get("api/key", default="dev-key")
    """
    try:
        return await self._backend.get(key)
    except KeyError:
        if default is not None:
            return default
        raise SecretNotFoundError(
            f"Secret not found: {key}",
            key=key,
        ) from None

set async

set(key: str, value: str) -> None

Set a secret value.

Parameters:

Name Type Description Default
key str

Secret key/path.

required
value str

Secret value.

required

Raises:

Type Description
SecurityError

If write fails or not supported.

Example

await secrets.set("api/key", "new-secret-key")

Source code in src/dspu/security/secrets.py
async def set(self, key: str, value: str) -> None:
    """Set a secret value.

    Args:
        key: Secret key/path.
        value: Secret value.

    Raises:
        SecurityError: If write fails or not supported.

    Example:
        >>> await secrets.set("api/key", "new-secret-key")
    """
    await self._backend.set(key, value)

Secret Backends

dspu.security.backends.EnvBackend

EnvBackend(prefix: str = '', separator: str = '_')

Environment variable backend for secrets.

Reads secrets from environment variables with optional prefix. Key paths are converted to uppercase with underscores.

Example

backend = EnvBackend(prefix="APP_SECRET_")

Gets from APP_SECRET_API_KEY

api_key = await backend.get("api/key")

Initialize environment backend.

Parameters:

Name Type Description Default
prefix str

Prefix for all environment variables.

''
separator str

Separator for nested keys (default: _).

'_'
Source code in src/dspu/security/backends.py
def __init__(self, prefix: str = "", separator: str = "_") -> None:
    """Initialize environment backend.

    Args:
        prefix: Prefix for all environment variables.
        separator: Separator for nested keys (default: _).
    """
    self.prefix = prefix
    self.separator = separator

Functions

get async

get(key: str) -> str

Get secret from environment variable.

Source code in src/dspu/security/backends.py
async def get(self, key: str) -> str:
    """Get secret from environment variable."""
    env_key = self._key_to_env(key)
    value = os.getenv(env_key)

    if value is None:
        raise KeyError(f"Environment variable not found: {env_key}")

    return value

set async

set(key: str, value: str) -> None

Set environment variable (runtime only, not persistent).

Source code in src/dspu/security/backends.py
async def set(self, key: str, value: str) -> None:
    """Set environment variable (runtime only, not persistent)."""
    env_key = self._key_to_env(key)
    os.environ[env_key] = value

delete async

delete(key: str) -> None

Delete environment variable.

Source code in src/dspu/security/backends.py
async def delete(self, key: str) -> None:
    """Delete environment variable."""
    env_key = self._key_to_env(key)
    if env_key not in os.environ:
        raise KeyError(f"Environment variable not found: {env_key}")
    del os.environ[env_key]

exists async

exists(key: str) -> bool

Check if environment variable exists.

Source code in src/dspu/security/backends.py
async def exists(self, key: str) -> bool:
    """Check if environment variable exists."""
    env_key = self._key_to_env(key)
    return env_key in os.environ

list async

list(prefix: str = '') -> list[str]

List environment variables matching prefix.

Source code in src/dspu/security/backends.py
async def list(self, prefix: str = "") -> list[str]:
    """List environment variables matching prefix."""
    keys = []
    env_prefix = self._key_to_env(prefix) if prefix else self.prefix

    for env_name in os.environ:
        if env_name.startswith(env_prefix):
            key = self._env_to_key(env_name)
            if not prefix or key.startswith(prefix):
                keys.append(key)

    return sorted(keys)

dspu.security.backends.FileBackend

FileBackend(path: str | Path, auto_save: bool = True)

File-based backend for secrets (development only).

Reads secrets from a JSON or YAML file. This backend is intended for local development and testing only. DO NOT use in production.

Example

backend = FileBackend("secrets.yaml") password = await backend.get("database/password")

Initialize file backend.

Parameters:

Name Type Description Default
path str | Path

Path to secrets file (JSON or YAML).

required
auto_save bool

Auto-save changes to file (default: True).

True
Source code in src/dspu/security/backends.py
def __init__(self, path: str | Path, auto_save: bool = True) -> None:
    """Initialize file backend.

    Args:
        path: Path to secrets file (JSON or YAML).
        auto_save: Auto-save changes to file (default: True).
    """
    self.path = Path(path)
    self.auto_save = auto_save
    self._secrets: dict[str, Any] = {}
    self._loaded = False

Functions

get async

get(key: str) -> str

Get secret from file.

Source code in src/dspu/security/backends.py
async def get(self, key: str) -> str:
    """Get secret from file."""
    await self._load()
    return self._get_nested(key)

set async

set(key: str, value: str) -> None

Set secret in file.

Source code in src/dspu/security/backends.py
async def set(self, key: str, value: str) -> None:
    """Set secret in file."""
    await self._load()
    self._set_nested(key, value)
    await self._save()

delete async

delete(key: str) -> None

Delete secret from file.

Source code in src/dspu/security/backends.py
async def delete(self, key: str) -> None:
    """Delete secret from file."""
    await self._load()
    self._delete_nested(key)
    await self._save()

exists async

exists(key: str) -> bool

Check if secret exists in file.

Source code in src/dspu/security/backends.py
async def exists(self, key: str) -> bool:
    """Check if secret exists in file."""
    await self._load()
    try:
        self._get_nested(key)
        return True
    except KeyError:
        return False

list async

list(prefix: str = '') -> list[str]

List secret keys from file.

Source code in src/dspu/security/backends.py
async def list(self, prefix: str = "") -> list[str]:
    """List secret keys from file."""
    await self._load()

    def collect_keys(d: dict[str, Any], current_path: str = "") -> list[str]:
        keys = []
        for key, value in d.items():
            path = f"{current_path}/{key}" if current_path else key
            if isinstance(value, dict):
                keys.extend(collect_keys(value, path))
            else:
                keys.append(path)
        return keys

    all_keys = collect_keys(self._secrets)

    if prefix:
        return sorted([k for k in all_keys if k.startswith(prefix)])
    return sorted(all_keys)

dspu.security.backends.VaultBackend

VaultBackend(
    url: str,
    token: str,
    mount_point: str = "secret",
    **kwargs: Any,
)

HashiCorp Vault backend for secrets.

Connects to Vault server for secure secret storage and retrieval.

Example

backend = VaultBackend( ... url="http://localhost:8200", ... token="dev-token", ... mount_point="secret" ... ) password = await backend.get("database/password")

Initialize Vault backend.

Parameters:

Name Type Description Default
url str

Vault server URL.

required
token str

Vault authentication token.

required
mount_point str

Vault mount point (default: "secret").

'secret'
**kwargs Any

Additional hvac client options.

{}

Raises:

Type Description
ConfigurationError

If hvac not installed.

Source code in src/dspu/security/backends.py
def __init__(
    self,
    url: str,
    token: str,
    mount_point: str = "secret",
    **kwargs: Any,
) -> None:
    """Initialize Vault backend.

    Args:
        url: Vault server URL.
        token: Vault authentication token.
        mount_point: Vault mount point (default: "secret").
        **kwargs: Additional hvac client options.

    Raises:
        ConfigurationError: If hvac not installed.
    """
    try:
        import hvac
    except ImportError as e:
        from dspu.core.exceptions import ConfigurationError

        raise ConfigurationError(
            "Vault backend requires hvac package",
            suggestion="Install with: pip install hvac",
        ) from e

    self.url = url
    self.token = token
    self.mount_point = mount_point
    self._client = hvac.Client(url=url, token=token, **kwargs)

    # Verify connection
    if not self._client.is_authenticated():
        raise SecurityError(
            "Vault authentication failed",
            reason="Invalid token or server unreachable",
            resource=url,
        )

Functions

get async

get(key: str) -> str

Get secret from Vault.

Source code in src/dspu/security/backends.py
async def get(self, key: str) -> str:
    """Get secret from Vault."""
    try:
        response = self._client.secrets.kv.v2.read_secret_version(
            path=key,
            mount_point=self.mount_point,
        )
        data = response["data"]["data"]

        # If the secret has a 'value' field, return it
        # Otherwise return the entire data as JSON
        if "value" in data:
            return str(data["value"])
        return json.dumps(data)

    except Exception as e:
        if "404" in str(e) or "not found" in str(e).lower():
            raise KeyError(f"Secret not found: {key}") from e
        raise SecurityError(
            f"Failed to read secret from Vault: {e}",
            reason=str(e),
            resource=key,
        ) from e

set async

set(key: str, value: str) -> None

Set secret in Vault.

Source code in src/dspu/security/backends.py
async def set(self, key: str, value: str) -> None:
    """Set secret in Vault."""
    try:
        self._client.secrets.kv.v2.create_or_update_secret(
            path=key,
            secret={"value": value},
            mount_point=self.mount_point,
        )
    except Exception as e:
        raise SecurityError(
            f"Failed to write secret to Vault: {e}",
            reason=str(e),
            resource=key,
        ) from e

delete async

delete(key: str) -> None

Delete secret from Vault.

Source code in src/dspu/security/backends.py
async def delete(self, key: str) -> None:
    """Delete secret from Vault."""
    try:
        self._client.secrets.kv.v2.delete_metadata_and_all_versions(
            path=key,
            mount_point=self.mount_point,
        )
    except Exception as e:
        if "404" in str(e) or "not found" in str(e).lower():
            raise KeyError(f"Secret not found: {key}") from e
        raise SecurityError(
            f"Failed to delete secret from Vault: {e}",
            reason=str(e),
            resource=key,
        ) from e

exists async

exists(key: str) -> bool

Check if secret exists in Vault.

Source code in src/dspu/security/backends.py
async def exists(self, key: str) -> bool:
    """Check if secret exists in Vault."""
    try:
        await self.get(key)
        return True
    except KeyError:
        return False

list async

list(prefix: str = '') -> list[str]

List secret keys from Vault.

Source code in src/dspu/security/backends.py
async def list(self, prefix: str = "") -> list[str]:
    """List secret keys from Vault."""
    try:
        # List keys at the prefix path
        path = prefix.rstrip("/")
        response = self._client.secrets.kv.v2.list_secrets(
            path=path if path else "",
            mount_point=self.mount_point,
        )
        keys = response["data"]["keys"]

        # Add prefix back to keys
        if prefix:
            return sorted([f"{prefix}/{k}".rstrip("/") for k in keys])
        return sorted(keys)

    except Exception as e:
        if "404" in str(e) or "not found" in str(e).lower():
            return []
        raise SecurityError(
            f"Failed to list secrets from Vault: {e}",
            reason=str(e),
        ) from e

dspu.security.backends.AWSSecretBackend

AWSSecretBackend(region: str = 'us-east-1', **kwargs: Any)

AWS Secrets Manager backend.

Uses AWS Secrets Manager for secure secret storage.

Example

backend = AWSSecretBackend(region="us-west-2") api_key = await backend.get("prod/api/key")

Initialize AWS Secrets Manager backend.

Parameters:

Name Type Description Default
region str

AWS region.

'us-east-1'
**kwargs Any

Additional boto3 client options.

{}

Raises:

Type Description
ConfigurationError

If boto3 not installed.

Source code in src/dspu/security/backends.py
def __init__(self, region: str = "us-east-1", **kwargs: Any) -> None:
    """Initialize AWS Secrets Manager backend.

    Args:
        region: AWS region.
        **kwargs: Additional boto3 client options.

    Raises:
        ConfigurationError: If boto3 not installed.
    """
    try:
        import boto3
    except ImportError as e:
        from dspu.core.exceptions import ConfigurationError

        raise ConfigurationError(
            "AWS backend requires boto3 package",
            suggestion="Install with: pip install boto3",
        ) from e

    self.region = region
    self._client = boto3.client("secretsmanager", region_name=region, **kwargs)

Functions

get async

get(key: str) -> str

Get secret from AWS Secrets Manager.

Source code in src/dspu/security/backends.py
async def get(self, key: str) -> str:
    """Get secret from AWS Secrets Manager."""
    try:
        response = self._client.get_secret_value(SecretId=key)

        # Return the secret string
        if "SecretString" in response:
            return response["SecretString"]

        # For binary secrets, decode
        import base64

        return base64.b64decode(response["SecretBinary"]).decode("utf-8")

    except self._client.exceptions.ResourceNotFoundException:
        raise KeyError(f"Secret not found: {key}") from None
    except Exception as e:
        raise SecurityError(
            f"Failed to read secret from AWS: {e}",
            reason=str(e),
            resource=key,
        ) from e

set async

set(key: str, value: str) -> None

Set secret in AWS Secrets Manager.

Source code in src/dspu/security/backends.py
async def set(self, key: str, value: str) -> None:
    """Set secret in AWS Secrets Manager."""
    try:
        # Try to update existing secret
        self._client.update_secret(SecretId=key, SecretString=value)
    except self._client.exceptions.ResourceNotFoundException:
        # Create new secret if it doesn't exist
        try:
            self._client.create_secret(Name=key, SecretString=value)
        except Exception as e:
            raise SecurityError(
                f"Failed to create secret in AWS: {e}",
                reason=str(e),
                resource=key,
            ) from e
    except Exception as e:
        raise SecurityError(
            f"Failed to write secret to AWS: {e}",
            reason=str(e),
            resource=key,
        ) from e

delete async

delete(key: str) -> None

Delete secret from AWS Secrets Manager.

Source code in src/dspu/security/backends.py
async def delete(self, key: str) -> None:
    """Delete secret from AWS Secrets Manager."""
    try:
        self._client.delete_secret(
            SecretId=key,
            ForceDeleteWithoutRecovery=True,
        )
    except self._client.exceptions.ResourceNotFoundException:
        raise KeyError(f"Secret not found: {key}") from None
    except Exception as e:
        raise SecurityError(
            f"Failed to delete secret from AWS: {e}",
            reason=str(e),
            resource=key,
        ) from e

exists async

exists(key: str) -> bool

Check if secret exists in AWS Secrets Manager.

Source code in src/dspu/security/backends.py
async def exists(self, key: str) -> bool:
    """Check if secret exists in AWS Secrets Manager."""
    try:
        self._client.describe_secret(SecretId=key)
        return True
    except self._client.exceptions.ResourceNotFoundException:
        return False
    except Exception:
        return False

list async

list(prefix: str = '') -> list[str]

List secret keys from AWS Secrets Manager.

Source code in src/dspu/security/backends.py
async def list(self, prefix: str = "") -> list[str]:
    """List secret keys from AWS Secrets Manager."""
    try:
        secrets = []
        paginator = self._client.get_paginator("list_secrets")

        for page in paginator.paginate():
            for secret in page["SecretList"]:
                name = secret["Name"]
                if not prefix or name.startswith(prefix):
                    secrets.append(name)

        return sorted(secrets)

    except Exception as e:
        raise SecurityError(
            f"Failed to list secrets from AWS: {e}",
            reason=str(e),
        ) from e

Token Rotation

dspu.security.rotating_token.RotatingToken

RotatingToken(
    fetch_fn: Callable[[], Awaitable[TokenData]],
    refresh_interval: float | None = None,
    refresh_before: float = 300.0,
    on_refresh: Callable[[TokenData], Awaitable[None]]
    | None = None,
)

Automatic token rotation context manager.

Manages tokens that need periodic refresh, automatically refreshing before expiration and providing the current valid token.

Example

async def fetch_token() -> TokenData: ... # Get token from auth service ... token = await auth.get_token() ... expires_at = time.time() + 3600 # 1 hour ... return TokenData(token, expires_at=expires_at)

async with RotatingToken( ... fetch_fn=fetch_token, ... refresh_interval=3600, ... refresh_before=300, # 5 minutes before expiry ... ) as rotating_token: ... # Use current token ... api_client.set_token(rotating_token.current) ... ... # Token will auto-refresh when needed ... await api_client.make_request()

Initialize rotating token manager.

Parameters:

Name Type Description Default
fetch_fn Callable[[], Awaitable[TokenData]]

Async function that returns TokenData.

required
refresh_interval float | None

Interval in seconds to refresh (None = use token expiry).

None
refresh_before float

Seconds before expiry to refresh (default: 300).

300.0
on_refresh Callable[[TokenData], Awaitable[None]] | None

Optional callback called after token refresh.

None
Example

token_manager = RotatingToken( ... fetch_fn=lambda: auth.get_token(), ... refresh_interval=3600, ... refresh_before=300, ... )

Source code in src/dspu/security/rotating_token.py
def __init__(
    self,
    fetch_fn: Callable[[], Awaitable[TokenData]],
    refresh_interval: float | None = None,
    refresh_before: float = 300.0,
    on_refresh: Callable[[TokenData], Awaitable[None]] | None = None,
) -> None:
    """Initialize rotating token manager.

    Args:
        fetch_fn: Async function that returns TokenData.
        refresh_interval: Interval in seconds to refresh (None = use token expiry).
        refresh_before: Seconds before expiry to refresh (default: 300).
        on_refresh: Optional callback called after token refresh.

    Example:
        >>> token_manager = RotatingToken(
        ...     fetch_fn=lambda: auth.get_token(),
        ...     refresh_interval=3600,
        ...     refresh_before=300,
        ... )
    """
    self._fetch_fn = fetch_fn
    self._refresh_interval = refresh_interval
    self._refresh_before = refresh_before
    self._on_refresh = on_refresh

    self._token_data: TokenData | None = None
    self._refresh_task: asyncio.Task[None] | None = None
    self._stop_event = asyncio.Event()

Attributes

current property

current: str

Get current valid token.

Returns:

Type Description
str

Current token value.

Raises:

Type Description
TokenExpiredError

If token is expired and refresh hasn't completed.

RuntimeError

If token not initialized.

Example

token = rotating_token.current api_client.set_token(token)

Functions

refresh async

refresh() -> None

Manually trigger token refresh.

Example

await rotating_token.refresh()

Source code in src/dspu/security/rotating_token.py
async def refresh(self) -> None:
    """Manually trigger token refresh.

    Example:
        >>> await rotating_token.refresh()
    """
    self._token_data = await self._fetch_fn()

    # Call refresh callback if provided
    if self._on_refresh is not None and self._token_data is not None:
        await self._on_refresh(self._token_data)

is_expired

is_expired() -> bool

Check if current token is expired.

Returns:

Type Description
bool

True if expired or not initialized, False otherwise.

Source code in src/dspu/security/rotating_token.py
def is_expired(self) -> bool:
    """Check if current token is expired.

    Returns:
        True if expired or not initialized, False otherwise.
    """
    if self._token_data is None:
        return True
    return self._token_data.is_expired()

dspu.security.rotating_token.simple_rotating_token async

simple_rotating_token(
    fetch_fn: Callable[[], Awaitable[str]],
    refresh_interval: float,
) -> RotatingToken

Create a simple rotating token with just a string fetch function.

Convenience function for cases where you just need periodic refresh without expiry tracking.

Parameters:

Name Type Description Default
fetch_fn Callable[[], Awaitable[str]]

Async function that returns token string.

required
refresh_interval float

How often to refresh in seconds.

required

Returns:

Type Description
RotatingToken

RotatingToken instance.

Example

async def get_token() -> str: ... return await auth_service.get_token()

async with simple_rotating_token(get_token, 3600) as token: ... api.set_token(token.current)

Source code in src/dspu/security/rotating_token.py
async def simple_rotating_token(
    fetch_fn: Callable[[], Awaitable[str]],
    refresh_interval: float,
) -> RotatingToken:
    """Create a simple rotating token with just a string fetch function.

    Convenience function for cases where you just need periodic refresh
    without expiry tracking.

    Args:
        fetch_fn: Async function that returns token string.
        refresh_interval: How often to refresh in seconds.

    Returns:
        RotatingToken instance.

    Example:
        >>> async def get_token() -> str:
        ...     return await auth_service.get_token()
        >>>
        >>> async with simple_rotating_token(get_token, 3600) as token:
        ...     api.set_token(token.current)
    """

    async def fetch_token_data() -> TokenData:
        token = await fetch_fn()
        return TokenData(token=token)

    return RotatingToken(
        fetch_fn=fetch_token_data,
        refresh_interval=refresh_interval,
    )

Authentication

Providers

dspu.security.auth.StaticTokenProvider

StaticTokenProvider(token: str)

Static token authentication provider.

Simple provider that returns a fixed token. Useful for development and services that use long-lived API keys.

Example

provider = StaticTokenProvider(token="my-api-key-123") token = await provider.get_token() print(token) # "my-api-key-123"

Initialize static token provider.

Parameters:

Name Type Description Default
token str

Static token value.

required
Source code in src/dspu/security/auth.py
def __init__(self, token: str) -> None:
    """Initialize static token provider.

    Args:
        token: Static token value.
    """
    self._token = token

Functions

get_token async

get_token(scopes: list[str] | None = None) -> str

Get the static token.

Parameters:

Name Type Description Default
scopes list[str] | None

Ignored for static tokens.

None

Returns:

Type Description
str

Static token value.

Source code in src/dspu/security/auth.py
async def get_token(self, scopes: list[str] | None = None) -> str:
    """Get the static token.

    Args:
        scopes: Ignored for static tokens.

    Returns:
        Static token value.
    """
    return self._token

refresh_token async

refresh_token() -> str

Refresh token (returns same token for static provider).

Returns:

Type Description
str

Static token value.

Source code in src/dspu/security/auth.py
async def refresh_token(self) -> str:
    """Refresh token (returns same token for static provider).

    Returns:
        Static token value.
    """
    return self._token

revoke_token async

revoke_token() -> None

Revoke token (no-op for static provider).

Source code in src/dspu/security/auth.py
async def revoke_token(self) -> None:
    """Revoke token (no-op for static provider)."""
    pass

validate_token async

validate_token(token: str) -> bool

Validate token.

Parameters:

Name Type Description Default
token str

Token to validate.

required

Returns:

Type Description
bool

True if matches static token, False otherwise.

Source code in src/dspu/security/auth.py
async def validate_token(self, token: str) -> bool:
    """Validate token.

    Args:
        token: Token to validate.

    Returns:
        True if matches static token, False otherwise.
    """
    return token == self._token

dspu.security.auth.OAuth2Provider

OAuth2Provider(
    client_id: str,
    client_secret: str,
    token_url: str,
    **kwargs: Any,
)

OAuth2 authentication provider.

Implements OAuth2 client credentials flow for service-to-service authentication.

Example

provider = OAuth2Provider( ... client_id="my-client-id", ... client_secret="my-client-secret", ... token_url="https://auth.example.com/oauth/token" ... ) token = await provider.get_token(scopes=["read", "write"])

Initialize OAuth2 provider.

Parameters:

Name Type Description Default
client_id str

OAuth2 client ID.

required
client_secret str

OAuth2 client secret.

required
token_url str

Token endpoint URL.

required
**kwargs Any

Additional httpx options.

{}

Raises:

Type Description
ConfigurationError

If httpx not installed.

Source code in src/dspu/security/auth.py
def __init__(
    self,
    client_id: str,
    client_secret: str,
    token_url: str,
    **kwargs: Any,
) -> None:
    """Initialize OAuth2 provider.

    Args:
        client_id: OAuth2 client ID.
        client_secret: OAuth2 client secret.
        token_url: Token endpoint URL.
        **kwargs: Additional httpx options.

    Raises:
        ConfigurationError: If httpx not installed.
    """
    try:
        import httpx

        self._httpx = httpx
    except ImportError as e:
        raise ConfigurationError(
            "OAuth2 provider requires httpx package",
            suggestion="Install with: pip install dspu[async]",
        ) from e

    self.client_id = client_id
    self.client_secret = client_secret
    self.token_url = token_url
    self.kwargs = kwargs

    self._current_token: TokenData | None = None

Functions

get_token async

get_token(scopes: list[str] | None = None) -> str

Get OAuth2 access token.

Parameters:

Name Type Description Default
scopes list[str] | None

OAuth2 scopes to request.

None

Returns:

Type Description
str

Access token.

Raises:

Type Description
AuthenticationError

If token request fails.

Source code in src/dspu/security/auth.py
async def get_token(self, scopes: list[str] | None = None) -> str:
    """Get OAuth2 access token.

    Args:
        scopes: OAuth2 scopes to request.

    Returns:
        Access token.

    Raises:
        AuthenticationError: If token request fails.
    """
    # Check if we have a valid cached token
    if self._current_token is not None and not self._current_token.is_expired():
        # Check if scopes match
        if scopes is None or set(scopes).issubset(set(self._current_token.scopes)):
            return self._current_token.token

    # Request new token
    await self._request_token(scopes)

    if self._current_token is None:
        raise AuthenticationError(
            "Failed to obtain OAuth2 token",
            provider="oauth2",
        )

    return self._current_token.token

refresh_token async

refresh_token() -> str

Refresh OAuth2 token.

Returns:

Type Description
str

New access token.

Raises:

Type Description
AuthenticationError

If refresh fails.

Source code in src/dspu/security/auth.py
async def refresh_token(self) -> str:
    """Refresh OAuth2 token.

    Returns:
        New access token.

    Raises:
        AuthenticationError: If refresh fails.
    """
    scopes = self._current_token.scopes if self._current_token else None
    await self._request_token(scopes)

    if self._current_token is None:
        raise AuthenticationError(
            "Failed to refresh OAuth2 token",
            provider="oauth2",
        )

    return self._current_token.token

revoke_token async

revoke_token() -> None

Revoke OAuth2 token (no-op, tokens expire naturally).

Source code in src/dspu/security/auth.py
async def revoke_token(self) -> None:
    """Revoke OAuth2 token (no-op, tokens expire naturally)."""
    self._current_token = None

validate_token async

validate_token(token: str) -> bool

Validate OAuth2 token.

Parameters:

Name Type Description Default
token str

Token to validate.

required

Returns:

Type Description
bool

True if token matches current valid token.

Source code in src/dspu/security/auth.py
async def validate_token(self, token: str) -> bool:
    """Validate OAuth2 token.

    Args:
        token: Token to validate.

    Returns:
        True if token matches current valid token.
    """
    if self._current_token is None:
        return False

    return token == self._current_token.token and not self._current_token.is_expired()

dspu.security.auth.JWTProvider

JWTProvider(
    secret_key: str,
    algorithm: str = "HS256",
    issuer: str | None = None,
    audience: str | None = None,
    expiry_seconds: int = 3600,
    **kwargs: Any,
)

JWT (JSON Web Token) authentication provider.

Creates and validates JWT tokens for authentication.

Example

provider = JWTProvider( ... secret_key="my-secret-key", ... algorithm="HS256" ... ) token = await provider.get_token()

Initialize JWT provider.

Parameters:

Name Type Description Default
secret_key str

Secret key for signing JWTs.

required
algorithm str

JWT algorithm (default: HS256).

'HS256'
issuer str | None

Token issuer claim.

None
audience str | None

Token audience claim.

None
expiry_seconds int

Token expiry time in seconds (default: 3600).

3600
**kwargs Any

Additional JWT claims.

{}

Raises:

Type Description
ConfigurationError

If pyjwt not installed.

Source code in src/dspu/security/auth.py
def __init__(
    self,
    secret_key: str,
    algorithm: str = "HS256",
    issuer: str | None = None,
    audience: str | None = None,
    expiry_seconds: int = 3600,
    **kwargs: Any,
) -> None:
    """Initialize JWT provider.

    Args:
        secret_key: Secret key for signing JWTs.
        algorithm: JWT algorithm (default: HS256).
        issuer: Token issuer claim.
        audience: Token audience claim.
        expiry_seconds: Token expiry time in seconds (default: 3600).
        **kwargs: Additional JWT claims.

    Raises:
        ConfigurationError: If pyjwt not installed.
    """
    try:
        import jwt

        self._jwt = jwt
    except ImportError as e:
        raise ConfigurationError(
            "JWT provider requires pyjwt package",
            suggestion="Install with: pip install dspu[security]",
        ) from e

    self.secret_key = secret_key
    self.algorithm = algorithm
    self.issuer = issuer
    self.audience = audience
    self.expiry_seconds = expiry_seconds
    self.kwargs = kwargs

    self._current_token: TokenData | None = None

Functions

get_token async

get_token(scopes: list[str] | None = None) -> str

Generate JWT token.

Parameters:

Name Type Description Default
scopes list[str] | None

Token scopes/permissions.

None

Returns:

Type Description
str

JWT token string.

Example

token = await provider.get_token(scopes=["read", "write"])

Source code in src/dspu/security/auth.py
async def get_token(self, scopes: list[str] | None = None) -> str:
    """Generate JWT token.

    Args:
        scopes: Token scopes/permissions.

    Returns:
        JWT token string.

    Example:
        >>> token = await provider.get_token(scopes=["read", "write"])
    """
    # Check if we have a valid cached token
    if self._current_token is not None and not self._current_token.is_expired():
        if scopes is None or set(scopes).issubset(set(self._current_token.scopes)):
            return self._current_token.token

    # Generate new token
    await self._generate_token(scopes)

    if self._current_token is None:
        raise AuthenticationError(
            "Failed to generate JWT token",
            provider="jwt",
        )

    return self._current_token.token

refresh_token async

refresh_token() -> str

Generate new JWT token.

Returns:

Type Description
str

New JWT token.

Source code in src/dspu/security/auth.py
async def refresh_token(self) -> str:
    """Generate new JWT token.

    Returns:
        New JWT token.
    """
    scopes = self._current_token.scopes if self._current_token else None
    await self._generate_token(scopes)

    if self._current_token is None:
        raise AuthenticationError(
            "Failed to refresh JWT token",
            provider="jwt",
        )

    return self._current_token.token

revoke_token async

revoke_token() -> None

Revoke JWT token (clears cached token).

Note: JWTs are stateless, so this only clears the local cache. The token will still be valid until it expires.

Source code in src/dspu/security/auth.py
async def revoke_token(self) -> None:
    """Revoke JWT token (clears cached token).

    Note: JWTs are stateless, so this only clears the local cache.
    The token will still be valid until it expires.
    """
    self._current_token = None

validate_token async

validate_token(token: str) -> bool

Validate JWT token.

Parameters:

Name Type Description Default
token str

JWT token to validate.

required

Returns:

Type Description
bool

True if token is valid, False otherwise.

Source code in src/dspu/security/auth.py
async def validate_token(self, token: str) -> bool:
    """Validate JWT token.

    Args:
        token: JWT token to validate.

    Returns:
        True if token is valid, False otherwise.
    """
    try:
        payload = self._jwt.decode(
            token,
            self.secret_key,
            algorithms=[self.algorithm],
            audience=self.audience,
            issuer=self.issuer,
        )

        # Check expiry
        exp = payload.get("exp")
        if exp and time.time() >= exp:
            return False

        return True

    except Exception:
        return False

decode_token

decode_token(token: str) -> dict[str, Any]

Decode JWT token without validation.

Parameters:

Name Type Description Default
token str

JWT token to decode.

required

Returns:

Type Description
dict[str, Any]

Decoded payload.

Example

payload = provider.decode_token(token) print(payload["scopes"])

Source code in src/dspu/security/auth.py
def decode_token(self, token: str) -> dict[str, Any]:
    """Decode JWT token without validation.

    Args:
        token: JWT token to decode.

    Returns:
        Decoded payload.

    Example:
        >>> payload = provider.decode_token(token)
        >>> print(payload["scopes"])
    """
    return self._jwt.decode(
        token,
        options={"verify_signature": False},
    )

dspu.security.auth.create_auth_provider

create_auth_provider(
    provider_type: str, **kwargs: Any
) -> Any

Create authentication provider by type.

Factory function to create appropriate auth provider based on type.

Parameters:

Name Type Description Default
provider_type str

Provider type ("static", "oauth2", "jwt").

required
**kwargs Any

Provider-specific configuration.

{}

Returns:

Type Description
Any

Authentication provider instance.

Raises:

Type Description
ConfigurationError

If provider type is unknown.

Example

Static token

provider = create_auth_provider("static", token="api-key-123")

OAuth2

provider = create_auth_provider( ... "oauth2", ... client_id="id", ... client_secret="secret", ... token_url="https://auth.example.com/token" ... )

JWT

provider = create_auth_provider( ... "jwt", ... secret_key="secret", ... algorithm="HS256" ... )

Source code in src/dspu/security/auth.py
def create_auth_provider(provider_type: str, **kwargs: Any) -> Any:
    """Create authentication provider by type.

    Factory function to create appropriate auth provider based on type.

    Args:
        provider_type: Provider type ("static", "oauth2", "jwt").
        **kwargs: Provider-specific configuration.

    Returns:
        Authentication provider instance.

    Raises:
        ConfigurationError: If provider type is unknown.

    Example:
        >>> # Static token
        >>> provider = create_auth_provider("static", token="api-key-123")
        >>>
        >>> # OAuth2
        >>> provider = create_auth_provider(
        ...     "oauth2",
        ...     client_id="id",
        ...     client_secret="secret",
        ...     token_url="https://auth.example.com/token"
        ... )
        >>>
        >>> # JWT
        >>> provider = create_auth_provider(
        ...     "jwt",
        ...     secret_key="secret",
        ...     algorithm="HS256"
        ... )
    """
    if provider_type == "static":
        if "token" not in kwargs:
            raise ConfigurationError(
                "Static provider requires 'token' parameter",
                field="token",
            )
        return StaticTokenProvider(**kwargs)

    if provider_type == "oauth2":
        required = ["client_id", "client_secret", "token_url"]
        missing = [f for f in required if f not in kwargs]
        if missing:
            raise ConfigurationError(
                f"OAuth2 provider requires: {', '.join(missing)}",
                field=missing[0],
            )
        return OAuth2Provider(**kwargs)

    if provider_type == "jwt":
        if "secret_key" not in kwargs:
            raise ConfigurationError(
                "JWT provider requires 'secret_key' parameter",
                field="secret_key",
            )
        return JWTProvider(**kwargs)

    raise ConfigurationError(
        f"Unknown auth provider type: {provider_type}",
        suggestion="Supported types: static, oauth2, jwt",
    )

Encryption

Symmetric Encryption

dspu.security.encryption.Fernet

Fernet(key: bytes | str)

Symmetric encryption using Fernet (AES-128-CBC).

Provides simple symmetric encryption for data at rest. Uses the cryptography library's Fernet implementation.

Example

Generate a key

key = Fernet.generate_key()

Encrypt data

fernet = Fernet(key) encrypted = fernet.encrypt(b"secret data")

Decrypt data

decrypted = fernet.decrypt(encrypted) print(decrypted) # b"secret data"

Initialize Fernet cipher with key.

Parameters:

Name Type Description Default
key bytes | str

Encryption key (32 bytes or base64-encoded string).

required

Raises:

Type Description
ConfigurationError

If cryptography not installed.

EncryptionError

If key is invalid.

Source code in src/dspu/security/encryption.py
def __init__(self, key: bytes | str) -> None:
    """Initialize Fernet cipher with key.

    Args:
        key: Encryption key (32 bytes or base64-encoded string).

    Raises:
        ConfigurationError: If cryptography not installed.
        EncryptionError: If key is invalid.
    """
    try:
        from cryptography.fernet import Fernet as CryptoFernet

        self._fernet_class = CryptoFernet
    except ImportError as e:
        raise ConfigurationError(
            "Encryption requires cryptography package",
            suggestion="Install with: pip install dspu[security]",
        ) from e

    # Convert string key to bytes if needed
    if isinstance(key, str):
        key = key.encode("utf-8")

    try:
        self._cipher = self._fernet_class(key)
    except Exception as e:
        raise EncryptionError(
            f"Invalid encryption key: {e}",
            algorithm="fernet",
        ) from e

Functions

encrypt

encrypt(data: bytes | str) -> bytes

Encrypt data.

Parameters:

Name Type Description Default
data bytes | str

Data to encrypt (bytes or string).

required

Returns:

Type Description
bytes

Encrypted data (base64 encoded).

Example

encrypted = fernet.encrypt(b"secret") encrypted = fernet.encrypt("secret") # Also works

Source code in src/dspu/security/encryption.py
def encrypt(self, data: bytes | str) -> bytes:
    """Encrypt data.

    Args:
        data: Data to encrypt (bytes or string).

    Returns:
        Encrypted data (base64 encoded).

    Example:
        >>> encrypted = fernet.encrypt(b"secret")
        >>> encrypted = fernet.encrypt("secret")  # Also works
    """
    if isinstance(data, str):
        data = data.encode("utf-8")

    try:
        return self._cipher.encrypt(data)
    except Exception as e:
        raise EncryptionError(
            f"Encryption failed: {e}",
            operation="encrypt",
            algorithm="fernet",
        ) from e

decrypt

decrypt(encrypted_data: bytes) -> bytes

Decrypt data.

Parameters:

Name Type Description Default
encrypted_data bytes

Encrypted data to decrypt.

required

Returns:

Type Description
bytes

Decrypted data as bytes.

Example

decrypted = fernet.decrypt(encrypted) print(decrypted.decode()) # Convert to string

Source code in src/dspu/security/encryption.py
def decrypt(self, encrypted_data: bytes) -> bytes:
    """Decrypt data.

    Args:
        encrypted_data: Encrypted data to decrypt.

    Returns:
        Decrypted data as bytes.

    Example:
        >>> decrypted = fernet.decrypt(encrypted)
        >>> print(decrypted.decode())  # Convert to string
    """
    try:
        return self._cipher.decrypt(encrypted_data)
    except Exception as e:
        raise EncryptionError(
            f"Decryption failed: {e}",
            operation="decrypt",
            algorithm="fernet",
        ) from e

dspu.security.encryption.AES

AES(key: bytes)

AES encryption utilities.

Provides AES-256-GCM encryption for data at rest and in transit.

Example

Generate a key

key = AES.generate_key()

Encrypt data

aes = AES(key) encrypted, nonce = aes.encrypt(b"secret data")

Decrypt data

decrypted = aes.decrypt(encrypted, nonce)

Initialize AES cipher with key.

Parameters:

Name Type Description Default
key bytes

32-byte encryption key.

required

Raises:

Type Description
ConfigurationError

If cryptography not installed.

EncryptionError

If key is invalid.

Source code in src/dspu/security/encryption.py
def __init__(self, key: bytes) -> None:
    """Initialize AES cipher with key.

    Args:
        key: 32-byte encryption key.

    Raises:
        ConfigurationError: If cryptography not installed.
        EncryptionError: If key is invalid.
    """
    try:
        from cryptography.hazmat.primitives.ciphers.aead import (
            AESGCM,
        )

        self._aesgcm_class = AESGCM
    except ImportError as e:
        raise ConfigurationError(
            "Encryption requires cryptography package",
            suggestion="Install with: pip install dspu[security]",
        ) from e

    if len(key) != 32:
        raise EncryptionError(
            "AES key must be 32 bytes",
            algorithm="aes-256-gcm",
        )

    self._cipher = self._aesgcm_class(key)

Functions

generate_key staticmethod

generate_key() -> bytes

Generate a new AES-256 key.

Returns:

Type Description
bytes

32-byte encryption key.

Example

key = AES.generate_key()

Source code in src/dspu/security/encryption.py
@staticmethod
def generate_key() -> bytes:
    """Generate a new AES-256 key.

    Returns:
        32-byte encryption key.

    Example:
        >>> key = AES.generate_key()
    """
    return secrets.token_bytes(32)

encrypt

encrypt(
    data: bytes | str, associated_data: bytes | None = None
) -> tuple[bytes, bytes]

Encrypt data using AES-256-GCM.

Parameters:

Name Type Description Default
data bytes | str

Data to encrypt.

required
associated_data bytes | None

Optional authenticated but unencrypted data.

None

Returns:

Type Description
tuple[bytes, bytes]

Tuple of (encrypted_data, nonce).

Example

encrypted, nonce = aes.encrypt(b"secret")

Store both encrypted and nonce
Source code in src/dspu/security/encryption.py
def encrypt(
    self,
    data: bytes | str,
    associated_data: bytes | None = None,
) -> tuple[bytes, bytes]:
    """Encrypt data using AES-256-GCM.

    Args:
        data: Data to encrypt.
        associated_data: Optional authenticated but unencrypted data.

    Returns:
        Tuple of (encrypted_data, nonce).

    Example:
        >>> encrypted, nonce = aes.encrypt(b"secret")
        >>> # Store both encrypted and nonce
    """
    if isinstance(data, str):
        data = data.encode("utf-8")

    try:
        # Generate random nonce (12 bytes for GCM)
        nonce = secrets.token_bytes(12)

        encrypted = self._cipher.encrypt(nonce, data, associated_data)
        return encrypted, nonce

    except Exception as e:
        raise EncryptionError(
            f"AES encryption failed: {e}",
            operation="encrypt",
            algorithm="aes-256-gcm",
        ) from e

decrypt

decrypt(
    encrypted_data: bytes,
    nonce: bytes,
    associated_data: bytes | None = None,
) -> bytes

Decrypt data using AES-256-GCM.

Parameters:

Name Type Description Default
encrypted_data bytes

Data to decrypt.

required
nonce bytes

Nonce used during encryption.

required
associated_data bytes | None

Optional authenticated data (must match encryption).

None

Returns:

Type Description
bytes

Decrypted data.

Example

decrypted = aes.decrypt(encrypted, nonce)

Source code in src/dspu/security/encryption.py
def decrypt(
    self,
    encrypted_data: bytes,
    nonce: bytes,
    associated_data: bytes | None = None,
) -> bytes:
    """Decrypt data using AES-256-GCM.

    Args:
        encrypted_data: Data to decrypt.
        nonce: Nonce used during encryption.
        associated_data: Optional authenticated data (must match encryption).

    Returns:
        Decrypted data.

    Example:
        >>> decrypted = aes.decrypt(encrypted, nonce)
    """
    try:
        return self._cipher.decrypt(nonce, encrypted_data, associated_data)
    except Exception as e:
        raise EncryptionError(
            f"AES decryption failed: {e}",
            operation="decrypt",
            algorithm="aes-256-gcm",
        ) from e

Password Hashing

dspu.security.encryption.hash_password

hash_password(
    password: str, salt: bytes | None = None
) -> tuple[str, bytes]

Hash a password using PBKDF2.

Parameters:

Name Type Description Default
password str

Password to hash.

required
salt bytes | None

Salt bytes (generates new one if None).

None

Returns:

Type Description
tuple[str, bytes]

Tuple of (hash_string, salt_bytes).

Example

hash_str, salt = hash_password("my-password")

Store hash_str and salt in database

Later, verify password

is_valid = verify_password("my-password", hash_str, salt)

Source code in src/dspu/security/encryption.py
def hash_password(password: str, salt: bytes | None = None) -> tuple[str, bytes]:
    """Hash a password using PBKDF2.

    Args:
        password: Password to hash.
        salt: Salt bytes (generates new one if None).

    Returns:
        Tuple of (hash_string, salt_bytes).

    Example:
        >>> hash_str, salt = hash_password("my-password")
        >>> # Store hash_str and salt in database
        >>>
        >>> # Later, verify password
        >>> is_valid = verify_password("my-password", hash_str, salt)
    """
    if salt is None:
        salt = secrets.token_bytes(32)

    # Use PBKDF2 with SHA256
    key = hashlib.pbkdf2_hmac(
        "sha256",
        password.encode("utf-8"),
        salt,
        iterations=100000,  # OWASP recommendation
        dklen=32,
    )

    # Return base64-encoded hash and salt
    hash_str = base64.b64encode(key).decode("utf-8")
    return hash_str, salt

dspu.security.encryption.verify_password

verify_password(
    password: str, hash_str: str, salt: bytes
) -> bool

Verify a password against its hash.

Parameters:

Name Type Description Default
password str

Password to verify.

required
hash_str str

Base64-encoded password hash.

required
salt bytes

Salt bytes used for hashing.

required

Returns:

Type Description
bool

True if password matches, False otherwise.

Example

After storing hash and salt

is_valid = verify_password("user-input", stored_hash, stored_salt) if is_valid: ... print("Password correct!")

Source code in src/dspu/security/encryption.py
def verify_password(password: str, hash_str: str, salt: bytes) -> bool:
    """Verify a password against its hash.

    Args:
        password: Password to verify.
        hash_str: Base64-encoded password hash.
        salt: Salt bytes used for hashing.

    Returns:
        True if password matches, False otherwise.

    Example:
        >>> # After storing hash and salt
        >>> is_valid = verify_password("user-input", stored_hash, stored_salt)
        >>> if is_valid:
        ...     print("Password correct!")
    """
    # Hash the password with the same salt
    computed_hash, _ = hash_password(password, salt)

    # Constant-time comparison to prevent timing attacks
    return secrets.compare_digest(computed_hash, hash_str)

Utilities

dspu.security.encryption.generate_token

generate_token(length: int = 32) -> str

Generate a cryptographically secure random token.

Parameters:

Name Type Description Default
length int

Token length in bytes (default: 32).

32

Returns:

Type Description
str

URL-safe base64-encoded token.

Example

api_key = generate_token(32) session_id = generate_token(16)

Source code in src/dspu/security/encryption.py
def generate_token(length: int = 32) -> str:
    """Generate a cryptographically secure random token.

    Args:
        length: Token length in bytes (default: 32).

    Returns:
        URL-safe base64-encoded token.

    Example:
        >>> api_key = generate_token(32)
        >>> session_id = generate_token(16)
    """
    token_bytes = secrets.token_bytes(length)
    return base64.urlsafe_b64encode(token_bytes).decode("utf-8")

dspu.security.encryption.constant_time_compare

constant_time_compare(a: str, b: str) -> bool

Compare two strings in constant time.

Prevents timing attacks when comparing secrets like tokens or passwords.

Parameters:

Name Type Description Default
a str

First string.

required
b str

Second string.

required

Returns:

Type Description
bool

True if strings are equal, False otherwise.

Example

token1 = "secret-token-123" token2 = user_provided_token if constant_time_compare(token1, token2): ... print("Valid token")

Source code in src/dspu/security/encryption.py
def constant_time_compare(a: str, b: str) -> bool:
    """Compare two strings in constant time.

    Prevents timing attacks when comparing secrets like tokens or passwords.

    Args:
        a: First string.
        b: Second string.

    Returns:
        True if strings are equal, False otherwise.

    Example:
        >>> token1 = "secret-token-123"
        >>> token2 = user_provided_token
        >>> if constant_time_compare(token1, token2):
        ...     print("Valid token")
    """
    return secrets.compare_digest(a.encode("utf-8"), b.encode("utf-8"))

dspu.security.encryption.hash_data

hash_data(
    data: bytes | str, algorithm: str = "sha256"
) -> str

Hash data using specified algorithm.

Parameters:

Name Type Description Default
data bytes | str

Data to hash.

required
algorithm str

Hash algorithm (sha256, sha512, etc.).

'sha256'

Returns:

Type Description
str

Hexadecimal hash string.

Example

file_hash = hash_data(file_contents, algorithm="sha256") checksum = hash_data(b"data", algorithm="md5")

Source code in src/dspu/security/encryption.py
def hash_data(data: bytes | str, algorithm: str = "sha256") -> str:
    """Hash data using specified algorithm.

    Args:
        data: Data to hash.
        algorithm: Hash algorithm (sha256, sha512, etc.).

    Returns:
        Hexadecimal hash string.

    Example:
        >>> file_hash = hash_data(file_contents, algorithm="sha256")
        >>> checksum = hash_data(b"data", algorithm="md5")
    """
    if isinstance(data, str):
        data = data.encode("utf-8")

    hasher = hashlib.new(algorithm)
    hasher.update(data)
    return hasher.hexdigest()

Usage

Secret Management

from dspu.security import SecretManager

# From environment
secrets = SecretManager.from_env()
api_key = secrets.get("API_KEY")

# From Vault
vault_secrets = SecretManager.from_vault(
    url="http://vault:8200",
    token="s.abc123"
)
db_password = vault_secrets.get("database/password")

Token Rotation

from dspu.security import RotatingToken

async def refresh_token():
    # Fetch new token from API
    return {"access_token": "...", "expires_in": 3600}

async with RotatingToken(refresh_fn=refresh_token, refresh_interval=3300) as token:
    # Token automatically refreshes
    await make_api_call(token.get())

Encryption

from dspu.security import Fernet, hash_password, verify_password

# Encrypt data
cipher = Fernet.generate()
encrypted = cipher.encrypt(b"sensitive data")
decrypted = cipher.decrypt(encrypted)

# Password hashing
hashed = hash_password("my_password")
is_valid = verify_password("my_password", hashed)  # True

See Also