Source code for maldiamrkit.drift.monitor

"""DriftMonitor: orchestrator for temporal drift analysis over a MaldiSet."""

from __future__ import annotations

import warnings
from typing import TYPE_CHECKING, Any

import numpy as np
import pandas as pd

from maldiamrkit.visualization.exploratory_plots import _reduce_dimensions

from .pca_drift import _pca_drift_timeseries
from .peak_drift import (
    _effect_size_timeseries,
    _peak_stability_timeseries,
    _top_peaks_set,
)
from .reference import _compute_reference_spectrum, _reference_similarity_timeseries

if TYPE_CHECKING:
    from maldiamrkit.dataset import MaldiSet
    from maldiamrkit.differential import DifferentialAnalysis


[docs] class DriftMonitor: """Monitor spectral drift over time using baseline-anchored metrics. Establishes a baseline from the earliest timestamps, then quantifies drift for later time windows via three complementary views: - reference similarity (distance to baseline median spectrum) - PCA centroid trajectory (baseline-fitted PCA space) - peak-selection stability (Jaccard overlap of top-k discriminative peaks per window vs. baseline) and Cohen's d tracking of specific peaks Output is data + plots only; no automated alerts. Parameters ---------- time_column : str Metadata column containing timestamps (parsed via :func:`pandas.to_datetime`). window : str or pd.Timedelta, default="30D" Time window size for :class:`pandas.Grouper` (e.g. ``"30D"``, ``"7D"``). baseline_end : str, pd.Timestamp, or None, default=None End of the baseline period (inclusive). If ``None``, defaults to the timestamp at the 20th percentile of sorted timestamps. metric : str, default="cosine" Distance metric for reference similarity (see :func:`maldiamrkit.similarity.spectral_distance`). n_components : int, default=2 PCA components for PCA-drift monitoring. min_samples : int, default=5 Skip time windows with fewer spectra than this (and, for peak-stability / effect-size monitoring, fewer than this many samples in either the R or S class). """
[docs] def __init__( self, time_column: str, window: str | pd.Timedelta = "30D", baseline_end: str | pd.Timestamp | None = None, metric: str = "cosine", n_components: int = 2, min_samples: int = 5, ) -> None: self.time_column = time_column self.window = window self.baseline_end = baseline_end self.metric = metric self.n_components = n_components self.min_samples = min_samples self._reference: np.ndarray | None = None self._reducer: Any = None self._baseline_end_ts: pd.Timestamp | None = None
[docs] def fit(self, maldi_set: MaldiSet) -> DriftMonitor: """Establish the baseline reference and PCA space. Parameters ---------- maldi_set : MaldiSet Dataset exposing ``.X`` and ``.meta`` with ``time_column``. Returns ------- DriftMonitor ``self``, for chaining. """ X, ts = self._extract_X_and_timestamps(maldi_set) baseline_end = self._resolve_baseline_end(ts) baseline_mask = ts <= baseline_end n_baseline = int(baseline_mask.sum()) if n_baseline < 2: raise ValueError( f"Baseline contains only {n_baseline} sample(s); need at least 2 " "to fit a reference and PCA." ) X_base = X.loc[baseline_mask[baseline_mask].index] self._reference = _compute_reference_spectrum(X_base) n_components = min(self.n_components, X_base.shape[0], X_base.shape[1]) _, self._reducer = _reduce_dimensions( X_base, method="pca", n_components=n_components ) self._baseline_end_ts = baseline_end return self
@property def reference_(self) -> np.ndarray: """Baseline reference spectrum (read-only).""" self._check_fitted() assert self._reference is not None return self._reference.copy() @property def baseline_end_(self) -> pd.Timestamp: """Timestamp used as the (inclusive) baseline cut-off.""" self._check_fitted() assert self._baseline_end_ts is not None return self._baseline_end_ts
[docs] def monitor(self, maldi_set: MaldiSet) -> pd.DataFrame: """Reference-similarity timeseries. Only spectra with ``timestamp > baseline_end_`` are monitored; the baseline is reserved as a reference and excluded from windows to avoid a self-reference artefact in the first window. Returns a DataFrame with columns ``window_start``, ``window_end``, ``n_spectra``, ``distance_to_reference``. """ self._check_fitted() assert self._reference is not None X, ts = self._extract_X_and_timestamps(maldi_set) X, ts = self._drop_baseline(X, ts) df, n_skipped = _reference_similarity_timeseries( X, ts, self._reference, window=self.window, metric=self.metric, min_samples=self.min_samples, ) self._warn_skipped(n_skipped, "monitor") return df
[docs] def monitor_pca(self, maldi_set: MaldiSet) -> pd.DataFrame: """PCA centroid + dispersion timeseries. Only post-baseline spectra (``timestamp > baseline_end_``) are included. Returns a DataFrame with columns ``window_start``, ``window_end``, ``centroid_pc1``, ``centroid_pc2``, ``dispersion``, ``n_spectra``. """ self._check_fitted() X, ts = self._extract_X_and_timestamps(maldi_set) X, ts = self._drop_baseline(X, ts) df, n_skipped = _pca_drift_timeseries( X, ts, self._reducer, window=self.window, min_samples=self.min_samples, ) self._warn_skipped(n_skipped, "monitor_pca") return df
[docs] def monitor_peak_stability( self, maldi_set: MaldiSet, differential_analysis: DifferentialAnalysis, antibiotic: str | None = None, n_top: int = 20, ) -> pd.DataFrame: """Peak-selection stability (Jaccard) timeseries. ``differential_analysis`` must already have been ``.run()``; its top-``n_top`` peaks define the baseline peak set. Only post-baseline spectra (``timestamp > baseline_end_``) are included in the monitored windows. Returns a DataFrame with columns ``window_start``, ``stability_score``, ``n_spectra``. """ self._check_fitted() X, ts = self._extract_X_and_timestamps(maldi_set) X, ts = self._drop_baseline(X, ts) y = maldi_set.get_y_single(antibiotic) y_numeric = pd.to_numeric(y, errors="coerce") valid = y_numeric.notna() X = X.loc[X.index.intersection(valid[valid].index)] ts = ts.loc[X.index] y_numeric = y_numeric.loc[X.index].astype(int) baseline_peaks = _top_peaks_set(differential_analysis, n_top=n_top) df, n_skipped = _peak_stability_timeseries( X, y_numeric, ts, baseline_peaks=baseline_peaks, window=self.window, n_top=n_top, min_samples=self.min_samples, ) self._warn_skipped(n_skipped, "monitor_peak_stability") return df
[docs] def monitor_effect_sizes( self, maldi_set: MaldiSet, peaks: list[str], antibiotic: str | None = None, ) -> pd.DataFrame: """Per-peak Cohen's d timeseries. Only post-baseline spectra (``timestamp > baseline_end_``) are included. Returns a DataFrame with ``window_start`` plus one column per requested peak (the peak's ``mz_bin`` label as a string). """ self._check_fitted() X, ts = self._extract_X_and_timestamps(maldi_set) X, ts = self._drop_baseline(X, ts) y = maldi_set.get_y_single(antibiotic) y_numeric = pd.to_numeric(y, errors="coerce") valid = y_numeric.notna() X = X.loc[X.index.intersection(valid[valid].index)] ts = ts.loc[X.index] y_numeric = y_numeric.loc[X.index].astype(int) df, n_skipped = _effect_size_timeseries( X, y_numeric, ts, peaks=peaks, window=self.window, min_samples=self.min_samples, ) self._warn_skipped(n_skipped, "monitor_effect_sizes") return df
def _drop_baseline( self, X: pd.DataFrame, ts: pd.Series ) -> tuple[pd.DataFrame, pd.Series]: """Return ``(X, ts)`` restricted to ``timestamp > baseline_end_``.""" assert self._baseline_end_ts is not None post_mask = ts > self._baseline_end_ts kept = post_mask[post_mask].index return X.loc[kept], ts.loc[kept] def _extract_X_and_timestamps( self, maldi_set: MaldiSet ) -> tuple[pd.DataFrame, pd.Series]: """Return aligned ``(X, parsed_timestamps)`` from a MaldiSet.""" X = maldi_set.X if self.time_column not in maldi_set.meta.columns: raise ValueError( f"'{self.time_column}' not found in maldi_set.meta columns." ) raw = maldi_set.meta.loc[X.index, self.time_column] try: ts = pd.to_datetime(raw, errors="raise") except (ValueError, TypeError) as exc: raise ValueError( f"Failed to parse metadata column '{self.time_column}' as " f"timestamps: {exc}" ) from exc if ts.isna().any(): n_bad = int(ts.isna().sum()) raise ValueError( f"'{self.time_column}' contains {n_bad} unparseable timestamp(s)." ) ts.index = X.index return X, ts def _resolve_baseline_end(self, ts: pd.Series) -> pd.Timestamp: """Return the configured or defaulted baseline end timestamp. When ``baseline_end`` is not set, returns the 20th-percentile timestamp using the ``"higher"`` interpolation method so the baseline contains ``ceil(0.2 * n)`` samples (at least one). """ if self.baseline_end is not None: return pd.Timestamp(self.baseline_end) ordered = ts.sort_values() n = len(ordered) idx = max(int(np.ceil(0.2 * n)) - 1, 0) return pd.Timestamp(ordered.iloc[idx]) def _check_fitted(self) -> None: if self._reference is None or self._reducer is None: raise RuntimeError("DriftMonitor.fit() must be called first.") def _warn_skipped(self, n: int, name: str) -> None: if n > 0: warnings.warn( f"DriftMonitor.{name}: skipped {n} window(s) with fewer than " f"min_samples={self.min_samples} spectra " "(or, for peak-stability / effect-size monitors, fewer than " f"{self.min_samples} samples in either class).", UserWarning, stacklevel=2, )