I/O API Reference¶
Storage abstraction and multi-format file operations.
Storage¶
dspu.io.storage.Storage
¶
Unified storage interface with automatic serialization.
Provides a high-level API for reading/writing data to any storage backend with automatic format detection and serialization.
Example
Local filesystem¶
storage = Storage.from_uri("/data") await storage.write("config.json", {"debug": True}) config = await storage.read("config.json")
S3 (requires fsspec)¶
storage = Storage.from_uri("s3://my-bucket/path") await storage.write("data.msgpack", large_dataset)
Streaming for large files¶
async for chunk in storage.read_stream("large_file.csv"): ... process(chunk)
Initialize storage with backend.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
backend
|
StorageBackend
|
Storage backend implementation. |
required |
Source code in src/dspu/io/storage.py
Functions¶
from_uri
classmethod
¶
from_uri(uri: str, **kwargs: Any) -> Storage
Create Storage from URI.
Auto-detects backend from URI scheme.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
uri
|
str
|
Storage URI (e.g., "file:///path", "s3://bucket"). |
required |
**kwargs
|
Any
|
Backend-specific options. |
{}
|
Returns:
| Type | Description |
|---|---|
Storage
|
Storage instance with appropriate backend. |
Raises:
| Type | Description |
|---|---|
ConfigurationError
|
If backend is not supported or unavailable. |
Example
storage = Storage.from_uri("/data/local") storage = Storage.from_uri("s3://my-bucket/prefix")
Source code in src/dspu/io/storage.py
read
async
¶
Read and deserialize data from storage.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
str
|
Path to file. |
required |
format
|
str | None
|
Serialization format (auto-detected if None). |
None
|
raw
|
bool
|
If True, return raw bytes without deserialization. |
False
|
Returns:
| Type | Description |
|---|---|
Any
|
Deserialized data or raw bytes if raw=True. |
Raises:
| Type | Description |
|---|---|
FileNotFoundError
|
If file doesn't exist. |
SerializationError
|
If deserialization fails. |
Example
config = await storage.read("config.json") data = await storage.read("data.msgpack", format="msgpack") raw_bytes = await storage.read("file.bin", raw=True)
Source code in src/dspu/io/storage.py
write
async
¶
Serialize and write data to storage.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
str
|
Path to file. |
required |
data
|
Any
|
Data to write (or bytes if raw=True). |
required |
format
|
str | None
|
Serialization format (auto-detected if None). |
None
|
raw
|
bool
|
If True, write data as-is (must be bytes). |
False
|
Raises:
| Type | Description |
|---|---|
SerializationError
|
If serialization fails. |
TypeError
|
If raw=True and data is not bytes. |
Example
await storage.write("config.json", {"debug": True}) await storage.write("data.msgpack", dataset, format="msgpack") await storage.write("file.bin", b"raw data", raw=True)
Source code in src/dspu/io/storage.py
exists
async
¶
Check if file exists.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
str
|
Path to check. |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if exists, False otherwise. |
Example
if await storage.exists("config.json"): ... config = await storage.read("config.json")
Source code in src/dspu/io/storage.py
list
async
¶
list(pattern: str = '*') -> list[FileInfo]
List files matching pattern.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
pattern
|
str
|
Glob pattern. |
'*'
|
Returns:
| Type | Description |
|---|---|
list[FileInfo]
|
List of FileInfo for matching files. |
Example
files = await storage.list("*.json") for file in files: ... print(f"{file.path}: {file.size} bytes")
Source code in src/dspu/io/storage.py
delete
async
¶
Delete file or directory.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
str
|
Path to delete. |
required |
Raises:
| Type | Description |
|---|---|
FileNotFoundError
|
If path doesn't exist. |
Example
await storage.delete("old_data.json")
Source code in src/dspu/io/storage.py
read_stream
async
¶
Stream file contents.
Useful for large files to avoid loading everything in memory.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
str
|
Path to file. |
required |
chunk_size
|
int
|
Size of chunks in bytes. |
8192
|
Yields:
| Type | Description |
|---|---|
AsyncIterator[bytes]
|
Chunks of file data. |
Example
async for chunk in storage.read_stream("large_file.csv"): ... process(chunk)
Source code in src/dspu/io/storage.py
write_stream
async
¶
Write file from stream.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
str
|
Path to write to. |
required |
data
|
AsyncIterator[bytes]
|
Async iterator of data chunks. |
required |
Example
async def generate_data(): ... for i in range(1000): ... yield f"line {i}\n".encode() ... await storage.write_stream("output.txt", generate_data())
Source code in src/dspu/io/storage.py
write_format
async
¶
write_format(
path: str,
data: Any,
*,
format: str | None = None,
format_options: dict[str, Any] | None = None,
) -> None
Write data using format writer (text-based formats).
This method uses the format system for text-based structured formats like YAML, TOML, CSV, .env files, etc. For binary formats (msgpack, pickle), use the regular write() method.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
str
|
Path to file. |
required |
data
|
Any
|
Data to write. |
required |
format
|
str | None
|
Format name (auto-detected from path if None). |
None
|
format_options
|
dict[str, Any] | None
|
Format-specific options (indent, delimiter, etc). |
None
|
Raises:
| Type | Description |
|---|---|
FormatError
|
If format operations fail. |
Example
YAML with custom options¶
await storage.write_format( ... 'config.yaml', ... {'debug': True}, ... format_options={'sort_keys': True} ... )
CSV with headers¶
await storage.write_format( ... 'data.csv', ... [{'name': 'Alice', 'age': 30}], ... format_options={'header': True} ... )
Source code in src/dspu/io/storage.py
read_format
async
¶
read_format(
path: str,
*,
format: str | None = None,
format_options: dict[str, Any] | None = None,
) -> Any
Read data using format reader (text-based formats).
This method uses the format system for text-based structured formats. For binary formats, use the regular read() method.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
str
|
Path to file. |
required |
format
|
str | None
|
Format name (auto-detected from path if None). |
None
|
format_options
|
dict[str, Any] | None
|
Format-specific options. |
None
|
Returns:
| Type | Description |
|---|---|
Any
|
Parsed data (type depends on format). |
Raises:
| Type | Description |
|---|---|
FormatError
|
If format operations fail. |
Example
config = await storage.read_format('config.yaml') data = await storage.read_format('data.csv')
Source code in src/dspu/io/storage.py
Storage Backends¶
dspu.io.local.LocalBackend
¶
Local filesystem storage backend.
Provides async file operations on the local filesystem.
Example
backend = LocalBackend("/data") await backend.write("test.txt", b"Hello") data = await backend.read("test.txt") print(data.decode()) Hello
Initialize local backend.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
root
|
str | Path
|
Root directory for storage. |
required |
Source code in src/dspu/io/local.py
Functions¶
read
async
¶
Read file contents.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
str
|
Path to file relative to root. |
required |
Returns:
| Type | Description |
|---|---|
bytes
|
File contents as bytes. |
Raises:
| Type | Description |
|---|---|
FileNotFoundError
|
If file doesn't exist. |
DSPUIOError
|
For other I/O errors. |
Source code in src/dspu/io/local.py
write
async
¶
Write data to file.
Creates parent directories if needed.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
str
|
Path to file relative to root. |
required |
data
|
bytes
|
Data to write. |
required |
Raises:
| Type | Description |
|---|---|
DSPUIOError
|
For I/O errors. |
Source code in src/dspu/io/local.py
exists
async
¶
Check if path exists.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
str
|
Path to check. |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if exists, False otherwise. |
Source code in src/dspu/io/local.py
delete
async
¶
Delete file or directory.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
str
|
Path to delete. |
required |
Raises:
| Type | Description |
|---|---|
FileNotFoundError
|
If path doesn't exist. |
DSPUIOError
|
For other I/O errors. |
Source code in src/dspu/io/local.py
list
async
¶
List files matching pattern.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
pattern
|
str
|
Glob pattern. |
'*'
|
Yields:
| Type | Description |
|---|---|
AsyncIterator[FileInfo]
|
FileInfo for each matching file/directory. |
Source code in src/dspu/io/local.py
read_stream
async
¶
Stream file contents in chunks.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
str
|
Path to file. |
required |
chunk_size
|
int
|
Chunk size in bytes. |
8192
|
Yields:
| Type | Description |
|---|---|
AsyncIterator[bytes]
|
Chunks of file data. |
Raises:
| Type | Description |
|---|---|
FileNotFoundError
|
If file doesn't exist. |
DSPUIOError
|
For other I/O errors. |
Source code in src/dspu/io/local.py
write_stream
async
¶
Write file from stream.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
str
|
Path to write to. |
required |
data
|
AsyncIterator[bytes]
|
Async iterator of data chunks. |
required |
Raises:
| Type | Description |
|---|---|
DSPUIOError
|
For I/O errors. |
Source code in src/dspu/io/local.py
dspu.io.cloud.S3Backend
¶
Bases: _FsspecBackend
Amazon S3 storage backend.
Requires: pip install s3fs
Example
backend = S3Backend( ... "my-bucket/prefix", ... key="ACCESS_KEY", ... secret="SECRET_KEY", ... ) await backend.write("data.json", b'{"key": "value"}')
Initialize S3 backend.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
str
|
S3 path (bucket/prefix). |
required |
**kwargs
|
Any
|
S3-specific options (key, secret, endpoint_url, etc.). |
{}
|
Source code in src/dspu/io/cloud.py
Functions¶
read
async
¶
Read file contents.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
str
|
Path to file. |
required |
Returns:
| Type | Description |
|---|---|
bytes
|
File contents as bytes. |
Raises:
| Type | Description |
|---|---|
FileNotFoundError
|
If file doesn't exist. |
DSPUIOError
|
For other I/O errors. |
Source code in src/dspu/io/cloud.py
write
async
¶
Write data to file.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
str
|
Path to file. |
required |
data
|
bytes
|
Data to write. |
required |
Raises:
| Type | Description |
|---|---|
DSPUIOError
|
For I/O errors. |
Source code in src/dspu/io/cloud.py
exists
async
¶
Check if path exists.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
str
|
Path to check. |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if exists, False otherwise. |
Source code in src/dspu/io/cloud.py
delete
async
¶
Delete file or directory.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
str
|
Path to delete. |
required |
Raises:
| Type | Description |
|---|---|
FileNotFoundError
|
If path doesn't exist. |
DSPUIOError
|
For other I/O errors. |
Source code in src/dspu/io/cloud.py
list
async
¶
List files matching pattern.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
pattern
|
str
|
Glob pattern. |
'*'
|
Yields:
| Type | Description |
|---|---|
AsyncIterator[FileInfo]
|
FileInfo for each matching file. |
Source code in src/dspu/io/cloud.py
read_stream
async
¶
Stream file contents.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
str
|
Path to file. |
required |
chunk_size
|
int
|
Chunk size in bytes. |
8192
|
Yields:
| Type | Description |
|---|---|
AsyncIterator[bytes]
|
Chunks of file data. |
Raises:
| Type | Description |
|---|---|
FileNotFoundError
|
If file doesn't exist. |
DSPUIOError
|
For other I/O errors. |
Source code in src/dspu/io/cloud.py
write_stream
async
¶
Write file from stream.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
str
|
Path to write to. |
required |
data
|
AsyncIterator[bytes]
|
Async iterator of data chunks. |
required |
Raises:
| Type | Description |
|---|---|
DSPUIOError
|
For I/O errors. |
Source code in src/dspu/io/cloud.py
dspu.io.cloud.GCSBackend
¶
Bases: _FsspecBackend
Google Cloud Storage backend.
Requires: pip install gcsfs
Example
backend = GCSBackend( ... "my-bucket/prefix", ... token="path/to/credentials.json", ... ) await backend.write("data.json", b'{"key": "value"}')
Initialize GCS backend.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
str
|
GCS path (bucket/prefix). |
required |
**kwargs
|
Any
|
GCS-specific options (token, project, etc.). |
{}
|
Source code in src/dspu/io/cloud.py
Functions¶
read
async
¶
Read file contents.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
str
|
Path to file. |
required |
Returns:
| Type | Description |
|---|---|
bytes
|
File contents as bytes. |
Raises:
| Type | Description |
|---|---|
FileNotFoundError
|
If file doesn't exist. |
DSPUIOError
|
For other I/O errors. |
Source code in src/dspu/io/cloud.py
write
async
¶
Write data to file.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
str
|
Path to file. |
required |
data
|
bytes
|
Data to write. |
required |
Raises:
| Type | Description |
|---|---|
DSPUIOError
|
For I/O errors. |
Source code in src/dspu/io/cloud.py
exists
async
¶
Check if path exists.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
str
|
Path to check. |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if exists, False otherwise. |
Source code in src/dspu/io/cloud.py
delete
async
¶
Delete file or directory.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
str
|
Path to delete. |
required |
Raises:
| Type | Description |
|---|---|
FileNotFoundError
|
If path doesn't exist. |
DSPUIOError
|
For other I/O errors. |
Source code in src/dspu/io/cloud.py
list
async
¶
List files matching pattern.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
pattern
|
str
|
Glob pattern. |
'*'
|
Yields:
| Type | Description |
|---|---|
AsyncIterator[FileInfo]
|
FileInfo for each matching file. |
Source code in src/dspu/io/cloud.py
read_stream
async
¶
Stream file contents.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
str
|
Path to file. |
required |
chunk_size
|
int
|
Chunk size in bytes. |
8192
|
Yields:
| Type | Description |
|---|---|
AsyncIterator[bytes]
|
Chunks of file data. |
Raises:
| Type | Description |
|---|---|
FileNotFoundError
|
If file doesn't exist. |
DSPUIOError
|
For other I/O errors. |
Source code in src/dspu/io/cloud.py
write_stream
async
¶
Write file from stream.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
str
|
Path to write to. |
required |
data
|
AsyncIterator[bytes]
|
Async iterator of data chunks. |
required |
Raises:
| Type | Description |
|---|---|
DSPUIOError
|
For I/O errors. |
Source code in src/dspu/io/cloud.py
dspu.io.cloud.AzureBackend
¶
Bases: _FsspecBackend
Azure Blob Storage backend.
Requires: pip install adlfs
Example
backend = AzureBackend( ... "container/prefix", ... account_name="myaccount", ... account_key="KEY", ... ) await backend.write("data.json", b'{"key": "value"}')
Initialize Azure backend.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
str
|
Azure path (container/prefix). |
required |
**kwargs
|
Any
|
Azure-specific options (account_name, account_key, etc.). |
{}
|
Source code in src/dspu/io/cloud.py
Functions¶
read
async
¶
Read file contents.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
str
|
Path to file. |
required |
Returns:
| Type | Description |
|---|---|
bytes
|
File contents as bytes. |
Raises:
| Type | Description |
|---|---|
FileNotFoundError
|
If file doesn't exist. |
DSPUIOError
|
For other I/O errors. |
Source code in src/dspu/io/cloud.py
write
async
¶
Write data to file.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
str
|
Path to file. |
required |
data
|
bytes
|
Data to write. |
required |
Raises:
| Type | Description |
|---|---|
DSPUIOError
|
For I/O errors. |
Source code in src/dspu/io/cloud.py
exists
async
¶
Check if path exists.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
str
|
Path to check. |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if exists, False otherwise. |
Source code in src/dspu/io/cloud.py
delete
async
¶
Delete file or directory.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
str
|
Path to delete. |
required |
Raises:
| Type | Description |
|---|---|
FileNotFoundError
|
If path doesn't exist. |
DSPUIOError
|
For other I/O errors. |
Source code in src/dspu/io/cloud.py
list
async
¶
List files matching pattern.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
pattern
|
str
|
Glob pattern. |
'*'
|
Yields:
| Type | Description |
|---|---|
AsyncIterator[FileInfo]
|
FileInfo for each matching file. |
Source code in src/dspu/io/cloud.py
read_stream
async
¶
Stream file contents.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
str
|
Path to file. |
required |
chunk_size
|
int
|
Chunk size in bytes. |
8192
|
Yields:
| Type | Description |
|---|---|
AsyncIterator[bytes]
|
Chunks of file data. |
Raises:
| Type | Description |
|---|---|
FileNotFoundError
|
If file doesn't exist. |
DSPUIOError
|
For other I/O errors. |
Source code in src/dspu/io/cloud.py
write_stream
async
¶
Write file from stream.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
str
|
Path to write to. |
required |
data
|
AsyncIterator[bytes]
|
Async iterator of data chunks. |
required |
Raises:
| Type | Description |
|---|---|
DSPUIOError
|
For I/O errors. |
Source code in src/dspu/io/cloud.py
Path Resolution¶
dspu.io.paths.PathResolver
¶
Resolve file paths relative to a source file.
This class helps resolve file paths relative to the location of a source file, useful for loading configuration files, data files, etc. that are stored relative to your code.
Example
In your module file¶
resolver = PathResolver(file) config_path = resolver.resolve("../configs/app.yaml") assert config_path.is_absolute()
With a basis directory¶
resolver = PathResolver(file, basis="../configs") config_path = resolver.resolve("app.yaml")
Security
- All paths are resolved to absolute paths
- Path traversal protection ensures files stay within basis
- Symlinks are followed and validated
Initialize path resolver.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
source_file
|
str
|
Starting point for resolution (typically file). |
required |
basis
|
str
|
Relative path from source_file's directory to use as root. Default "." means the directory containing source_file. |
'.'
|
Example
Resolve relative to current file's directory¶
resolver = PathResolver(file)
Resolve relative to parent directory¶
resolver = PathResolver(file, basis="..")
Resolve relative to configs directory¶
resolver = PathResolver(file, basis="../configs")
Source code in src/dspu/io/paths.py
Functions¶
resolve
¶
resolve(
filename: str,
*,
check_exists: bool = False,
must_be_file: bool = False,
must_be_dir: bool = False,
) -> Path
Resolve filename relative to basis directory.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
filename
|
str
|
File or directory name to resolve. |
required |
check_exists
|
bool
|
If True, verify path exists. |
False
|
must_be_file
|
bool
|
If True, verify path is a file. |
False
|
must_be_dir
|
bool
|
If True, verify path is a directory. |
False
|
Returns:
| Type | Description |
|---|---|
Path
|
Absolute path to the resolved file/directory. |
Raises:
| Type | Description |
|---|---|
FileNotFoundError
|
If check_exists=True and path doesn't exist. |
NotADirectoryError
|
If must_be_dir=True and path is not a directory. |
IsADirectoryError
|
If must_be_file=True and path is a directory. |
DSPUIOError
|
If path escapes basis directory. |
Example
resolver = PathResolver(file, basis="../configs") path = resolver.resolve("app.yaml", check_exists=True) path = resolver.resolve("data", must_be_dir=True)
Source code in src/dspu/io/paths.py
resolve_all
¶
Resolve multiple filenames at once.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
*filenames
|
str
|
File or directory names to resolve. |
()
|
check_exists
|
bool
|
If True, verify all paths exist. |
False
|
Returns:
| Type | Description |
|---|---|
list[Path]
|
List of absolute paths. |
Raises:
| Type | Description |
|---|---|
FileNotFoundError
|
If check_exists=True and any path doesn't exist. |
DSPUIOError
|
If any path escapes basis directory. |
Example
resolver = PathResolver(file) paths = resolver.resolve_all("a.txt", "b.txt", "c.txt")
Source code in src/dspu/io/paths.py
check_path_within
staticmethod
¶
Check that a path is within a basis directory.
Static utility method for one-off path validation without creating a PathResolver instance.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
str | Path
|
Path to check. |
required |
basis
|
str | Path
|
Directory that path must be within. |
required |
resolve
|
bool
|
If True, resolve paths to absolute before checking. |
True
|
Returns:
| Type | Description |
|---|---|
Path
|
The resolved path if valid. |
Raises:
| Type | Description |
|---|---|
DSPUIOError
|
If path is outside basis directory. |
Example
safe_path = PathResolver.check_path_within( ... "/data/configs/app.yaml", ... "/data", ... )
Source code in src/dspu/io/paths.py
Usage¶
from dspu.io import Storage
# Local storage
storage = Storage.from_uri("file:///data")
data = storage.read("file.json")
storage.write("output.json", {"status": "success"})
# S3 storage
s3 = Storage.from_uri("s3://bucket/path")
s3.write("data.json", {"count": 100})
# Multi-format
storage.write_format("config.yaml", {"database": {"host": "localhost"}})
storage.write_format("data.csv", [{"name": "Alice", "age": 30}])