"""Individual preprocessing step transformers for MALDI-TOF spectra.
Each transformer is a callable that operates on a DataFrame with
``mass`` and ``intensity`` columns and returns a transformed DataFrame.
Examples
--------
>>> from maldiamrkit.preprocessing.transformers import (
... ClipNegatives, SqrtTransform, SavitzkyGolaySmooth,
... SNIPBaseline, MzTrimmer, TICNormalizer,
... )
>>> steps = [ClipNegatives(), SqrtTransform(), SavitzkyGolaySmooth()]
>>> df = steps[0](raw_df)
"""
from __future__ import annotations
import logging
import warnings
import numpy as np
import pandas as pd
from pybaselines import Baseline
from scipy.ndimage import grey_opening, median_filter, uniform_filter1d
from scipy.signal import savgol_filter
from scipy.spatial import ConvexHull
logger = logging.getLogger(__name__)
[docs]
class ClipNegatives:
"""Clip negative intensity values to zero."""
[docs]
def __call__(self, df: pd.DataFrame) -> pd.DataFrame:
"""Apply clipping to the spectrum."""
df = df.copy()
df["intensity"] = df["intensity"].clip(lower=0)
return df
[docs]
def to_dict(self) -> dict:
"""Serialize transformer to a dictionary."""
return {"name": "ClipNegatives"}
def __repr__(self) -> str:
return "ClipNegatives()"
[docs]
class SavitzkyGolaySmooth:
"""Savitzky-Golay smoothing filter.
Parameters
----------
window_length : int, default=21
Length of the filter window. Must be a positive odd integer
(per Savitzky & Golay 1964).
polyorder : int, default=2
Order of the polynomial used to fit the samples.
"""
[docs]
def __init__(self, window_length: int = 21, polyorder: int = 2):
self.window_length = window_length
self.polyorder = polyorder
[docs]
def __call__(self, df: pd.DataFrame) -> pd.DataFrame:
"""Apply Savitzky-Golay smoothing.
Raises
------
ValueError
If ``window_length`` exceeds the data length, or if
``window_length`` is not greater than ``polyorder``.
"""
n = len(df)
if self.window_length > n:
raise ValueError(
f"window_length ({self.window_length}) exceeds data length ({n})."
)
if self.window_length <= self.polyorder:
raise ValueError(
f"window_length ({self.window_length}) must be greater than "
f"polyorder ({self.polyorder})."
)
df = df.copy()
df["intensity"] = savgol_filter(
df["intensity"],
window_length=self.window_length,
polyorder=self.polyorder,
)
return df
[docs]
def to_dict(self) -> dict:
"""Serialize transformer to a dictionary."""
return {
"name": "SavitzkyGolaySmooth",
"window_length": self.window_length,
"polyorder": self.polyorder,
}
def __repr__(self) -> str:
return (
f"SavitzkyGolaySmooth(window_length={self.window_length}, "
f"polyorder={self.polyorder})"
)
[docs]
class SNIPBaseline:
"""SNIP (Statistics-sensitive Non-linear Iterative Peak-clipping) baseline correction.
Parameters
----------
half_window : int, default=40
Half-window size for the SNIP algorithm.
"""
[docs]
def __init__(self, half_window: int = 40):
self.half_window = half_window
[docs]
def __call__(self, df: pd.DataFrame) -> pd.DataFrame:
"""Apply SNIP baseline correction to the spectrum."""
df = df.copy()
bkg = Baseline(x_data=df["mass"]).snip(
df["intensity"],
max_half_window=self.half_window,
decreasing=True,
smooth_half_window=0,
)[0]
intensity = df["intensity"] - bkg
intensity[intensity < 0] = 0
df["intensity"] = intensity
return df
[docs]
def to_dict(self) -> dict:
"""Serialize transformer to a dictionary."""
return {"name": "SNIPBaseline", "half_window": self.half_window}
def __repr__(self) -> str:
return f"SNIPBaseline(half_window={self.half_window})"
[docs]
class TopHatBaseline:
"""Morphological top-hat baseline subtraction.
Estimates the baseline by morphological grey-level opening of the
intensity trace (erosion followed by dilation), then subtracts it
from the spectrum and clips negative values to zero.
Parameters
----------
half_window : int, default=100
Half-width of the structuring element in bins. The full
element size is ``2 * half_window + 1``. Must be a positive
integer.
Raises
------
ValueError
If ``half_window`` is not a positive integer or exceeds the
data length.
"""
[docs]
def __init__(self, half_window: int = 100):
self.half_window = half_window
[docs]
def __call__(self, df: pd.DataFrame) -> pd.DataFrame:
"""Apply top-hat baseline subtraction to the spectrum."""
if self.half_window < 1:
raise ValueError(
f"half_window ({self.half_window}) must be a positive integer."
)
n = len(df)
size = 2 * self.half_window + 1
if size > n:
raise ValueError(
f"Structuring element size ({size}) exceeds data length ({n})."
)
df = df.copy()
baseline = grey_opening(df["intensity"].to_numpy(), size=size)
df["intensity"] = np.maximum(0.0, df["intensity"].to_numpy() - baseline)
return df
[docs]
def to_dict(self) -> dict:
"""Serialize transformer to a dictionary."""
return {"name": "TopHatBaseline", "half_window": self.half_window}
def __repr__(self) -> str:
return f"TopHatBaseline(half_window={self.half_window})"
[docs]
class ConvexHullBaseline:
"""Parameter-free baseline from the lower convex hull of the spectrum.
Computes the convex hull of the ``(mass, intensity)`` points,
extracts the lower hull (vertices traversed in ascending mass
with minimum intensity), linearly interpolates it onto the full
m/z axis, and subtracts the resulting baseline from the spectrum.
Negative residuals are clipped to zero.
Notes
-----
Requires at least three distinct points to form a hull. For
shorter inputs the baseline is taken as the per-point minimum
of the first and last intensities (degenerate "flat" hull).
"""
[docs]
def __call__(self, df: pd.DataFrame) -> pd.DataFrame:
"""Apply convex-hull baseline subtraction to the spectrum."""
df = df.copy()
mz = df["mass"].to_numpy(dtype=float)
intensity = df["intensity"].to_numpy(dtype=float)
n = len(mz)
if n < 3 or not np.all(np.isfinite(intensity)):
df["intensity"] = np.maximum(
0.0, intensity - np.minimum.accumulate(intensity)
)
return df
points = np.column_stack([mz, intensity])
try:
hull = ConvexHull(points)
except Exception:
flat = float(np.nanmin(intensity))
df["intensity"] = np.maximum(0.0, intensity - flat)
return df
verts = hull.vertices
n_v = len(verts)
mz_v = mz[verts]
int_v = intensity[verts]
left = int(np.lexsort((int_v, mz_v))[0])
right = int(np.lexsort((-int_v, -mz_v))[0])
lower = []
i = left
while True:
lower.append(verts[i])
if i == right:
break
i = (i + 1) % n_v
lower_mz = mz[lower]
lower_int = intensity[lower]
if not np.all(np.diff(lower_mz) > 0):
order = np.argsort(lower_mz)
lower_mz = lower_mz[order]
lower_int = lower_int[order]
lower_mz, unique_idx = np.unique(lower_mz, return_index=True)
lower_int = lower_int[unique_idx]
baseline = np.interp(mz, lower_mz, lower_int)
df["intensity"] = np.maximum(0.0, intensity - baseline)
return df
[docs]
def to_dict(self) -> dict:
"""Serialize transformer to a dictionary."""
return {"name": "ConvexHullBaseline"}
def __repr__(self) -> str:
return "ConvexHullBaseline()"
[docs]
class MovingAverageSmooth:
"""Moving-average smoothing filter.
Applies a uniform (boxcar) moving average of length
``window_length`` using reflective boundary handling.
Parameters
----------
window_length : int, default=5
Length of the smoothing window. Must be an odd integer
greater than or equal to 3.
Raises
------
ValueError
If ``window_length`` is not an odd integer ``>= 3``, or if
it exceeds the data length.
"""
[docs]
def __init__(self, window_length: int = 5):
self.window_length = window_length
[docs]
def __call__(self, df: pd.DataFrame) -> pd.DataFrame:
"""Apply moving-average smoothing to the spectrum."""
if self.window_length < 3 or self.window_length % 2 == 0:
raise ValueError(
f"window_length ({self.window_length}) must be an odd integer >= 3."
)
n = len(df)
if self.window_length > n:
raise ValueError(
f"window_length ({self.window_length}) exceeds data length ({n})."
)
df = df.copy()
df["intensity"] = uniform_filter1d(
df["intensity"].to_numpy(dtype=float),
size=self.window_length,
mode="reflect",
)
return df
[docs]
def to_dict(self) -> dict:
"""Serialize transformer to a dictionary."""
return {"name": "MovingAverageSmooth", "window_length": self.window_length}
def __repr__(self) -> str:
return f"MovingAverageSmooth(window_length={self.window_length})"
[docs]
class MzTrimmer:
"""Trim spectrum to a specified m/z range.
Parameters
----------
mz_min : int, default=2000
Lower m/z bound in Daltons.
mz_max : int, default=20000
Upper m/z bound in Daltons.
Raises
------
ValueError
If ``mz_min`` is greater than or equal to ``mz_max``.
"""
[docs]
def __init__(self, mz_min: int = 2000, mz_max: int = 20000):
if mz_min >= mz_max:
raise ValueError(f"mz_min ({mz_min}) must be less than mz_max ({mz_max}).")
self.mz_min = mz_min
self.mz_max = mz_max
[docs]
def __call__(self, df: pd.DataFrame) -> pd.DataFrame:
"""Apply m/z trimming to the spectrum."""
return df[df["mass"].between(self.mz_min, self.mz_max)].reset_index(drop=True)
[docs]
def to_dict(self) -> dict:
"""Serialize transformer to a dictionary."""
return {
"name": "MzTrimmer",
"mz_min": self.mz_min,
"mz_max": self.mz_max,
}
def __repr__(self) -> str:
return f"MzTrimmer(mz_min={self.mz_min}, mz_max={self.mz_max})"
[docs]
class TICNormalizer:
"""Total Ion Current normalization (intensities sum to 1)."""
[docs]
def __call__(self, df: pd.DataFrame) -> pd.DataFrame:
"""Apply TIC normalization to the spectrum."""
df = df.copy()
total = df["intensity"].sum()
if total > 0:
df["intensity"] = df["intensity"] / total
else:
warnings.warn(
"TIC total is zero - spectrum has no signal. "
"This may indicate a failed acquisition.",
UserWarning,
stacklevel=2,
)
return df
[docs]
def to_dict(self) -> dict:
"""Serialize transformer to a dictionary."""
return {"name": "TICNormalizer"}
def __repr__(self) -> str:
return "TICNormalizer()"
[docs]
class PQNNormalizer:
"""Probabilistic Quotient Normalization.
First normalizes by TIC, then divides by the median of the quotient
spectrum (sample / reference). If no reference is provided, the
reference is the median spectrum across the dataset.
Parameters
----------
reference : np.ndarray, list, or None, default=None
Reference intensity vector. If None, uses TIC normalization only
(the full PQN requires a reference from the dataset). Lists are
converted to arrays internally.
"""
[docs]
def __init__(self, reference: np.ndarray | list | None = None):
if isinstance(reference, list):
reference = np.asarray(reference)
self.reference = reference
[docs]
def __call__(self, df: pd.DataFrame) -> pd.DataFrame:
"""Apply PQN normalization to the spectrum."""
df = df.copy()
# Step 1: TIC normalization
total = df["intensity"].sum()
if total > 0:
df["intensity"] = df["intensity"] / total
# Step 2: quotient normalization if reference is available
if self.reference is not None:
if len(self.reference) != len(df):
raise ValueError(
f"PQNNormalizer reference length ({len(self.reference)}) differs "
f"from input length ({len(df)}). This produces invalid "
f"normalization due to misaligned m/z positions. Ensure the "
f"reference was computed on spectra with the same m/z grid."
)
ref = self.reference
mask = ref > 0
if mask.any():
quotients = df.loc[mask, "intensity"].values / ref[mask]
median_quotient = np.median(quotients)
if median_quotient > 0:
df["intensity"] = df["intensity"] / median_quotient
return df
[docs]
def to_dict(self) -> dict:
"""Serialize transformer to a dictionary."""
d: dict = {"name": "PQNNormalizer"}
if self.reference is not None:
d["reference"] = self.reference.tolist()
return d
def __repr__(self) -> str:
return "PQNNormalizer()"
[docs]
class MzMultiTrimmer:
"""Keep only specific m/z ranges from the spectrum.
Parameters
----------
mz_ranges : list of tuple[float, float]
List of (mz_min, mz_max) ranges to keep.
Raises
------
ValueError
If ``mz_ranges`` is empty.
"""
[docs]
def __init__(self, mz_ranges: list[tuple[float, float]]):
if not mz_ranges:
raise ValueError("mz_ranges must not be empty.")
self.mz_ranges = mz_ranges
[docs]
def __call__(self, df: pd.DataFrame) -> pd.DataFrame:
"""Apply m/z range subsetting to the spectrum."""
masks = [df["mass"].between(lo, hi) for lo, hi in self.mz_ranges]
combined_mask = masks[0]
for m in masks[1:]:
combined_mask = combined_mask | m
return df[combined_mask].reset_index(drop=True)
[docs]
def to_dict(self) -> dict:
"""Serialize transformer to a dictionary."""
return {
"name": "MzMultiTrimmer",
"mz_ranges": self.mz_ranges,
}
def __repr__(self) -> str:
return f"MzMultiTrimmer(mz_ranges={self.mz_ranges})"
# Registry mapping transformer names to classes (for deserialization)
TRANSFORMER_REGISTRY: dict[str, type] = {
"ClipNegatives": ClipNegatives,
"SqrtTransform": SqrtTransform,
"LogTransform": LogTransform,
"SavitzkyGolaySmooth": SavitzkyGolaySmooth,
"MovingAverageSmooth": MovingAverageSmooth,
"SNIPBaseline": SNIPBaseline,
"TopHatBaseline": TopHatBaseline,
"ConvexHullBaseline": ConvexHullBaseline,
"MedianBaseline": MedianBaseline,
"MzTrimmer": MzTrimmer,
"TICNormalizer": TICNormalizer,
"MedianNormalizer": MedianNormalizer,
"PQNNormalizer": PQNNormalizer,
"MzMultiTrimmer": MzMultiTrimmer,
}