Source code for alphadia.workflow.managers.raw_file_manager

"""Manager handling the raw data file and its statistics."""

import logging
import os

import numpy as np

from alphadia.raw_data import DiaData
from alphadia.raw_data.alpharaw_wrapper import AlphaRawBase, MzML, Sciex, Thermo
from alphadia.raw_data.bruker import TimsTOFTranspose
from alphadia.workflow.config import Config
from alphadia.workflow.managers.base import BaseManager

logger = logging.getLogger()


[docs] class RawFileManager(BaseManager):
[docs] def __init__( self, config: None | Config = None, path: None | str = None, load_from_file: bool = False, **kwargs, ): """Handles raw file loading and contains information on the raw file.""" self.stats = {} # needs to be before super().__init__ to avoid overwriting loaded values super().__init__(path=path, load_from_file=load_from_file, **kwargs) self._config: Config = config # deliberately not storing the dia_data object as an instance variable to avoid the saved manager file being too large self.reporter.log_string(f"Initializing {self.__class__.__name__}") self.reporter.log_event("initializing", {"name": f"{self.__class__.__name__}"})
[docs] def get_dia_data_object(self, dia_data_path: str) -> DiaData: """Get the correct data class depending on the file extension of the DIA data file. Parameters ---------- dia_data_path: str Path to the DIA data file Returns ------- DiaData object containing the DIA data """ file_extension = os.path.splitext(dia_data_path)[1] if file_extension.lower() == ".d": raw_data_type = "bruker" dia_data = TimsTOFTranspose( dia_data_path, ) elif file_extension.lower() == ".hdf": raw_data_type = "alpharaw" dia_data = AlphaRawBase(dia_data_path) elif file_extension.lower() == ".raw": raw_data_type = "thermo" dia_data = Thermo( dia_data_path, process_count=self._config["general"]["thread_count"], ) elif file_extension.lower() == ".mzml": raw_data_type = "mzml" dia_data = MzML(dia_data_path) elif file_extension.lower() == ".wiff": raw_data_type = "sciex" dia_data = Sciex(dia_data_path) else: raise ValueError( f"Unknown file extension {file_extension} for file at {dia_data_path}" ) self.reporter.log_metric("raw_data_type", raw_data_type) self._calc_stats(dia_data) self._log_stats() return dia_data
def _calc_stats(self, dia_data: DiaData): """Calculate statistics from the DIA data.""" rt_values = dia_data.rt_values cycle = dia_data.cycle stats = {} stats["rt_limit_min"] = rt_values.min() stats["rt_limit_max"] = rt_values.max() cycle_length = cycle.shape[1] stats["cycle_length"] = cycle_length stats["cycle_duration"] = np.diff(rt_values[::cycle_length]).mean() stats["cycle_number"] = len(rt_values) // cycle_length flat_cycle = cycle.flatten() flat_cycle = flat_cycle[flat_cycle > 0] stats["ms2_range_min"] = flat_cycle.min() stats["ms2_range_max"] = flat_cycle.max() self.stats = stats def _log_stats(self): """Log the statistics calculated from the DIA data.""" rt_duration = self.stats["rt_limit_max"] - self.stats["rt_limit_min"] logger.info( f"{'RT (min)':<20}: {self.stats['rt_limit_min']/60:.1f} - {self.stats['rt_limit_max']/60:.1f}" ) logger.info(f"{'RT duration (sec)':<20}: {rt_duration:.1f}") logger.info(f"{'RT duration (min)':<20}: {rt_duration/60:.1f}") logger.info(f"{'Cycle len (scans)':<20}: {self.stats['cycle_length']:.0f}") logger.info(f"{'Cycle len (sec)':<20}: {self.stats['cycle_duration']:.2f}") logger.info(f"{'Number of cycles':<20}: {self.stats['cycle_number']:.0f}") logger.info( f"{'MS2 range (m/z)':<20}: {self.stats['ms2_range_min']:.1f} - {self.stats['ms2_range_max']:.1f}" )