Source code for maldiamrkit.differential.analysis

"""DifferentialAnalysis: per-bin differential peak testing between R and S groups."""

from __future__ import annotations

from enum import Enum
from typing import TYPE_CHECKING

import numpy as np
import pandas as pd

from .stats import (
    _compute_effect_size,
    _compute_fold_change,
    _correct_pvalues,
    _mann_whitney_test,
    _t_test,
)

if TYPE_CHECKING:
    from maldiamrkit.dataset import MaldiSet
    from maldiamrkit.detection import MaldiPeakDetector


MzRange = tuple[float, float]


[docs] class StatisticalTest(str, Enum): """Supported statistical tests for :meth:`~maldiamrkit.differential.DifferentialAnalysis.run`. Attributes ---------- mann_whitney : str Two-sided Mann-Whitney U test (non-parametric). t_test : str Welch's two-sample t-test (unequal variances). """ mann_whitney = "mann_whitney" t_test = "t_test"
[docs] class CorrectionMethod(str, Enum): """Supported multiple-testing corrections for :meth:`~maldiamrkit.differential.DifferentialAnalysis.run`. Attributes ---------- fdr_bh : str Benjamini-Hochberg false discovery rate. fdr_by : str Benjamini-Yekutieli false discovery rate. bonferroni : str Bonferroni family-wise correction. """ fdr_bh = "fdr_bh" fdr_by = "fdr_by" bonferroni = "bonferroni"
def _normalize_mz_ranges( mz_ranges: MzRange | list[MzRange], ) -> list[MzRange]: """Normalize input into a list of ``(low, high)`` tuples with ``low <= high``. Accepts a single ``(low, high)`` tuple or a list of such tuples. """ if ( isinstance(mz_ranges, tuple) and len(mz_ranges) == 2 and not isinstance(mz_ranges[0], (list, tuple)) ): ranges: list[MzRange] = [mz_ranges] else: ranges = [tuple(r) for r in mz_ranges] # type: ignore[arg-type] out: list[MzRange] = [] for r in ranges: if len(r) != 2: raise ValueError(f"Each m/z range must be a (low, high) tuple; got {r!r}.") low, high = float(r[0]), float(r[1]) if low > high: low, high = high, low out.append((low, high)) return out def _mz_range_mask(columns: pd.Index, mz_ranges: MzRange | list[MzRange]) -> np.ndarray: """Boolean mask of columns whose numeric m/z label falls in any range. Non-numeric column labels are excluded (mask entry is ``False``). """ ranges = _normalize_mz_ranges(mz_ranges) mz_values = pd.to_numeric(pd.Index(columns), errors="coerce").to_numpy() mask = np.zeros(len(mz_values), dtype=bool) valid = ~np.isnan(mz_values) for low, high in ranges: mask |= valid & (mz_values >= low) & (mz_values <= high) return mask
[docs] class DifferentialAnalysis: """Identify discriminative m/z peaks between resistant and susceptible groups. Given a binned feature matrix and binary labels (0 = susceptible, 1 = resistant), the analysis iterates over each m/z bin and computes a statistical test, a log2 fold change, and Cohen's d effect size comparing the two groups. Multiple-testing correction is applied across bins. Parameters ---------- X : pd.DataFrame Feature matrix of shape ``(n_samples, n_features)``. Column names are m/z bin identifiers (numeric or string). y : pd.Series or ndarray Binary labels aligned with ``X`` rows: ``0`` = susceptible, ``1`` = resistant. Any sample with a missing / NaN label is dropped before analysis. Attributes ---------- X : pd.DataFrame Feature matrix (possibly subset to rows with non-missing labels). y : pd.Series Labels aligned with ``X``. results : pd.DataFrame or None Populated by :meth:`run` with columns ``mz_bin``, ``mean_r``, ``mean_s``, ``fold_change``, ``p_value``, ``adjusted_p_value``, ``effect_size``. Examples -------- >>> analysis = DifferentialAnalysis(X, y).run() >>> analysis.top_peaks(n=10) >>> analysis.significant_peaks(fc_threshold=1.0, p_threshold=0.05) """
[docs] def __init__(self, X: pd.DataFrame, y: pd.Series | np.ndarray) -> None: if not isinstance(X, pd.DataFrame): raise TypeError("X must be a pandas DataFrame.") if isinstance(y, pd.Series): y_series = y.copy() if not y_series.index.equals(X.index): y_series = y_series.reindex(X.index) else: y_arr = np.asarray(y) if y_arr.shape[0] != X.shape[0]: raise ValueError( f"Length of y ({y_arr.shape[0]}) does not match " f"number of rows in X ({X.shape[0]})." ) y_series = pd.Series(y_arr, index=X.index) mask = y_series.notna() if not mask.all(): y_series = y_series[mask] X = X.loc[y_series.index] y_numeric = pd.to_numeric(y_series, errors="coerce") if y_numeric.isna().any(): raise ValueError("y contains values that cannot be cast to numeric labels.") unique = np.unique(np.asarray(y_numeric.values)) if not bool(np.all(np.isin(unique, [0, 1]))): raise ValueError( f"y must contain only binary labels 0 and 1; got {unique.tolist()}." ) self.X: pd.DataFrame = X self.y: pd.Series = y_numeric.astype(int) self._results: pd.DataFrame | None = None
[docs] @classmethod def from_maldi_set( cls, maldi_set: MaldiSet, antibiotic: str | None = None ) -> DifferentialAnalysis: """Build a :class:`DifferentialAnalysis` from a :class:`MaldiSet`. Extracts the feature matrix via ``maldi_set.X`` and labels via ``maldi_set.get_y_single(antibiotic)``. Parameters ---------- maldi_set : MaldiSet Dataset providing ``X`` and ``get_y_single``. antibiotic : str or None, default=None Antibiotic label to analyse. If ``None``, the first configured antibiotic is used. Returns ------- DifferentialAnalysis Unrun analysis (call :meth:`run` next). """ X = maldi_set.X y = maldi_set.get_y_single(antibiotic) return cls(X, y)
[docs] def run( self, test: str | StatisticalTest = StatisticalTest.mann_whitney, correction: str | CorrectionMethod = CorrectionMethod.fdr_bh, mz_ranges: MzRange | list[MzRange] | None = None, peak_detector: MaldiPeakDetector | None = None, ) -> DifferentialAnalysis: """Run per-bin statistical analysis. For each kept column of ``X``, splits samples by label, computes the requested test statistic and p-value, the log2 fold change of group means, and Cohen's d. Multiple-testing correction is then applied across the kept bins and the result is stored in :attr:`results`. Pre-test filters reduce the number of hypotheses - this is often decisive on small datasets where a full 1k-10k bin scan would exceed FDR power. Parameters ---------- test : {"mann_whitney", "t_test"} or StatisticalTest Statistical test to apply per bin. correction : {"fdr_bh", "fdr_by", "bonferroni"} or CorrectionMethod Multiple-testing correction. mz_ranges : tuple, list of tuples, or None, default=None Restrict analysis to bins whose m/z value falls within the given range(s). Pass a single ``(low, high)`` tuple or a list of such tuples for a union of intervals. Endpoints are inclusive. Column labels are coerced to ``float`` for range comparison; non-numeric columns are excluded. ``None`` disables the filter. peak_detector : MaldiPeakDetector or None, default=None Restrict analysis to bins that are peaks in at least one sample according to the provided detector. The detector's ``fit_transform`` is run on the (range-filtered) feature matrix and any bin that is non-zero in at least one row is kept. ``None`` disables the filter. Returns ------- DifferentialAnalysis ``self``, for method chaining. Raises ------ ValueError If ``y`` does not contain both classes, or if the combined filters leave no bins to test. """ test = StatisticalTest(test) correction = CorrectionMethod(correction) y_arr = np.asarray(self.y.values) r_mask = y_arr == 1 s_mask = y_arr == 0 if not bool(r_mask.any()) or not bool(s_mask.any()): raise ValueError( "DifferentialAnalysis requires at least one sample in each class " "(0 = susceptible, 1 = resistant)." ) keep_mask = self._build_feature_mask(mz_ranges, peak_detector) if not bool(keep_mask.any()): raise ValueError( "No bins remain after applying 'mz_ranges' / 'peak_detector' filters." ) kept_columns = self.X.columns[keep_mask] X_values = self.X.values[:, keep_mask].astype(float, copy=False) X_r = X_values[r_mask] X_s = X_values[s_mask] mean_r = X_r.mean(axis=0) mean_s = X_s.mean(axis=0) fold_change = _compute_fold_change(mean_r, mean_s) if test == StatisticalTest.mann_whitney: test_fn = _mann_whitney_test else: test_fn = _t_test n_features = X_values.shape[1] p_values = np.empty(n_features, dtype=float) effect_sizes = np.empty(n_features, dtype=float) for j in range(n_features): col_r = X_r[:, j] col_s = X_s[:, j] _, p_values[j] = test_fn(col_r, col_s) effect_sizes[j] = _compute_effect_size(col_r, col_s) adjusted = _correct_pvalues(p_values, method=correction.value) self._results = pd.DataFrame( { "mz_bin": kept_columns.to_numpy(), "mean_r": mean_r, "mean_s": mean_s, "fold_change": fold_change, "p_value": p_values, "adjusted_p_value": adjusted, "effect_size": effect_sizes, } ) return self
def _build_feature_mask( self, mz_ranges: MzRange | list[MzRange] | None, peak_detector: MaldiPeakDetector | None, ) -> np.ndarray: """Combine m/z range and peak-detector masks into one boolean array. Returns an array of shape ``(n_features,)`` with ``True`` for columns of :attr:`X` that should be included in the test. """ n_features = self.X.shape[1] mask = np.ones(n_features, dtype=bool) if mz_ranges is not None: mask &= _mz_range_mask(self.X.columns, mz_ranges) if peak_detector is not None: if not bool(mask.any()): return mask sub = self.X.loc[:, mask] detected = peak_detector.fit_transform(sub) peak_mask_sub = (detected.to_numpy() != 0).any(axis=0) full_peak_mask = np.zeros(n_features, dtype=bool) full_peak_mask[np.flatnonzero(mask)] = peak_mask_sub mask &= full_peak_mask return mask @property def results(self) -> pd.DataFrame: """Per-bin results table. Returns ------- pd.DataFrame Columns: ``mz_bin``, ``mean_r``, ``mean_s``, ``fold_change``, ``p_value``, ``adjusted_p_value``, ``effect_size``. Raises ------ RuntimeError If :meth:`run` has not been called yet. """ if self._results is None: raise RuntimeError( "DifferentialAnalysis has not been run yet. Call .run() first." ) return self._results.copy()
[docs] def top_peaks(self, n: int = 20) -> pd.DataFrame: """Return the top ``n`` peaks sorted by adjusted p-value ascending. Parameters ---------- n : int, default=20 Number of peaks to return. Returns ------- pd.DataFrame Sub-table with the ``n`` lowest adjusted p-values. """ df = self.results return df.sort_values( "adjusted_p_value", ascending=True, kind="mergesort" ).head(n)
[docs] def significant_peaks( self, fc_threshold: float = 1.0, p_threshold: float = 0.05 ) -> pd.DataFrame: """Return peaks passing both fold-change and adjusted p-value filters. Parameters ---------- fc_threshold : float, default=1.0 Absolute log2 fold-change threshold (inclusive). p_threshold : float, default=0.05 Adjusted p-value threshold (inclusive). Returns ------- pd.DataFrame Peaks where ``|fold_change| >= fc_threshold`` AND ``adjusted_p_value <= p_threshold``. """ df = self.results mask = (df["fold_change"].abs() >= fc_threshold) & ( df["adjusted_p_value"] <= p_threshold ) return df.loc[mask].reset_index(drop=True)
[docs] @staticmethod def compare_drugs( analyses: dict[str, DifferentialAnalysis], fc_threshold: float = 1.0, p_threshold: float = 0.05, ) -> pd.DataFrame: """Build a boolean significance matrix across multiple drug analyses. Parameters ---------- analyses : dict[str, DifferentialAnalysis] Mapping from drug name to a fitted :class:`DifferentialAnalysis`. fc_threshold : float, default=1.0 Absolute log2 fold-change threshold for significance. p_threshold : float, default=0.05 Adjusted p-value threshold for significance. Returns ------- pd.DataFrame Boolean matrix indexed by the union of significant m/z bins across all drugs; columns are drug names; ``True`` indicates the peak is significant for that drug. Raises ------ ValueError If *analyses* is empty. """ if not analyses: raise ValueError("'analyses' must contain at least one entry.") per_drug: dict[str, pd.Series] = {} for drug, analysis in analyses.items(): sig = analysis.significant_peaks( fc_threshold=fc_threshold, p_threshold=p_threshold ) per_drug[drug] = pd.Series( True, index=pd.Index(sig["mz_bin"].to_numpy(), name="mz_bin") ) if not per_drug: return pd.DataFrame() union_index = pd.Index([], name="mz_bin") for series in per_drug.values(): union_index = union_index.union(series.index) data = { drug: series.reindex(union_index, fill_value=False).astype(bool).values for drug, series in per_drug.items() } return pd.DataFrame(data, index=union_index)