Skip to content

Tutorial: Building a Data Processing API

This tutorial walks you through building a complete data processing API using DSPU modules.

What We'll Build

A REST API that: - Processes CSV data uploads - Applies ML transformations (scaling, encoding) - Stores results in cloud storage - Provides structured logging and monitoring - Handles authentication and secrets

Prerequisites

# Install DSPU with all extras
pip install 'dspu[all]'

# Or with uv
uv pip install 'dspu[all]'

Project Structure

data-processor/
├── config.yaml
├── .env.example
├── app.py
├── models.py
├── processing.py
└── artifacts/
    ├── scaler.json
    └── encoder.json

Step 1: Configuration

Define Configuration Model

Create models.py:

from pydantic import BaseModel, Field

class DatabaseConfig(BaseModel):
    host: str = "localhost"
    port: int = Field(ge=1, le=65535)
    name: str
    user: str

class StorageConfig(BaseModel):
    type: str  # "local" or "s3"
    uri: str

class AppConfig(BaseModel):
    debug: bool = False
    log_level: str = "INFO"
    database: DatabaseConfig
    storage: StorageConfig

Create Configuration File

Create config.yaml:

debug: false
log_level: INFO

database:
  host: localhost
  port: 5432
  name: dataproc
  user: admin

storage:
  type: local
  uri: ./data

Load Configuration

Add to app.py:

from dspu.config import Config, FileSource, EnvSource
from dspu.security import SecretManager
from models import AppConfig

# Load configuration
config = Config.load(
    AppConfig,
    sources=[
        FileSource("config.yaml"),
        EnvSource(prefix="APP_", separator="__"),
    ],
)

# Load secrets
secrets = SecretManager.from_env()
db_password = await secrets.get("database/password")
config.database.password = db_password

Step 2: Logging Setup

Configure Structured Logging

Add to app.py:

from dspu.observability import (
    configure_logging,
    get_logger,
    LogContext,
    timed,
)

# Configure logging
configure_logging(
    level=config.log_level,
    format="json" if not config.debug else "rich"
)

logger = get_logger(__name__)

Step 3: Storage Setup

Initialize Storage

from dspu.io import Storage

# Create storage instance
storage = Storage.from_uri(config.storage.uri)

logger.info("Storage initialized",
           storage_type=config.storage.type,
           uri=config.storage.uri)

Step 4: ML Processing Pipeline

Create Processing Module

Create processing.py:

from dspu.ml import (
    SeedManager,
    DataSplitter,
    Scaler,
    Encoder,
    Stats,
)
from dspu.observability import get_logger, timed

logger = get_logger(__name__)

class DataProcessor:
    def __init__(self, scaler_path: str = None, encoder_path: str = None):
        """Initialize processor with optional saved artifacts."""
        # Set reproducibility
        SeedManager.set_global_seed(42)

        # Load or create transformers
        if scaler_path and scaler_path.exists():
            self.scaler = Scaler.load_from_file(scaler_path)
        else:
            self.scaler = Scaler(method="standard")

        if encoder_path and encoder_path.exists():
            self.encoder = Encoder.load_from_file(encoder_path)
        else:
            self.encoder = Encoder(method="label")

    @timed()
    def fit(self, X: list[list[float]], y: list[str]):
        """Fit transformers on training data."""
        logger.info("Fitting transformers", samples=len(X))

        # Fit scaler
        self.scaler.fit(X)

        # Fit encoder
        self.encoder.fit(y)

        logger.info("Transformers fitted")

    @timed()
    def transform(self, X: list[list[float]], y: list[str] = None):
        """Transform data using fitted transformers."""
        logger.info("Transforming data", samples=len(X))

        # Scale features
        X_scaled = self.scaler.transform(X)

        # Encode labels if provided
        y_encoded = None
        if y is not None:
            y_encoded = self.encoder.transform(y)

        logger.info("Data transformed")

        return X_scaled, y_encoded

    def save_artifacts(self, scaler_path: str, encoder_path: str):
        """Save fitted transformers."""
        self.scaler.save_to_file(scaler_path)
        self.encoder.save_to_file(encoder_path)

        logger.info("Artifacts saved",
                   scaler_path=scaler_path,
                   encoder_path=encoder_path)

Step 5: API Endpoints

Create Resilient API Client

from dspu.aio import retry, RateLimiter, CircuitBreaker

# Setup resilience
rate_limiter = RateLimiter(rate=10.0)  # 10 requests/sec
circuit_breaker = CircuitBreaker(failure_threshold=5, timeout=60.0)

@retry(max_attempts=3, backoff="exponential")
async def process_upload(data: dict, request_id: str):
    """Process uploaded data with full error handling."""
    with LogContext(request_id=request_id):
        logger.info("Processing upload started")

        try:
            # Rate limit
            async with rate_limiter:
                # Circuit breaker
                async with circuit_breaker.protect():
                    # Process data
                    result = await process_data(data)

            logger.info("Processing completed", status="success")
            return result

        except Exception as e:
            logger.error("Processing failed", error=str(e), status="error")
            raise

Upload Endpoint

from fastapi import FastAPI, UploadFile, HTTPException
from dspu.validation import StripWhitespaceFilter, LowercaseFilter

app = FastAPI()

# Validation filter
email_filter = StripWhitespaceFilter().then(LowercaseFilter())

@app.post("/upload")
async def upload_file(
    file: UploadFile,
    user_email: str,
    request_id: str = None,
):
    """Upload and process CSV file."""
    if request_id is None:
        from dspu.ml import IDGenerator
        request_id = IDGenerator.ulid()

    with LogContext(request_id=request_id):
        logger.info("Upload started", filename=file.filename)

        try:
            # Validate email
            user_email = email_filter(user_email)

            # Read CSV
            content = await file.read()
            data = parse_csv(content)

            # Process with ML pipeline
            result = await process_upload(data, request_id)

            # Save to storage
            output_path = f"processed/{request_id}.json"
            await storage.write_format(output_path, result)

            logger.info("Upload completed",
                       filename=file.filename,
                       output_path=output_path,
                       rows=len(result))

            return {
                "request_id": request_id,
                "status": "success",
                "rows_processed": len(result),
                "output_path": output_path,
            }

        except Exception as e:
            logger.error("Upload failed", error=str(e))
            raise HTTPException(status_code=500, detail=str(e))

Train Endpoint

@app.post("/train")
async def train_model(
    X_train: list[list[float]],
    y_train: list[str],
):
    """Train ML transformers."""
    request_id = IDGenerator.ulid()

    with LogContext(request_id=request_id):
        logger.info("Training started", samples=len(X_train))

        try:
            # Create processor
            processor = DataProcessor()

            # Fit transformers
            processor.fit(X_train, y_train)

            # Save artifacts
            processor.save_artifacts(
                "artifacts/scaler.json",
                "artifacts/encoder.json"
            )

            logger.info("Training completed")

            return {
                "request_id": request_id,
                "status": "success",
                "samples_trained": len(X_train),
            }

        except Exception as e:
            logger.error("Training failed", error=str(e))
            raise HTTPException(status_code=500, detail=str(e))

Step 6: Authentication

Add JWT Authentication

from dspu.security import JWTProvider, RotatingToken
from fastapi import Depends, Header

# Get JWT secret from secrets manager
jwt_secret = await secrets.get("jwt/secret")
jwt_provider = JWTProvider(
    secret_key=jwt_secret,
    issuer="data-processor",
    expiry_seconds=3600
)

async def verify_token(authorization: str = Header(...)):
    """Verify JWT token."""
    if not authorization.startswith("Bearer "):
        raise HTTPException(status_code=401, detail="Invalid authorization header")

    token = authorization[7:]  # Remove "Bearer "

    try:
        claims = jwt_provider.verify_token(token)
        return claims
    except Exception:
        raise HTTPException(status_code=401, detail="Invalid token")

# Protected endpoint
@app.post("/upload", dependencies=[Depends(verify_token)])
async def upload_file(...):
    ...

Step 7: Run the Application

Complete app.py

import asyncio
from fastapi import FastAPI
from dspu.config import Config, FileSource, EnvSource
from dspu.observability import configure_logging, get_logger
from dspu.security import SecretManager
from dspu.io import Storage
from models import AppConfig
from processing import DataProcessor

# Load configuration
config = Config.load(
    AppConfig,
    sources=[
        FileSource("config.yaml"),
        EnvSource(prefix="APP_", separator="__"),
    ],
)

# Configure logging
configure_logging(
    level=config.log_level,
    format="json" if not config.debug else "rich"
)

logger = get_logger(__name__)

# Initialize components
async def startup():
    """Initialize application on startup."""
    logger.info("Application starting", version="1.0.0")

    # Load secrets
    global secrets
    secrets = SecretManager.from_env()

    # Initialize storage
    global storage
    storage = Storage.from_uri(config.storage.uri)

    logger.info("Application started")

# Create FastAPI app
app = FastAPI(on_startup=[startup])

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

Environment Variables

Create .env.example:

# Database
APP_DATABASE__PASSWORD=secret123

# JWT
APP_JWT_SECRET=your-secret-key

# Storage (for production)
APP_STORAGE__TYPE=s3
APP_STORAGE__URI=s3://my-bucket/data

# Logging
APP_LOG_LEVEL=INFO
APP_DEBUG=false

Run

# Development
python app.py

# Production
uvicorn app:app --host 0.0.0.0 --port 8000 --workers 4

Step 8: Testing

Upload Data

curl -X POST "http://localhost:8000/upload" \
  -H "Authorization: Bearer YOUR_TOKEN" \
  -F "file=@data.csv" \
  -F "user_email=  USER@EXAMPLE.COM  "

Response

{
  "request_id": "01ARZ3NDEKTSV4RRFFQ69G5FAV",
  "status": "success",
  "rows_processed": 1000,
  "output_path": "processed/01ARZ3NDEKTSV4RRFFQ69G5FAV.json"
}

Logs

{"timestamp":"2024-12-05T10:30:45","level":"INFO","message":"Upload started","request_id":"01ARZ3NDEKTSV4RRFFQ69G5FAV","filename":"data.csv"}
{"timestamp":"2024-12-05T10:30:45","level":"INFO","message":"Processing upload started","request_id":"01ARZ3NDEKTSV4RRFFQ69G5FAV"}
{"timestamp":"2024-12-05T10:30:46","level":"INFO","message":"Transforming data","samples":1000}
{"timestamp":"2024-12-05T10:30:47","level":"INFO","message":"Data transformed"}
{"timestamp":"2024-12-05T10:30:47","level":"INFO","message":"Processing completed","status":"success"}
{"timestamp":"2024-12-05T10:30:48","level":"INFO","message":"Upload completed","filename":"data.csv","output_path":"processed/01ARZ3NDEKTSV4RRFFQ69G5FAV.json","rows":1000}

What We've Learned

Configuration

  • Multi-source configuration (file + env vars)
  • Type-safe configuration with Pydantic
  • Secret management separation

Observability

  • Structured logging with JSON output
  • Request correlation with request_id
  • Automatic timing with @timed decorator
  • Rich output for development

ML Processing

  • Reproducible experiments with seed management
  • Stateful transformers (fit/transform pattern)
  • Artifact persistence for production

Resilience

  • Retry with exponential backoff
  • Rate limiting for API protection
  • Circuit breakers for fault tolerance

Storage

  • Storage abstraction (local/S3/GCS)
  • Multi-format support
  • Streaming for large files

Security

  • Secret management (environment/Vault/AWS)
  • JWT authentication
  • Input validation with filters

Next Steps

Production Deployment

  1. Docker:

    FROM python:3.11-slim
    COPY . /app
    WORKDIR /app
    RUN pip install -r requirements.txt
    CMD ["uvicorn", "app:app", "--host", "0.0.0.0", "--port", "8000"]
    

  2. Kubernetes:

  3. ConfigMap for config.yaml
  4. Secrets for sensitive data
  5. Service for load balancing

  6. Monitoring:

  7. Export JSON logs to ELK/Datadog
  8. Add metrics endpoint
  9. Set up alerts

Additional Features

  • Add caching with Redis
  • Implement batch processing
  • Add async background jobs
  • Set up monitoring dashboard

Complete Example

The complete code is available in examples/tutorials/data-processor/.

Further Reading