import logging
import os
import time
from alphabase.spectral_library.flat import SpecLibFlat
from alphadia.constants.keys import ConfigKeys
from alphadia.constants.settings import FIGURES_FOLDER_NAME
from alphadia.exceptions import GenericUserError
from alphadia.raw_data import DiaData
from alphadia.raw_data.bruker import TimsTOFTranspose
from alphadia.reporting import reporting
from alphadia.workflow.config import Config
from alphadia.workflow.managers.calibration_manager import CalibrationManager
from alphadia.workflow.managers.optimization_manager import OptimizationManager
from alphadia.workflow.managers.raw_file_manager import RawFileManager
from alphadia.workflow.managers.timing_manager import TimingManager
from alphadia.workflow.peptidecentric.ng.ng_mapper import (
dia_data_to_ng,
set_ng_thread_count,
)
logger = logging.getLogger()
QUANT_FOLDER_NAME = "quant"
[docs]
class WorkflowBase:
"""Base class for all workflows. This class is responsible for creating the workflow folder.
It also initializes the calibration_manager and fdr_manager for the workflow.
"""
RAW_FILE_MANAGER_PKL_NAME = "raw_file_manager.pkl"
CALIBRATION_MANAGER_PKL_NAME = "calibration_manager.pkl"
OPTIMIZATION_MANAGER_PKL_NAME = "optimization_manager.pkl"
TIMING_MANAGER_PKL_NAME = "timing_manager.pkl"
FDR_MANAGER_PKL_NAME = "fdr_manager.pkl"
[docs]
def __init__(
self,
instance_name: str,
config: Config,
quant_path: str = None,
) -> None:
"""
Parameters
----------
instance_name: str
Name for the particular workflow instance, e.g. the name of the raw file
config: Config
Configuration for the workflow.
quant_path: str
path to directory holding quant folders, relevant for distributed searches
"""
self.instance_name: str = instance_name
quant_path_ = quant_path or os.path.join(
config[ConfigKeys.OUTPUT_DIRECTORY], QUANT_FOLDER_NAME
)
logger.info(f"Quantification results path: {quant_path_}")
self._path = os.path.join(quant_path_, self.instance_name)
self._figure_path: str = (
os.path.join(self.path, FIGURES_FOLDER_NAME)
if config[ConfigKeys.GENERAL][ConfigKeys.GENERAL.SAVE_FIGURES]
else None
)
self._config: Config = config
self.reporter: reporting.Pipeline | None = None
self._dia_data: DiaData | None = None
self._spectral_library: SpecLibFlat | None = None
self._calibration_manager: CalibrationManager | None = None
self._optimization_manager: OptimizationManager | None = None
self._timing_manager: TimingManager | None = None
for path in [self._figure_path, self.path]:
if path and not os.path.exists(path):
logger.info(f"Creating folder {path}")
os.makedirs(
path,
exist_ok=True,
)
[docs]
def load(
self,
dia_data_path: str,
spectral_library: SpecLibFlat,
) -> None:
self.reporter = reporting.Pipeline(
backends=[
reporting.LogBackend(),
reporting.JSONLBackend(path=self.path),
reporting.FigureBackend(path=self.path),
]
)
self.reporter.context.__enter__()
self.reporter.log_event("section_start", {"name": "Initialize Workflow"})
# load the raw data
self.reporter.log_event("loading_data", {"progress": 0})
raw_file_manager = RawFileManager(
self.config,
path=os.path.join(self.path, self.RAW_FILE_MANAGER_PKL_NAME),
reporter=self.reporter,
)
time_start = time.time()
self._dia_data = raw_file_manager.get_dia_data_object(dia_data_path)
self.reporter.log_string(
f"Creating DIA data object took: {time.time() - time_start}"
)
if self._config["search"]["extraction_backend"] == "rust":
time_start = time.time()
if isinstance(self._dia_data, TimsTOFTranspose):
raise GenericUserError(
"NOT_SUPPORTED_BY_NG",
"Rust backend does not support TimsTOF data yet. Please use extraction_backend='python'.",
)
# needs to be the first call to alphadia-search-rs
set_ng_thread_count(self.config["general"]["thread_count"])
dia_data_ng = dia_data_to_ng(self._dia_data)
# TODO: remove these asserts
assert self.dia_data.cycle.shape[1] == dia_data_ng.cycle.shape[1]
assert all(self.dia_data.rt_values == dia_data_ng.rt_values)
self._dia_data = dia_data_ng
self.reporter.log_string(
f"Creating DIA data NG object took: {time.time() - time_start}"
)
raw_file_manager.save()
self.reporter.log_event("loading_data", {"progress": 1})
self._spectral_library: SpecLibFlat = spectral_library.copy()
self._calibration_manager = CalibrationManager(
path=os.path.join(self.path, self.CALIBRATION_MANAGER_PKL_NAME),
load_from_file=self.config["general"]["reuse_calibration"],
has_ms1=self._dia_data.has_ms1,
has_mobility=self._dia_data.has_mobility,
reporter=self.reporter,
)
self._optimization_manager = OptimizationManager(
self.config,
gradient_length=self.dia_data.rt_values.max(),
path=os.path.join(self.path, self.OPTIMIZATION_MANAGER_PKL_NAME),
load_from_file=self.config["general"]["reuse_calibration"],
figure_path=self._figure_path,
reporter=self.reporter,
)
self.reporter.log_event("section_stop", {})
@property
def path(self) -> str:
"""Path to the workflow folder, e.g. `first_search/quant/raw_file_xyz.raw`"""
return self._path
@property
def config(self) -> Config:
"""Configuration for the workflow."""
return self._config
@property
def calibration_manager(self) -> CalibrationManager:
"""Calibration manager for the workflow. Owns the RT, IM, MZ calibration and the calibration data"""
return self._calibration_manager
@property
def optimization_manager(self) -> OptimizationManager:
"""Optimization manager for the workflow. Owns the optimization data"""
return self._optimization_manager
@property
def timing_manager(self) -> TimingManager:
"""Optimization manager for the workflow. Owns the timing data"""
return self._timing_manager
@property
def spectral_library(self) -> SpecLibFlat | None:
"""Spectral library for the workflow. Owns the spectral library data"""
return self._spectral_library
@property
def dia_data(
self,
) -> DiaData:
"""DIA data for the workflow. Owns the DIA data"""
return self._dia_data