Tutorial: Building a Data Processing API¶
This tutorial walks you through building a complete data processing API using DSPU modules.
What We'll Build¶
A REST API that: - Processes CSV data uploads - Applies ML transformations (scaling, encoding) - Stores results in cloud storage - Provides structured logging and monitoring - Handles authentication and secrets
Prerequisites¶
Project Structure¶
data-processor/
├── config.yaml
├── .env.example
├── app.py
├── models.py
├── processing.py
└── artifacts/
├── scaler.json
└── encoder.json
Step 1: Configuration¶
Define Configuration Model¶
Create models.py:
from pydantic import BaseModel, Field
class DatabaseConfig(BaseModel):
host: str = "localhost"
port: int = Field(ge=1, le=65535)
name: str
user: str
class StorageConfig(BaseModel):
type: str # "local" or "s3"
uri: str
class AppConfig(BaseModel):
debug: bool = False
log_level: str = "INFO"
database: DatabaseConfig
storage: StorageConfig
Create Configuration File¶
Create config.yaml:
debug: false
log_level: INFO
database:
host: localhost
port: 5432
name: dataproc
user: admin
storage:
type: local
uri: ./data
Load Configuration¶
Add to app.py:
from dspu.config import Config, FileSource, EnvSource
from dspu.security import SecretManager
from models import AppConfig
# Load configuration
config = Config.load(
AppConfig,
sources=[
FileSource("config.yaml"),
EnvSource(prefix="APP_", separator="__"),
],
)
# Load secrets
secrets = SecretManager.from_env()
db_password = await secrets.get("database/password")
config.database.password = db_password
Step 2: Logging Setup¶
Configure Structured Logging¶
Add to app.py:
from dspu.observability import (
configure_logging,
get_logger,
LogContext,
timed,
)
# Configure logging
configure_logging(
level=config.log_level,
format="json" if not config.debug else "rich"
)
logger = get_logger(__name__)
Step 3: Storage Setup¶
Initialize Storage¶
from dspu.io import Storage
# Create storage instance
storage = Storage.from_uri(config.storage.uri)
logger.info("Storage initialized",
storage_type=config.storage.type,
uri=config.storage.uri)
Step 4: ML Processing Pipeline¶
Create Processing Module¶
Create processing.py:
from dspu.ml import (
SeedManager,
DataSplitter,
Scaler,
Encoder,
Stats,
)
from dspu.observability import get_logger, timed
logger = get_logger(__name__)
class DataProcessor:
def __init__(self, scaler_path: str = None, encoder_path: str = None):
"""Initialize processor with optional saved artifacts."""
# Set reproducibility
SeedManager.set_global_seed(42)
# Load or create transformers
if scaler_path and scaler_path.exists():
self.scaler = Scaler.load_from_file(scaler_path)
else:
self.scaler = Scaler(method="standard")
if encoder_path and encoder_path.exists():
self.encoder = Encoder.load_from_file(encoder_path)
else:
self.encoder = Encoder(method="label")
@timed()
def fit(self, X: list[list[float]], y: list[str]):
"""Fit transformers on training data."""
logger.info("Fitting transformers", samples=len(X))
# Fit scaler
self.scaler.fit(X)
# Fit encoder
self.encoder.fit(y)
logger.info("Transformers fitted")
@timed()
def transform(self, X: list[list[float]], y: list[str] = None):
"""Transform data using fitted transformers."""
logger.info("Transforming data", samples=len(X))
# Scale features
X_scaled = self.scaler.transform(X)
# Encode labels if provided
y_encoded = None
if y is not None:
y_encoded = self.encoder.transform(y)
logger.info("Data transformed")
return X_scaled, y_encoded
def save_artifacts(self, scaler_path: str, encoder_path: str):
"""Save fitted transformers."""
self.scaler.save_to_file(scaler_path)
self.encoder.save_to_file(encoder_path)
logger.info("Artifacts saved",
scaler_path=scaler_path,
encoder_path=encoder_path)
Step 5: API Endpoints¶
Create Resilient API Client¶
from dspu.aio import retry, RateLimiter, CircuitBreaker
# Setup resilience
rate_limiter = RateLimiter(rate=10.0) # 10 requests/sec
circuit_breaker = CircuitBreaker(failure_threshold=5, timeout=60.0)
@retry(max_attempts=3, backoff="exponential")
async def process_upload(data: dict, request_id: str):
"""Process uploaded data with full error handling."""
with LogContext(request_id=request_id):
logger.info("Processing upload started")
try:
# Rate limit
async with rate_limiter:
# Circuit breaker
async with circuit_breaker.protect():
# Process data
result = await process_data(data)
logger.info("Processing completed", status="success")
return result
except Exception as e:
logger.error("Processing failed", error=str(e), status="error")
raise
Upload Endpoint¶
from fastapi import FastAPI, UploadFile, HTTPException
from dspu.validation import StripWhitespaceFilter, LowercaseFilter
app = FastAPI()
# Validation filter
email_filter = StripWhitespaceFilter().then(LowercaseFilter())
@app.post("/upload")
async def upload_file(
file: UploadFile,
user_email: str,
request_id: str = None,
):
"""Upload and process CSV file."""
if request_id is None:
from dspu.ml import IDGenerator
request_id = IDGenerator.ulid()
with LogContext(request_id=request_id):
logger.info("Upload started", filename=file.filename)
try:
# Validate email
user_email = email_filter(user_email)
# Read CSV
content = await file.read()
data = parse_csv(content)
# Process with ML pipeline
result = await process_upload(data, request_id)
# Save to storage
output_path = f"processed/{request_id}.json"
await storage.write_format(output_path, result)
logger.info("Upload completed",
filename=file.filename,
output_path=output_path,
rows=len(result))
return {
"request_id": request_id,
"status": "success",
"rows_processed": len(result),
"output_path": output_path,
}
except Exception as e:
logger.error("Upload failed", error=str(e))
raise HTTPException(status_code=500, detail=str(e))
Train Endpoint¶
@app.post("/train")
async def train_model(
X_train: list[list[float]],
y_train: list[str],
):
"""Train ML transformers."""
request_id = IDGenerator.ulid()
with LogContext(request_id=request_id):
logger.info("Training started", samples=len(X_train))
try:
# Create processor
processor = DataProcessor()
# Fit transformers
processor.fit(X_train, y_train)
# Save artifacts
processor.save_artifacts(
"artifacts/scaler.json",
"artifacts/encoder.json"
)
logger.info("Training completed")
return {
"request_id": request_id,
"status": "success",
"samples_trained": len(X_train),
}
except Exception as e:
logger.error("Training failed", error=str(e))
raise HTTPException(status_code=500, detail=str(e))
Step 6: Authentication¶
Add JWT Authentication¶
from dspu.security import JWTProvider, RotatingToken
from fastapi import Depends, Header
# Get JWT secret from secrets manager
jwt_secret = await secrets.get("jwt/secret")
jwt_provider = JWTProvider(
secret_key=jwt_secret,
issuer="data-processor",
expiry_seconds=3600
)
async def verify_token(authorization: str = Header(...)):
"""Verify JWT token."""
if not authorization.startswith("Bearer "):
raise HTTPException(status_code=401, detail="Invalid authorization header")
token = authorization[7:] # Remove "Bearer "
try:
claims = jwt_provider.verify_token(token)
return claims
except Exception:
raise HTTPException(status_code=401, detail="Invalid token")
# Protected endpoint
@app.post("/upload", dependencies=[Depends(verify_token)])
async def upload_file(...):
...
Step 7: Run the Application¶
Complete app.py¶
import asyncio
from fastapi import FastAPI
from dspu.config import Config, FileSource, EnvSource
from dspu.observability import configure_logging, get_logger
from dspu.security import SecretManager
from dspu.io import Storage
from models import AppConfig
from processing import DataProcessor
# Load configuration
config = Config.load(
AppConfig,
sources=[
FileSource("config.yaml"),
EnvSource(prefix="APP_", separator="__"),
],
)
# Configure logging
configure_logging(
level=config.log_level,
format="json" if not config.debug else "rich"
)
logger = get_logger(__name__)
# Initialize components
async def startup():
"""Initialize application on startup."""
logger.info("Application starting", version="1.0.0")
# Load secrets
global secrets
secrets = SecretManager.from_env()
# Initialize storage
global storage
storage = Storage.from_uri(config.storage.uri)
logger.info("Application started")
# Create FastAPI app
app = FastAPI(on_startup=[startup])
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
Environment Variables¶
Create .env.example:
# Database
APP_DATABASE__PASSWORD=secret123
# JWT
APP_JWT_SECRET=your-secret-key
# Storage (for production)
APP_STORAGE__TYPE=s3
APP_STORAGE__URI=s3://my-bucket/data
# Logging
APP_LOG_LEVEL=INFO
APP_DEBUG=false
Run¶
Step 8: Testing¶
Upload Data¶
curl -X POST "http://localhost:8000/upload" \
-H "Authorization: Bearer YOUR_TOKEN" \
-F "file=@data.csv" \
-F "user_email= USER@EXAMPLE.COM "
Response¶
{
"request_id": "01ARZ3NDEKTSV4RRFFQ69G5FAV",
"status": "success",
"rows_processed": 1000,
"output_path": "processed/01ARZ3NDEKTSV4RRFFQ69G5FAV.json"
}
Logs¶
{"timestamp":"2024-12-05T10:30:45","level":"INFO","message":"Upload started","request_id":"01ARZ3NDEKTSV4RRFFQ69G5FAV","filename":"data.csv"}
{"timestamp":"2024-12-05T10:30:45","level":"INFO","message":"Processing upload started","request_id":"01ARZ3NDEKTSV4RRFFQ69G5FAV"}
{"timestamp":"2024-12-05T10:30:46","level":"INFO","message":"Transforming data","samples":1000}
{"timestamp":"2024-12-05T10:30:47","level":"INFO","message":"Data transformed"}
{"timestamp":"2024-12-05T10:30:47","level":"INFO","message":"Processing completed","status":"success"}
{"timestamp":"2024-12-05T10:30:48","level":"INFO","message":"Upload completed","filename":"data.csv","output_path":"processed/01ARZ3NDEKTSV4RRFFQ69G5FAV.json","rows":1000}
What We've Learned¶
Configuration¶
- Multi-source configuration (file + env vars)
- Type-safe configuration with Pydantic
- Secret management separation
Observability¶
- Structured logging with JSON output
- Request correlation with request_id
- Automatic timing with @timed decorator
- Rich output for development
ML Processing¶
- Reproducible experiments with seed management
- Stateful transformers (fit/transform pattern)
- Artifact persistence for production
Resilience¶
- Retry with exponential backoff
- Rate limiting for API protection
- Circuit breakers for fault tolerance
Storage¶
- Storage abstraction (local/S3/GCS)
- Multi-format support
- Streaming for large files
Security¶
- Secret management (environment/Vault/AWS)
- JWT authentication
- Input validation with filters
Next Steps¶
Production Deployment¶
-
Docker:
-
Kubernetes:
- ConfigMap for config.yaml
- Secrets for sensitive data
-
Service for load balancing
-
Monitoring:
- Export JSON logs to ELK/Datadog
- Add metrics endpoint
- Set up alerts
Additional Features¶
- Add caching with Redis
- Implement batch processing
- Add async background jobs
- Set up monitoring dashboard
Complete Example¶
The complete code is available in examples/tutorials/data-processor/.