"""Spectral distance metrics and registry."""
from __future__ import annotations
from enum import Enum
from typing import TYPE_CHECKING, Callable
import numpy as np
import pandas as pd
if TYPE_CHECKING:
from maldiamrkit.spectrum import MaldiSpectrum
def _extract_mz_intensity(
spec: MaldiSpectrum | pd.DataFrame | np.ndarray,
) -> tuple[np.ndarray | None, np.ndarray]:
"""Normalize input to ``(mz_array | None, intensity_array)``.
Parameters
----------
spec : MaldiSpectrum, DataFrame, or ndarray
Spectrum input. For :class:`MaldiSpectrum` or a DataFrame with
``mass`` and ``intensity`` columns the m/z axis is returned. For a
plain 1-D array (binned vector) ``mz`` is ``None``.
Returns
-------
mz : ndarray or None
m/z values, or ``None`` for binned vectors.
intensity : ndarray
Intensity values.
"""
if hasattr(spec, "get_data"):
df = spec.get_data(prefer="preprocessed")
return np.asarray(df["mass"]), np.asarray(df["intensity"])
if isinstance(spec, pd.DataFrame):
if "mass" in spec.columns and "intensity" in spec.columns:
return np.asarray(spec["mass"]), np.asarray(spec["intensity"])
return None, np.asarray(spec.iloc[0])
arr = np.asarray(spec, dtype=float)
return None, arr
def _wasserstein_distance(
spec_a: MaldiSpectrum | pd.DataFrame | np.ndarray,
spec_b: MaldiSpectrum | pd.DataFrame | np.ndarray,
) -> float:
"""Earth-mover distance between two raw spectra.
Uses m/z positions as the support and intensities as weights.
Any negative intensity values (which would break scipy's
non-negative weight precondition) are clipped to zero.
"""
from scipy.stats import wasserstein_distance as _wd
mz_a, int_a = _extract_mz_intensity(spec_a)
mz_b, int_b = _extract_mz_intensity(spec_b)
if mz_a is None or mz_b is None:
raise TypeError(
"Wasserstein distance requires raw spectra with m/z values, "
"not binned vectors."
)
int_a = np.clip(np.asarray(int_a, dtype=float), 0.0, None)
int_b = np.clip(np.asarray(int_b, dtype=float), 0.0, None)
return float(_wd(mz_a, mz_b, int_a, int_b))
def _dtw_distance(
spec_a: MaldiSpectrum | pd.DataFrame | np.ndarray,
spec_b: MaldiSpectrum | pd.DataFrame | np.ndarray,
) -> float:
"""Dynamic time-warping distance between two raw spectra.
Both spectra are interpolated onto a common m/z grid before computing
DTW. Pre-processed or trimmed input is recommended for performance.
"""
from tslearn.metrics import dtw
mz_a, int_a = _extract_mz_intensity(spec_a)
mz_b, int_b = _extract_mz_intensity(spec_b)
if mz_a is None or mz_b is None:
raise TypeError(
"DTW distance requires raw spectra with m/z values, not binned vectors."
)
# Interpolate to a common grid spanning the union range.
lo = min(mz_a[0], mz_b[0])
hi = max(mz_a[-1], mz_b[-1])
n_points = max(len(mz_a), len(mz_b))
common_mz = np.linspace(lo, hi, n_points)
int_a_interp = np.interp(common_mz, mz_a, int_a).reshape(-1, 1)
int_b_interp = np.interp(common_mz, mz_b, int_b).reshape(-1, 1)
return float(dtw(int_a_interp, int_b_interp))
def _cosine_distance(
spec_a: MaldiSpectrum | pd.DataFrame | np.ndarray,
spec_b: MaldiSpectrum | pd.DataFrame | np.ndarray,
) -> float:
"""Cosine distance (``1 - cosine_similarity``) for binned vectors."""
_, a = _extract_mz_intensity(spec_a)
_, b = _extract_mz_intensity(spec_b)
norm_a = np.linalg.norm(a)
norm_b = np.linalg.norm(b)
_tiny = np.finfo(float).tiny
if norm_a < _tiny or norm_b < _tiny:
return 1.0
cos_sim = np.dot(a, b) / (norm_a * norm_b)
return float(np.clip(1.0 - cos_sim, 0.0, 2.0))
def _spectral_contrast_angle(
spec_a: MaldiSpectrum | pd.DataFrame | np.ndarray,
spec_b: MaldiSpectrum | pd.DataFrame | np.ndarray,
) -> float:
"""Spectral contrast angle distance for binned vectors.
Defined as ``(2 / pi) * arccos(cosine_similarity)``. Ranges from
0 (identical) to 1 (orthogonal).
"""
_, a = _extract_mz_intensity(spec_a)
_, b = _extract_mz_intensity(spec_b)
norm_a = np.linalg.norm(a)
norm_b = np.linalg.norm(b)
_tiny = np.finfo(float).tiny
if norm_a < _tiny or norm_b < _tiny:
return 1.0
cos_sim = float(np.clip(np.dot(a, b) / (norm_a * norm_b), -1.0, 1.0))
return float((2.0 / np.pi) * np.arccos(cos_sim))
def _pearson_correlation(
spec_a: MaldiSpectrum | pd.DataFrame | np.ndarray,
spec_b: MaldiSpectrum | pd.DataFrame | np.ndarray,
) -> float:
"""Pearson-correlation distance (``1 - r``) for binned vectors.
Returns a value in ``[0, 2]``: 0 for perfectly correlated spectra,
1 for uncorrelated, and 2 for perfectly anti-correlated.
"""
_, a = _extract_mz_intensity(spec_a)
_, b = _extract_mz_intensity(spec_b)
corr = np.corrcoef(a, b)[0, 1]
if np.isnan(corr):
return 1.0
return float(1.0 - corr)
[docs]
class SpectralMetric(str, Enum):
"""Supported spectral distance/similarity metrics.
Attributes
----------
wasserstein : str
Earth mover's (Wasserstein-1) distance on raw spectra.
dtw : str
Dynamic time warping distance on raw spectra.
cosine : str
Cosine distance on binned intensity vectors.
spectral_contrast_angle : str
Spectral contrast angle on binned intensity vectors.
pearson : str
1 - Pearson correlation on binned intensity vectors.
"""
wasserstein = "wasserstein"
dtw = "dtw"
cosine = "cosine"
spectral_contrast_angle = "spectral_contrast_angle"
pearson = "pearson"
METRIC_REGISTRY: dict[str, Callable] = {
"wasserstein": _wasserstein_distance,
"dtw": _dtw_distance,
"cosine": _cosine_distance,
"spectral_contrast_angle": _spectral_contrast_angle,
"pearson": _pearson_correlation,
}
[docs]
def spectral_distance(
spec_a: MaldiSpectrum | pd.DataFrame | np.ndarray,
spec_b: MaldiSpectrum | pd.DataFrame | np.ndarray,
metric: str | SpectralMetric = SpectralMetric.wasserstein,
) -> float:
"""Compute distance between two spectra.
Parameters
----------
spec_a, spec_b : MaldiSpectrum, DataFrame, or ndarray
For non-binned metrics (``"wasserstein"``, ``"dtw"``):
:class:`~maldiamrkit.spectrum.MaldiSpectrum` or DataFrame with
``mass`` and ``intensity`` columns.
For binned metrics (``"cosine"``, ``"spectral_contrast_angle"``,
``"pearson"``): 1-D intensity arrays.
metric : str or SpectralMetric, default="wasserstein"
Key in :data:`METRIC_REGISTRY`.
Returns
-------
float
Distance (or ``1 - similarity`` for correlation-based metrics).
Raises
------
ValueError
If *metric* is not in :data:`METRIC_REGISTRY`.
"""
metric = SpectralMetric(metric)
return METRIC_REGISTRY[metric](spec_a, spec_b)