Skip to content

I/O Operations

Storage abstraction and multi-format file operations for local and cloud storage.

Overview

The I/O module provides:

  • Unified API: Same interface for local, S3, GCS, Azure storage
  • Multi-format: Support for JSON, YAML, TOML, CSV, and more
  • Streaming: Handle large files efficiently
  • Path safety: Prevent path traversal attacks
  • Type-safe: Full type hints and validation

Storage Abstraction

Creating Storage Instances

from dspu.io import Storage

# Local filesystem
storage = Storage.from_uri("file:///data")
storage = Storage.from_uri("./data")  # Relative path

# Amazon S3
storage = Storage.from_uri("s3://bucket/path")

# Google Cloud Storage
storage = Storage.from_uri("gs://bucket/path")

# Azure Blob Storage
storage = Storage.from_uri("azure://container/path")

Basic Operations

# Write
await storage.write("file.txt", b"Hello, World!")

# Read
data = await storage.read("file.txt")

# Check existence
exists = await storage.exists("file.txt")

# List files
files = await storage.list("*.json")

# Delete
await storage.delete("file.txt")

Multi-Format Support

Supported Formats

Format Extensions Use Case
JSON .json Structured data, APIs
YAML .yaml, .yml Human-readable configs
TOML .toml Python projects
HOCON .conf, .hocon Complex hierarchical configs
CSV .csv Tabular data
Markdown .md Documentation
Python .py Code generation
Env .env Environment variables
Text .txt Plain text

Format Auto-Detection

# Auto-detect from extension
await storage.write_format("config.json", {"key": "value"})
await storage.write_format("config.yaml", {"key": "value"})
await storage.write_format("config.toml", {"key": "value"})

# Read with auto-detection
config = await storage.read_format("config.yaml")

Format Options

# JSON with indentation
await storage.write_format(
    "config.json",
    data,
    format_options={"indent": 2, "sort_keys": True}
)

# CSV with header
await storage.write_format(
    "data.csv",
    rows,
    format_options={"header": True, "delimiter": ","}
)

# Python code validation
await storage.write_format(
    "utils.py",
    code,
    format_options={"validate_syntax": True}
)

Streaming Large Files

Why Streaming?

  • Handle files larger than RAM
  • Lower memory footprint
  • Start processing before download completes
  • Transform data on-the-fly

Reading Streams

# Stream file in chunks
async for chunk in storage.read_stream("large_file.csv", chunk_size=8192):
    process_chunk(chunk)

Writing Streams

# Write data incrementally
async with storage.write_stream("output.txt") as stream:
    for data in generate_data():
        await stream.write(data.encode())

Transform While Streaming

# Read CSV, transform, write JSON
async for chunk in storage.read_stream("input.csv"):
    records = parse_csv(chunk)
    transformed = [transform(r) for r in records]
    await storage.write_format("output.json", transformed)

Path Resolution

Why Path Resolution?

  • Resolve paths relative to source files
  • Prevent path traversal attacks
  • Validate paths before use
  • Consistent path handling

Using PathResolver

from dspu.io import PathResolver

# Resolve relative to current file
resolver = PathResolver(__file__, basis="../configs")

# Resolve path
config_path = resolver.resolve("app.yaml")  # /path/to/configs/app.yaml

# Resolve with existence check
config_path = resolver.resolve("app.yaml", check_exists=True)

# Check path safety
is_safe = resolver.is_safe("../../etc/passwd")  # False

Security Checks

from dspu.io import PathResolver

resolver = PathResolver(__file__, basis="/var/app/data")

# ✅ Safe: within basis
resolver.resolve("user_data.json")  # OK

# ❌ Unsafe: tries to escape
resolver.resolve("../../etc/passwd")  # Raises SecurityError

Cloud Storage

AWS S3

# Setup
storage = Storage.from_uri("s3://my-bucket/data")

# Use exactly like local storage
await storage.write_format("config.json", config)
data = await storage.read_format("config.json")
files = await storage.list("*.json")

Authentication: - Uses AWS credentials from environment/profile - Set AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY - Or use IAM roles (recommended)

Google Cloud Storage

# Setup
storage = Storage.from_uri("gs://my-bucket/data")

# Same API
await storage.write_format("data.json", data)

Authentication: - Uses Google Cloud credentials - Set GOOGLE_APPLICATION_CREDENTIALS - Or use service account

Azure Blob Storage

# Setup
storage = Storage.from_uri("azure://my-container/data")

# Same API
await storage.write_format("data.json", data)

Authentication: - Uses Azure credentials - Set AZURE_STORAGE_CONNECTION_STRING - Or use managed identity

Common Patterns

Pattern 1: Configuration Management

# Load config from multiple formats
config_files = ["config.yaml", "config.json", "config.toml"]

for file in config_files:
    if await storage.exists(file):
        config = await storage.read_format(file)
        break

Pattern 2: Data Export

# Export to multiple formats
formats = ["json", "yaml", "csv"]

for fmt in formats:
    filename = f"export.{fmt}"
    await storage.write_format(filename, data)

Pattern 3: Log Archival

# Stream logs to cloud storage
log_date = datetime.now().strftime("%Y-%m-%d")
s3_storage = Storage.from_uri(f"s3://logs-bucket/{log_date}")

async with s3_storage.write_stream("app.log") as stream:
    async for log_line in read_logs():
        await stream.write(log_line.encode())

Pattern 4: Data Migration

# Migrate from local to cloud
local = Storage.from_uri("./data")
cloud = Storage.from_uri("s3://backup/data")

files = await local.list("*.json")
for file in files:
    data = await local.read(file)
    await cloud.write(file, data)

Pattern 5: Multi-Cloud Backup

# Backup to multiple clouds
primary = Storage.from_uri("s3://primary-bucket/data")
backup_gcs = Storage.from_uri("gs://backup-bucket/data")
backup_azure = Storage.from_uri("azure://backup-container/data")

data = await primary.read("important.json")
await backup_gcs.write("important.json", data)
await backup_azure.write("important.json", data)

Format-Specific Details

JSON

# Pretty print
await storage.write_format(
    "config.json",
    data,
    format_options={
        "indent": 2,
        "sort_keys": True,
        "ensure_ascii": False,
    }
)

# Compact
await storage.write_format(
    "config.json",
    data,
    format_options={"separators": (",", ":")}
)

YAML

# YAML with flow style
await storage.write_format(
    "config.yaml",
    data,
    format_options={"default_flow_style": False}
)

CSV

# With header
rows = [
    {"name": "Alice", "age": 30},
    {"name": "Bob", "age": 25},
]

await storage.write_format(
    "data.csv",
    rows,
    format_options={
        "header": True,
        "delimiter": ",",
    }
)

# Result:
# name,age
# Alice,30
# Bob,25

Environment Files

# Write .env file
env_vars = {
    "DATABASE_URL": "postgresql://...",
    "API_KEY": "secret",
}

await storage.write_format(".env", env_vars)

# Result:
# DATABASE_URL=postgresql://...
# API_KEY=secret

Error Handling

Common Errors

from dspu.io import FormatError, StorageError

try:
    # Format error: invalid data for format
    await storage.write_format("config.toml", ["list", "at", "root"])
except FormatError as e:
    print(f"Format error: {e}")
    print(f"Suggestion: {e.suggestion}")

try:
    # Storage error: file not found
    data = await storage.read("nonexistent.json")
except StorageError as e:
    print(f"Storage error: {e}")

Graceful Degradation

# Try multiple storage backends
backends = [
    "s3://primary-bucket/data",
    "gs://backup-bucket/data",
    "./local-cache",
]

data = None
for uri in backends:
    try:
        storage = Storage.from_uri(uri)
        data = await storage.read("file.json")
        break
    except StorageError:
        continue

if data is None:
    raise RuntimeError("All storage backends failed")

Best Practices

Storage

DO: - Use Storage abstraction for portability - Use streaming for large files - Handle errors gracefully - Use cloud storage for production

DON'T: - Don't load entire large files into memory - Don't hardcode storage URIs - Don't skip error handling - Don't mix storage backends unnecessarily

Formats

DO: - Use format auto-detection from extensions - Validate data before writing - Use appropriate formats (JSON for APIs, YAML for configs) - Specify format options when needed

DON'T: - Don't ignore format errors - Don't use wrong format for data type - Don't skip validation - Don't assume format from name only

Paths

DO: - Use PathResolver for security - Validate paths before use - Use absolute paths when possible - Check path existence when needed

DON'T: - Don't construct paths with string concatenation - Don't trust user-provided paths - Don't skip security checks - Don't use relative paths without basis

Installation

# Basic I/O
pip install dspu

# With cloud storage support
pip install 'dspu[io]'

Next Steps