Quick Start¶
Get started with DSPU in minutes! This guide covers the essential features of each module.
Configuration Management¶
Load and manage configuration from multiple sources:
from dspu.config import Config
# Load from file
config = Config.from_file("config.yaml")
# Access values
db_host = config.get("database.host", default="localhost")
db_port = config.get("database.port", default=5432)
# Merge from environment
config.merge_from_env(prefix="APP_")
# Watch for changes
from dspu.config import WatchedConfig
watched_config = WatchedConfig.from_file("config.yaml")
watched_config.start_watching() # Auto-reload on file changes
I/O Operations¶
Unified interface for local and cloud storage:
from dspu.io import Storage
# Local storage
storage = Storage.from_uri("file:///data")
storage.write("output.json", {"status": "success"})
data = storage.read("output.json")
# S3 storage
s3_storage = Storage.from_uri("s3://my-bucket/path")
s3_storage.write("data.json", {"count": 100})
# Multi-format writing
storage.write_format("config.yaml", {"database": {"host": "localhost"}})
storage.write_format("data.csv", [{"name": "Alice", "age": 30}])
Async Utilities¶
Resilient async operations:
import asyncio
from dspu.aio import retry, RateLimiter, CircuitBreaker
# Retry with exponential backoff
@retry(max_attempts=3, backoff="exponential")
async def fetch_data(url):
# Will retry on failure
async with httpx.AsyncClient() as client:
response = await client.get(url)
return response.json()
# Rate limiting
async def process_items(items):
limiter = RateLimiter(rate=10, per=1.0) # 10 requests per second
async with limiter:
for item in items:
await process_item(item)
# Circuit breaker
circuit_breaker = CircuitBreaker(failure_threshold=5)
@circuit_breaker
async def call_external_api():
# Automatically opens circuit after failures
...
# Run
asyncio.run(fetch_data("https://api.example.com/data"))
Validation¶
Data filtering and validation:
from dspu.validation import StripWhitespaceFilter, LowercaseFilter, EmailNormalizationFilter
from pydantic import BaseModel
# Compose filters
email_filter = StripWhitespaceFilter().then(LowercaseFilter()).then(EmailNormalizationFilter())
# Apply filter
clean_email = email_filter(" Alice.Smith+Tag@GMAIL.COM ")
# Result: "alicesmith@gmail.com"
# Pydantic integration
class User(BaseModel):
email: str
name: str
_email_filter = pydantic_filter_validator("email", email_filter)
_name_filter = pydantic_filter_validator("name", StripWhitespaceFilter())
user = User(email=" Alice@EXAMPLE.COM ", name=" Alice ")
# Automatically cleaned: alice@example.com, "Alice"
Security¶
Manage secrets and encryption:
from dspu.security import SecretManager, RotatingToken, Fernet
# Secret management
secrets = SecretManager.from_env()
api_key = secrets.get("API_KEY")
# Or from HashiCorp Vault
vault_secrets = SecretManager.from_vault(url="http://vault:8200", token="...")
db_password = vault_secrets.get("database/password")
# Rotating tokens
async def refresh_token():
# Fetch new token from API
return {"access_token": "...", "expires_in": 3600}
async with RotatingToken(refresh_fn=refresh_token, refresh_interval=3300) as token:
# Token automatically refreshes before expiry
await make_api_call(token.get())
# Encryption
cipher = Fernet.generate()
encrypted = cipher.encrypt(b"sensitive data")
decrypted = cipher.decrypt(encrypted)
Observability¶
Structured logging and rich output:
from dspu.observability import configure_logging, get_logger, LogContext, timed
# Configure logging
configure_logging(level="INFO", format="rich")
logger = get_logger(__name__)
# Structured logging
logger.info("User logged in", user_id=123, ip="192.168.1.1")
# Context management
with LogContext(request_id="req-123"):
logger.info("Processing request") # Automatically includes request_id
logger.info("Query executed", duration_ms=45)
# Timing decorator
@timed()
def slow_operation():
# Execution time automatically logged
...
# Rich output
from dspu.observability import print_json, print_table
data = {"user": {"name": "Alice", "age": 30}}
print_json(data) # Syntax-highlighted JSON
users = [{"name": "Alice", "age": 30}, {"name": "Bob", "age": 25}]
print_table(users, title="Users") # Pretty table
ML Utilities¶
Reproducible ML pipelines:
from dspu.ml import (
SeedManager,
DataSplitter,
Scaler,
Encoder,
Stats,
make_classification_data
)
# Set seed for reproducibility
SeedManager.set_global_seed(42)
# Generate synthetic data
X, y = make_classification_data(n_samples=1000, n_features=10)
# Split data (no leakage!)
X_train, X_test, y_train, y_test = DataSplitter.train_test_split(
X, y, test_size=0.2, stratify=y
)
# Scale features
scaler = Scaler(method="standard")
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)
# Save for production
scaler.save_to_file("artifacts/scaler.json")
# Encode categorical features
encoder = Encoder(method="label")
categories = ["cat", "dog", "bird", "cat", "dog"]
encoded = encoder.fit_transform(categories) # [0, 1, 2, 0, 1]
# Statistics
correlation = Stats.correlation([1, 2, 3, 4, 5], [2, 4, 6, 8, 10])
print(f"Correlation: {correlation:.3f}") # 1.000 (perfect)
Complete Example: ML Pipeline¶
Here's a complete example combining multiple modules:
from dspu.config import Config
from dspu.ml import SeedManager, DataSplitter, Scaler
from dspu.observability import configure_logging, get_logger, timed, LogContext
from dspu.io import Storage
# Setup
configure_logging(level="INFO", format="rich")
logger = get_logger(__name__)
config = Config.from_file("config.yaml")
SeedManager.set_global_seed(config.get("seed", 42))
storage = Storage.from_uri(config.get("storage.uri", "file://./data"))
@timed()
def load_data():
"""Load data from storage."""
X = storage.read("features.json")
y = storage.read("labels.json")
logger.info("Data loaded", samples=len(X))
return X, y
@timed()
def train_model(X_train, y_train):
"""Train model (placeholder)."""
logger.info("Training model", features=len(X_train[0]))
# Your training code here
return "model"
def main():
with LogContext(pipeline="training", version="1.0"):
# Load data
X, y = load_data()
# Split data
X_train, X_test, y_train, y_test = DataSplitter.train_test_split(
X, y, test_size=0.2, stratify=y
)
logger.info("Data split", train=len(X_train), test=len(X_test))
# Scale features
scaler = Scaler(method="standard")
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)
# Save scaler
storage.write("artifacts/scaler.json", scaler.save_state())
logger.info("Scaler saved")
# Train model
model = train_model(X_train_scaled, y_train)
logger.info("Pipeline complete", status="success")
if __name__ == "__main__":
main()
Next Steps¶
- Tutorial - Comprehensive walkthrough of all features
- User Guide - Deep dive into each module
- API Reference - Complete API documentation
- Examples - More practical examples