Skip to content

Observability

Structured logging, tracing, and rich output for better debugging and monitoring.

Overview

The observability module provides:

  • Structured Logging: JSON logs with context
  • Log Context: Automatic field propagation
  • Tracing Decorators: Automatic function timing and tracing
  • Rich Output: Colorful console output with tables, JSON, progress bars
  • Production-Ready: Designed for log aggregation systems

Structured Logging

Why Structured Logging?

Traditional logging:

logger.info(f"User {user_id} logged in from {ip}")

Structured logging:

logger.info("User logged in", user_id=user_id, ip=ip)

Benefits: - Searchable fields in log aggregators - Type-safe field values - Consistent field names - Easy to query and analyze

Basic Setup

from dspu.observability import configure_logging, get_logger

# Configure logging
configure_logging(level="INFO", format="json")

# Get logger
logger = get_logger(__name__)

# Log with fields
logger.info("User logged in", user_id=123, ip="192.168.1.1")

Output (JSON):

{"timestamp":"2024-12-05T10:30:45","level":"INFO","logger":"myapp","message":"User logged in","user_id":123,"ip":"192.168.1.1"}

Output Formats

Console (Development)

Human-readable text:

configure_logging(level="INFO", format="console")
2024-12-05 10:30:45 INFO myapp: User logged in (user_id=123, ip='192.168.1.1')

JSON (Production)

Structured NDJSON:

configure_logging(level="INFO", format="json")
{"timestamp":"2024-12-05T10:30:45","level":"INFO","message":"User logged in","user_id":123}

Use Case: Log aggregation (ELK, Datadog, CloudWatch)

Rich (Development)

Colorful with syntax highlighting:

configure_logging(level="INFO", format="rich")

Features: - Colored log levels - Syntax-highlighted exceptions - Clickable file paths - Progress bars

Log Context

Automatic Field Propagation

Add context once, included in all logs:

from dspu.observability import LogContext

with LogContext(request_id="req-123", user_id=456):
    logger.info("Processing request")  # Includes request_id, user_id
    process_order()
    logger.info("Request completed")   # Includes request_id, user_id

Nested Contexts

Contexts stack automatically:

with LogContext(request_id="req-123"):
    logger.info("Request started")  # request_id

    with LogContext(user_id=456):
        logger.info("User authenticated")  # request_id, user_id

        with LogContext(order_id=789):
            logger.info("Processing order")  # request_id, user_id, order_id

Persistent Context

Add fields for entire session:

from dspu.observability import bind_context

# Add at startup
bind_context(service="api", version="1.2.3", env="production")

# All logs include these fields
logger.info("Application started")  # Includes service, version, env

Tracing Decorators

@timed

Automatic execution timing:

from dspu.observability import timed

@timed()
def slow_function():
    # ... expensive operation
    return result

# Logs: "slow_function completed in 2.34s"

@traced

Function entry/exit tracing:

from dspu.observability import traced

@traced(log_args=True, log_result=True)
def process_order(order_id: int, user_id: int):
    logger.info("Processing order")
    return {"status": "success"}

# Logs:
# → process_order(order_id=123, user_id=456)
# Processing order
# ← process_order returned {"status": "success"}

@logged_errors

Automatic error logging:

from dspu.observability import logged_errors

@logged_errors(reraise=True)
def risky_operation():
    # ... code that might fail
    ...

# Automatically logs exceptions with full traceback

Combining Decorators

Stack decorators for comprehensive tracing:

@timed()
@traced(log_args=True)
@logged_errors(reraise=True)
def api_endpoint(user_id: int):
    return process_request(user_id)

# Logs timing, args, errors automatically

Async Support

All decorators work with async functions:

@timed()
@traced()
async def fetch_data(url: str):
    async with httpx.AsyncClient() as client:
        return await client.get(url)

Rich Output

Pretty JSON

Syntax-highlighted JSON:

from dspu.observability import print_json

data = {"user": {"name": "Alice", "age": 30}, "orders": [1, 2, 3]}
print_json(data)

Tables

Display tabular data:

from dspu.observability import print_table

data = [
    {"name": "Alice", "age": 30, "city": "NYC"},
    {"name": "Bob", "age": 25, "city": "SF"},
]
print_table(data, title="Users")

Trees

Hierarchical data:

from dspu.observability import print_tree

tree = {
    "root": {
        "child1": {"leaf1": "value1"},
        "child2": {"leaf2": "value2"},
    }
}
print_tree(tree, title="Config")

Progress Bars

Single progress:

from dspu.observability import Progress

with Progress() as progress:
    task = progress.add_task("Processing", total=100)

    for i in range(100):
        process_item(i)
        progress.update(task, advance=1)

Multiple tasks:

from dspu.observability import TaskProgress

with TaskProgress() as progress:
    task1 = progress.add_task("Downloading", total=100)
    task2 = progress.add_task("Processing", total=100)

    # Update both tasks
    progress.update(task1, advance=10)
    progress.update(task2, advance=5)

Common Patterns

Pattern 1: API Request Logging

from dspu.observability import configure_logging, get_logger, LogContext
import time

configure_logging(level="INFO", format="json")
logger = get_logger(__name__)

def handle_request(request_id: str, path: str, user_id: int):
    with LogContext(request_id=request_id, user_id=user_id):
        start = time.time()

        logger.info("Request started", path=path)

        try:
            result = process_request(path)
            status = "success"
        except Exception as e:
            logger.error("Request failed", error=str(e))
            status = "error"
            raise
        finally:
            duration_ms = (time.time() - start) * 1000
            logger.info("Request completed",
                       status=status,
                       duration_ms=duration_ms)

Pattern 2: Background Job

@timed()
@logged_errors()
def process_job(job_id: str):
    with LogContext(job_id=job_id):
        logger.info("Job started")

        # Process job
        for step in steps:
            logger.info(f"Processing {step}", step=step)
            process_step(step)

        logger.info("Job completed")

Pattern 3: Data Pipeline

@timed()
def load_data(source: str):
    logger.info("Loading data", source=source)
    return data

@timed()
def transform_data(data):
    logger.info("Transforming data", rows=len(data))

    with Progress() as progress:
        task = progress.add_task("Transforming", total=len(data))

        results = []
        for item in data:
            results.append(transform(item))
            progress.update(task, advance=1)

    return results

@timed()
def save_data(data, dest: str):
    logger.info("Saving data", dest=dest, rows=len(data))
    save(dest, data)

# Run pipeline
data = load_data("input.csv")
transformed = transform_data(data)
save_data(transformed, "output.csv")

Pattern 4: Error Tracking

from dspu.observability import print_traceback

@logged_errors(reraise=False)
def process_with_fallback():
    try:
        return primary_method()
    except Exception:
        logger.warning("Primary method failed, using fallback")
        print_traceback(show_locals=True)  # Rich traceback
        return fallback_method()

Best Practices

Logging

DO: - Use structured fields instead of string formatting - Add context (request_id, user_id, etc.) - Use appropriate log levels - Log errors with full context - Use JSON format in production

DON'T: - Don't log sensitive data (passwords, tokens) - Don't use string interpolation - Don't overuse DEBUG in production - Don't log inside tight loops - Don't swallow exceptions without logging

Context

DO: - Add request_id to all API requests - Use context managers for scoped fields - Include user_id for user actions - Add correlation IDs for distributed tracing - Clear context when done

DON'T: - Don't leak sensitive data in context - Don't add too many fields (keep it relevant) - Don't mutate global context without cleanup - Don't skip context in error handlers

Performance

DO: - Use JSON format in production - Log at appropriate levels (INFO for important events) - Use sampling for high-frequency logs - Asynchronous logging for high throughput - Structured fields for efficient querying

DON'T: - Don't log inside tight loops - Don't use rich format in production - Don't log large objects (truncate/sample) - Don't synchronous logging in critical paths

Log Levels

When to Use Each Level

DEBUG: - Detailed diagnostic information - Variable values - Function calls - Development only

INFO: - Normal operations - Request started/completed - Service started/stopped - Business events

WARNING: - Unexpected but handled - Deprecated features - Configuration issues - Performance issues

ERROR: - Exceptions - Failed operations - Data corruption - Service failures

Example:

logger.debug("Cache miss", key=key)  # Diagnostic
logger.info("Order created", order_id=123)  # Business event
logger.warning("API slow", duration_ms=5000)  # Performance issue
logger.error("Payment failed", error=str(e))  # Operation failure

Integration with Log Aggregators

ELK Stack

# JSON logs to stdout
configure_logging(level="INFO", format="json")

# Filebeat → Logstash → Elasticsearch
# Query: user_id:123 AND status:error

Datadog

# JSON logs with DD fields
bind_context(
    service="api",
    env="production",
    version="1.2.3"
)

CloudWatch

# JSON logs to stdout
# Lambda/ECS sends to CloudWatch
# Query: fields @timestamp, user_id, status | filter status="error"

Installation

# Observability included in base installation
pip install dspu

Next Steps