Source code for alphadia.workflow.managers.fdr_manager

import logging
import os
from collections import defaultdict
from copy import deepcopy
from typing import Literal

import numpy as np
import pandas as pd
import torch
import xxhash

import alphadia
from alphadia.fdr import fdr
from alphadia.fdr.classifiers import Classifier
from alphadia.workflow.config import Config
from alphadia.workflow.managers.base import BaseManager

logger = logging.getLogger()


[docs] def get_group_columns(competitive: bool, group_channels: bool) -> list[str]: """ Determine the group columns based on competitiveness and channel grouping. competitive : bool If True, group candidates eluting at the same time by grouping them under the same 'elution_group_idx'. group_channels : bool If True and 'competitive' is also True, further groups candidates by 'channel'. Returns ------- list A list of column names to be used for grouping in the analysis. If competitive, this could be either ['elution_group_idx', 'channel'] or ['elution_group_idx'] depending on the `group_channels` flag. If not competitive, the list will always be ['precursor_idx']. """ if competitive: group_columns = ( ["elution_group_idx", "channel"] if group_channels else ["elution_group_idx"] ) else: group_columns = ["precursor_idx"] return group_columns
[docs] def column_hash(columns): columns.sort() return xxhash.xxh64_hexdigest("".join(columns))
[docs] class FDRManager(BaseManager):
[docs] def __init__( self, feature_columns: list, classifier_base: Classifier, config: Config, dia_cycle: None | np.ndarray = None, path: None | str = None, load_from_file: bool = True, random_state: int | None = None, **kwargs, ): """Contains, updates and applies classifiers for target-decoy competition-based false discovery rate (FDR) estimation. Parameters ---------- feature_columns: list List of feature columns to use for the classifier classifier_base: object Base classifier object to use for the FDR estimation config: Config The workflow configuration object dia_cycle: None | np.ndarray DIA cycle information, if applicable. If None, no DIA cycle information is used. path : str, optional Path to the manager pickle on disk. load_from_file : bool, optional If True, the manager will be loaded from file if it exists. random_state: int, optional Random state for reproducibility. """ super().__init__(path=path, load_from_file=load_from_file, **kwargs) self.reporter.log_string(f"Initializing {self.__class__.__name__}") self.reporter.log_event("initializing", {"name": f"{self.__class__.__name__}"}) if not self.is_loaded_from_file: self.feature_columns = feature_columns self.classifier_store = defaultdict(list) self.classifier_base = classifier_base self._current_version = -1 self.load_classifier_store() self._compete_for_fragments = config["search"]["compete_for_fragments"] self._dia_cycle = dia_cycle self._np_rng = ( None if random_state is None else np.random.default_rng(random_state) )
[docs] def fit_predict( self, features_df: pd.DataFrame, decoy_strategy: Literal["precursor", "precursor_channel_wise", "channel"], competitive: bool, df_fragments: pd.DataFrame | None = None, decoy_channel: int = -1, version: int = -1, ): """Fit the classifier and perform FDR estimation. Parameters ---------- features_df: pd.DataFrame DataFrame containing the features to use for the classifier. Must contain the columns specified in self.feature_columns. decoy_strategy: Literal["precursor", "precursor_channel_wise", "channel"] The decoy strategy. competitive: bool Whether competitive scoring should be used. df_fragments: None | pd.DataFrame Dataframe containing the fragments to use for the classifier. If None, no fragments are used. decoy_channel: int Channel to use for decoy competition if decoy_strategy is "channel". Defaults to -1, which means no decoy channel is used. version: int Version of the classifier to use. If -1, uses the latest version. Defaults to -1. Notes ----- The classifier_hash must be identical for every call of fit_predict for self._current_version to give the right index in self.classifier_store. """ available_columns = list( set(features_df.columns).intersection(set(self.feature_columns)) ) self._check_valid_input( available_columns, decoy_channel, decoy_strategy, features_df ) if ( decoy_strategy == "precursor" or decoy_strategy == "precursor_channel_wise" ) and decoy_channel > -1: self.reporter.log_string( "decoy_channel is ignored if decoy_strategy is 'precursor' or 'precursor_channel_wise'.", verbosity="warning", ) decoy_channel = -1 self.reporter.log_string( f"performing {decoy_strategy} FDR with {len(available_columns)} features" ) self.reporter.log_string(f"Decoy channel: {decoy_channel}") self.reporter.log_string(f"competitive: {competitive}") classifier = self.get_classifier(available_columns, version) random_state = ( None if self._np_rng is None else self._np_rng.integers(0, 1_000_000) ) if decoy_strategy == "precursor": psm_df = fdr.perform_fdr( classifier, available_columns, features_df[features_df["decoy"] == 0].copy(), features_df[features_df["decoy"] == 1].copy(), competitive=competitive, group_channels=True, # TODO move this logic to perform_fdr(): df_fragments=df_fragments if self._compete_for_fragments else None, dia_cycle=self._dia_cycle, figure_path=self.figure_path, random_state=random_state, ) elif decoy_strategy == "precursor_channel_wise": channels = features_df["channel"].unique() psm_df_list = [] for channel in channels: channel_df = features_df[ features_df["channel"].isin([channel, decoy_channel]) ].copy() psm_df_list.append( fdr.perform_fdr( classifier, available_columns, channel_df[channel_df["decoy"] == 0].copy(), channel_df[channel_df["decoy"] == 1].copy(), competitive=competitive, group_channels=True, df_fragments=df_fragments if self._compete_for_fragments else None, dia_cycle=self._dia_cycle, figure_path=self.figure_path, random_state=random_state, ) ) psm_df = pd.concat(psm_df_list) elif decoy_strategy == "channel": channels = list(set(features_df["channel"].unique()) - set([decoy_channel])) psm_df_list = [] for channel in channels: channel_df = features_df[ features_df["channel"].isin([channel, decoy_channel]) ].copy() psm_df_list.append( fdr.perform_fdr( classifier, available_columns, channel_df[channel_df["channel"] != decoy_channel].copy(), channel_df[channel_df["channel"] == decoy_channel].copy(), competitive=competitive, group_channels=False, figure_path=self.figure_path, random_state=random_state, ) ) psm_df = pd.concat(psm_df_list) psm_df.loc[psm_df["channel"] == decoy_channel, "decoy"] = 1 else: raise ValueError(f"Invalid decoy_strategy: {decoy_strategy}") self._current_version += 1 self.classifier_store[column_hash(available_columns)].append(classifier) self.save() return psm_df
def _check_valid_input( self, available_columns: list[str], decoy_channel: int, decoy_strategy: str, features_df: pd.DataFrame, ): """Checks if the input features_df is valid for the FDR estimation. Raises ValueError if the input is not valid. """ if len(available_columns) == 0: raise ValueError("No feature columns found in features_df") strategy_requires_decoy_column = ( decoy_strategy == "precursor" or decoy_strategy == "precursor_channel_wise" ) if strategy_requires_decoy_column and "decoy" not in features_df.columns: raise ValueError("Column 'decoy' not found in features_df") strategy_requires_channel_column = ( decoy_strategy == "precursor_channel_wise" or decoy_strategy == "channel" ) if strategy_requires_channel_column and "channel" not in features_df.columns: raise ValueError("Column 'channel' not found in features_df") if decoy_strategy == "channel" and decoy_channel == -1: raise ValueError("decoy_channel must be set if decoy_type is channel") if ( decoy_strategy == "channel" and decoy_channel > -1 and decoy_channel not in features_df["channel"].unique() ): raise ValueError(f"decoy_channel {decoy_channel} not found in features_df")
[docs] def save_classifier_store( self, path: None | str = None, version: int = -1 ): # TODO: unused? """Saves the classifier store to disk. Parameters ---------- path: None | str Where to save the classifier. Saves to alphadia/constants/classifier if None. version: int Version of the classifier to save. Takes the last classifier if -1 (default) """ if path is None: path = os.path.join( os.path.dirname(alphadia.__file__), "constants", "classifier" ) logger.info(f"Saving classifier store to {path}") for classifier_hash, classifier_list in self.classifier_store.items(): torch.save( classifier_list[version].to_state_dict(), os.path.join(path, f"{classifier_hash}.pth"), )
[docs] def load_classifier_store(self, path: None | str = None): """Loads the classifier store from disk. Parameters ---------- path: None | str Location of the classifier to load. Loads from alphadia/constants/classifier if None. """ if path is None: path = os.path.join( os.path.dirname(alphadia.__file__), "constants", "classifier" ) logger.info(f"Loading classifier store from {path}") for file in os.listdir(path): if file.endswith(".pth"): classifier_hash = file.split(".")[0] if classifier_hash not in self.classifier_store: classifier = deepcopy(self.classifier_base) classifier.from_state_dict( torch.load(os.path.join(path, file), weights_only=False) ) self.classifier_store[classifier_hash].append(classifier)
[docs] def get_classifier(self, available_columns: list, version: int = -1) -> Classifier: """Gets the classifier for a given set of feature columns and version. If the classifier is not found in the store, gets the base classifier instead. Parameters ---------- available_columns: list List of feature columns version: int Version of the classifier to get Returns ---------- object Classifier object """ classifier_hash = column_hash(available_columns) if classifier_hash in self.classifier_store: classifier = self.classifier_store[classifier_hash][version] else: classifier = self.classifier_base return deepcopy(classifier)
@property def current_version(self): return self._current_version