Source code for maldiamrkit.preprocessing.merging
"""Spectral replicate merging for MALDI-TOF spectra.
Clinical workflows often acquire multiple replicates per isolate.
This module provides functions to merge replicates into a single
consensus spectrum and detect outlier replicates.
Examples
--------
>>> from maldiamrkit import MaldiSpectrum
>>> from maldiamrkit.preprocessing.merging import (
... merge_replicates, detect_outlier_replicates,
... )
>>> reps = [MaldiSpectrum(f"rep{i}.txt") for i in range(3)]
>>> merged = merge_replicates(reps, method="mean")
>>> keep = detect_outlier_replicates(reps)
>>> clean = [s for s, ok in zip(reps, keep) if ok]
"""
from __future__ import annotations
from enum import Enum
from typing import TYPE_CHECKING
import numpy as np
import pandas as pd
[docs]
class MergingMethod(str, Enum):
"""Supported replicate merging methods.
Attributes
----------
mean : str
Arithmetic mean (optionally weighted).
median : str
Element-wise median.
"""
mean = "mean"
median = "median"
if TYPE_CHECKING:
from maldiamrkit.spectrum import MaldiSpectrum
def _to_common_grid(
spectra: list[pd.DataFrame],
) -> tuple[np.ndarray, np.ndarray]:
"""Interpolate spectra onto a common m/z grid.
Parameters
----------
spectra : list of pd.DataFrame
Each with ``mass`` and ``intensity`` columns.
Returns
-------
common_mz : np.ndarray
Sorted union of all m/z values.
matrix : np.ndarray
Intensity matrix of shape ``(n_spectra, len(common_mz))``.
"""
# Fast path: skip interpolation when all spectra share the same grid
grids = [s["mass"].values for s in spectra]
if all(np.array_equal(grids[0], g) for g in grids[1:]):
common_mz = grids[0]
matrix = np.array([s["intensity"].values for s in spectra])
return common_mz, matrix
common_mz = np.unique(np.concatenate(grids))
matrix = np.empty((len(spectra), len(common_mz)))
for i, s in enumerate(spectra):
matrix[i] = np.interp(common_mz, s["mass"].values, s["intensity"].values)
return common_mz, matrix
[docs]
def merge_replicates(
spectra: list[MaldiSpectrum],
method: str | MergingMethod = MergingMethod.mean,
weights: np.ndarray | list[float] | None = None,
) -> pd.DataFrame:
"""Merge replicate spectra into a single consensus spectrum.
Parameters
----------
spectra : list of MaldiSpectrum
Replicate spectra to merge.
method : str, default="mean"
Merging strategy:
- ``"mean"``: arithmetic mean (or weighted mean if ``weights``
is provided).
- ``"median"``: element-wise median (``weights`` is ignored).
weights : array-like of float, optional
Per-replicate weights for the ``"mean"`` method (e.g. SNR
values). Ignored when ``method="median"``. Must have the
same length as ``spectra``.
Returns
-------
pd.DataFrame
Merged spectrum with ``mass`` and ``intensity`` columns.
Raises
------
ValueError
If *spectra* is empty, *method* is invalid, or *weights*
length does not match *spectra*.
"""
if not spectra:
raise ValueError("spectra must not be empty.")
method = MergingMethod(method)
dfs = [s.raw for s in spectra]
if len(dfs) == 1:
return dfs[0].copy()
common_mz, matrix = _to_common_grid(dfs)
if method == "mean":
if weights is not None:
w = np.asarray(weights, dtype=float)
if len(w) != len(spectra):
raise ValueError(
f"weights length ({len(w)}) must match "
f"spectra length ({len(spectra)})."
)
merged = np.average(matrix, axis=0, weights=w)
else:
merged = np.mean(matrix, axis=0)
else: # median
merged = np.median(matrix, axis=0)
return pd.DataFrame({"mass": common_mz, "intensity": merged})
[docs]
def detect_outlier_replicates(
spectra: list[MaldiSpectrum],
threshold: float = 3.0,
) -> np.ndarray:
"""Identify outlier replicates using correlation with the median spectrum.
Computes the Pearson correlation of each replicate against the
element-wise median spectrum. Replicates whose correlation falls
below ``median(corrs) - threshold * MAD(corrs)`` are flagged as
outliers.
Parameters
----------
spectra : list of MaldiSpectrum
Replicate spectra.
threshold : float, default=3.0
Number of MAD units below the median correlation to flag a
replicate as an outlier.
Returns
-------
np.ndarray
Boolean array of length ``len(spectra)``. ``True`` means the
replicate is kept; ``False`` means it is an outlier.
Raises
------
ValueError
If *spectra* has fewer than 3 elements (need at least 3 to
estimate spread).
"""
if len(spectra) < 3:
raise ValueError(
f"Need at least 3 replicates for outlier detection, got {len(spectra)}."
)
dfs = [s.raw for s in spectra]
common_mz, matrix = _to_common_grid(dfs)
median_spectrum = np.median(matrix, axis=0)
corrs = np.array([np.corrcoef(row, median_spectrum)[0, 1] for row in matrix])
med_corr = np.median(corrs)
mad_corr = np.median(np.abs(corrs - med_corr))
if mad_corr == 0:
# All correlations identical - no outliers
return np.ones(len(spectra), dtype=bool)
# The 1.4826 MAD-to-sigma factor assumes normally distributed
# correlations (Rousseeuw & Croux 1993). Pearson correlations are
# bounded in [-1, 1], so clamp the cutoff to avoid an unreachable
# threshold that would mask all outliers.
cutoff = max(med_corr - threshold * 1.4826 * mad_corr, -1.0)
return corrs >= cutoff