Security API Reference¶
Secret management, authentication, and encryption.
Secret Management¶
SecretManager¶
dspu.security.secrets.SecretManager
¶
Unified interface for secret management.
Provides a consistent API for accessing secrets from various backends (Vault, AWS Secrets Manager, environment variables, files).
Example
Auto-detect from environment¶
secrets = SecretManager.from_env() password = await secrets.get("database/password")
Explicit backend¶
from dspu.security.backends import EnvBackend secrets = SecretManager(EnvBackend(prefix="APP_")) api_key = await secrets.get("api/key")
Set secrets (if backend supports it)¶
await secrets.set("new/secret", "value")
Initialize secret manager with backend.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
backend
|
SecretBackend
|
Secret backend implementation. |
required |
Source code in src/dspu/security/secrets.py
Functions¶
from_env
classmethod
¶
from_env(**kwargs: Any) -> SecretManager
Create SecretManager from environment configuration.
Auto-detects backend from environment variables: - VAULT_ADDR + VAULT_TOKEN -> Vault backend - AWS_SECRET_BACKEND=true -> AWS Secrets Manager - Otherwise -> Environment variable backend
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
**kwargs
|
Any
|
Backend-specific options. |
{}
|
Returns:
| Type | Description |
|---|---|
SecretManager
|
SecretManager instance with detected backend. |
Example
With Vault¶
os.environ["VAULT_ADDR"] = "http://localhost:8200" os.environ["VAULT_TOKEN"] = "dev-token" secrets = SecretManager.from_env()
With environment variables¶
os.environ["SECRET_PREFIX"] = "APP_SECRET_" secrets = SecretManager.from_env()
Source code in src/dspu/security/secrets.py
from_vault
classmethod
¶
from_vault(
url: str, token: str, **kwargs: Any
) -> SecretManager
Create SecretManager with Vault backend.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
url
|
str
|
Vault server URL. |
required |
token
|
str
|
Vault authentication token. |
required |
**kwargs
|
Any
|
Additional Vault options. |
{}
|
Returns:
| Type | Description |
|---|---|
SecretManager
|
SecretManager with Vault backend. |
Raises:
| Type | Description |
|---|---|
ConfigurationError
|
If hvac package not installed. |
Example
secrets = SecretManager.from_vault( ... url="http://localhost:8200", ... token="dev-token" ... ) password = await secrets.get("secret/database/password")
Source code in src/dspu/security/secrets.py
from_aws
classmethod
¶
from_aws(
region: str = "us-east-1", **kwargs: Any
) -> SecretManager
Create SecretManager with AWS Secrets Manager backend.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
region
|
str
|
AWS region. |
'us-east-1'
|
**kwargs
|
Any
|
Additional AWS options. |
{}
|
Returns:
| Type | Description |
|---|---|
SecretManager
|
SecretManager with AWS backend. |
Raises:
| Type | Description |
|---|---|
ConfigurationError
|
If boto3 package not installed. |
Example
secrets = SecretManager.from_aws(region="us-west-2") api_key = await secrets.get("prod/api/key")
Source code in src/dspu/security/secrets.py
from_env_vars
classmethod
¶
from_env_vars(
prefix: str = "", **kwargs: Any
) -> SecretManager
Create SecretManager with environment variable backend.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
prefix
|
str
|
Prefix for environment variables. |
''
|
**kwargs
|
Any
|
Additional options. |
{}
|
Returns:
| Type | Description |
|---|---|
SecretManager
|
SecretManager with environment backend. |
Example
secrets = SecretManager.from_env_vars(prefix="APP_SECRET_")
Gets from APP_SECRET_API_KEY environment variable¶
api_key = await secrets.get("api/key")
Source code in src/dspu/security/secrets.py
from_file
classmethod
¶
from_file(path: str, **kwargs: Any) -> SecretManager
Create SecretManager with file-based backend.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
str
|
Path to secrets file (JSON or YAML). |
required |
**kwargs
|
Any
|
Additional options. |
{}
|
Returns:
| Type | Description |
|---|---|
SecretManager
|
SecretManager with file backend. |
Example
secrets = SecretManager.from_file("secrets.yaml") password = await secrets.get("database/password")
Source code in src/dspu/security/secrets.py
get
async
¶
Get a secret value.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key
|
str
|
Secret key/path. |
required |
default
|
str | None
|
Default value if secret not found. |
None
|
Returns:
| Type | Description |
|---|---|
str
|
Secret value. |
Raises:
| Type | Description |
|---|---|
SecretNotFoundError
|
If secret not found and no default. |
SecurityError
|
If access denied or backend error. |
Example
password = await secrets.get("database/password") api_key = await secrets.get("api/key", default="dev-key")
Source code in src/dspu/security/secrets.py
set
async
¶
Set a secret value.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key
|
str
|
Secret key/path. |
required |
value
|
str
|
Secret value. |
required |
Raises:
| Type | Description |
|---|---|
SecurityError
|
If write fails or not supported. |
Example
await secrets.set("api/key", "new-secret-key")
Source code in src/dspu/security/secrets.py
Secret Backends¶
dspu.security.backends.EnvBackend
¶
Environment variable backend for secrets.
Reads secrets from environment variables with optional prefix. Key paths are converted to uppercase with underscores.
Example
backend = EnvBackend(prefix="APP_SECRET_")
Gets from APP_SECRET_API_KEY¶
api_key = await backend.get("api/key")
Initialize environment backend.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
prefix
|
str
|
Prefix for all environment variables. |
''
|
separator
|
str
|
Separator for nested keys (default: _). |
'_'
|
Source code in src/dspu/security/backends.py
Functions¶
get
async
¶
Get secret from environment variable.
Source code in src/dspu/security/backends.py
set
async
¶
delete
async
¶
Delete environment variable.
exists
async
¶
list
async
¶
list(prefix: str = '') -> list[str]
List environment variables matching prefix.
Source code in src/dspu/security/backends.py
dspu.security.backends.FileBackend
¶
File-based backend for secrets (development only).
Reads secrets from a JSON or YAML file. This backend is intended for local development and testing only. DO NOT use in production.
Example
backend = FileBackend("secrets.yaml") password = await backend.get("database/password")
Initialize file backend.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
str | Path
|
Path to secrets file (JSON or YAML). |
required |
auto_save
|
bool
|
Auto-save changes to file (default: True). |
True
|
Source code in src/dspu/security/backends.py
Functions¶
get
async
¶
set
async
¶
delete
async
¶
exists
async
¶
list
async
¶
list(prefix: str = '') -> list[str]
List secret keys from file.
Source code in src/dspu/security/backends.py
dspu.security.backends.VaultBackend
¶
HashiCorp Vault backend for secrets.
Connects to Vault server for secure secret storage and retrieval.
Example
backend = VaultBackend( ... url="http://localhost:8200", ... token="dev-token", ... mount_point="secret" ... ) password = await backend.get("database/password")
Initialize Vault backend.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
url
|
str
|
Vault server URL. |
required |
token
|
str
|
Vault authentication token. |
required |
mount_point
|
str
|
Vault mount point (default: "secret"). |
'secret'
|
**kwargs
|
Any
|
Additional hvac client options. |
{}
|
Raises:
| Type | Description |
|---|---|
ConfigurationError
|
If hvac not installed. |
Source code in src/dspu/security/backends.py
Functions¶
get
async
¶
Get secret from Vault.
Source code in src/dspu/security/backends.py
set
async
¶
Set secret in Vault.
Source code in src/dspu/security/backends.py
delete
async
¶
Delete secret from Vault.
Source code in src/dspu/security/backends.py
exists
async
¶
list
async
¶
list(prefix: str = '') -> list[str]
List secret keys from Vault.
Source code in src/dspu/security/backends.py
dspu.security.backends.AWSSecretBackend
¶
AWS Secrets Manager backend.
Uses AWS Secrets Manager for secure secret storage.
Example
backend = AWSSecretBackend(region="us-west-2") api_key = await backend.get("prod/api/key")
Initialize AWS Secrets Manager backend.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
region
|
str
|
AWS region. |
'us-east-1'
|
**kwargs
|
Any
|
Additional boto3 client options. |
{}
|
Raises:
| Type | Description |
|---|---|
ConfigurationError
|
If boto3 not installed. |
Source code in src/dspu/security/backends.py
Functions¶
get
async
¶
Get secret from AWS Secrets Manager.
Source code in src/dspu/security/backends.py
set
async
¶
Set secret in AWS Secrets Manager.
Source code in src/dspu/security/backends.py
delete
async
¶
Delete secret from AWS Secrets Manager.
Source code in src/dspu/security/backends.py
exists
async
¶
Check if secret exists in AWS Secrets Manager.
Source code in src/dspu/security/backends.py
list
async
¶
list(prefix: str = '') -> list[str]
List secret keys from AWS Secrets Manager.
Source code in src/dspu/security/backends.py
Token Rotation¶
dspu.security.rotating_token.RotatingToken
¶
RotatingToken(
fetch_fn: Callable[[], Awaitable[TokenData]],
refresh_interval: float | None = None,
refresh_before: float = 300.0,
on_refresh: Callable[[TokenData], Awaitable[None]]
| None = None,
)
Automatic token rotation context manager.
Manages tokens that need periodic refresh, automatically refreshing before expiration and providing the current valid token.
Example
async def fetch_token() -> TokenData: ... # Get token from auth service ... token = await auth.get_token() ... expires_at = time.time() + 3600 # 1 hour ... return TokenData(token, expires_at=expires_at)
async with RotatingToken( ... fetch_fn=fetch_token, ... refresh_interval=3600, ... refresh_before=300, # 5 minutes before expiry ... ) as rotating_token: ... # Use current token ... api_client.set_token(rotating_token.current) ... ... # Token will auto-refresh when needed ... await api_client.make_request()
Initialize rotating token manager.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
fetch_fn
|
Callable[[], Awaitable[TokenData]]
|
Async function that returns TokenData. |
required |
refresh_interval
|
float | None
|
Interval in seconds to refresh (None = use token expiry). |
None
|
refresh_before
|
float
|
Seconds before expiry to refresh (default: 300). |
300.0
|
on_refresh
|
Callable[[TokenData], Awaitable[None]] | None
|
Optional callback called after token refresh. |
None
|
Example
token_manager = RotatingToken( ... fetch_fn=lambda: auth.get_token(), ... refresh_interval=3600, ... refresh_before=300, ... )
Source code in src/dspu/security/rotating_token.py
Attributes¶
current
property
¶
Get current valid token.
Returns:
| Type | Description |
|---|---|
str
|
Current token value. |
Raises:
| Type | Description |
|---|---|
TokenExpiredError
|
If token is expired and refresh hasn't completed. |
RuntimeError
|
If token not initialized. |
Example
token = rotating_token.current api_client.set_token(token)
Functions¶
refresh
async
¶
Manually trigger token refresh.
Example
await rotating_token.refresh()
Source code in src/dspu/security/rotating_token.py
is_expired
¶
Check if current token is expired.
Returns:
| Type | Description |
|---|---|
bool
|
True if expired or not initialized, False otherwise. |
dspu.security.rotating_token.simple_rotating_token
async
¶
simple_rotating_token(
fetch_fn: Callable[[], Awaitable[str]],
refresh_interval: float,
) -> RotatingToken
Create a simple rotating token with just a string fetch function.
Convenience function for cases where you just need periodic refresh without expiry tracking.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
fetch_fn
|
Callable[[], Awaitable[str]]
|
Async function that returns token string. |
required |
refresh_interval
|
float
|
How often to refresh in seconds. |
required |
Returns:
| Type | Description |
|---|---|
RotatingToken
|
RotatingToken instance. |
Example
async def get_token() -> str: ... return await auth_service.get_token()
async with simple_rotating_token(get_token, 3600) as token: ... api.set_token(token.current)
Source code in src/dspu/security/rotating_token.py
Authentication¶
Providers¶
dspu.security.auth.StaticTokenProvider
¶
Static token authentication provider.
Simple provider that returns a fixed token. Useful for development and services that use long-lived API keys.
Example
provider = StaticTokenProvider(token="my-api-key-123") token = await provider.get_token() print(token) # "my-api-key-123"
Initialize static token provider.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
token
|
str
|
Static token value. |
required |
Source code in src/dspu/security/auth.py
Functions¶
get_token
async
¶
Get the static token.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
scopes
|
list[str] | None
|
Ignored for static tokens. |
None
|
Returns:
| Type | Description |
|---|---|
str
|
Static token value. |
refresh_token
async
¶
Refresh token (returns same token for static provider).
Returns:
| Type | Description |
|---|---|
str
|
Static token value. |
revoke_token
async
¶
validate_token
async
¶
Validate token.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
token
|
str
|
Token to validate. |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if matches static token, False otherwise. |
dspu.security.auth.OAuth2Provider
¶
OAuth2 authentication provider.
Implements OAuth2 client credentials flow for service-to-service authentication.
Example
provider = OAuth2Provider( ... client_id="my-client-id", ... client_secret="my-client-secret", ... token_url="https://auth.example.com/oauth/token" ... ) token = await provider.get_token(scopes=["read", "write"])
Initialize OAuth2 provider.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
client_id
|
str
|
OAuth2 client ID. |
required |
client_secret
|
str
|
OAuth2 client secret. |
required |
token_url
|
str
|
Token endpoint URL. |
required |
**kwargs
|
Any
|
Additional httpx options. |
{}
|
Raises:
| Type | Description |
|---|---|
ConfigurationError
|
If httpx not installed. |
Source code in src/dspu/security/auth.py
Functions¶
get_token
async
¶
Get OAuth2 access token.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
scopes
|
list[str] | None
|
OAuth2 scopes to request. |
None
|
Returns:
| Type | Description |
|---|---|
str
|
Access token. |
Raises:
| Type | Description |
|---|---|
AuthenticationError
|
If token request fails. |
Source code in src/dspu/security/auth.py
refresh_token
async
¶
Refresh OAuth2 token.
Returns:
| Type | Description |
|---|---|
str
|
New access token. |
Raises:
| Type | Description |
|---|---|
AuthenticationError
|
If refresh fails. |
Source code in src/dspu/security/auth.py
revoke_token
async
¶
validate_token
async
¶
Validate OAuth2 token.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
token
|
str
|
Token to validate. |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if token matches current valid token. |
Source code in src/dspu/security/auth.py
dspu.security.auth.JWTProvider
¶
JWTProvider(
secret_key: str,
algorithm: str = "HS256",
issuer: str | None = None,
audience: str | None = None,
expiry_seconds: int = 3600,
**kwargs: Any,
)
JWT (JSON Web Token) authentication provider.
Creates and validates JWT tokens for authentication.
Example
provider = JWTProvider( ... secret_key="my-secret-key", ... algorithm="HS256" ... ) token = await provider.get_token()
Initialize JWT provider.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
secret_key
|
str
|
Secret key for signing JWTs. |
required |
algorithm
|
str
|
JWT algorithm (default: HS256). |
'HS256'
|
issuer
|
str | None
|
Token issuer claim. |
None
|
audience
|
str | None
|
Token audience claim. |
None
|
expiry_seconds
|
int
|
Token expiry time in seconds (default: 3600). |
3600
|
**kwargs
|
Any
|
Additional JWT claims. |
{}
|
Raises:
| Type | Description |
|---|---|
ConfigurationError
|
If pyjwt not installed. |
Source code in src/dspu/security/auth.py
Functions¶
get_token
async
¶
Generate JWT token.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
scopes
|
list[str] | None
|
Token scopes/permissions. |
None
|
Returns:
| Type | Description |
|---|---|
str
|
JWT token string. |
Example
token = await provider.get_token(scopes=["read", "write"])
Source code in src/dspu/security/auth.py
refresh_token
async
¶
Generate new JWT token.
Returns:
| Type | Description |
|---|---|
str
|
New JWT token. |
Source code in src/dspu/security/auth.py
revoke_token
async
¶
Revoke JWT token (clears cached token).
Note: JWTs are stateless, so this only clears the local cache. The token will still be valid until it expires.
validate_token
async
¶
Validate JWT token.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
token
|
str
|
JWT token to validate. |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if token is valid, False otherwise. |
Source code in src/dspu/security/auth.py
decode_token
¶
Decode JWT token without validation.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
token
|
str
|
JWT token to decode. |
required |
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Decoded payload. |
Example
payload = provider.decode_token(token) print(payload["scopes"])
Source code in src/dspu/security/auth.py
dspu.security.auth.create_auth_provider
¶
Create authentication provider by type.
Factory function to create appropriate auth provider based on type.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
provider_type
|
str
|
Provider type ("static", "oauth2", "jwt"). |
required |
**kwargs
|
Any
|
Provider-specific configuration. |
{}
|
Returns:
| Type | Description |
|---|---|
Any
|
Authentication provider instance. |
Raises:
| Type | Description |
|---|---|
ConfigurationError
|
If provider type is unknown. |
Example
Static token¶
provider = create_auth_provider("static", token="api-key-123")
OAuth2¶
provider = create_auth_provider( ... "oauth2", ... client_id="id", ... client_secret="secret", ... token_url="https://auth.example.com/token" ... )
JWT¶
provider = create_auth_provider( ... "jwt", ... secret_key="secret", ... algorithm="HS256" ... )
Source code in src/dspu/security/auth.py
Encryption¶
Symmetric Encryption¶
dspu.security.encryption.Fernet
¶
Symmetric encryption using Fernet (AES-128-CBC).
Provides simple symmetric encryption for data at rest. Uses the cryptography library's Fernet implementation.
Example
Generate a key¶
key = Fernet.generate_key()
Encrypt data¶
fernet = Fernet(key) encrypted = fernet.encrypt(b"secret data")
Decrypt data¶
decrypted = fernet.decrypt(encrypted) print(decrypted) # b"secret data"
Initialize Fernet cipher with key.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key
|
bytes | str
|
Encryption key (32 bytes or base64-encoded string). |
required |
Raises:
| Type | Description |
|---|---|
ConfigurationError
|
If cryptography not installed. |
EncryptionError
|
If key is invalid. |
Source code in src/dspu/security/encryption.py
Functions¶
encrypt
¶
Encrypt data.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data
|
bytes | str
|
Data to encrypt (bytes or string). |
required |
Returns:
| Type | Description |
|---|---|
bytes
|
Encrypted data (base64 encoded). |
Example
encrypted = fernet.encrypt(b"secret") encrypted = fernet.encrypt("secret") # Also works
Source code in src/dspu/security/encryption.py
decrypt
¶
Decrypt data.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
encrypted_data
|
bytes
|
Encrypted data to decrypt. |
required |
Returns:
| Type | Description |
|---|---|
bytes
|
Decrypted data as bytes. |
Example
decrypted = fernet.decrypt(encrypted) print(decrypted.decode()) # Convert to string
Source code in src/dspu/security/encryption.py
dspu.security.encryption.AES
¶
AES encryption utilities.
Provides AES-256-GCM encryption for data at rest and in transit.
Example
Generate a key¶
key = AES.generate_key()
Encrypt data¶
aes = AES(key) encrypted, nonce = aes.encrypt(b"secret data")
Decrypt data¶
decrypted = aes.decrypt(encrypted, nonce)
Initialize AES cipher with key.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key
|
bytes
|
32-byte encryption key. |
required |
Raises:
| Type | Description |
|---|---|
ConfigurationError
|
If cryptography not installed. |
EncryptionError
|
If key is invalid. |
Source code in src/dspu/security/encryption.py
Functions¶
generate_key
staticmethod
¶
Generate a new AES-256 key.
Returns:
| Type | Description |
|---|---|
bytes
|
32-byte encryption key. |
Example
key = AES.generate_key()
encrypt
¶
Encrypt data using AES-256-GCM.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data
|
bytes | str
|
Data to encrypt. |
required |
associated_data
|
bytes | None
|
Optional authenticated but unencrypted data. |
None
|
Returns:
| Type | Description |
|---|---|
tuple[bytes, bytes]
|
Tuple of (encrypted_data, nonce). |
Example
encrypted, nonce = aes.encrypt(b"secret")
Store both encrypted and nonce¶
Source code in src/dspu/security/encryption.py
decrypt
¶
Decrypt data using AES-256-GCM.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
encrypted_data
|
bytes
|
Data to decrypt. |
required |
nonce
|
bytes
|
Nonce used during encryption. |
required |
associated_data
|
bytes | None
|
Optional authenticated data (must match encryption). |
None
|
Returns:
| Type | Description |
|---|---|
bytes
|
Decrypted data. |
Example
decrypted = aes.decrypt(encrypted, nonce)
Source code in src/dspu/security/encryption.py
Password Hashing¶
dspu.security.encryption.hash_password
¶
Hash a password using PBKDF2.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
password
|
str
|
Password to hash. |
required |
salt
|
bytes | None
|
Salt bytes (generates new one if None). |
None
|
Returns:
| Type | Description |
|---|---|
tuple[str, bytes]
|
Tuple of (hash_string, salt_bytes). |
Example
hash_str, salt = hash_password("my-password")
Store hash_str and salt in database¶
Later, verify password¶
is_valid = verify_password("my-password", hash_str, salt)
Source code in src/dspu/security/encryption.py
dspu.security.encryption.verify_password
¶
Verify a password against its hash.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
password
|
str
|
Password to verify. |
required |
hash_str
|
str
|
Base64-encoded password hash. |
required |
salt
|
bytes
|
Salt bytes used for hashing. |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if password matches, False otherwise. |
Example
After storing hash and salt¶
is_valid = verify_password("user-input", stored_hash, stored_salt) if is_valid: ... print("Password correct!")
Source code in src/dspu/security/encryption.py
Utilities¶
dspu.security.encryption.generate_token
¶
Generate a cryptographically secure random token.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
length
|
int
|
Token length in bytes (default: 32). |
32
|
Returns:
| Type | Description |
|---|---|
str
|
URL-safe base64-encoded token. |
Example
api_key = generate_token(32) session_id = generate_token(16)
Source code in src/dspu/security/encryption.py
dspu.security.encryption.constant_time_compare
¶
Compare two strings in constant time.
Prevents timing attacks when comparing secrets like tokens or passwords.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
a
|
str
|
First string. |
required |
b
|
str
|
Second string. |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if strings are equal, False otherwise. |
Example
token1 = "secret-token-123" token2 = user_provided_token if constant_time_compare(token1, token2): ... print("Valid token")
Source code in src/dspu/security/encryption.py
dspu.security.encryption.hash_data
¶
Hash data using specified algorithm.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data
|
bytes | str
|
Data to hash. |
required |
algorithm
|
str
|
Hash algorithm (sha256, sha512, etc.). |
'sha256'
|
Returns:
| Type | Description |
|---|---|
str
|
Hexadecimal hash string. |
Example
file_hash = hash_data(file_contents, algorithm="sha256") checksum = hash_data(b"data", algorithm="md5")
Source code in src/dspu/security/encryption.py
Usage¶
Secret Management¶
from dspu.security import SecretManager
# From environment
secrets = SecretManager.from_env()
api_key = secrets.get("API_KEY")
# From Vault
vault_secrets = SecretManager.from_vault(
url="http://vault:8200",
token="s.abc123"
)
db_password = vault_secrets.get("database/password")
Token Rotation¶
from dspu.security import RotatingToken
async def refresh_token():
# Fetch new token from API
return {"access_token": "...", "expires_in": 3600}
async with RotatingToken(refresh_fn=refresh_token, refresh_interval=3300) as token:
# Token automatically refreshes
await make_api_call(token.get())
Encryption¶
from dspu.security import Fernet, hash_password, verify_password
# Encrypt data
cipher = Fernet.generate()
encrypted = cipher.encrypt(b"sensitive data")
decrypted = cipher.decrypt(encrypted)
# Password hashing
hashed = hash_password("my_password")
is_valid = verify_password("my_password", hashed) # True