Source code for alphadia.outputtransform.outputaccumulator

"""
Output Accumulator
==================
This module contains classes to accumulate the information from the output folders of the alphadia pipeline
in a linear fashion. This is hugely useful when we have a large number of output folders and we want to accumulate the information from
all of them in a single object/Library which can be a challenge to do in a single go due to memory constraints.
The module is designed as broadcast-subscriber pattern where the AccumulationBroadcaster class loops over the output folders creating a
speclibBase object from each output folder and then broadcasts the information to the subscribers.

Classes
-------
BaseAccumulator
    Base class for accumulator classes, which are used to subscribe on the linear accumulation of a list of output folders.
    it has two methods update and post_process.

AccumulationBroadcaster
    Class that loops over output folders in a linear fashion to prevent having all the output folders in memory at the same time.

TransferLearningAccumulator
    Class that accumulates the information from the output folders for fine-tuning by selecting the top keep_top precursors and their fragments from all the output folders.


"""

import logging
import multiprocessing
import os
import threading

import numba as nb
import numpy as np
import pandas as pd
from alphabase.spectral_library import base
from alphabase.spectral_library.flat import SpecLibFlat
from tqdm import tqdm

from alphadia.constants.keys import CalibCols, SearchStepFiles

logger = logging.getLogger()


[docs] def build_speclibflat_from_quant( folder: str, mandatory_precursor_columns: list[str] | None = None, optional_precursor_columns: list[str] | None = None, charged_frag_types: list[str] | None = None, ) -> SpecLibFlat: """ Build a SpecLibFlat object from quantification output data stored in a folder for transfer learning. Parameters ---------- folder : str The output folder to be parsed. mandatory_precursor_columns : list[str], optional The columns to be selected from the precursor dataframe optional_precursor_columns : list[str], optional Additional optional columns to include if present Returns ------- SpecLibFlat A spectral library object containing the parsed data """ speclib = SpecLibFlat() if mandatory_precursor_columns is None: mandatory_precursor_columns = [ "precursor_idx", "sequence", "flat_frag_start_idx", "flat_frag_stop_idx", "charge", CalibCols.RT_LIBRARY, CalibCols.RT_OBSERVED, CalibCols.MOBILITY_LIBRARY, CalibCols.MOBILITY_OBSERVED, CalibCols.MZ_LIBRARY, CalibCols.MZ_OBSERVED, "proteins", "genes", "mods", "mod_sites", "proba", "decoy", ] if optional_precursor_columns is None: optional_precursor_columns = [ CalibCols.RT_CALIBRATED, CalibCols.MZ_CALIBRATED, ] psm_df = pd.read_parquet(os.path.join(folder, SearchStepFiles.PSM_FILE_NAME)) frag_df = pd.read_parquet( os.path.join(folder, SearchStepFiles.FRAG_TRANSFER_FILE_NAME) ) if not set(mandatory_precursor_columns).issubset(psm_df.columns): raise ValueError( f"mandatory_precursor_columns must be a subset of psm_df.columns didnt find {set(mandatory_precursor_columns) - set(psm_df.columns)}" ) available_columns = sorted( list( set(mandatory_precursor_columns) | (set(optional_precursor_columns) & set(psm_df.columns)) ) ) psm_df = psm_df[available_columns] psm_df["raw_name"] = os.path.basename(folder) psm_df["decoy"] = psm_df["decoy"].astype(int) psm_df = psm_df[psm_df["decoy"] == 0].reset_index(drop=True) speclib._precursor_df = psm_df.copy() speclib._precursor_df["mods"] = speclib._precursor_df["mods"].astype(str) speclib._precursor_df["mod_sites"] = speclib._precursor_df["mod_sites"].astype(str) speclib._precursor_df["mods"] = speclib._precursor_df["mods"].replace("nan", "") speclib._precursor_df["mod_sites"] = speclib._precursor_df["mod_sites"].replace( "nan", "" ) speclib.calc_precursor_mz() for col in ["rt", "mz", "mobility"]: if f"{col}_observed" in psm_df.columns: values = psm_df[f"{col}_observed"] elif f"{col}_calibrated" in psm_df.columns: values = psm_df[f"{col}_calibrated"] else: values = psm_df[f"{col}_library"] speclib._precursor_df[col] = values frag_df = frag_df[ frag_df["precursor_idx"].isin(speclib._precursor_df["precursor_idx"]) ] speclib._fragment_df = frag_df[ [ "mz", "intensity", "precursor_idx", "frag_idx", "correlation", "number", "type", "charge", "loss_type", "position", ] ].copy() return speclib.to_speclib_base( charged_frag_types=charged_frag_types, flat_columns=["intensity", "correlation"], )
[docs] class BaseAccumulator: """ Base class for accumulator classes, which are used to subscribe on the linear accumulation of a list of output folders. """
[docs] def update(self, info: base.SpecLibBase) -> None: """ Called when a new output folder is obtained. Parameters ---------- info : SpecLibBase The information from the output folder. """ raise NotImplementedError("Subclasses must implement the update method")
[docs] def post_process(self) -> None: """ Called after all output folders have been processed. """ raise NotImplementedError("Subclasses must implement the post_process method")
[docs] def error_callback(e): logger.error(e, exc_info=True)
[docs] class AccumulationBroadcaster: """ Class that loops over output folders in a linear fashion to only have one folder in memory at a time. And broadcasts the output of each folder to the subscribers. """
[docs] def __init__( self, folder_list: list, number_of_processes: int, processing_kwargs: dict ): self._folder_list = folder_list self._number_of_processes = number_of_processes self._subscribers = [] self._lock = threading.Lock() # Lock to prevent two processes trying to update the same subscriber at the same time self._processing_kwargs = processing_kwargs
[docs] def subscribe(self, subscriber: BaseAccumulator): self._subscribers.append(subscriber)
def _update_subscriber( self, subscriber: BaseAccumulator, speclibase: base.SpecLibBase ): subscriber.update(speclibase) def _broadcast(self, result): speclibBase = result with self._lock: for sub in self._subscribers: self._update_subscriber(sub, speclibBase) def _post_process(self): for sub in self._subscribers: sub.post_process()
[docs] def run(self): with multiprocessing.Pool(processes=self._number_of_processes) as pool: for folder in self._folder_list: _ = pool.apply_async( build_speclibflat_from_quant, (folder,), self._processing_kwargs, callback=self._broadcast, error_callback=error_callback, ) pool.close() pool.join() self._post_process()
@nb.jit(nopython=True) def _get_top_indices_from_freq( number_of_readings_per_precursor: np.ndarray, keep_top: int, len_of_precursor_df: int, ): """ Get the indices of the top keep_top elements in the array number_of_readings_per_precursor. Parameters ---------- number_of_readings_per_precursor : np.array The array of number of readings per precursor. keep_top : int The number of top elements to keep. len_of_precursor_df : int The length of the precursor_df. Returns ------- np.array The indices of the top keep_top elements in the array number_of_readings_per_precursor. """ indices = np.zeros(len_of_precursor_df, dtype=np.bool_) i = 0 for n in number_of_readings_per_precursor: to_keep = min(n, keep_top) indices[i : i + to_keep] = np.ones(to_keep, dtype=np.bool_) i += n return indices
[docs] class TransferLearningAccumulator(BaseAccumulator):
[docs] def __init__( self, keep_top: int = 3, norm_delta_max: bool = True, precursor_correlation_cutoff: float = 0.5, fragment_correlation_ratio: float = 0.75, ): """ TransferLearningAccumulator is used to accumulate the information from the output folders for fine-tuning by selecting the top keep_top precursors and their fragments from all the output folders. The current measure of score is the probA Parameters ---------- keep_top : int, optional The number of top precursors to keep, by default 3 norm_w_calib : bool, optional If true, advanced normalization of retention times will be performed. Retention times are normalized using calibrated deviation from the library at the start of the gradient and max normalization at the end of the gradient. If false, max normalization will be performed, by default True precursor_correlation_cutoff : float, optional Only precursors with a median fragment correlation above this cutoff will be used for MS2 learning, by default 0.5 fragment_correlation_ratio : float, optional The cutoff for the fragment correlation relative to the median fragment correlation for a precursor, by default 0.75 """ self._keep_top = keep_top self.consensus_speclibase = None self._norm_delta_max = norm_delta_max self._precursor_correlation_cutoff = precursor_correlation_cutoff self._fragment_correlation_ratio = fragment_correlation_ratio
[docs] def update(self, speclibase: base.SpecLibBase): """ Update the consensus_speclibase with the information from the speclibase. Parameters ---------- speclibase : SpecLibBase The information from the output folder. """ speclibase.hash_precursor_df() if self.consensus_speclibase is None: self.consensus_speclibase = speclibase else: # Append in basespeclib and modify to work do the same for additional dataframe self.consensus_speclibase.append( speclibase, dfs_to_append=["_precursor_df"] + [df for df in speclibase.available_dense_fragment_dfs()], ) # Sort by modseqhash and proba in ascending order self.consensus_speclibase._precursor_df = ( self.consensus_speclibase._precursor_df.sort_values( ["mod_seq_hash", "proba", "precursor_idx"], # last sort to break ties ascending=[True, True, True], ) ) # Select the top keep_top precursors # First get the number of readings per precursor such as mod_seq_hash _ maps to number of rows with the same mod_seq_hash number_of_readings_per_precursor = self.consensus_speclibase._precursor_df[ "mod_seq_hash" ].value_counts(sort=False) keepIndices = _get_top_indices_from_freq( number_of_readings_per_precursor.values, self._keep_top, self.consensus_speclibase._precursor_df.shape[0], ) assert ( len(keepIndices) == self.consensus_speclibase._precursor_df.shape[0] ), f"keepIndices length {len(keepIndices)} must be equal to the length of the precursor_df {self.consensus_speclibase._precursor_df.shape[0]}" self.consensus_speclibase._precursor_df = ( self.consensus_speclibase._precursor_df.iloc[keepIndices] ) # Drop unused fragments self.consensus_speclibase.remove_unused_fragments()
[docs] def post_process(self): """ Post process the consensus_speclibase by normalizing retention times. """ norm_delta_max = self._norm_delta_max if ( CalibCols.RT_CALIBRATED not in self.consensus_speclibase.precursor_df.columns ): logger.warning( f"Column '{CalibCols.RT_CALIBRATED}' not found in the precursor_df, delta-max normalization will not be performed" ) norm_delta_max = False logger.info("Performing quality control for transfer learning.") logger.info(f"Normalize by delta: {norm_delta_max}") logger.info( f"Precursor correlation cutoff: {self._precursor_correlation_cutoff}" ) logger.info(f"Fragment correlation cutoff: {self._fragment_correlation_ratio}") if norm_delta_max: self.consensus_speclibase = normalize_rt_delta_max( self.consensus_speclibase ) else: self.consensus_speclibase = normalize_rt_max(self.consensus_speclibase) self.consensus_speclibase = ms2_quality_control( self.consensus_speclibase, self._precursor_correlation_cutoff, self._fragment_correlation_ratio, )
[docs] def normalize_rt_max(spec_lib_base: base.SpecLibBase) -> base.SpecLibBase: """ Normalize the retention times of the precursors in the SpecLibBase object using max normalization. Parameters ---------- spec_lib_base : SpecLibBase The SpecLibBase object to be normalized. Returns ------- SpecLibBase The SpecLibBase object with the retention times normalized using max normalization. """ spec_lib_base.precursor_df["rt_norm"] = ( spec_lib_base.precursor_df[CalibCols.RT_OBSERVED] / spec_lib_base.precursor_df[CalibCols.RT_OBSERVED].max() ) return spec_lib_base
[docs] def normalize_rt_delta_max(spec_lib_base: base.SpecLibBase) -> base.SpecLibBase: """ Normalize the retention times of the precursors in the SpecLibBase object using delta max normalization. Parameters ---------- spec_lib_base : SpecLibBase The SpecLibBase object to be normalized. Returns ------- SpecLibBase The SpecLibBase object with the retention times normalized using delta max normalization. """ # instead of a simple max normalization, we want to use a weighted average of the two normalizations # At the start of the retention time we will normalize using the calibrated deviation from the library # At the end of the retention time we will normalize using the max normalization precursor_df = spec_lib_base.precursor_df # calculate max normalization max_norm = precursor_df[CalibCols.RT_OBSERVED].values / np.max( precursor_df[CalibCols.RT_OBSERVED].values ) # calculate calibrated normalization deviation_from_calib = ( precursor_df[CalibCols.RT_OBSERVED].values - precursor_df[CalibCols.RT_CALIBRATED].values ) / precursor_df[CalibCols.RT_CALIBRATED].values calibrated_norm = precursor_df[CalibCols.RT_LIBRARY].values * ( 1 + deviation_from_calib ) calibrated_norm = calibrated_norm / calibrated_norm.max() # use max norm as weight and combine the two normalizations spec_lib_base.precursor_df["rt_norm"] = ( 1 - max_norm ) * calibrated_norm + max_norm * max_norm return spec_lib_base
[docs] def ms2_quality_control( spec_lib_base: base.SpecLibBase, precursor_correlation_cutoff: float = 0.5, fragment_correlation_ratio: float = 0.75, ): """ Perform quality control for transfer learning by filtering out precursors with low median fragment correlation and fragments with low correlation. Parameters ---------- spec_lib_base : SpecLibBase The SpecLibBase object to be normalized. precursor_correlation_cutoff : float Only precursors with a median fragment correlation above this cutoff will be used for MS2 learning. Default is 0.5. fragment_correlation_ratio : float The cutoff for the fragment correlation relative to the median fragment correlation for a precursor. Default is 0.75. Returns ------- SpecLibBase The SpecLibBase object with the precursors and fragments that pass the quality control filters. """ use_for_ms2 = np.zeros(len(spec_lib_base.precursor_df), dtype=bool) precursor_df = spec_lib_base.precursor_df fragment_intensity_df = spec_lib_base.fragment_intensity_df fragment_correlation_df = spec_lib_base._fragment_correlation_df for i, (start_idx, stop_idx) in tqdm( enumerate( zip( precursor_df["frag_start_idx"], precursor_df["frag_stop_idx"], strict=True, ) ) ): # get XIC correlations and intensities for the precursor fragment_correlation_view = fragment_correlation_df.iloc[start_idx:stop_idx] flat_correlation = fragment_correlation_view.values.flatten() fragment_intensity_view = fragment_intensity_df.iloc[start_idx:stop_idx] flat_intensity = fragment_intensity_view.values.flatten() # calculate the median correlation for the precursor intensity_mask = flat_intensity > 0.0 median_correlation = ( np.median(flat_correlation[intensity_mask]) if intensity_mask.any() else 0.0 ) # use the precursor for MS2 learning if the median correlation is above the cutoff use_for_ms2[i] = median_correlation > precursor_correlation_cutoff # Fix: Use iloc to modify the original DataFrame instead of the view spec_lib_base.fragment_intensity_df.iloc[start_idx:stop_idx] = ( fragment_intensity_view.values * ( fragment_correlation_view > median_correlation * fragment_correlation_ratio ) ) spec_lib_base.precursor_df["use_for_ms2"] = use_for_ms2 return spec_lib_base