Skip to content

I/O API Reference

Storage abstraction and multi-format file operations.

Storage

dspu.io.storage.Storage

Storage(backend: StorageBackend)

Unified storage interface with automatic serialization.

Provides a high-level API for reading/writing data to any storage backend with automatic format detection and serialization.

Example

Local filesystem

storage = Storage.from_uri("/data") await storage.write("config.json", {"debug": True}) config = await storage.read("config.json")

S3 (requires fsspec)

storage = Storage.from_uri("s3://my-bucket/path") await storage.write("data.msgpack", large_dataset)

Streaming for large files

async for chunk in storage.read_stream("large_file.csv"): ... process(chunk)

Initialize storage with backend.

Parameters:

Name Type Description Default
backend StorageBackend

Storage backend implementation.

required
Source code in src/dspu/io/storage.py
def __init__(self, backend: StorageBackend) -> None:
    """Initialize storage with backend.

    Args:
        backend: Storage backend implementation.
    """
    self._backend: StorageBackend = backend

Functions

from_uri classmethod

from_uri(uri: str, **kwargs: Any) -> Storage

Create Storage from URI.

Auto-detects backend from URI scheme.

Parameters:

Name Type Description Default
uri str

Storage URI (e.g., "file:///path", "s3://bucket").

required
**kwargs Any

Backend-specific options.

{}

Returns:

Type Description
Storage

Storage instance with appropriate backend.

Raises:

Type Description
ConfigurationError

If backend is not supported or unavailable.

Example

storage = Storage.from_uri("/data/local") storage = Storage.from_uri("s3://my-bucket/prefix")

Source code in src/dspu/io/storage.py
@classmethod
def from_uri(cls, uri: str, **kwargs: Any) -> "Storage":
    """Create Storage from URI.

    Auto-detects backend from URI scheme.

    Args:
        uri: Storage URI (e.g., "file:///path", "s3://bucket").
        **kwargs: Backend-specific options.

    Returns:
        Storage instance with appropriate backend.

    Raises:
        ConfigurationError: If backend is not supported or unavailable.

    Example:
        >>> storage = Storage.from_uri("/data/local")
        >>> storage = Storage.from_uri("s3://my-bucket/prefix")
    """
    scheme, path = parse_uri(uri)

    if scheme == "file":
        backend = cast(StorageBackend, LocalBackend(path))
        return cls(backend)

    if scheme == "s3":
        # Lazy import to avoid requiring fsspec for local-only use
        try:
            from dspu.io.cloud import S3Backend  # noqa: PLC0415
        except ImportError as e:
            raise ConfigurationError(
                "S3 backend requires fsspec and s3fs packages",
                suggestion="Install with: pip install dspu[io]",
            ) from e

        backend = cast(StorageBackend, S3Backend(str(path), **kwargs))
        return cls(backend)

    if scheme in {"gs", "gcs"}:
        try:
            from dspu.io.cloud import GCSBackend  # noqa: PLC0415
        except ImportError as e:
            raise ConfigurationError(
                "GCS backend requires fsspec and gcsfs packages",
                suggestion="Install with: pip install dspu[io]",
            ) from e

        backend = cast(StorageBackend, GCSBackend(str(path), **kwargs))
        return cls(backend)

    if scheme in {"az", "azure"}:
        try:
            from dspu.io.cloud import AzureBackend  # noqa: PLC0415
        except ImportError as e:
            raise ConfigurationError(
                "Azure backend requires fsspec and adlfs packages",
                suggestion="Install with: pip install dspu[io]",
            ) from e

        backend = cast(StorageBackend, AzureBackend(str(path), **kwargs))
        return cls(backend)

    raise ConfigurationError(
        f"Unsupported storage scheme: {scheme}",
        suggestion="Supported schemes: file, s3, gs/gcs, az/azure",
    )

read async

read(
    path: str,
    *,
    format: str | None = None,
    raw: bool = False,
) -> Any

Read and deserialize data from storage.

Parameters:

Name Type Description Default
path str

Path to file.

required
format str | None

Serialization format (auto-detected if None).

None
raw bool

If True, return raw bytes without deserialization.

False

Returns:

Type Description
Any

Deserialized data or raw bytes if raw=True.

Raises:

Type Description
FileNotFoundError

If file doesn't exist.

SerializationError

If deserialization fails.

Example

config = await storage.read("config.json") data = await storage.read("data.msgpack", format="msgpack") raw_bytes = await storage.read("file.bin", raw=True)

Source code in src/dspu/io/storage.py
async def read(
    self,
    path: str,
    *,
    format: str | None = None,  # noqa: A002
    raw: bool = False,
) -> Any:
    """Read and deserialize data from storage.

    Args:
        path: Path to file.
        format: Serialization format (auto-detected if None).
        raw: If True, return raw bytes without deserialization.

    Returns:
        Deserialized data or raw bytes if raw=True.

    Raises:
        FileNotFoundError: If file doesn't exist.
        SerializationError: If deserialization fails.

    Example:
        >>> config = await storage.read("config.json")
        >>> data = await storage.read("data.msgpack", format="msgpack")
        >>> raw_bytes = await storage.read("file.bin", raw=True)
    """
    data = await self._backend.read(path)

    if raw:
        return data

    # Auto-detect format if not specified
    fmt = format or detect_format(path)

    return deserialize(data, fmt)

write async

write(
    path: str,
    data: Any,
    *,
    format: str | None = None,
    raw: bool = False,
) -> None

Serialize and write data to storage.

Parameters:

Name Type Description Default
path str

Path to file.

required
data Any

Data to write (or bytes if raw=True).

required
format str | None

Serialization format (auto-detected if None).

None
raw bool

If True, write data as-is (must be bytes).

False

Raises:

Type Description
SerializationError

If serialization fails.

TypeError

If raw=True and data is not bytes.

Example

await storage.write("config.json", {"debug": True}) await storage.write("data.msgpack", dataset, format="msgpack") await storage.write("file.bin", b"raw data", raw=True)

Source code in src/dspu/io/storage.py
async def write(
    self,
    path: str,
    data: Any,
    *,
    format: str | None = None,  # noqa: A002
    raw: bool = False,
) -> None:
    """Serialize and write data to storage.

    Args:
        path: Path to file.
        data: Data to write (or bytes if raw=True).
        format: Serialization format (auto-detected if None).
        raw: If True, write data as-is (must be bytes).

    Raises:
        SerializationError: If serialization fails.
        TypeError: If raw=True and data is not bytes.

    Example:
        >>> await storage.write("config.json", {"debug": True})
        >>> await storage.write("data.msgpack", dataset, format="msgpack")
        >>> await storage.write("file.bin", b"raw data", raw=True)
    """
    if raw:
        if not isinstance(data, bytes):
            raise TypeError("raw=True requires bytes data")
        serialized = data
    else:
        # Auto-detect format if not specified
        fmt = format or detect_format(path)
        serialized = serialize(data, fmt)

    await self._backend.write(path, serialized)

exists async

exists(path: str) -> bool

Check if file exists.

Parameters:

Name Type Description Default
path str

Path to check.

required

Returns:

Type Description
bool

True if exists, False otherwise.

Example

if await storage.exists("config.json"): ... config = await storage.read("config.json")

Source code in src/dspu/io/storage.py
async def exists(self, path: str) -> bool:
    """Check if file exists.

    Args:
        path: Path to check.

    Returns:
        True if exists, False otherwise.

    Example:
        >>> if await storage.exists("config.json"):
        ...     config = await storage.read("config.json")
    """
    return await self._backend.exists(path)

list async

list(pattern: str = '*') -> list[FileInfo]

List files matching pattern.

Parameters:

Name Type Description Default
pattern str

Glob pattern.

'*'

Returns:

Type Description
list[FileInfo]

List of FileInfo for matching files.

Example

files = await storage.list("*.json") for file in files: ... print(f"{file.path}: {file.size} bytes")

Source code in src/dspu/io/storage.py
async def list(self, pattern: str = "*") -> list[FileInfo]:
    """List files matching pattern.

    Args:
        pattern: Glob pattern.

    Returns:
        List of FileInfo for matching files.

    Example:
        >>> files = await storage.list("*.json")
        >>> for file in files:
        ...     print(f"{file.path}: {file.size} bytes")
    """
    # Type ignore: pyrefly has issues with async generator protocol variance
    return [f async for f in self._backend.list(pattern)]  # type: ignore[misc]

delete async

delete(path: str) -> None

Delete file or directory.

Parameters:

Name Type Description Default
path str

Path to delete.

required

Raises:

Type Description
FileNotFoundError

If path doesn't exist.

Example

await storage.delete("old_data.json")

Source code in src/dspu/io/storage.py
async def delete(self, path: str) -> None:
    """Delete file or directory.

    Args:
        path: Path to delete.

    Raises:
        FileNotFoundError: If path doesn't exist.

    Example:
        >>> await storage.delete("old_data.json")
    """
    await self._backend.delete(path)

read_stream async

read_stream(
    path: str, chunk_size: int = 8192
) -> AsyncIterator[bytes]

Stream file contents.

Useful for large files to avoid loading everything in memory.

Parameters:

Name Type Description Default
path str

Path to file.

required
chunk_size int

Size of chunks in bytes.

8192

Yields:

Type Description
AsyncIterator[bytes]

Chunks of file data.

Example

async for chunk in storage.read_stream("large_file.csv"): ... process(chunk)

Source code in src/dspu/io/storage.py
async def read_stream(
    self,
    path: str,
    chunk_size: int = 8192,
) -> AsyncIterator[bytes]:
    """Stream file contents.

    Useful for large files to avoid loading everything in memory.

    Args:
        path: Path to file.
        chunk_size: Size of chunks in bytes.

    Yields:
        Chunks of file data.

    Example:
        >>> async for chunk in storage.read_stream("large_file.csv"):
        ...     process(chunk)
    """
    # Type ignore: pyrefly has issues with async generator protocol variance
    async for chunk in self._backend.read_stream(path, chunk_size):  # type: ignore[misc]
        yield chunk

write_stream async

write_stream(path: str, data: AsyncIterator[bytes]) -> None

Write file from stream.

Parameters:

Name Type Description Default
path str

Path to write to.

required
data AsyncIterator[bytes]

Async iterator of data chunks.

required
Example

async def generate_data(): ... for i in range(1000): ... yield f"line {i}\n".encode() ... await storage.write_stream("output.txt", generate_data())

Source code in src/dspu/io/storage.py
async def write_stream(
    self,
    path: str,
    data: AsyncIterator[bytes],
) -> None:
    """Write file from stream.

    Args:
        path: Path to write to.
        data: Async iterator of data chunks.

    Example:
        >>> async def generate_data():
        ...     for i in range(1000):
        ...         yield f"line {i}\\n".encode()
        ...
        >>> await storage.write_stream("output.txt", generate_data())
    """
    await self._backend.write_stream(path, data)

write_format async

write_format(
    path: str,
    data: Any,
    *,
    format: str | None = None,
    format_options: dict[str, Any] | None = None,
) -> None

Write data using format writer (text-based formats).

This method uses the format system for text-based structured formats like YAML, TOML, CSV, .env files, etc. For binary formats (msgpack, pickle), use the regular write() method.

Parameters:

Name Type Description Default
path str

Path to file.

required
data Any

Data to write.

required
format str | None

Format name (auto-detected from path if None).

None
format_options dict[str, Any] | None

Format-specific options (indent, delimiter, etc).

None

Raises:

Type Description
FormatError

If format operations fail.

Example
YAML with custom options

await storage.write_format( ... 'config.yaml', ... {'debug': True}, ... format_options={'sort_keys': True} ... )

CSV with headers

await storage.write_format( ... 'data.csv', ... [{'name': 'Alice', 'age': 30}], ... format_options={'header': True} ... )

Source code in src/dspu/io/storage.py
async def write_format(
    self,
    path: str,
    data: Any,
    *,
    format: str | None = None,  # noqa: A002
    format_options: dict[str, Any] | None = None,
) -> None:
    """Write data using format writer (text-based formats).

    This method uses the format system for text-based structured formats
    like YAML, TOML, CSV, .env files, etc. For binary formats (msgpack,
    pickle), use the regular write() method.

    Args:
        path: Path to file.
        data: Data to write.
        format: Format name (auto-detected from path if None).
        format_options: Format-specific options (indent, delimiter, etc).

    Raises:
        FormatError: If format operations fail.

    Example:
        >>> # YAML with custom options
        >>> await storage.write_format(
        ...     'config.yaml',
        ...     {'debug': True},
        ...     format_options={'sort_keys': True}
        ... )
        >>>
        >>> # CSV with headers
        >>> await storage.write_format(
        ...     'data.csv',
        ...     [{'name': 'Alice', 'age': 30}],
        ...     format_options={'header': True}
        ... )
    """
    from dspu.io.formats import (  # noqa: PLC0415
        detect_format_from_path,
        get_format,
    )
    from dspu.io.formats.base import FormatError  # noqa: PLC0415

    # Detect format
    fmt_name = format or detect_format_from_path(path)

    # Get format instance
    opts = format_options or {}
    fmt = get_format(fmt_name, **opts)

    # Check if can write
    if not fmt.can_write(data):
        raise FormatError(
            f"Format '{fmt_name}' cannot write {type(data).__name__}",
            format=fmt_name,
            suggestion="Check data type matches format requirements",
        )

    # Convert to bytes
    bytes_data = fmt.write(data, path)

    # Write via backend
    await self._backend.write(path, bytes_data)

read_format async

read_format(
    path: str,
    *,
    format: str | None = None,
    format_options: dict[str, Any] | None = None,
) -> Any

Read data using format reader (text-based formats).

This method uses the format system for text-based structured formats. For binary formats, use the regular read() method.

Parameters:

Name Type Description Default
path str

Path to file.

required
format str | None

Format name (auto-detected from path if None).

None
format_options dict[str, Any] | None

Format-specific options.

None

Returns:

Type Description
Any

Parsed data (type depends on format).

Raises:

Type Description
FormatError

If format operations fail.

Example

config = await storage.read_format('config.yaml') data = await storage.read_format('data.csv')

Source code in src/dspu/io/storage.py
async def read_format(
    self,
    path: str,
    *,
    format: str | None = None,  # noqa: A002
    format_options: dict[str, Any] | None = None,
) -> Any:
    """Read data using format reader (text-based formats).

    This method uses the format system for text-based structured formats.
    For binary formats, use the regular read() method.

    Args:
        path: Path to file.
        format: Format name (auto-detected from path if None).
        format_options: Format-specific options.

    Returns:
        Parsed data (type depends on format).

    Raises:
        FormatError: If format operations fail.

    Example:
        >>> config = await storage.read_format('config.yaml')
        >>> data = await storage.read_format('data.csv')
    """
    from dspu.io.formats import (  # noqa: PLC0415
        detect_format_from_path,
        get_format,
    )

    # Read raw bytes
    bytes_data = await self._backend.read(path)

    # Detect format
    fmt_name = format or detect_format_from_path(path)

    # Get format instance
    opts = format_options or {}
    fmt = get_format(fmt_name, **opts)

    # Parse bytes
    return fmt.read(bytes_data, path)

Storage Backends

dspu.io.local.LocalBackend

LocalBackend(root: str | Path)

Local filesystem storage backend.

Provides async file operations on the local filesystem.

Example

backend = LocalBackend("/data") await backend.write("test.txt", b"Hello") data = await backend.read("test.txt") print(data.decode()) Hello

Initialize local backend.

Parameters:

Name Type Description Default
root str | Path

Root directory for storage.

required
Source code in src/dspu/io/local.py
def __init__(self, root: str | Path) -> None:
    """Initialize local backend.

    Args:
        root: Root directory for storage.
    """
    self.root = Path(root).expanduser().resolve()
    self.root.mkdir(parents=True, exist_ok=True)

Functions

read async

read(path: str) -> bytes

Read file contents.

Parameters:

Name Type Description Default
path str

Path to file relative to root.

required

Returns:

Type Description
bytes

File contents as bytes.

Raises:

Type Description
FileNotFoundError

If file doesn't exist.

DSPUIOError

For other I/O errors.

Source code in src/dspu/io/local.py
async def read(self, path: str) -> bytes:
    """Read file contents.

    Args:
        path: Path to file relative to root.

    Returns:
        File contents as bytes.

    Raises:
        FileNotFoundError: If file doesn't exist.
        DSPUIOError: For other I/O errors.
    """
    full_path = self._resolve_path(path)

    try:
        data: bytes = await anyio.Path(full_path).read_bytes()
        return data
    except FileNotFoundError:
        raise
    except Exception as e:
        raise DSPUIOError(
            f"Failed to read file: {path}",
            path=path,
            operation="read",
        ) from e

write async

write(path: str, data: bytes) -> None

Write data to file.

Creates parent directories if needed.

Parameters:

Name Type Description Default
path str

Path to file relative to root.

required
data bytes

Data to write.

required

Raises:

Type Description
DSPUIOError

For I/O errors.

Source code in src/dspu/io/local.py
async def write(self, path: str, data: bytes) -> None:
    """Write data to file.

    Creates parent directories if needed.

    Args:
        path: Path to file relative to root.
        data: Data to write.

    Raises:
        DSPUIOError: For I/O errors.
    """
    full_path = self._resolve_path(path)

    try:
        # Create parent directories
        full_path.parent.mkdir(parents=True, exist_ok=True)

        # Write file atomically using anyio
        await anyio.Path(full_path).write_bytes(data)
    except Exception as e:
        raise DSPUIOError(
            f"Failed to write file: {path}",
            path=path,
            operation="write",
        ) from e

exists async

exists(path: str) -> bool

Check if path exists.

Parameters:

Name Type Description Default
path str

Path to check.

required

Returns:

Type Description
bool

True if exists, False otherwise.

Source code in src/dspu/io/local.py
async def exists(self, path: str) -> bool:
    """Check if path exists.

    Args:
        path: Path to check.

    Returns:
        True if exists, False otherwise.
    """
    try:
        full_path = self._resolve_path(path)
        exists: bool = await anyio.Path(full_path).exists()
        return exists
    except DSPUIOError:
        return False

delete async

delete(path: str) -> None

Delete file or directory.

Parameters:

Name Type Description Default
path str

Path to delete.

required

Raises:

Type Description
FileNotFoundError

If path doesn't exist.

DSPUIOError

For other I/O errors.

Source code in src/dspu/io/local.py
async def delete(self, path: str) -> None:
    """Delete file or directory.

    Args:
        path: Path to delete.

    Raises:
        FileNotFoundError: If path doesn't exist.
        DSPUIOError: For other I/O errors.
    """
    full_path = self._resolve_path(path)

    if not full_path.exists():
        raise FileNotFoundError(f"Path not found: {path}")

    try:
        if full_path.is_dir():
            # Use sync shutil for recursive delete
            await anyio.to_thread.run_sync(shutil.rmtree, full_path)
        else:
            await anyio.Path(full_path).unlink()
    except Exception as e:
        raise DSPUIOError(
            f"Failed to delete: {path}",
            path=path,
            operation="delete",
        ) from e

list async

list(pattern: str = '*') -> AsyncIterator[FileInfo]

List files matching pattern.

Parameters:

Name Type Description Default
pattern str

Glob pattern.

'*'

Yields:

Type Description
AsyncIterator[FileInfo]

FileInfo for each matching file/directory.

Source code in src/dspu/io/local.py
async def list(self, pattern: str = "*") -> AsyncIterator[FileInfo]:
    """List files matching pattern.

    Args:
        pattern: Glob pattern.

    Yields:
        FileInfo for each matching file/directory.
    """
    try:
        # Use glob to find matching paths
        for path in self.root.glob(pattern):
            stat = path.stat()

            # Convert to relative path
            rel_path = path.relative_to(self.root)

            yield FileInfo(
                path=str(rel_path),
                size=stat.st_size,
                modified=datetime.fromtimestamp(stat.st_mtime, tz=UTC),
                is_dir=path.is_dir(),
            )
    except Exception as e:
        raise DSPUIOError(
            f"Failed to list files: {pattern}",
            path=str(self.root),
            operation="list",
        ) from e

read_stream async

read_stream(
    path: str, chunk_size: int = 8192
) -> AsyncIterator[bytes]

Stream file contents in chunks.

Parameters:

Name Type Description Default
path str

Path to file.

required
chunk_size int

Chunk size in bytes.

8192

Yields:

Type Description
AsyncIterator[bytes]

Chunks of file data.

Raises:

Type Description
FileNotFoundError

If file doesn't exist.

DSPUIOError

For other I/O errors.

Source code in src/dspu/io/local.py
async def read_stream(
    self,
    path: str,
    chunk_size: int = 8192,
) -> AsyncIterator[bytes]:
    """Stream file contents in chunks.

    Args:
        path: Path to file.
        chunk_size: Chunk size in bytes.

    Yields:
        Chunks of file data.

    Raises:
        FileNotFoundError: If file doesn't exist.
        DSPUIOError: For other I/O errors.
    """
    full_path = self._resolve_path(path)

    if not full_path.exists():
        raise FileNotFoundError(f"File not found: {path}")

    try:
        async with await anyio.open_file(full_path, "rb") as f:
            while True:
                chunk = await f.read(chunk_size)
                if not chunk:
                    break
                yield chunk
    except FileNotFoundError:
        raise
    except Exception as e:
        raise DSPUIOError(
            f"Failed to stream file: {path}",
            path=path,
            operation="read_stream",
        ) from e

write_stream async

write_stream(path: str, data: AsyncIterator[bytes]) -> None

Write file from stream.

Parameters:

Name Type Description Default
path str

Path to write to.

required
data AsyncIterator[bytes]

Async iterator of data chunks.

required

Raises:

Type Description
DSPUIOError

For I/O errors.

Source code in src/dspu/io/local.py
async def write_stream(
    self,
    path: str,
    data: AsyncIterator[bytes],
) -> None:
    """Write file from stream.

    Args:
        path: Path to write to.
        data: Async iterator of data chunks.

    Raises:
        DSPUIOError: For I/O errors.
    """
    full_path = self._resolve_path(path)

    try:
        # Create parent directories
        full_path.parent.mkdir(parents=True, exist_ok=True)

        async with await anyio.open_file(full_path, "wb") as f:
            async for chunk in data:
                await f.write(chunk)
    except Exception as e:
        raise DSPUIOError(
            f"Failed to write stream: {path}",
            path=path,
            operation="write_stream",
        ) from e

dspu.io.cloud.S3Backend

S3Backend(path: str, **kwargs: Any)

Bases: _FsspecBackend

Amazon S3 storage backend.

Requires: pip install s3fs

Example

backend = S3Backend( ... "my-bucket/prefix", ... key="ACCESS_KEY", ... secret="SECRET_KEY", ... ) await backend.write("data.json", b'{"key": "value"}')

Initialize S3 backend.

Parameters:

Name Type Description Default
path str

S3 path (bucket/prefix).

required
**kwargs Any

S3-specific options (key, secret, endpoint_url, etc.).

{}
Source code in src/dspu/io/cloud.py
def __init__(self, path: str, **kwargs: Any) -> None:
    """Initialize S3 backend.

    Args:
        path: S3 path (bucket/prefix).
        **kwargs: S3-specific options (key, secret, endpoint_url, etc.).
    """
    super().__init__("s3", path, **kwargs)

Functions

read async

read(path: str) -> bytes

Read file contents.

Parameters:

Name Type Description Default
path str

Path to file.

required

Returns:

Type Description
bytes

File contents as bytes.

Raises:

Type Description
FileNotFoundError

If file doesn't exist.

DSPUIOError

For other I/O errors.

Source code in src/dspu/io/cloud.py
async def read(self, path: str) -> bytes:
    """Read file contents.

    Args:
        path: Path to file.

    Returns:
        File contents as bytes.

    Raises:
        FileNotFoundError: If file doesn't exist.
        DSPUIOError: For other I/O errors.
    """
    full_path = self._resolve_path(path)

    try:
        # fsspec provides async methods
        if hasattr(self.fs, "_cat_file"):
            data: bytes = await self.fs._cat_file(full_path)
            return data

        # Fallback to sync method
        result: bytes = self.fs.cat_file(full_path)
        return result

    except FileNotFoundError:
        raise
    except Exception as e:
        raise DSPUIOError(
            f"Failed to read from {self.protocol}: {path}",
            path=path,
            operation="read",
        ) from e

write async

write(path: str, data: bytes) -> None

Write data to file.

Parameters:

Name Type Description Default
path str

Path to file.

required
data bytes

Data to write.

required

Raises:

Type Description
DSPUIOError

For I/O errors.

Source code in src/dspu/io/cloud.py
async def write(self, path: str, data: bytes) -> None:
    """Write data to file.

    Args:
        path: Path to file.
        data: Data to write.

    Raises:
        DSPUIOError: For I/O errors.
    """
    full_path = self._resolve_path(path)

    try:
        # fsspec provides async methods
        if hasattr(self.fs, "_pipe_file"):
            await self.fs._pipe_file(full_path, data)
        else:
            # Fallback to sync method
            self.fs.pipe_file(full_path, data)

    except Exception as e:
        raise DSPUIOError(
            f"Failed to write to {self.protocol}: {path}",
            path=path,
            operation="write",
        ) from e

exists async

exists(path: str) -> bool

Check if path exists.

Parameters:

Name Type Description Default
path str

Path to check.

required

Returns:

Type Description
bool

True if exists, False otherwise.

Source code in src/dspu/io/cloud.py
async def exists(self, path: str) -> bool:
    """Check if path exists.

    Args:
        path: Path to check.

    Returns:
        True if exists, False otherwise.
    """
    full_path = self._resolve_path(path)

    try:
        exists: bool = self.fs.exists(full_path)
        return exists
    except Exception:
        return False

delete async

delete(path: str) -> None

Delete file or directory.

Parameters:

Name Type Description Default
path str

Path to delete.

required

Raises:

Type Description
FileNotFoundError

If path doesn't exist.

DSPUIOError

For other I/O errors.

Source code in src/dspu/io/cloud.py
async def delete(self, path: str) -> None:
    """Delete file or directory.

    Args:
        path: Path to delete.

    Raises:
        FileNotFoundError: If path doesn't exist.
        DSPUIOError: For other I/O errors.
    """
    full_path = self._resolve_path(path)

    if not self.fs.exists(full_path):
        raise FileNotFoundError(f"Path not found: {path}")

    try:
        self.fs.rm(full_path, recursive=True)
    except Exception as e:
        raise DSPUIOError(
            f"Failed to delete from {self.protocol}: {path}",  # noqa: S608
            path=path,
            operation="delete",
        ) from e

list async

list(pattern: str = '*') -> AsyncIterator[FileInfo]

List files matching pattern.

Parameters:

Name Type Description Default
pattern str

Glob pattern.

'*'

Yields:

Type Description
AsyncIterator[FileInfo]

FileInfo for each matching file.

Source code in src/dspu/io/cloud.py
async def list(self, pattern: str = "*") -> AsyncIterator[FileInfo]:
    """List files matching pattern.

    Args:
        pattern: Glob pattern.

    Yields:
        FileInfo for each matching file.
    """
    try:
        # Use glob to find matching files
        full_pattern = self._resolve_path(pattern)
        files = self.fs.glob(full_pattern)

        for file_path in files:
            try:
                info = self.fs.info(file_path)

                # Extract relative path
                rel_path = str(Path(file_path).relative_to(self.path))

                yield FileInfo(
                    path=rel_path,
                    size=info.get("size", 0),
                    modified=datetime.fromtimestamp(
                        info.get("mtime", 0),
                        tz=UTC,
                    ),
                    is_dir=info.get("type") == "directory",
                )
            except Exception:  # noqa: S112
                # Skip files we can't stat
                continue

    except Exception as e:
        raise DSPUIOError(
            f"Failed to list files in {self.protocol}: {pattern}",
            path=self.path,
            operation="list",
        ) from e

read_stream async

read_stream(
    path: str, chunk_size: int = 8192
) -> AsyncIterator[bytes]

Stream file contents.

Parameters:

Name Type Description Default
path str

Path to file.

required
chunk_size int

Chunk size in bytes.

8192

Yields:

Type Description
AsyncIterator[bytes]

Chunks of file data.

Raises:

Type Description
FileNotFoundError

If file doesn't exist.

DSPUIOError

For other I/O errors.

Source code in src/dspu/io/cloud.py
async def read_stream(
    self,
    path: str,
    chunk_size: int = 8192,
) -> AsyncIterator[bytes]:
    """Stream file contents.

    Args:
        path: Path to file.
        chunk_size: Chunk size in bytes.

    Yields:
        Chunks of file data.

    Raises:
        FileNotFoundError: If file doesn't exist.
        DSPUIOError: For other I/O errors.
    """
    full_path = self._resolve_path(path)

    if not self.fs.exists(full_path):
        raise FileNotFoundError(f"File not found: {path}")

    try:
        with self.fs.open(full_path, "rb") as f:
            while True:
                chunk = f.read(chunk_size)
                if not chunk:
                    break
                yield chunk

    except FileNotFoundError:
        raise
    except Exception as e:
        raise DSPUIOError(
            f"Failed to stream from {self.protocol}: {path}",
            path=path,
            operation="read_stream",
        ) from e

write_stream async

write_stream(path: str, data: AsyncIterator[bytes]) -> None

Write file from stream.

Parameters:

Name Type Description Default
path str

Path to write to.

required
data AsyncIterator[bytes]

Async iterator of data chunks.

required

Raises:

Type Description
DSPUIOError

For I/O errors.

Source code in src/dspu/io/cloud.py
async def write_stream(
    self,
    path: str,
    data: AsyncIterator[bytes],
) -> None:
    """Write file from stream.

    Args:
        path: Path to write to.
        data: Async iterator of data chunks.

    Raises:
        DSPUIOError: For I/O errors.
    """
    full_path = self._resolve_path(path)

    try:
        with self.fs.open(full_path, "wb") as f:
            async for chunk in data:
                f.write(chunk)

    except Exception as e:
        raise DSPUIOError(
            f"Failed to write stream to {self.protocol}: {path}",
            path=path,
            operation="write_stream",
        ) from e

dspu.io.cloud.GCSBackend

GCSBackend(path: str, **kwargs: Any)

Bases: _FsspecBackend

Google Cloud Storage backend.

Requires: pip install gcsfs

Example

backend = GCSBackend( ... "my-bucket/prefix", ... token="path/to/credentials.json", ... ) await backend.write("data.json", b'{"key": "value"}')

Initialize GCS backend.

Parameters:

Name Type Description Default
path str

GCS path (bucket/prefix).

required
**kwargs Any

GCS-specific options (token, project, etc.).

{}
Source code in src/dspu/io/cloud.py
def __init__(self, path: str, **kwargs: Any) -> None:
    """Initialize GCS backend.

    Args:
        path: GCS path (bucket/prefix).
        **kwargs: GCS-specific options (token, project, etc.).
    """
    super().__init__("gs", path, **kwargs)

Functions

read async

read(path: str) -> bytes

Read file contents.

Parameters:

Name Type Description Default
path str

Path to file.

required

Returns:

Type Description
bytes

File contents as bytes.

Raises:

Type Description
FileNotFoundError

If file doesn't exist.

DSPUIOError

For other I/O errors.

Source code in src/dspu/io/cloud.py
async def read(self, path: str) -> bytes:
    """Read file contents.

    Args:
        path: Path to file.

    Returns:
        File contents as bytes.

    Raises:
        FileNotFoundError: If file doesn't exist.
        DSPUIOError: For other I/O errors.
    """
    full_path = self._resolve_path(path)

    try:
        # fsspec provides async methods
        if hasattr(self.fs, "_cat_file"):
            data: bytes = await self.fs._cat_file(full_path)
            return data

        # Fallback to sync method
        result: bytes = self.fs.cat_file(full_path)
        return result

    except FileNotFoundError:
        raise
    except Exception as e:
        raise DSPUIOError(
            f"Failed to read from {self.protocol}: {path}",
            path=path,
            operation="read",
        ) from e

write async

write(path: str, data: bytes) -> None

Write data to file.

Parameters:

Name Type Description Default
path str

Path to file.

required
data bytes

Data to write.

required

Raises:

Type Description
DSPUIOError

For I/O errors.

Source code in src/dspu/io/cloud.py
async def write(self, path: str, data: bytes) -> None:
    """Write data to file.

    Args:
        path: Path to file.
        data: Data to write.

    Raises:
        DSPUIOError: For I/O errors.
    """
    full_path = self._resolve_path(path)

    try:
        # fsspec provides async methods
        if hasattr(self.fs, "_pipe_file"):
            await self.fs._pipe_file(full_path, data)
        else:
            # Fallback to sync method
            self.fs.pipe_file(full_path, data)

    except Exception as e:
        raise DSPUIOError(
            f"Failed to write to {self.protocol}: {path}",
            path=path,
            operation="write",
        ) from e

exists async

exists(path: str) -> bool

Check if path exists.

Parameters:

Name Type Description Default
path str

Path to check.

required

Returns:

Type Description
bool

True if exists, False otherwise.

Source code in src/dspu/io/cloud.py
async def exists(self, path: str) -> bool:
    """Check if path exists.

    Args:
        path: Path to check.

    Returns:
        True if exists, False otherwise.
    """
    full_path = self._resolve_path(path)

    try:
        exists: bool = self.fs.exists(full_path)
        return exists
    except Exception:
        return False

delete async

delete(path: str) -> None

Delete file or directory.

Parameters:

Name Type Description Default
path str

Path to delete.

required

Raises:

Type Description
FileNotFoundError

If path doesn't exist.

DSPUIOError

For other I/O errors.

Source code in src/dspu/io/cloud.py
async def delete(self, path: str) -> None:
    """Delete file or directory.

    Args:
        path: Path to delete.

    Raises:
        FileNotFoundError: If path doesn't exist.
        DSPUIOError: For other I/O errors.
    """
    full_path = self._resolve_path(path)

    if not self.fs.exists(full_path):
        raise FileNotFoundError(f"Path not found: {path}")

    try:
        self.fs.rm(full_path, recursive=True)
    except Exception as e:
        raise DSPUIOError(
            f"Failed to delete from {self.protocol}: {path}",  # noqa: S608
            path=path,
            operation="delete",
        ) from e

list async

list(pattern: str = '*') -> AsyncIterator[FileInfo]

List files matching pattern.

Parameters:

Name Type Description Default
pattern str

Glob pattern.

'*'

Yields:

Type Description
AsyncIterator[FileInfo]

FileInfo for each matching file.

Source code in src/dspu/io/cloud.py
async def list(self, pattern: str = "*") -> AsyncIterator[FileInfo]:
    """List files matching pattern.

    Args:
        pattern: Glob pattern.

    Yields:
        FileInfo for each matching file.
    """
    try:
        # Use glob to find matching files
        full_pattern = self._resolve_path(pattern)
        files = self.fs.glob(full_pattern)

        for file_path in files:
            try:
                info = self.fs.info(file_path)

                # Extract relative path
                rel_path = str(Path(file_path).relative_to(self.path))

                yield FileInfo(
                    path=rel_path,
                    size=info.get("size", 0),
                    modified=datetime.fromtimestamp(
                        info.get("mtime", 0),
                        tz=UTC,
                    ),
                    is_dir=info.get("type") == "directory",
                )
            except Exception:  # noqa: S112
                # Skip files we can't stat
                continue

    except Exception as e:
        raise DSPUIOError(
            f"Failed to list files in {self.protocol}: {pattern}",
            path=self.path,
            operation="list",
        ) from e

read_stream async

read_stream(
    path: str, chunk_size: int = 8192
) -> AsyncIterator[bytes]

Stream file contents.

Parameters:

Name Type Description Default
path str

Path to file.

required
chunk_size int

Chunk size in bytes.

8192

Yields:

Type Description
AsyncIterator[bytes]

Chunks of file data.

Raises:

Type Description
FileNotFoundError

If file doesn't exist.

DSPUIOError

For other I/O errors.

Source code in src/dspu/io/cloud.py
async def read_stream(
    self,
    path: str,
    chunk_size: int = 8192,
) -> AsyncIterator[bytes]:
    """Stream file contents.

    Args:
        path: Path to file.
        chunk_size: Chunk size in bytes.

    Yields:
        Chunks of file data.

    Raises:
        FileNotFoundError: If file doesn't exist.
        DSPUIOError: For other I/O errors.
    """
    full_path = self._resolve_path(path)

    if not self.fs.exists(full_path):
        raise FileNotFoundError(f"File not found: {path}")

    try:
        with self.fs.open(full_path, "rb") as f:
            while True:
                chunk = f.read(chunk_size)
                if not chunk:
                    break
                yield chunk

    except FileNotFoundError:
        raise
    except Exception as e:
        raise DSPUIOError(
            f"Failed to stream from {self.protocol}: {path}",
            path=path,
            operation="read_stream",
        ) from e

write_stream async

write_stream(path: str, data: AsyncIterator[bytes]) -> None

Write file from stream.

Parameters:

Name Type Description Default
path str

Path to write to.

required
data AsyncIterator[bytes]

Async iterator of data chunks.

required

Raises:

Type Description
DSPUIOError

For I/O errors.

Source code in src/dspu/io/cloud.py
async def write_stream(
    self,
    path: str,
    data: AsyncIterator[bytes],
) -> None:
    """Write file from stream.

    Args:
        path: Path to write to.
        data: Async iterator of data chunks.

    Raises:
        DSPUIOError: For I/O errors.
    """
    full_path = self._resolve_path(path)

    try:
        with self.fs.open(full_path, "wb") as f:
            async for chunk in data:
                f.write(chunk)

    except Exception as e:
        raise DSPUIOError(
            f"Failed to write stream to {self.protocol}: {path}",
            path=path,
            operation="write_stream",
        ) from e

dspu.io.cloud.AzureBackend

AzureBackend(path: str, **kwargs: Any)

Bases: _FsspecBackend

Azure Blob Storage backend.

Requires: pip install adlfs

Example

backend = AzureBackend( ... "container/prefix", ... account_name="myaccount", ... account_key="KEY", ... ) await backend.write("data.json", b'{"key": "value"}')

Initialize Azure backend.

Parameters:

Name Type Description Default
path str

Azure path (container/prefix).

required
**kwargs Any

Azure-specific options (account_name, account_key, etc.).

{}
Source code in src/dspu/io/cloud.py
def __init__(self, path: str, **kwargs: Any) -> None:
    """Initialize Azure backend.

    Args:
        path: Azure path (container/prefix).
        **kwargs: Azure-specific options (account_name, account_key, etc.).
    """
    super().__init__("az", path, **kwargs)

Functions

read async

read(path: str) -> bytes

Read file contents.

Parameters:

Name Type Description Default
path str

Path to file.

required

Returns:

Type Description
bytes

File contents as bytes.

Raises:

Type Description
FileNotFoundError

If file doesn't exist.

DSPUIOError

For other I/O errors.

Source code in src/dspu/io/cloud.py
async def read(self, path: str) -> bytes:
    """Read file contents.

    Args:
        path: Path to file.

    Returns:
        File contents as bytes.

    Raises:
        FileNotFoundError: If file doesn't exist.
        DSPUIOError: For other I/O errors.
    """
    full_path = self._resolve_path(path)

    try:
        # fsspec provides async methods
        if hasattr(self.fs, "_cat_file"):
            data: bytes = await self.fs._cat_file(full_path)
            return data

        # Fallback to sync method
        result: bytes = self.fs.cat_file(full_path)
        return result

    except FileNotFoundError:
        raise
    except Exception as e:
        raise DSPUIOError(
            f"Failed to read from {self.protocol}: {path}",
            path=path,
            operation="read",
        ) from e

write async

write(path: str, data: bytes) -> None

Write data to file.

Parameters:

Name Type Description Default
path str

Path to file.

required
data bytes

Data to write.

required

Raises:

Type Description
DSPUIOError

For I/O errors.

Source code in src/dspu/io/cloud.py
async def write(self, path: str, data: bytes) -> None:
    """Write data to file.

    Args:
        path: Path to file.
        data: Data to write.

    Raises:
        DSPUIOError: For I/O errors.
    """
    full_path = self._resolve_path(path)

    try:
        # fsspec provides async methods
        if hasattr(self.fs, "_pipe_file"):
            await self.fs._pipe_file(full_path, data)
        else:
            # Fallback to sync method
            self.fs.pipe_file(full_path, data)

    except Exception as e:
        raise DSPUIOError(
            f"Failed to write to {self.protocol}: {path}",
            path=path,
            operation="write",
        ) from e

exists async

exists(path: str) -> bool

Check if path exists.

Parameters:

Name Type Description Default
path str

Path to check.

required

Returns:

Type Description
bool

True if exists, False otherwise.

Source code in src/dspu/io/cloud.py
async def exists(self, path: str) -> bool:
    """Check if path exists.

    Args:
        path: Path to check.

    Returns:
        True if exists, False otherwise.
    """
    full_path = self._resolve_path(path)

    try:
        exists: bool = self.fs.exists(full_path)
        return exists
    except Exception:
        return False

delete async

delete(path: str) -> None

Delete file or directory.

Parameters:

Name Type Description Default
path str

Path to delete.

required

Raises:

Type Description
FileNotFoundError

If path doesn't exist.

DSPUIOError

For other I/O errors.

Source code in src/dspu/io/cloud.py
async def delete(self, path: str) -> None:
    """Delete file or directory.

    Args:
        path: Path to delete.

    Raises:
        FileNotFoundError: If path doesn't exist.
        DSPUIOError: For other I/O errors.
    """
    full_path = self._resolve_path(path)

    if not self.fs.exists(full_path):
        raise FileNotFoundError(f"Path not found: {path}")

    try:
        self.fs.rm(full_path, recursive=True)
    except Exception as e:
        raise DSPUIOError(
            f"Failed to delete from {self.protocol}: {path}",  # noqa: S608
            path=path,
            operation="delete",
        ) from e

list async

list(pattern: str = '*') -> AsyncIterator[FileInfo]

List files matching pattern.

Parameters:

Name Type Description Default
pattern str

Glob pattern.

'*'

Yields:

Type Description
AsyncIterator[FileInfo]

FileInfo for each matching file.

Source code in src/dspu/io/cloud.py
async def list(self, pattern: str = "*") -> AsyncIterator[FileInfo]:
    """List files matching pattern.

    Args:
        pattern: Glob pattern.

    Yields:
        FileInfo for each matching file.
    """
    try:
        # Use glob to find matching files
        full_pattern = self._resolve_path(pattern)
        files = self.fs.glob(full_pattern)

        for file_path in files:
            try:
                info = self.fs.info(file_path)

                # Extract relative path
                rel_path = str(Path(file_path).relative_to(self.path))

                yield FileInfo(
                    path=rel_path,
                    size=info.get("size", 0),
                    modified=datetime.fromtimestamp(
                        info.get("mtime", 0),
                        tz=UTC,
                    ),
                    is_dir=info.get("type") == "directory",
                )
            except Exception:  # noqa: S112
                # Skip files we can't stat
                continue

    except Exception as e:
        raise DSPUIOError(
            f"Failed to list files in {self.protocol}: {pattern}",
            path=self.path,
            operation="list",
        ) from e

read_stream async

read_stream(
    path: str, chunk_size: int = 8192
) -> AsyncIterator[bytes]

Stream file contents.

Parameters:

Name Type Description Default
path str

Path to file.

required
chunk_size int

Chunk size in bytes.

8192

Yields:

Type Description
AsyncIterator[bytes]

Chunks of file data.

Raises:

Type Description
FileNotFoundError

If file doesn't exist.

DSPUIOError

For other I/O errors.

Source code in src/dspu/io/cloud.py
async def read_stream(
    self,
    path: str,
    chunk_size: int = 8192,
) -> AsyncIterator[bytes]:
    """Stream file contents.

    Args:
        path: Path to file.
        chunk_size: Chunk size in bytes.

    Yields:
        Chunks of file data.

    Raises:
        FileNotFoundError: If file doesn't exist.
        DSPUIOError: For other I/O errors.
    """
    full_path = self._resolve_path(path)

    if not self.fs.exists(full_path):
        raise FileNotFoundError(f"File not found: {path}")

    try:
        with self.fs.open(full_path, "rb") as f:
            while True:
                chunk = f.read(chunk_size)
                if not chunk:
                    break
                yield chunk

    except FileNotFoundError:
        raise
    except Exception as e:
        raise DSPUIOError(
            f"Failed to stream from {self.protocol}: {path}",
            path=path,
            operation="read_stream",
        ) from e

write_stream async

write_stream(path: str, data: AsyncIterator[bytes]) -> None

Write file from stream.

Parameters:

Name Type Description Default
path str

Path to write to.

required
data AsyncIterator[bytes]

Async iterator of data chunks.

required

Raises:

Type Description
DSPUIOError

For I/O errors.

Source code in src/dspu/io/cloud.py
async def write_stream(
    self,
    path: str,
    data: AsyncIterator[bytes],
) -> None:
    """Write file from stream.

    Args:
        path: Path to write to.
        data: Async iterator of data chunks.

    Raises:
        DSPUIOError: For I/O errors.
    """
    full_path = self._resolve_path(path)

    try:
        with self.fs.open(full_path, "wb") as f:
            async for chunk in data:
                f.write(chunk)

    except Exception as e:
        raise DSPUIOError(
            f"Failed to write stream to {self.protocol}: {path}",
            path=path,
            operation="write_stream",
        ) from e

Path Resolution

dspu.io.paths.PathResolver

PathResolver(source_file: str, basis: str = '.')

Resolve file paths relative to a source file.

This class helps resolve file paths relative to the location of a source file, useful for loading configuration files, data files, etc. that are stored relative to your code.

Example

In your module file

resolver = PathResolver(file) config_path = resolver.resolve("../configs/app.yaml") assert config_path.is_absolute()

With a basis directory

resolver = PathResolver(file, basis="../configs") config_path = resolver.resolve("app.yaml")

Security
  • All paths are resolved to absolute paths
  • Path traversal protection ensures files stay within basis
  • Symlinks are followed and validated

Initialize path resolver.

Parameters:

Name Type Description Default
source_file str

Starting point for resolution (typically file).

required
basis str

Relative path from source_file's directory to use as root. Default "." means the directory containing source_file.

'.'
Example

Resolve relative to current file's directory

resolver = PathResolver(file)

Resolve relative to parent directory

resolver = PathResolver(file, basis="..")

Resolve relative to configs directory

resolver = PathResolver(file, basis="../configs")

Source code in src/dspu/io/paths.py
def __init__(self, source_file: str, basis: str = ".") -> None:
    """Initialize path resolver.

    Args:
        source_file: Starting point for resolution (typically __file__).
        basis: Relative path from source_file's directory to use as root.
              Default "." means the directory containing source_file.

    Example:
        >>> # Resolve relative to current file's directory
        >>> resolver = PathResolver(__file__)
        >>>
        >>> # Resolve relative to parent directory
        >>> resolver = PathResolver(__file__, basis="..")
        >>>
        >>> # Resolve relative to configs directory
        >>> resolver = PathResolver(__file__, basis="../configs")
    """
    self.source_file = Path(source_file)
    self.basis = basis

Functions

resolve

resolve(
    filename: str,
    *,
    check_exists: bool = False,
    must_be_file: bool = False,
    must_be_dir: bool = False,
) -> Path

Resolve filename relative to basis directory.

Parameters:

Name Type Description Default
filename str

File or directory name to resolve.

required
check_exists bool

If True, verify path exists.

False
must_be_file bool

If True, verify path is a file.

False
must_be_dir bool

If True, verify path is a directory.

False

Returns:

Type Description
Path

Absolute path to the resolved file/directory.

Raises:

Type Description
FileNotFoundError

If check_exists=True and path doesn't exist.

NotADirectoryError

If must_be_dir=True and path is not a directory.

IsADirectoryError

If must_be_file=True and path is a directory.

DSPUIOError

If path escapes basis directory.

Example

resolver = PathResolver(file, basis="../configs") path = resolver.resolve("app.yaml", check_exists=True) path = resolver.resolve("data", must_be_dir=True)

Source code in src/dspu/io/paths.py
def resolve(
    self,
    filename: str,
    *,
    check_exists: bool = False,
    must_be_file: bool = False,
    must_be_dir: bool = False,
) -> Path:
    """Resolve filename relative to basis directory.

    Args:
        filename: File or directory name to resolve.
        check_exists: If True, verify path exists.
        must_be_file: If True, verify path is a file.
        must_be_dir: If True, verify path is a directory.

    Returns:
        Absolute path to the resolved file/directory.

    Raises:
        FileNotFoundError: If check_exists=True and path doesn't exist.
        NotADirectoryError: If must_be_dir=True and path is not a directory.
        IsADirectoryError: If must_be_file=True and path is a directory.
        DSPUIOError: If path escapes basis directory.

    Example:
        >>> resolver = PathResolver(__file__, basis="../configs")
        >>> path = resolver.resolve("app.yaml", check_exists=True)
        >>> path = resolver.resolve("data", must_be_dir=True)
    """
    basis_path = self.get_basis_path()
    full_path = (basis_path / filename).resolve()

    # Security: ensure path stays within basis directory
    try:
        full_path.relative_to(basis_path)
    except ValueError as e:
        raise DSPUIOError(
            f"Path '{filename}' escapes basis directory: {basis_path}",
            path=str(full_path),
            operation="resolve",
        ) from e

    # Validation checks
    if check_exists and not full_path.exists():
        raise FileNotFoundError(f"Path not found: {full_path}")

    if must_be_file:
        if not full_path.exists():
            raise FileNotFoundError(f"File not found: {full_path}")
        if not full_path.is_file():
            raise IsADirectoryError(f"Path is a directory, not a file: {full_path}")

    if must_be_dir:
        if not full_path.exists():
            raise FileNotFoundError(f"Directory not found: {full_path}")
        if not full_path.is_dir():
            raise NotADirectoryError(f"Path is a file, not a directory: {full_path}")

    return full_path

resolve_all

resolve_all(
    *filenames: str, check_exists: bool = False
) -> list[Path]

Resolve multiple filenames at once.

Parameters:

Name Type Description Default
*filenames str

File or directory names to resolve.

()
check_exists bool

If True, verify all paths exist.

False

Returns:

Type Description
list[Path]

List of absolute paths.

Raises:

Type Description
FileNotFoundError

If check_exists=True and any path doesn't exist.

DSPUIOError

If any path escapes basis directory.

Example

resolver = PathResolver(file) paths = resolver.resolve_all("a.txt", "b.txt", "c.txt")

Source code in src/dspu/io/paths.py
def resolve_all(self, *filenames: str, check_exists: bool = False) -> list[Path]:
    """Resolve multiple filenames at once.

    Args:
        *filenames: File or directory names to resolve.
        check_exists: If True, verify all paths exist.

    Returns:
        List of absolute paths.

    Raises:
        FileNotFoundError: If check_exists=True and any path doesn't exist.
        DSPUIOError: If any path escapes basis directory.

    Example:
        >>> resolver = PathResolver(__file__)
        >>> paths = resolver.resolve_all("a.txt", "b.txt", "c.txt")
    """
    return [self.resolve(f, check_exists=check_exists) for f in filenames]

check_path_within staticmethod

check_path_within(
    path: str | Path,
    basis: str | Path,
    *,
    resolve: bool = True,
) -> Path

Check that a path is within a basis directory.

Static utility method for one-off path validation without creating a PathResolver instance.

Parameters:

Name Type Description Default
path str | Path

Path to check.

required
basis str | Path

Directory that path must be within.

required
resolve bool

If True, resolve paths to absolute before checking.

True

Returns:

Type Description
Path

The resolved path if valid.

Raises:

Type Description
DSPUIOError

If path is outside basis directory.

Example

safe_path = PathResolver.check_path_within( ... "/data/configs/app.yaml", ... "/data", ... )

Source code in src/dspu/io/paths.py
@staticmethod
def check_path_within(
    path: str | Path,
    basis: str | Path,
    *,
    resolve: bool = True,
) -> Path:
    """Check that a path is within a basis directory.

    Static utility method for one-off path validation without creating
    a PathResolver instance.

    Args:
        path: Path to check.
        basis: Directory that path must be within.
        resolve: If True, resolve paths to absolute before checking.

    Returns:
        The resolved path if valid.

    Raises:
        DSPUIOError: If path is outside basis directory.

    Example:
        >>> safe_path = PathResolver.check_path_within(
        ...     "/data/configs/app.yaml",
        ...     "/data",
        ... )
    """
    path_obj = Path(path)
    basis_obj = Path(basis)

    if resolve:
        path_obj = path_obj.resolve()
        basis_obj = basis_obj.resolve()

    try:
        path_obj.relative_to(basis_obj)
    except ValueError as e:
        raise DSPUIOError(
            f"Path '{path}' is outside basis directory: {basis}",
            path=str(path_obj),
            operation="check",
        ) from e

    return path_obj

Usage

from dspu.io import Storage

# Local storage
storage = Storage.from_uri("file:///data")
data = storage.read("file.json")
storage.write("output.json", {"status": "success"})

# S3 storage
s3 = Storage.from_uri("s3://bucket/path")
s3.write("data.json", {"count": 100})

# Multi-format
storage.write_format("config.yaml", {"database": {"host": "localhost"}})
storage.write_format("data.csv", [{"name": "Alice", "age": 30}])

See Also