Source code for alphadia.search.scoring.scoring

"""Main Implementation of Candidate Scoring System."""

import logging

import numpy as np
import pandas as pd
from alpharaw.utils.pjit import pjit
from alpharaw.utils.pjit import set_threads as set_pjit_threads

from alphadia.constants.keys import CalibCols
from alphadia.raw_data import DiaData
from alphadia.search.jitclasses.fragment_container import FragmentContainer
from alphadia.search.scoring.config import CandidateScoringConfig
from alphadia.search.scoring.containers.score_group import ScoreGroupContainer
from alphadia.search.scoring.output import OutputPsmDF
from alphadia.search.scoring.quadrupole import SimpleQuadrupole
from alphadia.search.scoring.utils import (
    calculate_score_groups,
    merge_missing_columns,
)
from alphadia.utils import (
    USE_NUMBA_CACHING,
    get_isotope_columns,
)
from alphadia.validation.schemas import (
    candidates_schema,
    features_schema,
    fragment_features_schema,
    fragments_flat_schema,
    precursors_flat_schema,
)

logger = logging.getLogger()

DEFAULT_FEATURE_COLUMNS = [
    "base_width_mobility",
    "base_width_rt",
    CalibCols.RT_OBSERVED,
    CalibCols.MOBILITY_OBSERVED,
    "mono_ms1_intensity",
    "top_ms1_intensity",
    "sum_ms1_intensity",
    "weighted_ms1_intensity",
    "weighted_mass_deviation",
    "weighted_mass_error",
    CalibCols.MZ_OBSERVED,
    "mono_ms1_height",
    "top_ms1_height",
    "sum_ms1_height",
    "weighted_ms1_height",
    "isotope_intensity_correlation",
    "isotope_height_correlation",
    "n_observations",
    "intensity_correlation",
    "height_correlation",
    "intensity_fraction",
    "height_fraction",
    "intensity_fraction_weighted",
    "height_fraction_weighted",
    "mean_observation_score",
    "sum_b_ion_intensity",
    "sum_y_ion_intensity",
    "diff_b_y_ion_intensity",
    "f_masked",
    "fragment_scan_correlation",
    "template_scan_correlation",
    "fragment_frame_correlation",
    "top3_frame_correlation",
    "template_frame_correlation",
    "top3_b_ion_correlation",
    "n_b_ions",
    "top3_y_ion_correlation",
    "n_y_ions",
    "cycle_fwhm",
    "mobility_fwhm",
    "delta_frame_peak",
    "top_3_ms2_mass_error",
    "mean_ms2_mass_error",
    "n_overlapping",
    "mean_overlapping_intensity",
    "mean_overlapping_mass_error",
]

DEFAULT_CANDIDATE_COLUMNS = [
    "elution_group_idx",
    "scan_center",
    "scan_start",
    "scan_stop",
    "frame_center",
    "frame_start",
    "frame_stop",
]

DEFAULT_PRECURSOR_COLUMNS = [
    CalibCols.RT_LIBRARY,
    CalibCols.MOBILITY_LIBRARY,
    CalibCols.MZ_LIBRARY,
    "charge",
    "decoy",
    "channel",
    "flat_frag_start_idx",
    "flat_frag_stop_idx",
    "proteins",
    "genes",
    "sequence",
    "mods",
    "mod_sites",
]


def _get_isotope_column_names(colnames):
    return [f"i_{i}" for i in get_isotope_columns(colnames)]


@pjit(cache=USE_NUMBA_CACHING)
def _process_score_groups(
    i,  # pjit decorator changes the passed argument from an iterable to single index
    sg_container: ScoreGroupContainer,
    psm_proto_df,
    fragment_container,
    jit_data,
    config,
    quadrupole_calibration,
    debug,
):
    """
    Helper function.
    Is decorated with `pjit` to enable parallel execution of HybridElutionGroup.process.
    """

    sg_container[i].process(
        psm_proto_df,
        fragment_container,
        jit_data,
        config,
        quadrupole_calibration,
        debug,
    )


[docs] class CandidateScoring: """Calculate features for each precursor candidate used in scoring."""
[docs] def __init__( self, *, dia_data: DiaData, precursors_flat: pd.DataFrame, fragments_flat: pd.DataFrame, rt_column: str, mobility_column: str, precursor_mz_column: str, fragment_mz_column: str, config: CandidateScoringConfig | None = None, quadrupole_calibration: SimpleQuadrupole | None = None, ): """Initialize candidate scoring step. The features can then be used for scoring, calibration and quantification. Parameters ---------- dia_data : DiaData DIA data object. precursors_flat : pd.DataFrame A DataFrame containing precursor information. The DataFrame will be validated by using the `alphadia.validation.schemas.precursors_flat` schema. fragments_flat : pd.DataFrame A DataFrame containing fragment information. The DataFrame will be validated by using the `alphadia.validation.schemas.fragments_flat` schema. rt_column : str The name of the column in `precursors_flat` containing the RT information. This property needs to be changed to `rt_calibrated` if the data has been calibrated. mobility_column : str The name of the column in `precursors_flat` containing the mobility information. This property needs to be changed to `mobility_calibrated` if the data has been calibrated. precursor_mz_column : str The name of the column in `precursors_flat` containing the precursor m/z information. This property needs to be changed to `mz_calibrated` if the data has been calibrated. fragment_mz_column : str The name of the column in `fragments_flat` containing the fragment m/z information. This property needs to be changed to `mz_calibrated` if the data has been calibrated. config : CandidateScoringConfig, default = None A Numba jit compatible object containing the configuration for the candidate scoring. If None, the default configuration will be used. quadrupole_calibration : SimpleQuadrupole, default=None An object containing the quadrupole calibration information. If None, an uncalibrated quadrupole will be used. The object musst have a `jit` method which returns a Numba JIT compiled instance of the calibration function. """ self._dia_data: DiaData = dia_data precursors_flat_schema.validate(precursors_flat, warn_on_critical_values=True) self.precursors_flat_df = precursors_flat fragments_flat_schema.validate(fragments_flat, warn_on_critical_values=True) self.fragments_flat = fragments_flat # check if a valid quadrupole calibration is provided if quadrupole_calibration is None: self.quadrupole_calibration = SimpleQuadrupole(dia_data.cycle) else: self.quadrupole_calibration = quadrupole_calibration # check if a valid config is provided if config is None: self.config = CandidateScoringConfig() else: self.config = config self.rt_column = rt_column self.mobility_column = mobility_column self.precursor_mz_column = precursor_mz_column self.fragment_mz_column = fragment_mz_column
@property def dia_data(self) -> DiaData: """Get the raw mass spec data as a DiaData object.""" return self._dia_data @property def precursors_flat_df(self) -> pd.DataFrame: """Get the DataFrame containing precursor information.""" return self._precursors_flat_df @precursors_flat_df.setter def precursors_flat_df(self, precursors_flat_df) -> None: precursors_flat_schema.validate( precursors_flat_df, warn_on_critical_values=True ) self._precursors_flat_df = precursors_flat_df.sort_values(by="precursor_idx") @property def fragments_flat_df(self) -> pd.DataFrame: """Get the DataFrame containing fragment information.""" return self._fragments_flat @fragments_flat_df.setter def fragments_flat_df(self, fragments_flat: pd.DataFrame) -> None: fragments_flat_schema.validate(fragments_flat, warn_on_critical_values=True) self._fragments_flat = fragments_flat @property def quadrupole_calibration(self) -> SimpleQuadrupole: """Get the quadrupole calibration object.""" return self._quadrupole_calibration @quadrupole_calibration.setter def quadrupole_calibration(self, quadrupole_calibration: SimpleQuadrupole) -> None: if not hasattr(quadrupole_calibration, "jit"): raise AttributeError("quadrupole_calibration must have a jit method") self._quadrupole_calibration = quadrupole_calibration @property def config(self) -> CandidateScoringConfig: """Get the configuration object.""" return self._config @config.setter def config(self, config: CandidateScoringConfig) -> None: config.validate() self._config = config
[docs] def assemble_score_group_container( self, candidates_df: pd.DataFrame ) -> ScoreGroupContainer: """Assemble the Numba JIT compatible score group container from a candidate dataframe. If not present, the `rank` column will be added to the candidate dataframe. Then score groups are calculated using :func:`.calculate_score_groups` function. If configured in :attr:`.CandidateScoringConfig.score_grouped`, all channels will be grouped into a single score group. Otherwise, each channel will be scored separately. The candidate dataframe is validated using the :func:`.validate.candidates` schema. Parameters ---------- candidates_df : pd.DataFrame A DataFrame containing the candidates. Returns ------- score_group_container : ScoreGroupContainer A Numba JIT compatible score group container. """ precursor_columns = [ "channel", "flat_frag_start_idx", "flat_frag_stop_idx", "charge", "decoy", "channel", self.precursor_mz_column, ] + _get_isotope_column_names(self.precursors_flat_df.columns) candidates_df = merge_missing_columns( candidates_df, self.precursors_flat_df, precursor_columns, on=["precursor_idx"], how="left", ) # check if channel column is present if "channel" not in candidates_df.columns: candidates_df["channel"] = np.zeros(len(candidates_df), dtype=np.uint8) # check if monoisotopic abundance column is present if "i_0" not in candidates_df.columns: candidates_df["i_0"] = np.ones(len(candidates_df), dtype=np.float32) # calculate score groups candidates_df = calculate_score_groups( candidates_df, group_channels=self.config.score_grouped ) # validate dataframe schema and prepare jitclass compatible dtypes candidates_schema.validate(candidates_df, warn_on_critical_values=True) score_group_container = ScoreGroupContainer() score_group_container.build_from_df( candidates_df["elution_group_idx"].values, candidates_df["score_group_idx"].values, candidates_df["precursor_idx"].values, candidates_df["channel"].values, candidates_df["rank"].values, candidates_df["flat_frag_start_idx"].values, candidates_df["flat_frag_stop_idx"].values, candidates_df["scan_start"].values, candidates_df["scan_stop"].values, candidates_df["scan_center"].values, candidates_df["frame_start"].values, candidates_df["frame_stop"].values, candidates_df["frame_center"].values, candidates_df["charge"].values, candidates_df[self.precursor_mz_column].values, candidates_df[_get_isotope_column_names(candidates_df.columns)].values, ) return score_group_container
[docs] def assemble_fragments(self) -> FragmentContainer: """Assemble the Numba JIT compatible fragment container from a fragment dataframe. If not present, the `cardinality` column will be added to the fragment dataframe and set to 1. Then the fragment dataframe is validated using the :func:`.validate.fragments_flat` schema. Returns ------- fragment_container : fragments.FragmentContainer A Numba JIT compatible fragment container. """ # set cardinality to 1 if not present if "cardinality" not in self.fragments_flat.columns: logger.warning( "Fragment cardinality column not found in fragment dataframe. Setting cardinality to 1." ) self.fragments_flat["cardinality"] = np.ones( len(self.fragments_flat), dtype=np.uint8 ) # validate dataframe schema and prepare jitclass compatible dtypes fragments_flat_schema.validate( self.fragments_flat, warn_on_critical_values=True ) return FragmentContainer( self.fragments_flat[CalibCols.MZ_LIBRARY].values, self.fragments_flat[self.fragment_mz_column].values, self.fragments_flat["intensity"].values, self.fragments_flat["type"].values, self.fragments_flat["loss_type"].values, self.fragments_flat["charge"].values, self.fragments_flat["number"].values, self.fragments_flat["position"].values, self.fragments_flat["cardinality"].values, )
[docs] def collect_candidates( self, candidates_df: pd.DataFrame, psm_proto_df: OutputPsmDF, feature_columns: list[str] | None = None, candidate_columns: list[str] | None = None, precursor_df_columns: list[str] | None = None, ) -> pd.DataFrame: """Collect the features from the score group container and return a DataFrame. Parameters ---------- candidates_df : pd.DataFrame A DataFrame containing the features for each candidate. psm_proto_df : OutputPsmDF A Numba JIT compatible OutputPsmDF object containing the features for each candidate. feature_columns : list[str], default=None The columns to use for the features. If None, the `DEFAULT_FEATURE_COLUMNS` will be used candidate_columns : list[str], default=None The columns to use for the candidates. If None, the `DEFAULT_CANDIDATE_COLUMNS` will be used precursor_df_columns : list[str], default=None The columns to use for the precursor DataFrame. If None, the DEFAULT_PRECURSOR_COLUMNS will be used. Returns ------- candidates_psm_df : pd.DataFrame A DataFrame containing the features for each candidate. """ if feature_columns is None: feature_columns = DEFAULT_FEATURE_COLUMNS.copy() if candidate_columns is None: candidate_columns = DEFAULT_CANDIDATE_COLUMNS.copy() if precursor_df_columns is None: precursor_df_columns = DEFAULT_PRECURSOR_COLUMNS.copy() precursor_idx, rank, features = psm_proto_df.to_precursor_df() candidates_psm_df = pd.DataFrame(features, columns=feature_columns) candidates_psm_df["precursor_idx"] = precursor_idx candidates_psm_df["rank"] = rank candidates_psm_df = self.merge_candidate_data( candidates_psm_df, candidates_df, candidate_columns, ) candidates_psm_df = self.merge_precursor_data( candidates_psm_df, self.precursors_flat_df, self.rt_column, self.mobility_column, self.precursor_mz_column, precursor_df_columns, ) # calculate delta_rt candidates_psm_df["delta_rt"] = ( candidates_psm_df[CalibCols.RT_OBSERVED] - candidates_psm_df[self.rt_column] ) # calculate number of certain amino acids in sequence # TODO unused? candidates_psm_df["n_K"] = candidates_psm_df["sequence"].str.count("K") candidates_psm_df["n_R"] = candidates_psm_df["sequence"].str.count("R") candidates_psm_df["n_P"] = candidates_psm_df["sequence"].str.count("P") return candidates_psm_df
[docs] @staticmethod def merge_candidate_data( df: pd.DataFrame, candidates_df: pd.DataFrame, candidate_columns: list[str] | None = None, ): """Merge `candidate_columns` from `candidates_df` into `df`.""" if candidate_columns is None: candidate_columns = DEFAULT_CANDIDATE_COLUMNS.copy() candidate_columns += ["score"] if "score" in candidates_df.columns else [] return merge_missing_columns( df, candidates_df, candidate_columns, on=["precursor_idx", "rank"], how="left", )
[docs] @staticmethod def merge_precursor_data( df: pd.DataFrame, precursors_flat_df: pd.DataFrame, rt_column: str, mobility_column: str, precursor_mz_column: str, precursor_df_columns: list[str] | None = None, ): """Merge `rt_column`, `mobility_column`, `precursor_mz_column`, `precursor_df_columns` from `precursors_flat_df` into `df`.""" if precursor_df_columns is None: precursor_df_columns = DEFAULT_PRECURSOR_COLUMNS.copy() precursor_df_columns = precursor_df_columns + _get_isotope_column_names( precursors_flat_df.columns ) for col in [rt_column, mobility_column, precursor_mz_column]: if col not in precursor_df_columns: precursor_df_columns.append(col) return merge_missing_columns( df, precursors_flat_df, precursor_df_columns, on=["precursor_idx"], how="left", )
[docs] def collect_fragments( self, candidates_df: pd.DataFrame, psm_proto_df ) -> pd.DataFrame: """Collect the fragment-level features from the score group container and return a DataFrame. Parameters ---------- score_group_container : ScoreGroupContainer A Numba JIT compatible score group container. candidates_df : pd.DataFrame A DataFrame containing the features for each candidate. Returns ------- fragment_psm_df : pd.DataFrame A DataFrame containing the features for each fragment. """ colnames = [ "precursor_idx", "rank", CalibCols.MZ_LIBRARY, "mz", CalibCols.MZ_OBSERVED, "height", "intensity", "mass_error", "correlation", "position", "number", "type", "charge", "loss_type", ] df = pd.DataFrame( { key: value for value, key in zip( psm_proto_df.to_fragment_df(), colnames, strict=True ) } ) # join precursor columns precursor_df_columns = [ "elution_group_idx", "decoy", ] df = merge_missing_columns( df, self.precursors_flat_df, precursor_df_columns, on=["precursor_idx"], how="left", ) return df
def __call__( self, candidates_df, thread_count=10, debug=False, include_decoy_fragment_features=False, ): """Calculate features for each precursor candidate used for scoring. Parameters ---------- candidates_df : pd.DataFrame A DataFrame containing the candidates. thread_count : int, default=10 The number of threads to use for parallel processing. debug : bool, default=False Process only the first 10 elution groups and display full debug information. include_decoy_fragment_features : bool, default=False Include fragment features for decoy candidates. Returns ------- candidate_features_df : pd.DataFrame A DataFrame containing the features for each candidate. fragment_features_df : pd.DataFrame A DataFrame containing the features for each fragment. """ logger.info("Starting candidate scoring") fragment_container = self.assemble_fragments() candidates_schema.validate(candidates_df, warn_on_critical_values=True) score_group_container = self.assemble_score_group_container(candidates_df) n_candidates = score_group_container.get_candidate_count() psm_proto_df = OutputPsmDF(n_candidates, self.config.top_k_fragments) iterator_len = len(score_group_container) if debug: logger.info("Debug mode enabled. Processing only the first 10 score groups") thread_count = 1 iterator_len = min(10, iterator_len) set_pjit_threads(thread_count) _process_score_groups( range(iterator_len), # type: ignore # noqa: PGH003 # function is wrapped by pjit -> will be turned into single index and passed to the method score_group_container, psm_proto_df, fragment_container, self.dia_data.to_jitclass(), self.config.to_jitclass(), self.quadrupole_calibration.jit, debug, ) logger.info("Finished candidate processing") logger.info("Collecting candidate features") candidate_features_df = self.collect_candidates(candidates_df, psm_proto_df) features_schema.validate(candidate_features_df, warn_on_critical_values=True) logger.info("Collecting fragment features") fragment_features_df = self.collect_fragments(candidates_df, psm_proto_df) fragment_features_schema.validate( fragment_features_df, warn_on_critical_values=True ) logger.info("Finished candidate scoring") del score_group_container del fragment_container return candidate_features_df, fragment_features_df