"""File reading utilities for MALDI-TOF spectrum data."""
from __future__ import annotations
import csv
import itertools
import logging
from pathlib import Path
import numpy as np
import pandas as pd
logger = logging.getLogger(__name__)
def sniff_delimiter(path: str | Path, sample_lines: int = 10) -> str:
"""
Detect the delimiter used in a text file.
Parameters
----------
path : str or Path
Path to the file to analyze.
sample_lines : int, default=10
Number of lines to sample for delimiter detection.
Returns
-------
str
Detected delimiter character.
Raises
------
csv.Error
If the file is empty and the delimiter cannot be detected.
"""
with open(path, "r", newline="") as f:
lines = list(itertools.islice(f, sample_lines))
if not lines:
raise csv.Error("File is empty, cannot detect delimiter")
dialect = csv.Sniffer().sniff("".join(lines), delimiters=",;\t ")
return dialect.delimiter
_BRUKER_REQUIRED_PARAMS = {"TD", "DELAY", "DW", "ML1", "ML2", "ML3", "BYTORDA"}
_ACQUS_PARAM_TYPES: dict[str, type] = {
"TD": int,
"DELAY": int,
"DW": float,
"ML1": float,
"ML2": float,
"ML3": float,
"BYTORDA": int,
}
def _parse_acqus(acqus_path: Path) -> dict[str, int | float]:
"""
Parse a Bruker JCAMP-DX ``acqus`` file for calibration parameters.
Parameters
----------
acqus_path : Path
Path to the ``acqus`` file.
Returns
-------
dict[str, int | float]
Dictionary with keys ``TD``, ``DELAY``, ``DW``, ``ML1``,
``ML2``, ``ML3``, ``BYTORDA``.
Raises
------
ValueError
If any required parameter is missing from the file.
"""
params: dict[str, int | float] = {}
with open(acqus_path, "rb") as f:
for raw_line in f:
line = raw_line.decode("utf-8", errors="replace").strip()
if not line.startswith("##$"):
continue
try:
key_part, value_part = line.split("= ", 1)
except ValueError:
continue
key = key_part[3:] # strip '##$'
if key in _ACQUS_PARAM_TYPES:
params[key] = _ACQUS_PARAM_TYPES[key](value_part)
missing = _BRUKER_REQUIRED_PARAMS - params.keys()
if missing:
raise ValueError(
f"Missing required parameters in {acqus_path}: {sorted(missing)}"
)
return params
def _tof_to_mass(ml1: float, ml2: float, ml3: float, tof: np.ndarray) -> np.ndarray:
"""
Convert time-of-flight values to m/z using Bruker calibration.
Implements the quadratic TOF calibration equation used by Bruker
flexAnalysis (reference: MARISMa ``SpectrumObject.tof2mass``).
The calibration solves ``a*x^2 + b*x + c = 0`` where ``a = ML3``,
``b = sqrt(1e12 / ML1)``, ``c = ML2 - TOF``, and
``m/z = ((-b + sqrt(b^2 - 4ac)) / (2a))^2``. When ``ML3 = 0``
(linear calibration), this degenerates to ``m/z = (c / b)^2``.
The ``-b + sqrt(...)`` root is selected to match the MARISMa
reference implementation; this yields physical (positive) m/z for
the typical case where ``ML3 >= 0`` and ``ML1 > 0``.
Parameters
----------
ml1, ml2, ml3 : float
Bruker calibration constants from the ``acqus`` file.
tof : np.ndarray
Time-of-flight array.
Returns
-------
np.ndarray
Mass-to-charge (m/z) array.
Raises
------
ValueError
If ``ml1`` is not positive, or if the quadratic discriminant
is negative (invalid calibration constants).
"""
if ml1 <= 0:
raise ValueError(
f"Bruker calibration constant ML1 must be positive, got {ml1}. "
f"Check the acqus file for corrupt or missing calibration data."
)
a = ml3
b = np.sqrt(1e12 / ml1)
c = ml2 - tof
if a == 0:
return (c * c) / (b * b)
discriminant = b * b - 4 * a * c
if np.any(discriminant < 0):
n_neg = np.sum(discriminant < 0)
raise ValueError(
f"Bruker calibration produced negative discriminant for "
f"{n_neg}/{len(tof)} TOF values (ML1={ml1}, ML2={ml2}, "
f"ML3={ml3}). This indicates invalid calibration constants."
)
return ((-b + np.sqrt(discriminant)) / (2 * a)) ** 2
def _read_bruker_binary(path: Path, byte_order: int, n_points: int) -> np.ndarray:
"""
Read a Bruker binary file (``fid`` or ``1r``) as an int32 array.
Parameters
----------
path : Path
Path to the binary file.
byte_order : int
Byte order flag from ``BYTORDA``: 0 = little-endian,
1 = big-endian.
n_points : int
Expected number of data points (``TD``).
Returns
-------
np.ndarray
Intensity array with negatives clipped to zero.
"""
dtype = np.dtype("<i4") if byte_order == 0 else np.dtype(">i4")
data = np.fromfile(path, dtype=dtype)
data = data[:n_points]
data = data.astype(np.float64)
data[data < 0] = 0
return data
def _find_bruker_acqus(path: Path) -> Path | None:
"""
Find the ``acqus`` file within a Bruker directory.
Searches progressively deeper: ``path/acqus``, ``path/*/acqus``,
``path/*/*/acqus``.
Parameters
----------
path : Path
Root directory to search.
Returns
-------
Path or None
Path to the ``acqus`` file, or ``None`` if not found.
"""
direct = path / "acqus"
if direct.is_file():
return direct
for depth in ("*", "*/*", "*/*/*"):
matches = sorted(path.glob(f"{depth}/acqus"))
if matches:
return matches[0]
return None
def _read_bruker(path: Path, *, source: str = "1r") -> pd.DataFrame:
"""
Read a Bruker MALDI-TOF spectrum directory.
Parameters
----------
path : Path
Path to a Bruker data directory (containing ``acqus`` and
``fid``/``pdata`` either directly or in a subdirectory).
source : str, default="1r"
Which binary to read: ``"1r"`` for the processed spectrum
from ``pdata/1/1r``, or ``"fid"`` for the raw free induction
decay.
Returns
-------
pd.DataFrame
DataFrame with columns ``['mass', 'intensity']``.
Raises
------
FileNotFoundError
If the ``acqus`` file or the requested binary cannot be found.
ValueError
If required calibration parameters are missing.
"""
acqus_path = _find_bruker_acqus(path)
if acqus_path is None:
raise FileNotFoundError(
f"No 'acqus' file found in {path} or its subdirectories"
)
acqus_dir = acqus_path.parent
params = _parse_acqus(acqus_path)
td = int(params["TD"])
delay = int(params["DELAY"])
dw = float(params["DW"])
ml1 = float(params["ML1"])
ml2 = float(params["ML2"])
ml3 = float(params["ML3"])
byte_order = int(params["BYTORDA"])
tof = delay + np.arange(td) * dw
mass = _tof_to_mass(ml1, ml2, ml3, tof)
if source == "1r":
binary_path = acqus_dir / "pdata" / "1" / "1r"
elif source == "fid":
binary_path = acqus_dir / "fid"
else:
raise ValueError(f"Unknown source '{source}', expected '1r' or 'fid'")
if not binary_path.is_file():
raise FileNotFoundError(f"Binary file not found: {binary_path}")
intensity = _read_bruker_binary(binary_path, byte_order, td)
return pd.DataFrame({"mass": mass, "intensity": intensity})
[docs]
def read_spectrum(path: str | Path, *, bruker_source: str = "1r") -> pd.DataFrame:
"""
Read a raw spectrum file into a DataFrame.
Supports text-based formats (``.txt``, ``.csv``, ``.tsv``) with
automatic delimiter detection and Bruker binary directories
(containing ``fid``/``acqus`` files). Unrecognised extensions
are treated as text files with automatic delimiter detection.
Parameters
----------
path : str or Path
Path to the spectrum file, or to a Bruker data directory.
bruker_source : str, default="1r"
For Bruker directories, which binary to read: ``"1r"``
(processed) or ``"fid"`` (raw).
Returns
-------
pd.DataFrame
DataFrame with columns ``['mass', 'intensity']``.
Examples
--------
>>> from maldiamrkit.io import read_spectrum
>>> df = read_spectrum("spectrum.txt")
>>> df.head()
mass intensity
0 2000 1234
1 2001 1456
Read a Bruker directory:
>>> df = read_spectrum("path/to/bruker_dir")
>>> df = read_spectrum("path/to/bruker_dir", bruker_source="fid")
"""
path = Path(path)
if path.is_dir():
return _read_bruker(path, source=bruker_source)
# Default: text-based format (txt, csv, tsv, or any other extension)
try:
delim = sniff_delimiter(path)
except csv.Error:
delim = r"\s+"
df = pd.read_csv(
path, sep=delim, comment="#", header=None, names=["mass", "intensity"]
)
df = _coerce_numeric(df)
# If sniffed delimiter produced no valid rows, retry with whitespace regex.
if df.empty and delim != r"\s+":
df = pd.read_csv(
path, sep=r"\s+", comment="#", header=None, names=["mass", "intensity"]
)
df = _coerce_numeric(df)
if df.empty:
raise ValueError(f"No valid numeric data in {path}")
return df
def _coerce_numeric(df: pd.DataFrame) -> pd.DataFrame:
"""Coerce mass/intensity to numeric and drop unparseable rows."""
df = df.copy()
df["mass"] = pd.to_numeric(df["mass"], errors="coerce")
df["intensity"] = pd.to_numeric(df["intensity"], errors="coerce")
return df.dropna(subset=["mass", "intensity"]).reset_index(drop=True)