"""Module performing False Discovery Rate (FDR) control."""
from __future__ import annotations
import logging
from typing import TYPE_CHECKING
import numpy as np
import pandas as pd
from alphadia.exceptions import TooFewPSMError
from alphadia.fdr.plotting import plot_fdr
from alphadia.fdr.utils import manage_torch_threads, train_test_split_
from alphadia.fragcomp.fragcomp import FragmentCompetition
if TYPE_CHECKING:
from alphadia.fdr.classifiers import Classifier
max_dia_cycle_shape = 2
logger = logging.getLogger()
@manage_torch_threads(max_threads=2)
def perform_fdr( # noqa: C901, PLR0913, PLR0915 # too complex, Too many statements, too many arguments
classifier: Classifier,
available_columns: list[str],
df_target: pd.DataFrame,
df_decoy: pd.DataFrame,
*,
competitive: bool = False,
group_channels: bool = True,
figure_path: str | None = None,
df_fragments: pd.DataFrame | None = None,
dia_cycle: np.ndarray | None = None,
fdr_heuristic: float = 0.1,
random_state: int | None = None,
) -> pd.DataFrame:
"""Performs FDR calculation on a dataframe of PSMs.
Currently, it does not scale above 2 threads also for large problems, so thread number is limited to 2.
Parameters
----------
classifier : Classifier
A classifier that implements the fit and predict_proba methods
available_columns : list[str]
A list of column names that are available for the classifier
df_target : pd.DataFrame
A dataframe of target PSMs
df_decoy : pd.DataFrame
A dataframe of decoy PSMs
competitive : bool
Whether to perform competitive FDR calculation where only the highest scoring PSM in a target-decoy pair is retained
group_channels : bool
Whether to group PSMs by channel before performing competitive FDR calculation
figure_path : str, default=None
The path to save the FDR plot to
df_fragments : pd.DataFrame, default=None
The fragment dataframe.
dia_cycle : np.ndarray, default=None
The DIA cycle. Required if df_fragments is provided.
fdr_heuristic : float, default=0.1
The FDR heuristic to use for the initial selection of PSMs before fragment competition
random_state : int, optional
The random state for train-test split reproducibility.
Returns
-------
psm_df : pd.DataFrame
A dataframe of PSMs with q-values and probabilities.
The columns `qval` and `proba` are added to the input dataframes.
"""
target_len, decoy_len = len(df_target), len(df_decoy)
df_target.dropna(subset=available_columns, inplace=True)
df_decoy.dropna(subset=available_columns, inplace=True)
target_dropped, decoy_dropped = (
target_len - len(df_target),
decoy_len - len(df_decoy),
)
if target_dropped > 0:
logger.warning(f"dropped {target_dropped} target PSMs due to missing features")
if decoy_dropped > 0:
logger.warning(f"dropped {decoy_dropped} decoy PSMs due to missing features")
if (
np.abs(len(df_target) - len(df_decoy)) / ((len(df_target) + len(df_decoy)) / 2)
> 0.1 # noqa: PLR2004
):
logger.warning(
f"FDR calculation for {len(df_target)} target and {len(df_decoy)} decoy PSMs"
)
logger.warning(
"FDR calculation may be inaccurate as there is more than 10% difference in the number of target and decoy PSMs"
)
if random_state is not None:
logger.info(f"Using random state {random_state} for FDR calculation")
X_target = df_target[available_columns].to_numpy()
X_decoy = df_decoy[available_columns].to_numpy()
y_target = np.zeros(len(X_target))
y_decoy = np.ones(len(X_decoy))
X = np.concatenate([X_target, X_decoy])
y = np.concatenate([y_target, y_decoy])
try:
X_train, X_test, y_train, y_test, idxs_train, idxs_test = train_test_split_(
X, y, test_size=0.2, random_state=random_state
)
except TooFewPSMError:
logger.warning(
"Too few PSMs for FDR classification, assigning qval=1.0 and proba=1.0 to all PSMs."
)
psm_df = pd.concat([df_target, df_decoy])
psm_df["qval"] = 1.0
psm_df["proba"] = 1.0
return psm_df
classifier.fit(X_train, y_train)
psm_df = pd.concat([df_target, df_decoy])
psm_df["_decoy"] = y
if competitive:
group_columns = (
["elution_group_idx", "channel"]
if group_channels
else ["elution_group_idx"]
)
else:
group_columns = ["precursor_idx"]
predicted_proba = classifier.predict_proba(X)[:, 1]
psm_df["proba"] = predicted_proba
psm_df.sort_values(
["proba", "precursor_idx"], ascending=True, inplace=True
) # last sort to break ties
psm_df = get_q_values(psm_df, "proba", "_decoy")
if dia_cycle is not None and dia_cycle.shape[2] <= max_dia_cycle_shape:
# use a FDR of 10% as starting point
# if there are no PSMs with a FDR < 10% use all PSMs
start_idx = psm_df["qval"].searchsorted(fdr_heuristic, side="left")
if start_idx == 0:
start_idx = len(psm_df)
# make sure fragments are not reused
if df_fragments is not None:
if dia_cycle is None:
raise ValueError(
"dia_cycle must be provided if df_fragments is provided"
)
fragment_competition = FragmentCompetition()
psm_df = fragment_competition(
psm_df.iloc[:start_idx], df_fragments, dia_cycle
)
psm_df = keep_best(psm_df, group_columns=group_columns)
psm_df = get_q_values(psm_df, "proba", "_decoy")
if figure_path is not None:
plot_fdr(
y_train,
y_test,
predicted_proba[idxs_train],
predicted_proba[idxs_test],
psm_df["qval"],
figure_path=figure_path,
)
return psm_df
[docs]
def keep_best(
df: pd.DataFrame,
score_column: str = "proba",
group_columns: list[str] | None = None,
) -> pd.DataFrame:
"""Keep the best PSM for each group of PSMs with the same precursor_idx.
This function is used to select the best candidate PSM for each precursor.
if the group_columns is set to ['channel', 'elution_group_idx'] then its used for target decoy competition.
Parameters
----------
df : pd.DataFrame
The dataframe containing the PSMs.
score_column : str
The name of the column containing the score to use for the selection.
group_columns : list[str], default=['channel', 'precursor_idx']
The columns to use for the grouping.
Returns
-------
pd.DataFrame
The dataframe containing the best PSM for each group.
"""
if group_columns is None:
group_columns = ["channel", "precursor_idx"]
df = df.reset_index(drop=True)
df = df.sort_values(
[score_column, *group_columns], ascending=True
) # last sort to break ties
df = df.groupby(group_columns).head(1)
return df.sort_index().reset_index(drop=True)
def _fdr_to_q_values(fdr_values: np.ndarray) -> np.ndarray:
"""Converts FDR values to q-values.
Takes a ascending sorted array of FDR values and converts them to q-values.
for every element the lowest FDR where it would be accepted is used as q-value.
Parameters
----------
fdr_values : np.ndarray
The FDR values to convert.
Returns
-------
np.ndarray
The q-values.
"""
fdr_values_flipped = np.flip(fdr_values)
q_values_flipped = np.minimum.accumulate(fdr_values_flipped)
return np.flip(q_values_flipped)
[docs]
def get_q_values(
df: pd.DataFrame,
score_column: str = "proba",
decoy_column: str = "_decoy",
qval_column: str = "qval",
extra_sort_columns: list[str] | None = None,
) -> pd.DataFrame:
"""Calculates q-values for a dataframe containing PSMs.
Parameters
----------
df : pd.DataFrame
The dataframe containing the PSMs.
score_column : str, default='proba'
The name of the column containing the score to use for the selection.
Ascending sorted values are expected.
decoy_column : str, default='_decoy'
The name of the column containing the decoy information.
Decoys are expected to be 1 and targets 0.
qval_column : str, default='qval'
The name of the column to store the q-values in.
extra_sort_columns : list[str], default=['precursor_idx']
Additional columns to sort by after score_column and decoy_column to break ties.
Returns
-------
pd.DataFrame
The dataframe containing the q-values in column qval.
"""
if extra_sort_columns is None:
extra_sort_columns = ["precursor_idx"]
df = df.sort_values(
[score_column, decoy_column, *extra_sort_columns], ascending=True
) # last sort to break ties
target_values = 1 - df[decoy_column].to_numpy()
decoy_cumsum = np.cumsum(df[decoy_column].to_numpy())
target_cumsum = np.cumsum(target_values)
fdr_values = (
decoy_cumsum / target_cumsum
) # TODO: RuntimeWarning: divide by zero encountered in divide
df[qval_column] = _fdr_to_q_values(fdr_values)
return df