import logging
import os
from typing import Literal
import pandas as pd
from alphabase.peptide import precursor
from alphadia.constants.keys import (
INTERNAL_TO_OUTPUT_MAPPING,
InferenceStrategy,
)
from alphadia.outputtransform import grouping
from alphadia.workflow.config import MULTIPLEXING_CHANNELS_DELIM
logger = logging.getLogger()
supported_formats = ["parquet", "tsv"]
[docs]
def read_df(path_no_format, file_format="parquet"):
"""Read dataframe from disk with choosen file format
Parameters
----------
path_no_format: str
File to read from disk without file format
file_format: str, default = 'parquet'
File format for loading the file. Available options: ['parquet', 'tsv']
Returns
-------
pd.DataFrame
loaded dataframe from disk
"""
file_path = f"{path_no_format}.{file_format}"
if not os.path.exists(file_path):
raise FileNotFoundError(f"Can't load file as file was not found: {file_path}")
logger.info(f"Reading {file_path} from disk")
if file_format == "parquet":
return pd.read_parquet(file_path)
elif file_format == "tsv":
return pd.read_csv(file_path, sep="\t")
else:
raise ValueError(
f"Provided unknown file format: {file_format}, supported_formats: {supported_formats}"
)
[docs]
def apply_output_column_names(df: pd.DataFrame) -> pd.DataFrame:
"""Convert internal column names to output names and filter to only mapped columns.
Only columns that are present in INTERNAL_TO_OUTPUT_MAPPING are kept in the output.
This ensures that output files only contain the defined output columns.
Parameters
----------
df : pd.DataFrame
Dataframe with internal column names
Returns
-------
pd.DataFrame
Dataframe with output column names applied, containing only mapped columns
"""
# Get output column names (values from the mapping)
output_columns = set(INTERNAL_TO_OUTPUT_MAPPING.values())
# Rename columns according to mapping
df_renamed = df.rename(columns=INTERNAL_TO_OUTPUT_MAPPING)
# Filter to only keep columns that are in the output mapping
columns_to_keep = [col for col in df_renamed.columns if col in output_columns]
return df_renamed[columns_to_keep]
[docs]
def write_df(
df: pd.DataFrame, path_no_format: str, file_format: str = "parquet"
) -> None:
"""Write dataframe from disk with chosen file format.
Parameters
----------
df: pd.DataFrame
Dataframe to save to disk
path_no_format: str
Path for file without format
file_format: str, default = 'parquet'
File format for loading the file. Available options: ['parquet', 'tsv']
"""
if file_format not in supported_formats:
raise ValueError(
f"Provided unknown file format: {file_format}, supported_formats: {supported_formats}"
)
file_path = f"{path_no_format}.{file_format}"
logger.info(f"Saving {file_path} to disk")
if file_format == "parquet":
df.to_parquet(file_path, index=False)
elif file_format == "tsv":
df.to_csv(file_path, sep="\t", index=False, float_format="%.6f")
[docs]
def merge_quant_levels_to_psm(
psm_df: pd.DataFrame,
lfq_results: dict[str, pd.DataFrame],
quantlevel_configs: list,
) -> pd.DataFrame:
"""Merge quantification results from all levels back to the precursor table.
Parameters
----------
psm_df : pd.DataFrame
Precursor table to merge quantification data into
lfq_results : dict[str, pd.DataFrame]
Dictionary containing quantification results for each level
quantlevel_configs : list
List of LFQOutputConfig objects defining quantification levels
Returns
-------
pd.DataFrame
Updated precursor table with merged quantification data
"""
for config in quantlevel_configs:
lfq_df = lfq_results.get(config.level_name)
if lfq_df is None or lfq_df.empty:
continue
intensity_column = config.intensity_column
melted_df = lfq_df.melt(
id_vars=config.quant_level, var_name="run", value_name=intensity_column
)
psm_df = psm_df.merge(melted_df, on=[config.quant_level, "run"], how="left")
return psm_df
[docs]
def log_protein_fdr_summary(psm_df: pd.DataFrame) -> None:
"""Log summary statistics for protein FDR results.
Parameters
----------
psm_df : pd.DataFrame
Precursor table with protein grouping and FDR filtering applied
"""
pg_count = psm_df[psm_df["decoy"] == 0]["pg"].nunique()
precursor_count = psm_df[psm_df["decoy"] == 0]["precursor_idx"].nunique()
logger.info(
"================ Protein FDR =================",
)
logger.info("Unique protein groups in output")
logger.info(f" 1% protein FDR: {pg_count:,}")
logger.info("")
logger.info("Unique precursor in output")
logger.info(f" 1% protein FDR: {precursor_count:,}")
logger.info(
"================================================",
)
[docs]
def load_psm_files_from_folders(
folder_list: list[str], psm_file_name: str
) -> list[pd.DataFrame]:
"""Load PSM files from multiple folders.
Parameters
----------
folder_list : list[str]
List of folders containing PSM files
psm_file_name : str
Name of the PSM file (without extension)
Returns
-------
list[pd.DataFrame]
List of PSM dataframes from all folders
"""
psm_df_list = []
for folder in folder_list:
raw_name = os.path.basename(folder)
psm_path = os.path.join(folder, f"{psm_file_name}.parquet")
logger.info(f"Building output for {raw_name}")
if not os.path.exists(psm_path):
logger.warning(f"no psm file found for {raw_name}, skipping")
else:
try:
run_df = pd.read_parquet(psm_path)
psm_df_list.append(run_df)
except Exception as e:
logger.warning(f"Error reading psm file for {raw_name}")
logger.warning(e)
return psm_df_list
# TODO: remove this function in the future, shouldn't be necessary if well typed & tested
[docs]
def prepare_psm_dataframe(psm_df: pd.DataFrame) -> pd.DataFrame:
"""Prepare PSM dataframe by cleaning modification columns and hashing precursors.
Parameters
----------
psm_df : pd.DataFrame
Raw PSM dataframe
Returns
-------
pd.DataFrame
Prepared PSM dataframe with hashed precursor information
"""
psm_df["mods"] = psm_df["mods"].fillna("")
psm_df["mods"] = psm_df["mods"].astype(str)
psm_df["mod_sites"] = psm_df["mod_sites"].fillna("")
psm_df["mod_sites"] = psm_df["mod_sites"].astype(str)
psm_df = precursor.hash_precursor_df(psm_df)
return psm_df
[docs]
def apply_protein_inference(
psm_df: pd.DataFrame,
inference_strategy: Literal["library", "maximum_parsimony", "heuristic"],
group_level: str,
) -> pd.DataFrame:
"""Apply protein inference strategy to PSM dataframe.
Parameters
----------
psm_df : pd.DataFrame
PSM dataframe
inference_strategy : Literal["library", "maximum_parsimony", "heuristic"]
Inference strategy: 'library', 'maximum_parsimony', or 'heuristic'
group_level : str
Grouping level: 'proteins' or 'genes'
Returns
-------
pd.DataFrame
PSM dataframe with protein grouping applied
"""
if inference_strategy == InferenceStrategy.LIBRARY:
logger.info(
"Inference strategy: library. Using library grouping for protein inference"
)
psm_df["pg"] = psm_df[group_level]
psm_df["pg_master"] = psm_df[group_level]
elif inference_strategy == InferenceStrategy.MAXIMUM_PARSIMONY:
logger.info(
"Inference strategy: maximum_parsimony. Using maximum parsimony for protein inference"
)
psm_df = grouping.perform_grouping(
psm_df, genes_or_proteins=group_level, group=False
)
elif inference_strategy == InferenceStrategy.HEURISTIC:
logger.info(
"Inference strategy: heuristic. Using maximum parsimony with grouping for protein inference"
)
psm_df = grouping.perform_grouping(
psm_df, genes_or_proteins=group_level, group=True
)
else:
raise ValueError(
f"Unknown inference strategy: {inference_strategy}. Valid options are {InferenceStrategy.get_values()}"
)
return psm_df
[docs]
def get_channels_from_config(config: dict) -> list[int]:
"""Extract and compute channel list from configuration.
Parameters
----------
config : dict
Configuration dictionary containing search and multiplexing settings
Returns
-------
list[int]
Sorted list of channel integers
"""
if config["search"]["channel_filter"] == "":
all_channels = {0}
else:
all_channels = set(
config["search"]["channel_filter"].split(MULTIPLEXING_CHANNELS_DELIM)
)
if config["multiplexing"]["enabled"]:
all_channels &= set(
config["multiplexing"]["target_channels"].split(MULTIPLEXING_CHANNELS_DELIM)
)
all_channels = sorted([int(c) for c in all_channels])
return all_channels