Observability¶
Structured logging, tracing, and rich output for better debugging and monitoring.
Overview¶
The observability module provides:
- Structured Logging: JSON logs with context
- Log Context: Automatic field propagation
- Tracing Decorators: Automatic function timing and tracing
- Rich Output: Colorful console output with tables, JSON, progress bars
- Production-Ready: Designed for log aggregation systems
Structured Logging¶
Why Structured Logging?¶
Traditional logging:
Structured logging:
Benefits: - Searchable fields in log aggregators - Type-safe field values - Consistent field names - Easy to query and analyze
Basic Setup¶
from dspu.observability import configure_logging, get_logger
# Configure logging
configure_logging(level="INFO", format="json")
# Get logger
logger = get_logger(__name__)
# Log with fields
logger.info("User logged in", user_id=123, ip="192.168.1.1")
Output (JSON):
{"timestamp":"2024-12-05T10:30:45","level":"INFO","logger":"myapp","message":"User logged in","user_id":123,"ip":"192.168.1.1"}
Output Formats¶
Console (Development)¶
Human-readable text:
JSON (Production)¶
Structured NDJSON:
Use Case: Log aggregation (ELK, Datadog, CloudWatch)
Rich (Development)¶
Colorful with syntax highlighting:
Features: - Colored log levels - Syntax-highlighted exceptions - Clickable file paths - Progress bars
Log Context¶
Automatic Field Propagation¶
Add context once, included in all logs:
from dspu.observability import LogContext
with LogContext(request_id="req-123", user_id=456):
logger.info("Processing request") # Includes request_id, user_id
process_order()
logger.info("Request completed") # Includes request_id, user_id
Nested Contexts¶
Contexts stack automatically:
with LogContext(request_id="req-123"):
logger.info("Request started") # request_id
with LogContext(user_id=456):
logger.info("User authenticated") # request_id, user_id
with LogContext(order_id=789):
logger.info("Processing order") # request_id, user_id, order_id
Persistent Context¶
Add fields for entire session:
from dspu.observability import bind_context
# Add at startup
bind_context(service="api", version="1.2.3", env="production")
# All logs include these fields
logger.info("Application started") # Includes service, version, env
Tracing Decorators¶
@timed¶
Automatic execution timing:
from dspu.observability import timed
@timed()
def slow_function():
# ... expensive operation
return result
# Logs: "slow_function completed in 2.34s"
@traced¶
Function entry/exit tracing:
from dspu.observability import traced
@traced(log_args=True, log_result=True)
def process_order(order_id: int, user_id: int):
logger.info("Processing order")
return {"status": "success"}
# Logs:
# → process_order(order_id=123, user_id=456)
# Processing order
# ← process_order returned {"status": "success"}
@logged_errors¶
Automatic error logging:
from dspu.observability import logged_errors
@logged_errors(reraise=True)
def risky_operation():
# ... code that might fail
...
# Automatically logs exceptions with full traceback
Combining Decorators¶
Stack decorators for comprehensive tracing:
@timed()
@traced(log_args=True)
@logged_errors(reraise=True)
def api_endpoint(user_id: int):
return process_request(user_id)
# Logs timing, args, errors automatically
Async Support¶
All decorators work with async functions:
@timed()
@traced()
async def fetch_data(url: str):
async with httpx.AsyncClient() as client:
return await client.get(url)
Rich Output¶
Pretty JSON¶
Syntax-highlighted JSON:
from dspu.observability import print_json
data = {"user": {"name": "Alice", "age": 30}, "orders": [1, 2, 3]}
print_json(data)
Tables¶
Display tabular data:
from dspu.observability import print_table
data = [
{"name": "Alice", "age": 30, "city": "NYC"},
{"name": "Bob", "age": 25, "city": "SF"},
]
print_table(data, title="Users")
Trees¶
Hierarchical data:
from dspu.observability import print_tree
tree = {
"root": {
"child1": {"leaf1": "value1"},
"child2": {"leaf2": "value2"},
}
}
print_tree(tree, title="Config")
Progress Bars¶
Single progress:
from dspu.observability import Progress
with Progress() as progress:
task = progress.add_task("Processing", total=100)
for i in range(100):
process_item(i)
progress.update(task, advance=1)
Multiple tasks:
from dspu.observability import TaskProgress
with TaskProgress() as progress:
task1 = progress.add_task("Downloading", total=100)
task2 = progress.add_task("Processing", total=100)
# Update both tasks
progress.update(task1, advance=10)
progress.update(task2, advance=5)
Common Patterns¶
Pattern 1: API Request Logging¶
from dspu.observability import configure_logging, get_logger, LogContext
import time
configure_logging(level="INFO", format="json")
logger = get_logger(__name__)
def handle_request(request_id: str, path: str, user_id: int):
with LogContext(request_id=request_id, user_id=user_id):
start = time.time()
logger.info("Request started", path=path)
try:
result = process_request(path)
status = "success"
except Exception as e:
logger.error("Request failed", error=str(e))
status = "error"
raise
finally:
duration_ms = (time.time() - start) * 1000
logger.info("Request completed",
status=status,
duration_ms=duration_ms)
Pattern 2: Background Job¶
@timed()
@logged_errors()
def process_job(job_id: str):
with LogContext(job_id=job_id):
logger.info("Job started")
# Process job
for step in steps:
logger.info(f"Processing {step}", step=step)
process_step(step)
logger.info("Job completed")
Pattern 3: Data Pipeline¶
@timed()
def load_data(source: str):
logger.info("Loading data", source=source)
return data
@timed()
def transform_data(data):
logger.info("Transforming data", rows=len(data))
with Progress() as progress:
task = progress.add_task("Transforming", total=len(data))
results = []
for item in data:
results.append(transform(item))
progress.update(task, advance=1)
return results
@timed()
def save_data(data, dest: str):
logger.info("Saving data", dest=dest, rows=len(data))
save(dest, data)
# Run pipeline
data = load_data("input.csv")
transformed = transform_data(data)
save_data(transformed, "output.csv")
Pattern 4: Error Tracking¶
from dspu.observability import print_traceback
@logged_errors(reraise=False)
def process_with_fallback():
try:
return primary_method()
except Exception:
logger.warning("Primary method failed, using fallback")
print_traceback(show_locals=True) # Rich traceback
return fallback_method()
Best Practices¶
Logging¶
✅ DO: - Use structured fields instead of string formatting - Add context (request_id, user_id, etc.) - Use appropriate log levels - Log errors with full context - Use JSON format in production
❌ DON'T: - Don't log sensitive data (passwords, tokens) - Don't use string interpolation - Don't overuse DEBUG in production - Don't log inside tight loops - Don't swallow exceptions without logging
Context¶
✅ DO: - Add request_id to all API requests - Use context managers for scoped fields - Include user_id for user actions - Add correlation IDs for distributed tracing - Clear context when done
❌ DON'T: - Don't leak sensitive data in context - Don't add too many fields (keep it relevant) - Don't mutate global context without cleanup - Don't skip context in error handlers
Performance¶
✅ DO: - Use JSON format in production - Log at appropriate levels (INFO for important events) - Use sampling for high-frequency logs - Asynchronous logging for high throughput - Structured fields for efficient querying
❌ DON'T: - Don't log inside tight loops - Don't use rich format in production - Don't log large objects (truncate/sample) - Don't synchronous logging in critical paths
Log Levels¶
When to Use Each Level¶
DEBUG: - Detailed diagnostic information - Variable values - Function calls - Development only
INFO: - Normal operations - Request started/completed - Service started/stopped - Business events
WARNING: - Unexpected but handled - Deprecated features - Configuration issues - Performance issues
ERROR: - Exceptions - Failed operations - Data corruption - Service failures
Example:
logger.debug("Cache miss", key=key) # Diagnostic
logger.info("Order created", order_id=123) # Business event
logger.warning("API slow", duration_ms=5000) # Performance issue
logger.error("Payment failed", error=str(e)) # Operation failure
Integration with Log Aggregators¶
ELK Stack¶
# JSON logs to stdout
configure_logging(level="INFO", format="json")
# Filebeat → Logstash → Elasticsearch
# Query: user_id:123 AND status:error
Datadog¶
CloudWatch¶
# JSON logs to stdout
# Lambda/ECS sends to CloudWatch
# Query: fields @timestamp, user_id, status | filter status="error"