Skip to content

Quick Start

Get started with DSPU in minutes! This guide covers the essential features of each module.

Configuration Management

Load and manage configuration from multiple sources:

from dspu.config import Config

# Load from file
config = Config.from_file("config.yaml")

# Access values
db_host = config.get("database.host", default="localhost")
db_port = config.get("database.port", default=5432)

# Merge from environment
config.merge_from_env(prefix="APP_")

# Watch for changes
from dspu.config import WatchedConfig

watched_config = WatchedConfig.from_file("config.yaml")
watched_config.start_watching()  # Auto-reload on file changes

I/O Operations

Unified interface for local and cloud storage:

from dspu.io import Storage

# Local storage
storage = Storage.from_uri("file:///data")
storage.write("output.json", {"status": "success"})
data = storage.read("output.json")

# S3 storage
s3_storage = Storage.from_uri("s3://my-bucket/path")
s3_storage.write("data.json", {"count": 100})

# Multi-format writing
storage.write_format("config.yaml", {"database": {"host": "localhost"}})
storage.write_format("data.csv", [{"name": "Alice", "age": 30}])

Async Utilities

Resilient async operations:

import asyncio
from dspu.aio import retry, RateLimiter, CircuitBreaker

# Retry with exponential backoff
@retry(max_attempts=3, backoff="exponential")
async def fetch_data(url):
    # Will retry on failure
    async with httpx.AsyncClient() as client:
        response = await client.get(url)
        return response.json()

# Rate limiting
async def process_items(items):
    limiter = RateLimiter(rate=10, per=1.0)  # 10 requests per second

    async with limiter:
        for item in items:
            await process_item(item)

# Circuit breaker
circuit_breaker = CircuitBreaker(failure_threshold=5)

@circuit_breaker
async def call_external_api():
    # Automatically opens circuit after failures
    ...

# Run
asyncio.run(fetch_data("https://api.example.com/data"))

Validation

Data filtering and validation:

from dspu.validation import StripWhitespaceFilter, LowercaseFilter, EmailNormalizationFilter
from pydantic import BaseModel

# Compose filters
email_filter = StripWhitespaceFilter().then(LowercaseFilter()).then(EmailNormalizationFilter())

# Apply filter
clean_email = email_filter("  Alice.Smith+Tag@GMAIL.COM  ")
# Result: "alicesmith@gmail.com"

# Pydantic integration
class User(BaseModel):
    email: str
    name: str

    _email_filter = pydantic_filter_validator("email", email_filter)
    _name_filter = pydantic_filter_validator("name", StripWhitespaceFilter())

user = User(email="  Alice@EXAMPLE.COM  ", name="  Alice  ")
# Automatically cleaned: alice@example.com, "Alice"

Security

Manage secrets and encryption:

from dspu.security import SecretManager, RotatingToken, Fernet

# Secret management
secrets = SecretManager.from_env()
api_key = secrets.get("API_KEY")

# Or from HashiCorp Vault
vault_secrets = SecretManager.from_vault(url="http://vault:8200", token="...")
db_password = vault_secrets.get("database/password")

# Rotating tokens
async def refresh_token():
    # Fetch new token from API
    return {"access_token": "...", "expires_in": 3600}

async with RotatingToken(refresh_fn=refresh_token, refresh_interval=3300) as token:
    # Token automatically refreshes before expiry
    await make_api_call(token.get())

# Encryption
cipher = Fernet.generate()
encrypted = cipher.encrypt(b"sensitive data")
decrypted = cipher.decrypt(encrypted)

Observability

Structured logging and rich output:

from dspu.observability import configure_logging, get_logger, LogContext, timed

# Configure logging
configure_logging(level="INFO", format="rich")
logger = get_logger(__name__)

# Structured logging
logger.info("User logged in", user_id=123, ip="192.168.1.1")

# Context management
with LogContext(request_id="req-123"):
    logger.info("Processing request")  # Automatically includes request_id
    logger.info("Query executed", duration_ms=45)

# Timing decorator
@timed()
def slow_operation():
    # Execution time automatically logged
    ...

# Rich output
from dspu.observability import print_json, print_table

data = {"user": {"name": "Alice", "age": 30}}
print_json(data)  # Syntax-highlighted JSON

users = [{"name": "Alice", "age": 30}, {"name": "Bob", "age": 25}]
print_table(users, title="Users")  # Pretty table

ML Utilities

Reproducible ML pipelines:

from dspu.ml import (
    SeedManager,
    DataSplitter,
    Scaler,
    Encoder,
    Stats,
    make_classification_data
)

# Set seed for reproducibility
SeedManager.set_global_seed(42)

# Generate synthetic data
X, y = make_classification_data(n_samples=1000, n_features=10)

# Split data (no leakage!)
X_train, X_test, y_train, y_test = DataSplitter.train_test_split(
    X, y, test_size=0.2, stratify=y
)

# Scale features
scaler = Scaler(method="standard")
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)

# Save for production
scaler.save_to_file("artifacts/scaler.json")

# Encode categorical features
encoder = Encoder(method="label")
categories = ["cat", "dog", "bird", "cat", "dog"]
encoded = encoder.fit_transform(categories)  # [0, 1, 2, 0, 1]

# Statistics
correlation = Stats.correlation([1, 2, 3, 4, 5], [2, 4, 6, 8, 10])
print(f"Correlation: {correlation:.3f}")  # 1.000 (perfect)

Complete Example: ML Pipeline

Here's a complete example combining multiple modules:

from dspu.config import Config
from dspu.ml import SeedManager, DataSplitter, Scaler
from dspu.observability import configure_logging, get_logger, timed, LogContext
from dspu.io import Storage

# Setup
configure_logging(level="INFO", format="rich")
logger = get_logger(__name__)

config = Config.from_file("config.yaml")
SeedManager.set_global_seed(config.get("seed", 42))

storage = Storage.from_uri(config.get("storage.uri", "file://./data"))

@timed()
def load_data():
    """Load data from storage."""
    X = storage.read("features.json")
    y = storage.read("labels.json")
    logger.info("Data loaded", samples=len(X))
    return X, y

@timed()
def train_model(X_train, y_train):
    """Train model (placeholder)."""
    logger.info("Training model", features=len(X_train[0]))
    # Your training code here
    return "model"

def main():
    with LogContext(pipeline="training", version="1.0"):
        # Load data
        X, y = load_data()

        # Split data
        X_train, X_test, y_train, y_test = DataSplitter.train_test_split(
            X, y, test_size=0.2, stratify=y
        )
        logger.info("Data split", train=len(X_train), test=len(X_test))

        # Scale features
        scaler = Scaler(method="standard")
        X_train_scaled = scaler.fit_transform(X_train)
        X_test_scaled = scaler.transform(X_test)

        # Save scaler
        storage.write("artifacts/scaler.json", scaler.save_state())
        logger.info("Scaler saved")

        # Train model
        model = train_model(X_train_scaled, y_train)

        logger.info("Pipeline complete", status="success")

if __name__ == "__main__":
    main()

Next Steps