Skip to content

ML API Reference

Practical machine learning utilities that complement sklearn/pandas.

Overview

The dspu.ml module provides utilities for:

  • Reproducibility: Unified seed management across libraries
  • ID Generation: UUID, ULID, hash-based identifiers
  • Data Splitting: Train/val/test splits with no leakage
  • Statistics: Correlation, hypothesis tests, bootstrap, A/B testing
  • Feature Scaling: Standard, min-max, robust scaling with state persistence
  • Categorical Encoding: Label, one-hot, ordinal, frequency encoding

Random & Reproducibility

SeedManager

dspu.ml.random.SeedManager

Unified seed management for reproducibility.

Manages random seeds across multiple libraries (Python random, NumPy, PyTorch, TensorFlow) to ensure reproducible results.

Example

SeedManager.set_global_seed(42)

Now all random operations are seeded

with SeedManager.seed_context(123): ... # Temporary seed for this block ... pass

Functions

set_global_seed classmethod

set_global_seed(seed: int) -> None

Set seed for all available random number generators.

Seeds the following libraries if available: - Python's random module - NumPy - PyTorch (CPU and CUDA) - TensorFlow

Parameters:

Name Type Description Default
seed int

Random seed value (non-negative integer)

required

Raises:

Type Description
RandomError

If seed is negative

Example

SeedManager.set_global_seed(42) import random random.random() # Will be deterministic

Source code in src/dspu/ml/random.py
@classmethod
def set_global_seed(cls, seed: int) -> None:
    """Set seed for all available random number generators.

    Seeds the following libraries if available:
    - Python's random module
    - NumPy
    - PyTorch (CPU and CUDA)
    - TensorFlow

    Args:
        seed: Random seed value (non-negative integer)

    Raises:
        RandomError: If seed is negative

    Example:
        >>> SeedManager.set_global_seed(42)
        >>> import random
        >>> random.random()  # Will be deterministic
    """
    if seed < 0:
        raise RandomError(f"Seed must be non-negative, got {seed}")

    cls._current_seed = seed

    # Python random
    random.seed(seed)

    # NumPy
    try:
        import numpy as np

        np.random.seed(seed)
    except ImportError:
        pass

    # PyTorch
    try:
        import torch

        torch.manual_seed(seed)
        if torch.cuda.is_available():
            torch.cuda.manual_seed_all(seed)
        # Make PyTorch deterministic (may impact performance)
        torch.backends.cudnn.deterministic = True
        torch.backends.cudnn.benchmark = False
    except ImportError:
        pass

    # TensorFlow
    try:
        import tensorflow as tf

        tf.random.set_seed(seed)
    except ImportError:
        pass

seed_context classmethod

seed_context(seed: int) -> Generator[None, None, None]

Context manager for temporary seeding.

Sets a seed for the duration of the context, then restores the previous seed state.

Parameters:

Name Type Description Default
seed int

Temporary seed value

required

Yields:

Type Description
None

None

Example

SeedManager.set_global_seed(42) with SeedManager.seed_context(123): ... # Operations here use seed 123 ... data = random.random()

Back to seed 42
Source code in src/dspu/ml/random.py
@classmethod
@contextmanager
def seed_context(cls, seed: int) -> Generator[None, None, None]:
    """Context manager for temporary seeding.

    Sets a seed for the duration of the context, then restores
    the previous seed state.

    Args:
        seed: Temporary seed value

    Yields:
        None

    Example:
        >>> SeedManager.set_global_seed(42)
        >>> with SeedManager.seed_context(123):
        ...     # Operations here use seed 123
        ...     data = random.random()
        >>> # Back to seed 42
    """
    # Save current state
    old_seed = cls._current_seed
    old_python_state = random.getstate()

    old_numpy_state = None
    try:
        import numpy as np

        old_numpy_state = np.random.get_state()
    except ImportError:
        pass

    # Set new seed
    cls.set_global_seed(seed)

    try:
        yield
    finally:
        # Restore old state
        random.setstate(old_python_state)

        if old_numpy_state is not None:
            try:
                import numpy as np

                np.random.set_state(old_numpy_state)
            except ImportError:
                pass

        cls._current_seed = old_seed

get_rng classmethod

get_rng(
    seed: int | None = None, backend: str = "python"
) -> Any

Get a random number generator instance.

Parameters:

Name Type Description Default
seed int | None

Optional seed for the RNG. If None, uses current global seed.

None
backend str

RNG backend - "python" or "numpy"

'python'

Returns:

Type Description
Any

Random number generator instance

Raises:

Type Description
RandomError

If backend is invalid or not available

Example

rng = SeedManager.get_rng(seed=42, backend="python") rng.random()

Source code in src/dspu/ml/random.py
@classmethod
def get_rng(cls, seed: int | None = None, backend: str = "python") -> Any:
    """Get a random number generator instance.

    Args:
        seed: Optional seed for the RNG. If None, uses current global seed.
        backend: RNG backend - "python" or "numpy"

    Returns:
        Random number generator instance

    Raises:
        RandomError: If backend is invalid or not available

    Example:
        >>> rng = SeedManager.get_rng(seed=42, backend="python")
        >>> rng.random()
    """
    if seed is None:
        seed = cls._current_seed

    if backend == "python":
        rng = random.Random(seed)
        return rng
    if backend == "numpy":
        try:
            import numpy as np

            if seed is not None:
                return np.random.default_rng(seed)
            return np.random.default_rng()
        except ImportError:
            raise RandomError("NumPy not available, cannot create numpy RNG")
    else:
        raise RandomError(f"Unknown backend: {backend}. Use 'python' or 'numpy'")

get_current_seed classmethod

get_current_seed() -> int | None

Get the current global seed value.

Returns:

Type Description
int | None

Current seed or None if not set

Source code in src/dspu/ml/random.py
@classmethod
def get_current_seed(cls) -> int | None:
    """Get the current global seed value.

    Returns:
        Current seed or None if not set
    """
    return cls._current_seed

Synthetic Data Generation

dspu.ml.random.make_classification_data

make_classification_data(
    n_samples: int = 100,
    n_features: int = 5,
    n_classes: int = 2,
    n_informative: int | None = None,
    noise: float = 0.1,
    random_state: int | None = None,
) -> tuple[list[list[float]], list[int]]

Generate synthetic classification dataset.

Creates a simple classification dataset with optional noise. Features are drawn from normal distribution, with informative features correlating with class labels.

Parameters:

Name Type Description Default
n_samples int

Number of samples to generate

100
n_features int

Number of features

5
n_classes int

Number of classes

2
n_informative int | None

Number of informative features (default: n_features // 2)

None
noise float

Standard deviation of Gaussian noise added to features

0.1
random_state int | None

Random seed for reproducibility

None

Returns:

Type Description
tuple[list[list[float]], list[int]]

Tuple of (X, y) where X is features and y is labels

Example

X, y = make_classification_data(n_samples=100, n_features=5) len(X), len(y) (100, 100)

dspu.ml.random.make_regression_data

make_regression_data(
    n_samples: int = 100,
    n_features: int = 5,
    n_informative: int | None = None,
    noise: float = 0.1,
    random_state: int | None = None,
) -> tuple[list[list[float]], list[float]]

Generate synthetic regression dataset.

Creates a simple regression dataset with linear relationship between informative features and target, plus Gaussian noise.

Parameters:

Name Type Description Default
n_samples int

Number of samples to generate

100
n_features int

Number of features

5
n_informative int | None

Number of informative features (default: n_features // 2)

None
noise float

Standard deviation of Gaussian noise added to target

0.1
random_state int | None

Random seed for reproducibility

None

Returns:

Type Description
tuple[list[list[float]], list[float]]

Tuple of (X, y) where X is features and y is continuous targets

Example

X, y = make_regression_data(n_samples=100, n_features=5) len(X), len(y) (100, 100)

dspu.ml.random.make_time_series

make_time_series(
    length: int = 100,
    pattern: str = "trend+seasonality",
    trend_slope: float = 0.1,
    seasonality_period: int = 12,
    seasonality_amplitude: float = 1.0,
    noise: float = 0.1,
    random_state: int | None = None,
) -> list[float]

Generate synthetic time series data.

Creates time series with configurable patterns: - trend: Linear trend - seasonality: Sinusoidal seasonality - trend+seasonality: Both components - random: Pure random walk

Parameters:

Name Type Description Default
length int

Number of time steps

100
pattern str

Pattern type - "trend", "seasonality", "trend+seasonality", "random"

'trend+seasonality'
trend_slope float

Slope of linear trend

0.1
seasonality_period int

Period of seasonal component

12
seasonality_amplitude float

Amplitude of seasonal component

1.0
noise float

Standard deviation of Gaussian noise

0.1
random_state int | None

Random seed for reproducibility

None

Returns:

Type Description
list[float]

List of time series values

Raises:

Type Description
RandomError

If pattern is invalid

Example

ts = make_time_series(length=100, pattern="trend+seasonality") len(ts) 100

ID Generation

IDGenerator

dspu.ml.identifiers.IDGenerator

Utilities for generating and working with identifiers.

Supports multiple ID formats: - UUID v4: Random, globally unique - UUID v1: Time-based, globally unique - ULID: Time-based, lexicographically sortable - Hash-based: Deterministic from input data

Example

id1 = IDGenerator.uuid4() id2 = IDGenerator.ulid() id3 = IDGenerator.hash_text("data")

Functions

uuid4 staticmethod

uuid4() -> str

Generate random UUID v4.

UUID v4 is randomly generated and has extremely low collision probability.

Returns:

Type Description
str

UUID v4 as string (e.g., "550e8400-e29b-41d4-a716-446655440000")

Example

id = IDGenerator.uuid4() len(id) 36

Source code in src/dspu/ml/identifiers.py
@staticmethod
def uuid4() -> str:
    """Generate random UUID v4.

    UUID v4 is randomly generated and has extremely low collision probability.

    Returns:
        UUID v4 as string (e.g., "550e8400-e29b-41d4-a716-446655440000")

    Example:
        >>> id = IDGenerator.uuid4()
        >>> len(id)
        36
    """
    return str(uuid.uuid4())

uuid1 staticmethod

uuid1() -> str

Generate time-based UUID v1.

UUID v1 includes timestamp and MAC address. Useful when you need time-ordered IDs but not necessarily sortable.

Returns:

Type Description
str

UUID v1 as string

Example

id = IDGenerator.uuid1() len(id) 36

Source code in src/dspu/ml/identifiers.py
@staticmethod
def uuid1() -> str:
    """Generate time-based UUID v1.

    UUID v1 includes timestamp and MAC address. Useful when you need
    time-ordered IDs but not necessarily sortable.

    Returns:
        UUID v1 as string

    Example:
        >>> id = IDGenerator.uuid1()
        >>> len(id)
        36
    """
    return str(uuid.uuid1())

ulid staticmethod

ulid() -> str

Generate ULID (Universally Unique Lexicographically Sortable Identifier).

ULIDs are: - 26 characters (vs 36 for UUID) - Lexicographically sortable by creation time - Case-insensitive (base32 encoded) - Timestamp + randomness

Returns:

Type Description
str

ULID as string (e.g., "01ARZ3NDEKTSV4RRFFQ69G5FAV")

Example

id1 = IDGenerator.ulid() time.sleep(0.001) id2 = IDGenerator.ulid() id1 < id2 # Sortable by time True

Source code in src/dspu/ml/identifiers.py
@staticmethod
def ulid() -> str:
    """Generate ULID (Universally Unique Lexicographically Sortable Identifier).

    ULIDs are:
    - 26 characters (vs 36 for UUID)
    - Lexicographically sortable by creation time
    - Case-insensitive (base32 encoded)
    - Timestamp + randomness

    Returns:
        ULID as string (e.g., "01ARZ3NDEKTSV4RRFFQ69G5FAV")

    Example:
        >>> id1 = IDGenerator.ulid()
        >>> time.sleep(0.001)
        >>> id2 = IDGenerator.ulid()
        >>> id1 < id2  # Sortable by time
        True
    """
    # ULID format: 10 bytes timestamp + 16 bytes randomness
    # Encoded as 26 character base32 string

    # Timestamp (48 bits = 10 chars in base32)
    timestamp_ms = int(time.time() * 1000)

    # Randomness (80 bits = 16 chars in base32)
    import random

    randomness = random.getrandbits(80)

    # Encode to base32 (Crockford's base32)
    # Using custom alphabet that excludes ambiguous characters (I, L, O, U)
    alphabet = "0123456789ABCDEFGHJKMNPQRSTVWXYZ"

    # Encode timestamp (10 chars)
    ulid_str = ""
    ts = timestamp_ms
    for _ in range(10):
        ulid_str = alphabet[ts & 0x1F] + ulid_str
        ts >>= 5

    # Encode randomness (16 chars)
    rand = randomness
    rand_str = ""
    for _ in range(16):
        rand_str = alphabet[rand & 0x1F] + rand_str
        rand >>= 5

    return ulid_str + rand_str

hash_text staticmethod

hash_text(
    text: str,
    algo: Literal[
        "md5", "sha1", "sha256", "sha512"
    ] = "sha256",
    length: int | None = None,
) -> str

Generate hash-based ID from text.

Creates deterministic ID by hashing the input text. Useful for deduplication or creating stable IDs from content.

Parameters:

Name Type Description Default
text str

Input text to hash

required
algo Literal['md5', 'sha1', 'sha256', 'sha512']

Hash algorithm - "md5", "sha1", "sha256", "sha512"

'sha256'
length int | None

Optional truncation length (in characters)

None

Returns:

Type Description
str

Hex-encoded hash string

Raises:

Type Description
IDError

If algorithm is invalid

Example

id1 = IDGenerator.hash_text("hello") id2 = IDGenerator.hash_text("hello") id1 == id2 # Same input = same ID True

Source code in src/dspu/ml/identifiers.py
@staticmethod
def hash_text(
    text: str,
    algo: Literal["md5", "sha1", "sha256", "sha512"] = "sha256",
    length: int | None = None,
) -> str:
    """Generate hash-based ID from text.

    Creates deterministic ID by hashing the input text.
    Useful for deduplication or creating stable IDs from content.

    Args:
        text: Input text to hash
        algo: Hash algorithm - "md5", "sha1", "sha256", "sha512"
        length: Optional truncation length (in characters)

    Returns:
        Hex-encoded hash string

    Raises:
        IDError: If algorithm is invalid

    Example:
        >>> id1 = IDGenerator.hash_text("hello")
        >>> id2 = IDGenerator.hash_text("hello")
        >>> id1 == id2  # Same input = same ID
        True
    """
    valid_algos = {"md5", "sha1", "sha256", "sha512"}
    if algo not in valid_algos:
        raise IDError(f"Invalid algorithm: {algo}. Use one of {valid_algos}")

    if algo == "md5":
        h = hashlib.md5(text.encode("utf-8"))
    elif algo == "sha1":
        h = hashlib.sha1(text.encode("utf-8"))
    elif algo == "sha256":
        h = hashlib.sha256(text.encode("utf-8"))
    else:  # sha512
        h = hashlib.sha512(text.encode("utf-8"))

    digest = h.hexdigest()

    if length is not None:
        return digest[:length]

    return digest

hash_row staticmethod

hash_row(
    row: dict[str, Any],
    columns: list[str] | None = None,
    algo: Literal[
        "md5", "sha1", "sha256", "sha512"
    ] = "sha256",
    length: int | None = None,
) -> str

Generate stable hash-based ID for a row.

Creates deterministic ID by hashing selected column values. Useful for composite keys or content-based deduplication.

Parameters:

Name Type Description Default
row dict[str, Any]

Dictionary representing a row

required
columns list[str] | None

Columns to include in hash (None = all columns, sorted)

None
algo Literal['md5', 'sha1', 'sha256', 'sha512']

Hash algorithm

'sha256'
length int | None

Optional truncation length

None

Returns:

Type Description
str

Hex-encoded hash string

Example

row = {"name": "Alice", "age": 30} id = IDGenerator.hash_row(row, columns=["name"])

Source code in src/dspu/ml/identifiers.py
@staticmethod
def hash_row(
    row: dict[str, Any],
    columns: list[str] | None = None,
    algo: Literal["md5", "sha1", "sha256", "sha512"] = "sha256",
    length: int | None = None,
) -> str:
    """Generate stable hash-based ID for a row.

    Creates deterministic ID by hashing selected column values.
    Useful for composite keys or content-based deduplication.

    Args:
        row: Dictionary representing a row
        columns: Columns to include in hash (None = all columns, sorted)
        algo: Hash algorithm
        length: Optional truncation length

    Returns:
        Hex-encoded hash string

    Example:
        >>> row = {"name": "Alice", "age": 30}
        >>> id = IDGenerator.hash_row(row, columns=["name"])
    """
    if columns is None:
        columns = sorted(row.keys())

    # Create stable string representation
    parts: list[str] = []
    for col in columns:
        value = row.get(col)
        if value is None:
            parts.append("")
        else:
            parts.append(str(value))

    text = "|".join(parts)
    return IDGenerator.hash_text(text, algo=algo, length=length)

add_id_column staticmethod

add_id_column(
    table: list[dict[str, Any]],
    id_col: str = "_id",
    method: Literal[
        "uuid4", "uuid1", "ulid", "hash", "sequential"
    ] = "uuid4",
    hash_columns: list[str] | None = None,
    start: int = 1,
) -> list[dict[str, Any]]

Add ID column to a table (list of dicts).

Modifies the table in-place by adding an ID column to each row.

Parameters:

Name Type Description Default
table list[dict[str, Any]]

List of dictionaries (rows)

required
id_col str

Name of ID column to add

'_id'
method Literal['uuid4', 'uuid1', 'ulid', 'hash', 'sequential']

ID generation method - "uuid4", "uuid1", "ulid", "hash", "sequential"

'uuid4'
hash_columns list[str] | None

Columns to use for hash-based IDs (for method="hash")

None
start int

Starting value for sequential IDs

1

Returns:

Type Description
list[dict[str, Any]]

Modified table (same object, modified in-place)

Raises:

Type Description
IDError

If method is invalid or hash_columns not provided for hash method

Example

table = [{"name": "Alice"}, {"name": "Bob"}] IDGenerator.add_id_column(table, id_col="id", method="uuid4") "id" in table[0] True

Source code in src/dspu/ml/identifiers.py
@staticmethod
def add_id_column(
    table: list[dict[str, Any]],
    id_col: str = "_id",
    method: Literal["uuid4", "uuid1", "ulid", "hash", "sequential"] = "uuid4",
    hash_columns: list[str] | None = None,
    start: int = 1,
) -> list[dict[str, Any]]:
    """Add ID column to a table (list of dicts).

    Modifies the table in-place by adding an ID column to each row.

    Args:
        table: List of dictionaries (rows)
        id_col: Name of ID column to add
        method: ID generation method - "uuid4", "uuid1", "ulid", "hash", "sequential"
        hash_columns: Columns to use for hash-based IDs (for method="hash")
        start: Starting value for sequential IDs

    Returns:
        Modified table (same object, modified in-place)

    Raises:
        IDError: If method is invalid or hash_columns not provided for hash method

    Example:
        >>> table = [{"name": "Alice"}, {"name": "Bob"}]
        >>> IDGenerator.add_id_column(table, id_col="id", method="uuid4")
        >>> "id" in table[0]
        True
    """
    valid_methods = {"uuid4", "uuid1", "ulid", "hash", "sequential"}
    if method not in valid_methods:
        raise IDError(f"Invalid method: {method}. Use one of {valid_methods}")

    if method == "hash" and hash_columns is None:
        raise IDError("hash_columns must be provided for method='hash'")

    for i, row in enumerate(table):
        if method == "uuid4":
            row[id_col] = IDGenerator.uuid4()
        elif method == "uuid1":
            row[id_col] = IDGenerator.uuid1()
        elif method == "ulid":
            row[id_col] = IDGenerator.ulid()
        elif method == "hash":
            row[id_col] = IDGenerator.hash_row(row, columns=hash_columns)
        elif method == "sequential":
            row[id_col] = start + i

    return table

Data Splitting

DataSplitter

dspu.ml.splits.DataSplitter

Data splitting strategies for ML workflows.

Provides various train/test splitting methods with proper stratification and group handling to prevent data leakage.

Example

X = [[1], [2], [3], [4]] y = [0, 1, 0, 1] X_train, X_test, y_train, y_test = DataSplitter.train_test_split( ... X, y, test_size=0.25 ... )

Functions

train_test_split staticmethod

train_test_split(
    X: list[Any],
    y: list[Any] | None = None,
    test_size: float = 0.25,
    stratify: list[Any] | None = None,
    random_state: int | None = None,
) -> tuple[
    list[Any], list[Any], list[Any] | None, list[Any] | None
]

Split data into train and test sets.

Parameters:

Name Type Description Default
X list[Any]

Input data (list of samples)

required
y list[Any] | None

Target labels (optional)

None
test_size float

Fraction of data for test set (0 to 1)

0.25
stratify list[Any] | None

If provided, perform stratified split to maintain class distribution

None
random_state int | None

Random seed for reproducibility

None

Returns:

Type Description
list[Any]

Tuple of (X_train, X_test, y_train, y_test)

list[Any]

If y is None, returns (X_train, X_test, None, None)

Raises:

Type Description
SplitError

If test_size is invalid or data sizes don't match

Example

X = [[1], [2], [3], [4]] y = [0, 1, 0, 1] X_train, X_test, y_train, y_test = DataSplitter.train_test_split( ... X, y, test_size=0.25, random_state=42 ... )

Source code in src/dspu/ml/splits.py
@staticmethod
def train_test_split(
    X: list[Any],
    y: list[Any] | None = None,
    test_size: float = 0.25,
    stratify: list[Any] | None = None,
    random_state: int | None = None,
) -> tuple[list[Any], list[Any], list[Any] | None, list[Any] | None]:
    """Split data into train and test sets.

    Args:
        X: Input data (list of samples)
        y: Target labels (optional)
        test_size: Fraction of data for test set (0 to 1)
        stratify: If provided, perform stratified split to maintain class distribution
        random_state: Random seed for reproducibility

    Returns:
        Tuple of (X_train, X_test, y_train, y_test)
        If y is None, returns (X_train, X_test, None, None)

    Raises:
        SplitError: If test_size is invalid or data sizes don't match

    Example:
        >>> X = [[1], [2], [3], [4]]
        >>> y = [0, 1, 0, 1]
        >>> X_train, X_test, y_train, y_test = DataSplitter.train_test_split(
        ...     X, y, test_size=0.25, random_state=42
        ... )
    """
    if not 0 < test_size < 1:
        raise SplitError(f"test_size must be between 0 and 1, got {test_size}")

    n_samples = len(X)

    if y is not None and len(y) != n_samples:
        raise SplitError(f"X and y have different lengths: {n_samples} vs {len(y)}")

    if stratify is not None and len(stratify) != n_samples:
        raise SplitError(
            f"X and stratify have different lengths: {n_samples} vs {len(stratify)}"
        )

    n_test = max(1, int(n_samples * test_size))
    n_train = n_samples - n_test

    rng = random.Random(random_state)

    if stratify is not None:
        # Stratified split - maintain class distribution
        indices_by_class = defaultdict(list)
        for idx, label in enumerate(stratify):
            indices_by_class[label].append(idx)

        train_indices = []
        test_indices = []

        for label, indices in indices_by_class.items():
            n_class = len(indices)
            n_class_test = max(1, int(n_class * test_size))

            # Shuffle within class
            shuffled = list(indices)
            rng.shuffle(shuffled)

            test_indices.extend(shuffled[:n_class_test])
            train_indices.extend(shuffled[n_class_test:])

    else:
        # Random split
        indices = list(range(n_samples))
        rng.shuffle(indices)

        test_indices = indices[:n_test]
        train_indices = indices[n_test:]

    # Split data
    X_train = [X[i] for i in train_indices]
    X_test = [X[i] for i in test_indices]

    if y is not None:
        y_train = [y[i] for i in train_indices]
        y_test = [y[i] for i in test_indices]
        return X_train, X_test, y_train, y_test
    return X_train, X_test, None, None

train_val_test_split staticmethod

train_val_test_split(
    X: list[Any],
    y: list[Any] | None = None,
    test_size: float = 0.2,
    val_size: float = 0.1,
    stratify: list[Any] | None = None,
    random_state: int | None = None,
) -> tuple[
    list[Any],
    list[Any],
    list[Any],
    list[Any] | None,
    list[Any] | None,
    list[Any] | None,
]

Split data into train, validation, and test sets.

Parameters:

Name Type Description Default
X list[Any]

Input data

required
y list[Any] | None

Target labels (optional)

None
test_size float

Fraction for test set

0.2
val_size float

Fraction for validation set

0.1
stratify list[Any] | None

If provided, perform stratified split

None
random_state int | None

Random seed

None

Returns:

Type Description
tuple[list[Any], list[Any], list[Any], list[Any] | None, list[Any] | None, list[Any] | None]

Tuple of (X_train, X_val, X_test, y_train, y_val, y_test)

Raises:

Type Description
SplitError

If sizes are invalid

Example

X = list(range(100)) y = [i % 2 for i in range(100)] splits = DataSplitter.train_val_test_split(X, y, test_size=0.2, val_size=0.1) X_train, X_val, X_test, y_train, y_val, y_test = splits

Source code in src/dspu/ml/splits.py
@staticmethod
def train_val_test_split(
    X: list[Any],
    y: list[Any] | None = None,
    test_size: float = 0.2,
    val_size: float = 0.1,
    stratify: list[Any] | None = None,
    random_state: int | None = None,
) -> tuple[
    list[Any], list[Any], list[Any], list[Any] | None, list[Any] | None, list[Any] | None
]:
    """Split data into train, validation, and test sets.

    Args:
        X: Input data
        y: Target labels (optional)
        test_size: Fraction for test set
        val_size: Fraction for validation set
        stratify: If provided, perform stratified split
        random_state: Random seed

    Returns:
        Tuple of (X_train, X_val, X_test, y_train, y_val, y_test)

    Raises:
        SplitError: If sizes are invalid

    Example:
        >>> X = list(range(100))
        >>> y = [i % 2 for i in range(100)]
        >>> splits = DataSplitter.train_val_test_split(X, y, test_size=0.2, val_size=0.1)
        >>> X_train, X_val, X_test, y_train, y_val, y_test = splits
    """
    if test_size + val_size >= 1:
        raise SplitError(f"test_size + val_size must be < 1, got {test_size + val_size}")

    # First split: train+val vs test
    X_temp, X_test, y_temp, y_test = DataSplitter.train_test_split(
        X, y, test_size=test_size, stratify=stratify, random_state=random_state
    )

    # Second split: train vs val (from remaining data)
    val_size_adjusted = val_size / (1 - test_size)
    X_train, X_val, y_train, y_val = DataSplitter.train_test_split(
        X_temp,
        y_temp,
        test_size=val_size_adjusted,
        stratify=stratify,
        random_state=random_state,
    )

    return X_train, X_val, X_test, y_train, y_val, y_test

kfold_split staticmethod

kfold_split(
    X: list[Any],
    n_splits: int = 5,
    shuffle: bool = True,
    random_state: int | None = None,
) -> list[tuple[list[int], list[int]]]

K-fold cross-validation indices.

Splits data into K consecutive folds, yielding train/validation indices for each fold.

Parameters:

Name Type Description Default
X list[Any]

Input data (only length is used)

required
n_splits int

Number of folds

5
shuffle bool

Whether to shuffle data before splitting

True
random_state int | None

Random seed (only used if shuffle=True)

None

Returns:

Type Description
list[tuple[list[int], list[int]]]

List of (train_indices, val_indices) tuples

Raises:

Type Description
SplitError

If n_splits is invalid

Example

X = list(range(10)) folds = DataSplitter.kfold_split(X, n_splits=3) for train_idx, val_idx in folds: ... print(len(train_idx), len(val_idx))

Source code in src/dspu/ml/splits.py
@staticmethod
def kfold_split(
    X: list[Any],
    n_splits: int = 5,
    shuffle: bool = True,
    random_state: int | None = None,
) -> list[tuple[list[int], list[int]]]:
    """K-fold cross-validation indices.

    Splits data into K consecutive folds, yielding train/validation
    indices for each fold.

    Args:
        X: Input data (only length is used)
        n_splits: Number of folds
        shuffle: Whether to shuffle data before splitting
        random_state: Random seed (only used if shuffle=True)

    Returns:
        List of (train_indices, val_indices) tuples

    Raises:
        SplitError: If n_splits is invalid

    Example:
        >>> X = list(range(10))
        >>> folds = DataSplitter.kfold_split(X, n_splits=3)
        >>> for train_idx, val_idx in folds:
        ...     print(len(train_idx), len(val_idx))
    """
    n_samples = len(X)

    if n_splits < 2:
        raise SplitError(f"n_splits must be >= 2, got {n_splits}")

    if n_splits > n_samples:
        raise SplitError(f"n_splits ({n_splits}) cannot exceed n_samples ({n_samples})")

    indices = list(range(n_samples))

    if shuffle:
        rng = random.Random(random_state)
        rng.shuffle(indices)

    fold_sizes = [n_samples // n_splits] * n_splits
    # Distribute remainder
    for i in range(n_samples % n_splits):
        fold_sizes[i] += 1

    folds = []
    current = 0
    fold_indices = []

    for size in fold_sizes:
        fold_indices.append(indices[current : current + size])
        current += size

    # Generate train/val splits
    for i in range(n_splits):
        val_indices = fold_indices[i]
        train_indices = []
        for j in range(n_splits):
            if j != i:
                train_indices.extend(fold_indices[j])

        folds.append((train_indices, val_indices))

    return folds

stratified_kfold staticmethod

stratified_kfold(
    y: list[Any],
    n_splits: int = 5,
    random_state: int | None = None,
) -> list[tuple[list[int], list[int]]]

Stratified K-fold cross-validation indices.

Like K-fold but preserves class distribution in each fold. Useful for imbalanced datasets.

Parameters:

Name Type Description Default
y list[Any]

Target labels

required
n_splits int

Number of folds

5
random_state int | None

Random seed

None

Returns:

Type Description
list[tuple[list[int], list[int]]]

List of (train_indices, val_indices) tuples

Raises:

Type Description
SplitError

If any class has fewer samples than n_splits

Example

y = [0, 0, 0, 1, 1, 1] folds = DataSplitter.stratified_kfold(y, n_splits=3)

Source code in src/dspu/ml/splits.py
@staticmethod
def stratified_kfold(
    y: list[Any],
    n_splits: int = 5,
    random_state: int | None = None,
) -> list[tuple[list[int], list[int]]]:
    """Stratified K-fold cross-validation indices.

    Like K-fold but preserves class distribution in each fold.
    Useful for imbalanced datasets.

    Args:
        y: Target labels
        n_splits: Number of folds
        random_state: Random seed

    Returns:
        List of (train_indices, val_indices) tuples

    Raises:
        SplitError: If any class has fewer samples than n_splits

    Example:
        >>> y = [0, 0, 0, 1, 1, 1]
        >>> folds = DataSplitter.stratified_kfold(y, n_splits=3)
    """
    n_samples = len(y)

    if n_splits < 2:
        raise SplitError(f"n_splits must be >= 2, got {n_splits}")

    # Group indices by class
    class_indices = defaultdict(list)
    for idx, label in enumerate(y):
        class_indices[label].append(idx)

    # Verify each class has enough samples
    for label, indices in class_indices.items():
        if len(indices) < n_splits:
            raise SplitError(
                f"Class {label} has only {len(indices)} samples, "
                f"need at least {n_splits} for {n_splits}-fold CV"
            )

    rng = random.Random(random_state)

    # Shuffle within each class
    for indices in class_indices.values():
        rng.shuffle(indices)

    # Distribute samples from each class across folds
    folds = [[] for _ in range(n_splits)]

    for label, indices in class_indices.items():
        for i, idx in enumerate(indices):
            fold_idx = i % n_splits
            folds[fold_idx].append(idx)

    # Generate train/val splits
    result = []
    for i in range(n_splits):
        val_indices = folds[i]
        train_indices = []
        for j in range(n_splits):
            if j != i:
                train_indices.extend(folds[j])

        result.append((train_indices, val_indices))

    return result

time_series_split staticmethod

time_series_split(
    X: list[Any],
    n_splits: int = 5,
    max_train_size: int | None = None,
) -> list[tuple[list[int], list[int]]]

Time-series cross-validation indices.

Sequential splits suitable for temporal data where future data must not leak into training. Each fold uses progressively more historical data.

Parameters:

Name Type Description Default
X list[Any]

Input data (only length is used)

required
n_splits int

Number of splits

5
max_train_size int | None

Maximum size of training set (None = unlimited)

None

Returns:

Type Description
list[tuple[list[int], list[int]]]

List of (train_indices, val_indices) tuples

Raises:

Type Description
SplitError

If n_splits is too large

Example

X = list(range(20)) splits = DataSplitter.time_series_split(X, n_splits=4)

Split 1: train [0-4], test [5-9]
Split 2: train [0-9], test [10-14]
Split 3: train [0-14], test [15-19]
Source code in src/dspu/ml/splits.py
@staticmethod
def time_series_split(
    X: list[Any],
    n_splits: int = 5,
    max_train_size: int | None = None,
) -> list[tuple[list[int], list[int]]]:
    """Time-series cross-validation indices.

    Sequential splits suitable for temporal data where future data
    must not leak into training. Each fold uses progressively more
    historical data.

    Args:
        X: Input data (only length is used)
        n_splits: Number of splits
        max_train_size: Maximum size of training set (None = unlimited)

    Returns:
        List of (train_indices, val_indices) tuples

    Raises:
        SplitError: If n_splits is too large

    Example:
        >>> X = list(range(20))
        >>> splits = DataSplitter.time_series_split(X, n_splits=4)
        >>> # Split 1: train [0-4], test [5-9]
        >>> # Split 2: train [0-9], test [10-14]
        >>> # Split 3: train [0-14], test [15-19]
    """
    n_samples = len(X)

    if n_splits < 2:
        raise SplitError(f"n_splits must be >= 2, got {n_splits}")

    # Each fold needs at least 1 sample for validation
    if n_splits >= n_samples:
        raise SplitError(f"n_splits ({n_splits}) must be less than n_samples ({n_samples})")

    # Calculate fold size for validation sets
    test_size = n_samples // (n_splits + 1)
    if test_size == 0:
        test_size = 1

    splits = []

    for i in range(n_splits):
        # Validation set: next chunk of data
        val_start = (i + 1) * test_size
        val_end = min(val_start + test_size, n_samples)

        if val_start >= n_samples:
            break

        # Training set: all data before validation set
        train_start = 0
        train_end = val_start

        if max_train_size is not None and (train_end - train_start) > max_train_size:
            train_start = train_end - max_train_size

        train_indices = list(range(train_start, train_end))
        val_indices = list(range(val_start, val_end))

        splits.append((train_indices, val_indices))

    return splits

group_split staticmethod

group_split(
    X: list[Any],
    groups: list[Any],
    y: list[Any] | None = None,
    test_size: float = 0.25,
    random_state: int | None = None,
) -> tuple[
    list[Any], list[Any], list[Any] | None, list[Any] | None
]

Split data by groups to prevent leakage.

Ensures that samples from the same group appear only in either train or test set, never both. Critical for scenarios like: - Patient data (keep all visits from same patient together) - Time series (keep all events from same entity together) - Hierarchical data (keep related samples together)

Parameters:

Name Type Description Default
X list[Any]

Input data

required
groups list[Any]

Group labels for each sample

required
y list[Any] | None

Target labels (optional)

None
test_size float

Approximate fraction for test set

0.25
random_state int | None

Random seed

None

Returns:

Type Description
tuple[list[Any], list[Any], list[Any] | None, list[Any] | None]

Tuple of (X_train, X_test, y_train, y_test)

Raises:

Type Description
SplitError

If data sizes don't match

Example

X = [[1], [2], [3], [4], [5], [6]] y = [0, 0, 1, 1, 0, 1] groups = ["A", "A", "B", "B", "C", "C"] # A, B, C are patients X_train, X_test, y_train, y_test = DataSplitter.group_split( ... X, groups, y, test_size=0.33 ... )

All samples from same patient stay together
Source code in src/dspu/ml/splits.py
@staticmethod
def group_split(
    X: list[Any],
    groups: list[Any],
    y: list[Any] | None = None,
    test_size: float = 0.25,
    random_state: int | None = None,
) -> tuple[list[Any], list[Any], list[Any] | None, list[Any] | None]:
    """Split data by groups to prevent leakage.

    Ensures that samples from the same group appear only in either
    train or test set, never both. Critical for scenarios like:
    - Patient data (keep all visits from same patient together)
    - Time series (keep all events from same entity together)
    - Hierarchical data (keep related samples together)

    Args:
        X: Input data
        groups: Group labels for each sample
        y: Target labels (optional)
        test_size: Approximate fraction for test set
        random_state: Random seed

    Returns:
        Tuple of (X_train, X_test, y_train, y_test)

    Raises:
        SplitError: If data sizes don't match

    Example:
        >>> X = [[1], [2], [3], [4], [5], [6]]
        >>> y = [0, 0, 1, 1, 0, 1]
        >>> groups = ["A", "A", "B", "B", "C", "C"]  # A, B, C are patients
        >>> X_train, X_test, y_train, y_test = DataSplitter.group_split(
        ...     X, groups, y, test_size=0.33
        ... )
        >>> # All samples from same patient stay together
    """
    n_samples = len(X)

    if len(groups) != n_samples:
        raise SplitError(f"X and groups have different lengths: {n_samples} vs {len(groups)}")

    if y is not None and len(y) != n_samples:
        raise SplitError(f"X and y have different lengths: {n_samples} vs {len(y)}")

    # Get unique groups and their indices
    group_indices = defaultdict(list)
    for idx, group in enumerate(groups):
        group_indices[group].append(idx)

    unique_groups = list(group_indices.keys())
    n_groups = len(unique_groups)

    # Split groups (not individual samples)
    rng = random.Random(random_state)
    rng.shuffle(unique_groups)

    n_test_groups = max(1, int(n_groups * test_size))
    test_groups = set(unique_groups[:n_test_groups])

    # Assign samples based on group membership
    train_indices = []
    test_indices = []

    for group, indices in group_indices.items():
        if group in test_groups:
            test_indices.extend(indices)
        else:
            train_indices.extend(indices)

    # Split data
    X_train = [X[i] for i in train_indices]
    X_test = [X[i] for i in test_indices]

    if y is not None:
        y_train = [y[i] for i in train_indices]
        y_test = [y[i] for i in test_indices]
        return X_train, X_test, y_train, y_test
    return X_train, X_test, None, None

Statistical Utilities

Stats

dspu.ml.stats.Stats

Statistical utilities for data analysis and ML.

Provides common statistical functions without requiring scipy. For advanced statistical analysis, consider using scipy.stats.

Example

x = [1, 2, 3, 4, 5] y = [2, 4, 5, 4, 5] corr = Stats.correlation(x, y)

Functions

mean staticmethod

mean(data: list[float]) -> float

Calculate arithmetic mean.

Parameters:

Name Type Description Default
data list[float]

List of numbers

required

Returns:

Type Description
float

Mean value

Raises:

Type Description
StatsError

If data is empty

Source code in src/dspu/ml/stats.py
@staticmethod
def mean(data: list[float]) -> float:
    """Calculate arithmetic mean.

    Args:
        data: List of numbers

    Returns:
        Mean value

    Raises:
        StatsError: If data is empty
    """
    if not data:
        raise StatsError("Cannot compute mean of empty data")
    return sum(data) / len(data)

median staticmethod

median(data: list[float]) -> float

Calculate median.

Parameters:

Name Type Description Default
data list[float]

List of numbers

required

Returns:

Type Description
float

Median value

Raises:

Type Description
StatsError

If data is empty

Source code in src/dspu/ml/stats.py
@staticmethod
def median(data: list[float]) -> float:
    """Calculate median.

    Args:
        data: List of numbers

    Returns:
        Median value

    Raises:
        StatsError: If data is empty
    """
    if not data:
        raise StatsError("Cannot compute median of empty data")

    sorted_data = sorted(data)
    n = len(sorted_data)

    if n % 2 == 0:
        return (sorted_data[n // 2 - 1] + sorted_data[n // 2]) / 2
    return sorted_data[n // 2]

std staticmethod

std(data: list[float], ddof: int = 0) -> float

Calculate standard deviation.

Parameters:

Name Type Description Default
data list[float]

List of numbers

required
ddof int

Delta degrees of freedom (0 for population, 1 for sample)

0

Returns:

Type Description
float

Standard deviation

Raises:

Type Description
StatsError

If data is empty or has insufficient samples

Source code in src/dspu/ml/stats.py
@staticmethod
def std(data: list[float], ddof: int = 0) -> float:
    """Calculate standard deviation.

    Args:
        data: List of numbers
        ddof: Delta degrees of freedom (0 for population, 1 for sample)

    Returns:
        Standard deviation

    Raises:
        StatsError: If data is empty or has insufficient samples
    """
    if not data:
        raise StatsError("Cannot compute std of empty data")

    if len(data) <= ddof:
        raise StatsError(f"Need more than {ddof} samples for ddof={ddof}")

    mean_val = Stats.mean(data)
    variance = sum((x - mean_val) ** 2 for x in data) / (len(data) - ddof)
    return math.sqrt(variance)

correlation staticmethod

correlation(
    x: list[float],
    y: list[float],
    method: Literal[
        "pearson", "spearman", "kendall"
    ] = "pearson",
) -> float

Calculate correlation between two variables.

Parameters:

Name Type Description Default
x list[float]

First variable

required
y list[float]

Second variable

required
method Literal['pearson', 'spearman', 'kendall']

Correlation method - "pearson", "spearman", or "kendall"

'pearson'

Returns:

Type Description
float

Correlation coefficient (between -1 and 1)

Raises:

Type Description
StatsError

If lengths don't match or method is invalid

Example

x = [1, 2, 3, 4, 5] y = [2, 4, 5, 4, 5] corr = Stats.correlation(x, y, method="pearson")

Source code in src/dspu/ml/stats.py
@staticmethod
def correlation(
    x: list[float],
    y: list[float],
    method: Literal["pearson", "spearman", "kendall"] = "pearson",
) -> float:
    """Calculate correlation between two variables.

    Args:
        x: First variable
        y: Second variable
        method: Correlation method - "pearson", "spearman", or "kendall"

    Returns:
        Correlation coefficient (between -1 and 1)

    Raises:
        StatsError: If lengths don't match or method is invalid

    Example:
        >>> x = [1, 2, 3, 4, 5]
        >>> y = [2, 4, 5, 4, 5]
        >>> corr = Stats.correlation(x, y, method="pearson")
    """
    if len(x) != len(y):
        raise StatsError(f"x and y must have same length: {len(x)} vs {len(y)}")

    if len(x) < 2:
        raise StatsError("Need at least 2 samples for correlation")

    if method == "pearson":
        return Stats._pearson_correlation(x, y)
    if method == "spearman":
        # Spearman = Pearson on ranks
        x_ranks = Stats._rank_data(x)
        y_ranks = Stats._rank_data(y)
        return Stats._pearson_correlation(x_ranks, y_ranks)
    if method == "kendall":
        return Stats._kendall_tau(x, y)
    raise StatsError(f"Unknown correlation method: {method}")

t_test_independent staticmethod

t_test_independent(
    sample1: list[float],
    sample2: list[float],
    equal_var: bool = True,
) -> tuple[float, float]

Independent two-sample t-test.

Tests whether two samples have different means.

Parameters:

Name Type Description Default
sample1 list[float]

First sample

required
sample2 list[float]

Second sample

required
equal_var bool

Assume equal variances (True) or not (False, Welch's t-test)

True

Returns:

Name Type Description
float

Tuple of (t_statistic, p_value_approx)

Note float

p-value is approximate

Raises:

Type Description
StatsError

If samples are too small

Example

sample1 = [1, 2, 3, 4, 5] sample2 = [2, 3, 4, 5, 6] t_stat, p_val = Stats.t_test_independent(sample1, sample2)

Source code in src/dspu/ml/stats.py
@staticmethod
def t_test_independent(
    sample1: list[float],
    sample2: list[float],
    equal_var: bool = True,
) -> tuple[float, float]:
    """Independent two-sample t-test.

    Tests whether two samples have different means.

    Args:
        sample1: First sample
        sample2: Second sample
        equal_var: Assume equal variances (True) or not (False, Welch's t-test)

    Returns:
        Tuple of (t_statistic, p_value_approx)
        Note: p-value is approximate

    Raises:
        StatsError: If samples are too small

    Example:
        >>> sample1 = [1, 2, 3, 4, 5]
        >>> sample2 = [2, 3, 4, 5, 6]
        >>> t_stat, p_val = Stats.t_test_independent(sample1, sample2)
    """
    if len(sample1) < 2 or len(sample2) < 2:
        raise StatsError("Each sample must have at least 2 observations")

    n1, n2 = len(sample1), len(sample2)
    mean1 = Stats.mean(sample1)
    mean2 = Stats.mean(sample2)
    std1 = Stats.std(sample1, ddof=1)
    std2 = Stats.std(sample2, ddof=1)

    if equal_var:
        # Pooled standard deviation
        pooled_std = math.sqrt(((n1 - 1) * std1**2 + (n2 - 1) * std2**2) / (n1 + n2 - 2))
        t_stat = (mean1 - mean2) / (pooled_std * math.sqrt(1 / n1 + 1 / n2))
        df = n1 + n2 - 2
    else:
        # Welch's t-test
        s1 = std1**2 / n1
        s2 = std2**2 / n2
        t_stat = (mean1 - mean2) / math.sqrt(s1 + s2)

        # Welch-Satterthwaite degrees of freedom
        df = (s1 + s2) ** 2 / (s1**2 / (n1 - 1) + s2**2 / (n2 - 1))

    # Approximate p-value using normal approximation for large df
    # For small df, this is less accurate
    p_value = 2 * (1 - Stats._normal_cdf(abs(t_stat)))

    return t_stat, p_value

bootstrap_ci staticmethod

bootstrap_ci(
    data: list[float],
    stat_fn: Callable[[list[float]], float] | None = None,
    n_bootstrap: int = 1000,
    confidence_level: float = 0.95,
    random_state: int | None = None,
) -> tuple[float, float, float]

Bootstrap confidence interval for a statistic.

Parameters:

Name Type Description Default
data list[float]

Input data

required
stat_fn Callable[[list[float]], float] | None

Statistic function (default: mean)

None
n_bootstrap int

Number of bootstrap samples

1000
confidence_level float

Confidence level (e.g., 0.95 for 95% CI)

0.95
random_state int | None

Random seed

None

Returns:

Type Description
tuple[float, float, float]

Tuple of (estimate, lower_bound, upper_bound)

Example

data = [1, 2, 3, 4, 5] estimate, lower, upper = Stats.bootstrap_ci(data, n_bootstrap=1000)

Source code in src/dspu/ml/stats.py
@staticmethod
def bootstrap_ci(
    data: list[float],
    stat_fn: Callable[[list[float]], float] | None = None,
    n_bootstrap: int = 1000,
    confidence_level: float = 0.95,
    random_state: int | None = None,
) -> tuple[float, float, float]:
    """Bootstrap confidence interval for a statistic.

    Args:
        data: Input data
        stat_fn: Statistic function (default: mean)
        n_bootstrap: Number of bootstrap samples
        confidence_level: Confidence level (e.g., 0.95 for 95% CI)
        random_state: Random seed

    Returns:
        Tuple of (estimate, lower_bound, upper_bound)

    Example:
        >>> data = [1, 2, 3, 4, 5]
        >>> estimate, lower, upper = Stats.bootstrap_ci(data, n_bootstrap=1000)
    """
    if stat_fn is None:
        stat_fn = Stats.mean

    if not 0 < confidence_level < 1:
        raise StatsError(f"confidence_level must be between 0 and 1, got {confidence_level}")

    rng = random.Random(random_state)

    # Calculate original statistic
    estimate = stat_fn(data)

    # Bootstrap resampling
    bootstrap_stats = []
    n = len(data)

    for _ in range(n_bootstrap):
        # Resample with replacement
        sample = [rng.choice(data) for _ in range(n)]
        stat = stat_fn(sample)
        bootstrap_stats.append(stat)

    # Calculate percentile confidence interval
    bootstrap_stats.sort()
    alpha = 1 - confidence_level
    lower_idx = int(n_bootstrap * alpha / 2)
    upper_idx = int(n_bootstrap * (1 - alpha / 2))

    lower_bound = bootstrap_stats[lower_idx]
    upper_bound = bootstrap_stats[upper_idx]

    return estimate, lower_bound, upper_bound

ab_test_uplift staticmethod

ab_test_uplift(
    group_a: list[float],
    group_b: list[float],
    metric_fn: Callable[[list[float]], float] | None = None,
    n_bootstrap: int = 1000,
    confidence_level: float = 0.95,
    random_state: int | None = None,
) -> dict[str, float]

A/B test uplift calculation with bootstrap confidence interval.

Parameters:

Name Type Description Default
group_a list[float]

Metric values for group A (control)

required
group_b list[float]

Metric values for group B (treatment)

required
metric_fn Callable[[list[float]], float] | None

Metric function (default: mean)

None
n_bootstrap int

Number of bootstrap samples

1000
confidence_level float

Confidence level

0.95
random_state int | None

Random seed

None

Returns:

Type Description
dict[str, float]

Dictionary with keys:

dict[str, float]
  • "a_mean": Mean for group A
dict[str, float]
  • "b_mean": Mean for group B
dict[str, float]
  • "absolute_uplift": B - A
dict[str, float]
  • "relative_uplift": (B - A) / A
dict[str, float]
  • "uplift_ci_lower": Lower confidence bound for uplift
dict[str, float]
  • "uplift_ci_upper": Upper confidence bound for uplift
Example

group_a = [10, 12, 11, 13, 12] # Control group_b = [15, 16, 14, 17, 16] # Treatment result = Stats.ab_test_uplift(group_a, group_b) print(result["relative_uplift"]) # % improvement

Source code in src/dspu/ml/stats.py
@staticmethod
def ab_test_uplift(
    group_a: list[float],
    group_b: list[float],
    metric_fn: Callable[[list[float]], float] | None = None,
    n_bootstrap: int = 1000,
    confidence_level: float = 0.95,
    random_state: int | None = None,
) -> dict[str, float]:
    """A/B test uplift calculation with bootstrap confidence interval.

    Args:
        group_a: Metric values for group A (control)
        group_b: Metric values for group B (treatment)
        metric_fn: Metric function (default: mean)
        n_bootstrap: Number of bootstrap samples
        confidence_level: Confidence level
        random_state: Random seed

    Returns:
        Dictionary with keys:
        - "a_mean": Mean for group A
        - "b_mean": Mean for group B
        - "absolute_uplift": B - A
        - "relative_uplift": (B - A) / A
        - "uplift_ci_lower": Lower confidence bound for uplift
        - "uplift_ci_upper": Upper confidence bound for uplift

    Example:
        >>> group_a = [10, 12, 11, 13, 12]  # Control
        >>> group_b = [15, 16, 14, 17, 16]  # Treatment
        >>> result = Stats.ab_test_uplift(group_a, group_b)
        >>> print(result["relative_uplift"])  # % improvement
    """
    if metric_fn is None:
        metric_fn = Stats.mean

    rng = random.Random(random_state)

    # Calculate observed metrics
    a_mean = metric_fn(group_a)
    b_mean = metric_fn(group_b)
    absolute_uplift = b_mean - a_mean
    relative_uplift = (absolute_uplift / a_mean) if a_mean != 0 else 0.0

    # Bootstrap uplift distribution
    uplift_dist = []
    n_a = len(group_a)
    n_b = len(group_b)

    for _ in range(n_bootstrap):
        sample_a = [rng.choice(group_a) for _ in range(n_a)]
        sample_b = [rng.choice(group_b) for _ in range(n_b)]

        uplift = metric_fn(sample_b) - metric_fn(sample_a)
        uplift_dist.append(uplift)

    # Confidence interval for uplift
    uplift_dist.sort()
    alpha = 1 - confidence_level
    lower_idx = int(n_bootstrap * alpha / 2)
    upper_idx = int(n_bootstrap * (1 - alpha / 2))

    return {
        "a_mean": a_mean,
        "b_mean": b_mean,
        "absolute_uplift": absolute_uplift,
        "relative_uplift": relative_uplift,
        "uplift_ci_lower": uplift_dist[lower_idx],
        "uplift_ci_upper": uplift_dist[upper_idx],
    }

detect_outliers staticmethod

detect_outliers(
    data: list[float],
    method: Literal["iqr", "zscore"] = "iqr",
    threshold: float | None = None,
) -> list[int]

Detect outliers in data.

Parameters:

Name Type Description Default
data list[float]

Input data

required
method Literal['iqr', 'zscore']

Detection method - "iqr" (interquartile range) or "zscore"

'iqr'
threshold float | None

Custom threshold (default: 1.5 for IQR, 3.0 for Z-score)

None

Returns:

Type Description
list[int]

List of indices of outlier values

Example

data = [1, 2, 3, 4, 5, 100] outlier_indices = Stats.detect_outliers(data, method="iqr") [data[i] for i in outlier_indices] [100]

Source code in src/dspu/ml/stats.py
@staticmethod
def detect_outliers(
    data: list[float],
    method: Literal["iqr", "zscore"] = "iqr",
    threshold: float | None = None,
) -> list[int]:
    """Detect outliers in data.

    Args:
        data: Input data
        method: Detection method - "iqr" (interquartile range) or "zscore"
        threshold: Custom threshold (default: 1.5 for IQR, 3.0 for Z-score)

    Returns:
        List of indices of outlier values

    Example:
        >>> data = [1, 2, 3, 4, 5, 100]
        >>> outlier_indices = Stats.detect_outliers(data, method="iqr")
        >>> [data[i] for i in outlier_indices]
        \\[100]
    """
    if method == "iqr":
        if threshold is None:
            threshold = 1.5

        # Calculate quartiles
        sorted_data = sorted(data)
        n = len(sorted_data)

        q1_idx = n // 4
        q3_idx = 3 * n // 4

        q1 = sorted_data[q1_idx]
        q3 = sorted_data[q3_idx]
        iqr = q3 - q1

        lower_bound = q1 - threshold * iqr
        upper_bound = q3 + threshold * iqr

        outliers = [i for i, x in enumerate(data) if x < lower_bound or x > upper_bound]

    elif method == "zscore":
        if threshold is None:
            threshold = 3.0

        mean_val = Stats.mean(data)
        std_val = Stats.std(data, ddof=1)

        if std_val == 0:
            return []

        outliers = [i for i, x in enumerate(data) if abs((x - mean_val) / std_val) > threshold]

    else:
        raise StatsError(f"Unknown method: {method}. Use 'iqr' or 'zscore'")

    return outliers

Feature Scaling

Scaler

dspu.ml.scaling.Scaler

Scaler(
    method: Literal[
        "standard", "minmax", "robust"
    ] = "standard",
    feature_range: tuple[float, float] = (0.0, 1.0),
)

Feature scaling with fit/transform pattern.

Supports multiple scaling methods: - standard: (x - mean) / std - minmax: (x - min) / (max - min) - robust: (x - median) / IQR

Example

scaler = Scaler(method="standard") X = [[1.0, 2.0], [2.0, 4.0], [3.0, 6.0]] X_scaled = scaler.fit_transform(X)

Initialize scaler.

Parameters:

Name Type Description Default
method Literal['standard', 'minmax', 'robust']

Scaling method - "standard", "minmax", or "robust"

'standard'
feature_range tuple[float, float]

Target range for minmax scaling (default: (0, 1))

(0.0, 1.0)

Raises:

Type Description
ScalingError

If method is invalid

Source code in src/dspu/ml/scaling.py
def __init__(
    self,
    method: Literal["standard", "minmax", "robust"] = "standard",
    feature_range: tuple[float, float] = (0.0, 1.0),
):
    """Initialize scaler.

    Args:
        method: Scaling method - "standard", "minmax", or "robust"
        feature_range: Target range for minmax scaling (default: (0, 1))

    Raises:
        ScalingError: If method is invalid
    """
    valid_methods = {"standard", "minmax", "robust"}
    if method not in valid_methods:
        raise ScalingError(f"Invalid method: {method}. Use one of {valid_methods}")

    self.method = method
    self.feature_range = feature_range
    self._is_fitted = False

    # Statistics will be stored per feature
    self._means: list[float] | None = None
    self._stds: list[float] | None = None
    self._mins: list[float] | None = None
    self._maxs: list[float] | None = None
    self._medians: list[float] | None = None
    self._iqrs: list[float] | None = None
    self._n_features: int | None = None

Functions

fit

fit(X: list[list[float]]) -> Scaler

Fit scaler to data by computing statistics.

Parameters:

Name Type Description Default
X list[list[float]]

Training data (list of samples, each sample is list of features)

required

Returns:

Type Description
Scaler

self (for method chaining)

Raises:

Type Description
ScalingError

If data is empty or invalid

Example

scaler = Scaler(method="standard") X = [[1.0, 2.0], [2.0, 4.0]] scaler.fit(X)

Source code in src/dspu/ml/scaling.py
def fit(self, X: list[list[float]]) -> "Scaler":
    """Fit scaler to data by computing statistics.

    Args:
        X: Training data (list of samples, each sample is list of features)

    Returns:
        self (for method chaining)

    Raises:
        ScalingError: If data is empty or invalid

    Example:
        >>> scaler = Scaler(method="standard")
        >>> X = [[1.0, 2.0], [2.0, 4.0]]
        >>> scaler.fit(X)
    """
    if not X:
        raise ScalingError("Cannot fit on empty data")

    n_samples = len(X)
    n_features = len(X[0])

    if any(len(row) != n_features for row in X):
        raise ScalingError("All samples must have same number of features")

    self._n_features = n_features

    if self.method == "standard":
        # Compute mean and std for each feature
        self._means = []
        self._stds = []

        for feat_idx in range(n_features):
            values = [row[feat_idx] for row in X]
            mean = sum(values) / n_samples

            variance = sum((x - mean) ** 2 for x in values) / n_samples
            std = variance**0.5

            # Avoid division by zero
            if std == 0:
                std = 1.0

            self._means.append(mean)
            self._stds.append(std)

    elif self.method == "minmax":
        # Compute min and max for each feature
        self._mins = []
        self._maxs = []

        for feat_idx in range(n_features):
            values = [row[feat_idx] for row in X]
            min_val = min(values)
            max_val = max(values)

            # Avoid division by zero
            if max_val == min_val:
                max_val = min_val + 1.0

            self._mins.append(min_val)
            self._maxs.append(max_val)

    elif self.method == "robust":
        # Compute median and IQR for each feature
        self._medians = []
        self._iqrs = []

        for feat_idx in range(n_features):
            values = sorted([row[feat_idx] for row in X])
            n = len(values)

            # Median
            if n % 2 == 0:
                median = (values[n // 2 - 1] + values[n // 2]) / 2
            else:
                median = values[n // 2]

            # IQR (Q3 - Q1)
            q1_idx = n // 4
            q3_idx = 3 * n // 4
            q1 = values[q1_idx]
            q3 = values[q3_idx]
            iqr = q3 - q1

            # Avoid division by zero
            if iqr == 0:
                iqr = 1.0

            self._medians.append(median)
            self._iqrs.append(iqr)

    self._is_fitted = True
    return self

transform

transform(X: list[list[float]]) -> list[list[float]]

Transform data using fitted parameters.

Parameters:

Name Type Description Default
X list[list[float]]

Data to transform

required

Returns:

Type Description
list[list[float]]

Scaled data

Raises:

Type Description
ScalingError

If scaler not fitted or data shape mismatch

Example

scaler = Scaler(method="standard") X_train = [[1.0], [2.0], [3.0]] scaler.fit(X_train) X_test = [[1.5]] X_test_scaled = scaler.transform(X_test)

Source code in src/dspu/ml/scaling.py
def transform(self, X: list[list[float]]) -> list[list[float]]:
    """Transform data using fitted parameters.

    Args:
        X: Data to transform

    Returns:
        Scaled data

    Raises:
        ScalingError: If scaler not fitted or data shape mismatch

    Example:
        >>> scaler = Scaler(method="standard")
        >>> X_train = [[1.0], [2.0], [3.0]]
        >>> scaler.fit(X_train)
        >>> X_test = [[1.5]]
        >>> X_test_scaled = scaler.transform(X_test)
    """
    if not self._is_fitted:
        raise ScalingError("Scaler not fitted. Call fit() first.")

    # Type narrowing: after fit(), these attributes are guaranteed to be set
    assert self._n_features is not None
    if self.method == "standard":
        assert self._means is not None and self._stds is not None
    elif self.method == "minmax":
        assert self._mins is not None and self._maxs is not None
    elif self.method == "robust":
        assert self._medians is not None and self._iqrs is not None

    if not X:
        return []

    n_features = len(X[0])
    if n_features != self._n_features:
        raise ScalingError(f"Expected {self._n_features} features, got {n_features}")

    X_scaled = []

    for row in X:
        if len(row) != self._n_features:
            raise ScalingError(f"All samples must have {self._n_features} features")

        scaled_row = []

        for feat_idx, value in enumerate(row):
            if self.method == "standard":
                assert self._means is not None and self._stds is not None
                scaled_value = (value - self._means[feat_idx]) / self._stds[feat_idx]

            elif self.method == "minmax":
                assert self._mins is not None and self._maxs is not None
                # Scale to [0, 1]
                min_val = self._mins[feat_idx]
                max_val = self._maxs[feat_idx]
                scaled_01 = (value - min_val) / (max_val - min_val)

                # Scale to feature_range
                range_min, range_max = self.feature_range
                scaled_value = scaled_01 * (range_max - range_min) + range_min

            elif self.method == "robust":
                assert self._medians is not None and self._iqrs is not None
                scaled_value = (value - self._medians[feat_idx]) / self._iqrs[feat_idx]

            else:
                raise ScalingError(f"Unknown method: {self.method}")  # Should never happen

            scaled_row.append(scaled_value)

        X_scaled.append(scaled_row)

    return X_scaled

fit_transform

fit_transform(X: list[list[float]]) -> list[list[float]]

Fit scaler and transform data in one step.

Parameters:

Name Type Description Default
X list[list[float]]

Training data

required

Returns:

Type Description
list[list[float]]

Scaled data

Example

scaler = Scaler(method="standard") X = [[1.0], [2.0], [3.0]] X_scaled = scaler.fit_transform(X)

Source code in src/dspu/ml/scaling.py
def fit_transform(self, X: list[list[float]]) -> list[list[float]]:
    """Fit scaler and transform data in one step.

    Args:
        X: Training data

    Returns:
        Scaled data

    Example:
        >>> scaler = Scaler(method="standard")
        >>> X = [[1.0], [2.0], [3.0]]
        >>> X_scaled = scaler.fit_transform(X)
    """
    self.fit(X)
    return self.transform(X)

inverse_transform

inverse_transform(
    X_scaled: list[list[float]],
) -> list[list[float]]

Reverse scaling transformation.

Parameters:

Name Type Description Default
X_scaled list[list[float]]

Scaled data

required

Returns:

Type Description
list[list[float]]

Original-scale data

Raises:

Type Description
ScalingError

If scaler not fitted or inverse not supported

Example

scaler = Scaler(method="standard") X = [[1.0], [2.0], [3.0]] X_scaled = scaler.fit_transform(X) X_recovered = scaler.inverse_transform(X_scaled)

Source code in src/dspu/ml/scaling.py
def inverse_transform(self, X_scaled: list[list[float]]) -> list[list[float]]:
    """Reverse scaling transformation.

    Args:
        X_scaled: Scaled data

    Returns:
        Original-scale data

    Raises:
        ScalingError: If scaler not fitted or inverse not supported

    Example:
        >>> scaler = Scaler(method="standard")
        >>> X = [[1.0], [2.0], [3.0]]
        >>> X_scaled = scaler.fit_transform(X)
        >>> X_recovered = scaler.inverse_transform(X_scaled)
    """
    if not self._is_fitted:
        raise ScalingError("Scaler not fitted. Call fit() first.")

    # Type narrowing: after fit(), these attributes are guaranteed to be set
    assert self._n_features is not None
    if self.method == "standard":
        assert self._means is not None and self._stds is not None
    elif self.method == "minmax":
        assert self._mins is not None and self._maxs is not None
    elif self.method == "robust":
        assert self._medians is not None and self._iqrs is not None

    if not X_scaled:
        return []

    X_original = []

    for row in X_scaled:
        if len(row) != self._n_features:
            raise ScalingError(f"All samples must have {self._n_features} features")

        original_row = []

        for feat_idx, scaled_value in enumerate(row):
            if self.method == "standard":
                assert self._means is not None and self._stds is not None
                original_value = scaled_value * self._stds[feat_idx] + self._means[feat_idx]

            elif self.method == "minmax":
                assert self._mins is not None and self._maxs is not None
                # Reverse feature_range scaling
                range_min, range_max = self.feature_range
                scaled_01 = (scaled_value - range_min) / (range_max - range_min)

                # Reverse [0, 1] scaling
                min_val = self._mins[feat_idx]
                max_val = self._maxs[feat_idx]
                original_value = scaled_01 * (max_val - min_val) + min_val

            elif self.method == "robust":
                assert self._medians is not None and self._iqrs is not None
                original_value = scaled_value * self._iqrs[feat_idx] + self._medians[feat_idx]

            else:
                raise ScalingError(f"Unknown method: {self.method}")  # Should never happen

            original_row.append(original_value)

        X_original.append(original_row)

    return X_original

save_state

save_state() -> dict[str, Any]

Save scaler state for serialization.

Returns:

Type Description
dict[str, Any]

Dictionary containing scaler configuration and statistics

Raises:

Type Description
ScalingError

If scaler not fitted

Example

scaler = Scaler(method="standard") scaler.fit([[1.0], [2.0]]) state = scaler.save_state()

Save to file: json.dump(state, f)
Source code in src/dspu/ml/scaling.py
def save_state(self) -> dict[str, Any]:
    """Save scaler state for serialization.

    Returns:
        Dictionary containing scaler configuration and statistics

    Raises:
        ScalingError: If scaler not fitted

    Example:
        >>> scaler = Scaler(method="standard")
        >>> scaler.fit([[1.0], [2.0]])
        >>> state = scaler.save_state()
        >>> # Save to file: json.dump(state, f)
    """
    if not self._is_fitted:
        raise ScalingError("Cannot save state of unfitted scaler")

    state: dict[str, Any] = {
        "method": self.method,
        "feature_range": self.feature_range,
        "n_features": self._n_features,
    }

    if self.method == "standard":
        state["means"] = self._means
        state["stds"] = self._stds
    elif self.method == "minmax":
        state["mins"] = self._mins
        state["maxs"] = self._maxs
    elif self.method == "robust":
        state["medians"] = self._medians
        state["iqrs"] = self._iqrs

    return state

from_state classmethod

from_state(state: dict[str, Any]) -> Scaler

Load scaler from saved state.

Parameters:

Name Type Description Default
state dict[str, Any]

Dictionary from save_state()

required

Returns:

Type Description
Scaler

Configured scaler

Example

state = {"method": "standard", "means": [0.0], ...} scaler = Scaler.from_state(state)

Source code in src/dspu/ml/scaling.py
@classmethod
def from_state(cls, state: dict[str, Any]) -> "Scaler":
    """Load scaler from saved state.

    Args:
        state: Dictionary from save_state()

    Returns:
        Configured scaler

    Example:
        >>> state = {"method": "standard", "means": [0.0], ...}
        >>> scaler = Scaler.from_state(state)
    """
    method = state["method"]
    feature_range_data = state["feature_range"]
    feature_range = (float(feature_range_data[0]), float(feature_range_data[1]))

    scaler = cls(method=method, feature_range=feature_range)
    scaler._n_features = state["n_features"]

    if method == "standard":
        scaler._means = state["means"]
        scaler._stds = state["stds"]
    elif method == "minmax":
        scaler._mins = state["mins"]
        scaler._maxs = state["maxs"]
    elif method == "robust":
        scaler._medians = state["medians"]
        scaler._iqrs = state["iqrs"]

    scaler._is_fitted = True
    return scaler

save_to_file

save_to_file(filepath: str) -> None

Save scaler state to JSON file.

Parameters:

Name Type Description Default
filepath str

Path to output file

required
Example

scaler.save_to_file("scaler.json")

Source code in src/dspu/ml/scaling.py
def save_to_file(self, filepath: str) -> None:
    """Save scaler state to JSON file.

    Args:
        filepath: Path to output file

    Example:
        >>> scaler.save_to_file("scaler.json")
    """
    state = self.save_state()
    with open(filepath, "w") as f:
        json.dump(state, f, indent=2)

load_from_file classmethod

load_from_file(filepath: str) -> Scaler

Load scaler from JSON file.

Parameters:

Name Type Description Default
filepath str

Path to state file

required

Returns:

Type Description
Scaler

Configured scaler

Example

scaler = Scaler.load_from_file("scaler.json")

Source code in src/dspu/ml/scaling.py
@classmethod
def load_from_file(cls, filepath: str) -> "Scaler":
    """Load scaler from JSON file.

    Args:
        filepath: Path to state file

    Returns:
        Configured scaler

    Example:
        >>> scaler = Scaler.load_from_file("scaler.json")
    """
    with open(filepath) as f:
        state = json.load(f)
    return cls.from_state(state)

Categorical Encoding

Encoder

dspu.ml.encoding.Encoder

Encoder(
    method: Literal[
        "label", "onehot", "ordinal", "frequency"
    ] = "label",
    categories: list[str] | None = None,
    handle_unknown: Literal[
        "error", "use_default", "ignore"
    ] = "error",
    unknown_value: int | list[int] | None = None,
)

Categorical encoding with fit/transform pattern.

Supports multiple encoding methods: - label: Categories → integers (0, 1, 2, ...) - onehot: Categories → binary vectors ([1,0,0], [0,1,0], ...) - ordinal: Categories → ordered integers (custom order) - frequency: Categories → occurrence frequency

Example

encoder = Encoder(method="label") categories = ["A", "B", "A", "C"] encoded = encoder.fit_transform(categories)

Initialize encoder.

Parameters:

Name Type Description Default
method Literal['label', 'onehot', 'ordinal', 'frequency']

Encoding method

'label'
categories list[str] | None

Pre-defined category order (for ordinal encoding)

None
handle_unknown Literal['error', 'use_default', 'ignore']

How to handle unknown categories: - "error": Raise error (default) - "use_default": Use unknown_value - "ignore": Skip/remove unknown values

'error'
unknown_value int | list[int] | None

Value for unknown categories (for handle_unknown="use_default")

None

Raises:

Type Description
EncodingError

If method is invalid

Source code in src/dspu/ml/encoding.py
def __init__(
    self,
    method: Literal["label", "onehot", "ordinal", "frequency"] = "label",
    categories: list[str] | None = None,
    handle_unknown: Literal["error", "use_default", "ignore"] = "error",
    unknown_value: int | list[int] | None = None,
):
    """Initialize encoder.

    Args:
        method: Encoding method
        categories: Pre-defined category order (for ordinal encoding)
        handle_unknown: How to handle unknown categories:
            - "error": Raise error (default)
            - "use_default": Use unknown_value
            - "ignore": Skip/remove unknown values
        unknown_value: Value for unknown categories (for handle_unknown="use_default")

    Raises:
        EncodingError: If method is invalid
    """
    valid_methods = {"label", "onehot", "ordinal", "frequency"}
    if method not in valid_methods:
        raise EncodingError(f"Invalid method: {method}. Use one of {valid_methods}")

    valid_unknown_strategies = {"error", "use_default", "ignore"}
    if handle_unknown not in valid_unknown_strategies:
        raise EncodingError(
            f"Invalid handle_unknown: {handle_unknown}. Use one of {valid_unknown_strategies}"
        )

    self.method = method
    self.handle_unknown = handle_unknown
    self.unknown_value = unknown_value
    self._is_fitted = False

    # Attribute declarations
    self._categories: list[str] | None
    self._category_to_int: dict[str, int] | None
    self._category_frequencies: dict[str, float] | None = None

    # For ordinal encoding, use provided category order
    if method == "ordinal":
        if categories is None:
            raise EncodingError("Must provide categories for ordinal encoding")
        self._categories = categories
        self._category_to_int = {cat: i for i, cat in enumerate(categories)}
    else:
        self._categories = categories
        self._category_to_int = None

Functions

fit

fit(categories: list[str]) -> Encoder

Fit encoder by learning category mappings.

Parameters:

Name Type Description Default
categories list[str]

Training categories

required

Returns:

Type Description
Encoder

self (for method chaining)

Raises:

Type Description
EncodingError

If data is invalid

Example

encoder = Encoder(method="label") encoder.fit(["A", "B", "A", "C"])

Source code in src/dspu/ml/encoding.py
def fit(self, categories: list[str]) -> "Encoder":
    """Fit encoder by learning category mappings.

    Args:
        categories: Training categories

    Returns:
        self (for method chaining)

    Raises:
        EncodingError: If data is invalid

    Example:
        >>> encoder = Encoder(method="label")
        >>> encoder.fit(["A", "B", "A", "C"])
    """
    if not categories:
        raise EncodingError("Cannot fit on empty data")

    if self.method == "ordinal":
        # Categories already set in __init__
        pass
    elif self.method in {"label", "onehot"}:
        # Learn unique categories (sorted for consistency)
        unique_cats = sorted(set(categories))
        self._categories = unique_cats
        self._category_to_int = {cat: i for i, cat in enumerate(unique_cats)}
    elif self.method == "frequency":
        # Compute frequency of each category
        counts = Counter(categories)
        total = len(categories)
        self._category_frequencies = {cat: count / total for cat, count in counts.items()}
        self._categories = list(counts.keys())

    self._is_fitted = True
    return self

transform

transform(categories: list[str]) -> list[Any]

Transform categories using fitted encoding.

Parameters:

Name Type Description Default
categories list[str]

Categories to encode

required

Returns:

Type Description
list[Any]

Encoded values (type depends on encoding method)

Raises:

Type Description
EncodingError

If encoder not fitted or unknown category encountered

Example

encoder = Encoder(method="label") encoder.fit(["A", "B", "C"]) encoder.transform(["A", "C", "B"]) [0, 2, 1]

Source code in src/dspu/ml/encoding.py
def transform(self, categories: list[str]) -> list[Any]:
    """Transform categories using fitted encoding.

    Args:
        categories: Categories to encode

    Returns:
        Encoded values (type depends on encoding method)

    Raises:
        EncodingError: If encoder not fitted or unknown category encountered

    Example:
        >>> encoder = Encoder(method="label")
        >>> encoder.fit(["A", "B", "C"])
        >>> encoder.transform(["A", "C", "B"])
        [0, 2, 1]
    """
    if not self._is_fitted:
        raise EncodingError("Encoder not fitted. Call fit() first.")

    if not categories:
        return []

    encoded = []

    for cat in categories:
        if self.method == "label":
            encoded_val = self._encode_label(cat)
            if encoded_val is not None:
                encoded.append(encoded_val)

        elif self.method == "onehot":
            encoded_val = self._encode_onehot(cat)
            if encoded_val is not None:
                encoded.append(encoded_val)

        elif self.method == "ordinal":
            encoded_val = self._encode_ordinal(cat)
            if encoded_val is not None:
                encoded.append(encoded_val)

        elif self.method == "frequency":
            encoded_val = self._encode_frequency(cat)
            if encoded_val is not None:
                encoded.append(encoded_val)

    return encoded

fit_transform

fit_transform(categories: list[str]) -> list[Any]

Fit encoder and transform categories in one step.

Parameters:

Name Type Description Default
categories list[str]

Training categories

required

Returns:

Type Description
list[Any]

Encoded values

Example

encoder = Encoder(method="label") encoded = encoder.fit_transform(["A", "B", "A"])

Source code in src/dspu/ml/encoding.py
def fit_transform(self, categories: list[str]) -> list[Any]:
    """Fit encoder and transform categories in one step.

    Args:
        categories: Training categories

    Returns:
        Encoded values

    Example:
        >>> encoder = Encoder(method="label")
        >>> encoded = encoder.fit_transform(["A", "B", "A"])
    """
    self.fit(categories)
    return self.transform(categories)

inverse_transform

inverse_transform(encoded: list[int]) -> list[str]

Reverse encoding transformation.

Parameters:

Name Type Description Default
encoded list[int]

Encoded integers

required

Returns:

Type Description
list[str]

Original categories

Raises:

Type Description
EncodingError

If encoder not fitted or method doesn't support inverse

Example

encoder = Encoder(method="label") encoder.fit(["A", "B", "C"]) encoded = [0, 2, 1] encoder.inverse_transform(encoded) ['A', 'C', 'B']

Source code in src/dspu/ml/encoding.py
def inverse_transform(self, encoded: list[int]) -> list[str]:
    """Reverse encoding transformation.

    Args:
        encoded: Encoded integers

    Returns:
        Original categories

    Raises:
        EncodingError: If encoder not fitted or method doesn't support inverse

    Example:
        >>> encoder = Encoder(method="label")
        >>> encoder.fit(["A", "B", "C"])
        >>> encoded = [0, 2, 1]
        >>> encoder.inverse_transform(encoded)
        ['A', 'C', 'B']
    """
    if not self._is_fitted:
        raise EncodingError("Encoder not fitted. Call fit() first.")

    if self.method not in {"label", "ordinal"}:
        raise EncodingError(f"inverse_transform not supported for method={self.method}")

    # Type narrowing: after fit(), _category_to_int is guaranteed to be set
    assert self._category_to_int is not None
    # Create reverse mapping
    int_to_category = {v: k for k, v in self._category_to_int.items()}

    categories = []
    for code in encoded:
        if code in int_to_category:
            categories.append(int_to_category[code])
        else:
            raise EncodingError(f"Unknown code: {code}")

    return categories

get_feature_names

get_feature_names(input_feature: str = 'x') -> list[str]

Get feature names for one-hot encoding.

Parameters:

Name Type Description Default
input_feature str

Name of input feature

'x'

Returns:

Type Description
list[str]

List of feature names (one per category)

Raises:

Type Description
EncodingError

If method is not one-hot

Example

encoder = Encoder(method="onehot") encoder.fit(["A", "B", "C"]) encoder.get_feature_names("category") ['category_A', 'category_B', 'category_C']

Source code in src/dspu/ml/encoding.py
def get_feature_names(self, input_feature: str = "x") -> list[str]:
    """Get feature names for one-hot encoding.

    Args:
        input_feature: Name of input feature

    Returns:
        List of feature names (one per category)

    Raises:
        EncodingError: If method is not one-hot

    Example:
        >>> encoder = Encoder(method="onehot")
        >>> encoder.fit(["A", "B", "C"])
        >>> encoder.get_feature_names("category")
        ['category_A', 'category_B', 'category_C']
    """
    if self.method != "onehot":
        raise EncodingError("get_feature_names only available for method='onehot'")

    if not self._is_fitted:
        raise EncodingError("Encoder not fitted. Call fit() first.")

    assert self._categories is not None  # Guaranteed after fit()
    return [f"{input_feature}_{cat}" for cat in self._categories]

save_state

save_state() -> dict[str, Any]

Save encoder state for serialization.

Returns:

Type Description
dict[str, Any]

Dictionary containing encoder configuration and mappings

Raises:

Type Description
EncodingError

If encoder not fitted

Example

encoder = Encoder(method="label") encoder.fit(["A", "B"]) state = encoder.save_state()

Source code in src/dspu/ml/encoding.py
def save_state(self) -> dict[str, Any]:
    """Save encoder state for serialization.

    Returns:
        Dictionary containing encoder configuration and mappings

    Raises:
        EncodingError: If encoder not fitted

    Example:
        >>> encoder = Encoder(method="label")
        >>> encoder.fit(["A", "B"])
        >>> state = encoder.save_state()
    """
    if not self._is_fitted:
        raise EncodingError("Cannot save state of unfitted encoder")

    state: dict[str, Any] = {
        "method": self.method,
        "handle_unknown": self.handle_unknown,
        "unknown_value": self.unknown_value,
        "categories": self._categories,
    }

    if self.method in {"label", "onehot", "ordinal"}:
        state["category_to_int"] = self._category_to_int

    if self.method == "frequency":
        state["category_frequencies"] = self._category_frequencies

    return state

from_state classmethod

from_state(state: dict[str, Any]) -> Encoder

Load encoder from saved state.

Parameters:

Name Type Description Default
state dict[str, Any]

Dictionary from save_state()

required

Returns:

Type Description
Encoder

Configured encoder

Example

state = {"method": "label", "categories": ["A", "B"], ...} encoder = Encoder.from_state(state)

Source code in src/dspu/ml/encoding.py
@classmethod
def from_state(cls, state: dict[str, Any]) -> "Encoder":
    """Load encoder from saved state.

    Args:
        state: Dictionary from save_state()

    Returns:
        Configured encoder

    Example:
        >>> state = {"method": "label", "categories": ["A", "B"], ...}
        >>> encoder = Encoder.from_state(state)
    """
    method = state["method"]
    handle_unknown = state["handle_unknown"]
    unknown_value = state["unknown_value"]

    if method == "ordinal":
        encoder = cls(
            method=method,
            categories=state["categories"],
            handle_unknown=handle_unknown,
            unknown_value=unknown_value,
        )
    else:
        encoder = cls(
            method=method,
            handle_unknown=handle_unknown,
            unknown_value=unknown_value,
        )
        encoder._categories = state["categories"]

    if method in {"label", "onehot", "ordinal"}:
        encoder._category_to_int = state["category_to_int"]

    if method == "frequency":
        encoder._category_frequencies = state["category_frequencies"]

    encoder._is_fitted = True
    return encoder

save_to_file

save_to_file(filepath: str) -> None

Save encoder state to JSON file.

Parameters:

Name Type Description Default
filepath str

Path to output file

required
Example

encoder.save_to_file("encoder.json")

Source code in src/dspu/ml/encoding.py
def save_to_file(self, filepath: str) -> None:
    """Save encoder state to JSON file.

    Args:
        filepath: Path to output file

    Example:
        >>> encoder.save_to_file("encoder.json")
    """
    state = self.save_state()
    with open(filepath, "w") as f:
        json.dump(state, f, indent=2)

load_from_file classmethod

load_from_file(filepath: str) -> Encoder

Load encoder from JSON file.

Parameters:

Name Type Description Default
filepath str

Path to state file

required

Returns:

Type Description
Encoder

Configured encoder

Example

encoder = Encoder.load_from_file("encoder.json")

Source code in src/dspu/ml/encoding.py
@classmethod
def load_from_file(cls, filepath: str) -> "Encoder":
    """Load encoder from JSON file.

    Args:
        filepath: Path to state file

    Returns:
        Configured encoder

    Example:
        >>> encoder = Encoder.load_from_file("encoder.json")
    """
    with open(filepath) as f:
        state = json.load(f)
    return cls.from_state(state)

Exceptions

dspu.ml.random.RandomError

Bases: DSPUError

Raised when random number generation fails.

dspu.ml.identifiers.IDError

Bases: DSPUError

Raised when ID generation fails.

dspu.ml.splits.SplitError

Bases: DSPUError

Raised when data splitting fails.

dspu.ml.stats.StatsError

Bases: DSPUError

Raised when statistical computation fails.

dspu.ml.scaling.ScalingError

Bases: DSPUError

Raised when feature scaling fails.

dspu.ml.encoding.EncodingError

Bases: DSPUError

Raised when categorical encoding fails.

Usage Examples

Reproducible ML Pipeline

from dspu.ml import SeedManager, DataSplitter, Scaler, make_classification_data

# Set seed
SeedManager.set_global_seed(42)

# Generate data
X, y = make_classification_data(n_samples=1000, n_features=10)

# Split
X_train, X_test, y_train, y_test = DataSplitter.train_test_split(
    X, y, test_size=0.2, stratify=y
)

# Scale
scaler = Scaler(method="standard")
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)

# Save for production
scaler.save_to_file("scaler.json")

Data Splitting Strategies

from dspu.ml import DataSplitter

# Stratified K-fold (preserves class distribution)
folds = DataSplitter.stratified_kfold(y, n_splits=5)
for train_idx, val_idx in folds:
    X_train = [X[i] for i in train_idx]
    X_val = [X[i] for i in val_idx]
    # Train and validate

# Time series split (no future leakage)
splits = DataSplitter.time_series_split(X, n_splits=5)
for train_idx, val_idx in splits:
    assert max(train_idx) < min(val_idx)  # Chronological order

# Group split (prevent leakage by group)
X_train, X_test, y_train, y_test = DataSplitter.group_split(
    X, groups=patient_ids, y=y, test_size=0.25
)

A/B Testing

from dspu.ml import Stats

# Analyze A/B test
result = Stats.ab_test_uplift(
    group_a=[0.10, 0.11, 0.09, 0.10, 0.12],  # Control conversion rates
    group_b=[0.15, 0.16, 0.14, 0.15, 0.17],  # Treatment conversion rates
    n_bootstrap=1000
)

print(f"Uplift: {result['relative_uplift']*100:.1f}%")
print(f"95% CI: [{result['uplift_ci_lower']:.3f}, {result['uplift_ci_upper']:.3f}]")

if result['uplift_ci_lower'] > 0:
    print("Statistically significant improvement!")

See Also