From 4fc969d3740d5bf016f7a4587dff46e64904c721 Mon Sep 17 00:00:00 2001 From: WangHong007 <88552471+WangHong007@users.noreply.github.com> Date: Mon, 16 Oct 2023 10:18:39 +0800 Subject: [PATCH] Enable parquet streaming --- bin/peptide_normalization.py | 53 +++++--- bin/peptide_normalization_stream.py | 192 +++++++++++++++++----------- 2 files changed, 155 insertions(+), 90 deletions(-) diff --git a/bin/peptide_normalization.py b/bin/peptide_normalization.py index 0e1320e..3394f25 100644 --- a/bin/peptide_normalization.py +++ b/bin/peptide_normalization.py @@ -9,20 +9,43 @@ from matplotlib.backends.backend_pdf import PdfPages from pandas import DataFrame -from ibaq.ibaqpy_commons import (BIOREPLICATE, CHANNEL, CONDITION, - FEATURE_COLUMNS, FRACTION, FRAGMENT_ION, - INTENSITY, ISOTOPE_LABEL_TYPE, NORM_INTENSITY, - PEPTIDE_CANONICAL, PEPTIDE_CHARGE, - PEPTIDE_SEQUENCE, PROTEIN_NAME, REFERENCE, - RUN, SAMPLE_ID, SEARCH_ENGINE, STUDY_ID, - ITRAQ4plex, ITRAQ8plex, TMT6plex, TMT10plex, - TMT11plex, TMT16plex, get_canonical_peptide, - get_reference_name, get_study_accession, - parquet_map, parse_uniprot_accession, - plot_box_plot, plot_distributions, - remove_contaminants_decoys, - remove_extension_file, remove_protein_by_ids, - sum_peptidoform_intensities) +from ibaq.ibaqpy_commons import ( + BIOREPLICATE, + CHANNEL, + CONDITION, + FEATURE_COLUMNS, + FRACTION, + FRAGMENT_ION, + INTENSITY, + ISOTOPE_LABEL_TYPE, + NORM_INTENSITY, + PEPTIDE_CANONICAL, + PEPTIDE_CHARGE, + PEPTIDE_SEQUENCE, + PROTEIN_NAME, + REFERENCE, + RUN, + SAMPLE_ID, + SEARCH_ENGINE, + STUDY_ID, + ITRAQ4plex, + ITRAQ8plex, + TMT6plex, + TMT10plex, + TMT11plex, + TMT16plex, + get_canonical_peptide, + get_reference_name, + get_study_accession, + parquet_map, + parse_uniprot_accession, + plot_box_plot, + plot_distributions, + remove_contaminants_decoys, + remove_extension_file, + remove_protein_by_ids, + sum_peptidoform_intensities, +) def print_dataset_size(dataset: DataFrame, message: str, verbose: bool) -> None: @@ -318,7 +341,7 @@ def impute_peptide_intensities(dataset_df, field, class_field): "-m", "--msstats", help="MsStats file import generated by quantms", default=None ) @click.option( - "-p", "--parquet", help="Parquet file import generated by quantms", default=None + "-p", "--parquet", help="Parquet file import generated by quantms.io", default=None ) @click.option( "-s", "--sdrf", help="SDRF file import generated by quantms", default=None diff --git a/bin/peptide_normalization_stream.py b/bin/peptide_normalization_stream.py index 7fda244..8c291ef 100644 --- a/bin/peptide_normalization_stream.py +++ b/bin/peptide_normalization_stream.py @@ -4,10 +4,17 @@ import random from matplotlib.backends.backend_pdf import PdfPages - +import pyarrow.parquet as pq from ibaq.ibaqpy_commons import * +def read_large_parquet(parquet_path: str, batch_size: int = 100000): + parquet_file = pq.ParquetFile(parquet_path) + for batch in parquet_file.iter_batches(batch_size=batch_size): + batch_df = batch.to_pandas() + yield batch_df + + def parse_uniprot_accession(uniprot_id: str) -> str: """ Parse the uniprot accession from the uniprot id in the form of @@ -106,8 +113,68 @@ def get_reference_name(reference_spectrum: str) -> str: return re.split(r"\.mzML|\.MZML|\.raw|\.RAW", reference_spectrum)[0] +def extract_label_from_sdrf(sdrf_path: str, compression: bool) -> tuple: + sdrf_df = pd.read_csv(sdrf_path, sep="\t", compression=compression) + sdrf_df[REFERENCE] = sdrf_df["comment[data file]"].apply(remove_extension_file) + + # Determine label type + labels = set(sdrf_df["comment[label]"]) + choice = None + if len(labels) == 1: + label = "LFQ" + elif "TMT" in ",".join(labels) or "tmt" in ",".join(labels): + if ( + len(labels) > 11 + or "TMT134N" in labels + or "TMT133C" in labels + or "TMT133N" in labels + or "TMT132C" in labels + or "TMT132N" in labels + ): + choice = TMT16plex + elif len(labels) == 11 or "TMT131C" in labels: + choice = TMT11plex + elif len(labels) > 6: + choice = TMT10plex + else: + choice = TMT6plex + choice_df = ( + pd.DataFrame.from_dict(choice, orient="index", columns=[CHANNEL]) + .reset_index() + .rename(columns={"index": "comment[label]"}) + ) + sdrf_df = sdrf_df.merge(choice_df, on="comment[label]", how="left") + label = "TMT" + elif "ITRAQ" in ",".join(labels) or "itraq" in ",".join(labels): + if len(labels) > 4: + choice = ITRAQ8plex + else: + choice = ITRAQ4plex + choice_df = ( + pd.DataFrame.from_dict(choice, orient="index", columns=[CHANNEL]) + .reset_index() + .rename(columns={"index": "comment[label]"}) + ) + sdrf_df = sdrf_df.merge(choice_df, on="comment[label]", how="left") + label = "ITRAQ" + else: + print("Warning: Only support label free, TMT and ITRAQ experiment!") + exit(1) + sample_names = sdrf_df["source name"].unique().tolist() + + return sdrf_df, label, sample_names, choice + + @click.command() -@click.option("-m", "--msstats", help="MsStats file import generated by quantms") +@click.option( + "-m", "--msstats", help="MsStats file import generated by quantms", default=None +) +@click.option( + "-p", + "--parquet", + help="Feature parquet import generated by quantms.io", + default=None, +) @click.option( "-s", "--sdrf", help="SDRF file import generated by quantms", required=True ) @@ -115,7 +182,7 @@ def get_reference_name(reference_spectrum: str) -> str: @click.option( "--chunksize", help="The number of rows of MSstats read using pandas streaming", - default=10 * 1024 * 1024, + default=1000000, ) @click.option( "--min_aa", help="Minimum number of amino acids to filter peptides", default=7 @@ -175,6 +242,7 @@ def get_reference_name(reference_spectrum: str) -> str: ) def peptide_normalization( msstats: str, + parquet: str, sdrf: str, remove_ids: str, remove_decoy_contaminants: bool, @@ -208,93 +276,60 @@ def peptide_normalization( :return: """ - if msstats is None or output is None: + if output is None: print_help_msg(peptide_normalization) exit(1) pd.set_option("display.max_columns", None) print("Loading data..") compression_method = "gzip" if compress else None + sdrf_df, label, sample_names, choice = extract_label_from_sdrf( + sdrf, compression_method + ) - # Read the sdrf file - sdrf_df = pd.read_csv(sdrf, sep="\t", compression=compression_method) - sdrf_df[REFERENCE] = sdrf_df["comment[data file]"].apply(remove_extension_file) - print(sdrf_df) - - # Determine label type - labels = set(sdrf_df["comment[label]"]) - with open(msstats, "r") as file: - header_row = file.readline().strip().split(",") - msstats_columns = header_row[1:] - if CHANNEL not in msstats_columns: - label = "LFQ" - elif "TMT" in ",".join(labels) or "tmt" in ",".join(labels): - if ( - len(labels) > 11 - or "TMT134N" in labels - or "TMT133C" in labels - or "TMT133N" in labels - or "TMT132C" in labels - or "TMT132N" in labels - ): - choice = TMT16plex - elif len(labels) == 11 or "TMT131C" in labels: - choice = TMT11plex - elif len(labels) > 6: - choice = TMT10plex - else: - choice = TMT6plex - choice = ( - pd.DataFrame.from_dict(choice, orient="index", columns=[CHANNEL]) - .reset_index() - .rename(columns={"index": "comment[label]"}) + if parquet is None: + msstats_chunks = pd.read_csv( + msstats, + sep=",", + compression=compression_method, + dtype={CONDITION: "category", ISOTOPE_LABEL_TYPE: "category"}, + chunksize=chunksize, ) - sdrf_df = sdrf_df.merge(choice, on="comment[label]", how="left") - label = "TMT" - elif "ITRAQ" in ",".join(labels) or "itraq" in ",".join(labels): - if len(labels) > 4: - choice = ITRAQ8plex - else: - choice = ITRAQ4plex - choice = ( - pd.DataFrame.from_dict(choice, orient="index", columns=[CHANNEL]) - .reset_index() - .rename(columns={"index": "comment[label]"}) - ) - sdrf_df = sdrf_df.merge(choice, on="comment[label]", how="left") - label = "ITRAQ" else: - print("Warning: Only support label free, TMT and ITRAQ experiment!") - exit(1) - sample_names = sdrf_df["source name"].unique().tolist() + msstats_chunks = read_large_parquet(parquet, batch_size=chunksize) # TODO: Stream processing to obtain strong proteins with more than 2 uniqe peptides print("IBAQPY WARNING: Writing files into ibaqpy_temp...") if not os.path.exists("ibaqpy_temp/"): os.mkdir("ibaqpy_temp/") - msstats_chunks = pd.read_csv( - msstats, - sep=",", - compression=compression_method, - dtype={CONDITION: "category", ISOTOPE_LABEL_TYPE: "category"}, - chunksize=chunksize, - ) unique_peptides = {} canonical_dict = {} group_intensities = {} quantile = {} for msstats_df in msstats_chunks: - msstats_df.rename( - columns={ - "ProteinName": PROTEIN_NAME, - "PeptideSequence": PEPTIDE_SEQUENCE, - "PrecursorCharge": PEPTIDE_CHARGE, - "Run": RUN, - "Condition": CONDITION, - "Intensity": INTENSITY, - }, - inplace=True, - ) + if parquet is None: + msstats_df.rename( + columns={ + "ProteinName": PROTEIN_NAME, + "PeptideSequence": PEPTIDE_SEQUENCE, + "PrecursorCharge": PEPTIDE_CHARGE, + "Run": RUN, + "Condition": CONDITION, + "Intensity": INTENSITY, + }, + inplace=True, + ) + else: + msstats_df = msstats_df[FEATURE_COLUMNS] + msstats_df = msstats_df.rename(columns=parquet_map) + msstats_df[PROTEIN_NAME] = msstats_df.apply( + lambda x: ",".join(x[PROTEIN_NAME]), axis=1 + ) + if label == "LFQ": + msstats_df.drop(CHANNEL, inplace=True, axis=1) + else: + msstats_df[CHANNEL] = msstats_df[CHANNEL].map(choice) + msstats_df = msstats_df[msstats_df["Condition"] != "Empty"] msstats_df = msstats_df[msstats_df[INTENSITY] > 0] if PEPTIDE_CANONICAL not in msstats_df.columns: modified_seqs = msstats_df[PEPTIDE_SEQUENCE].unique().tolist() @@ -359,9 +394,11 @@ def peptide_normalization( ) result_df = result_df[result_df["Condition"] != "Empty"] result_df.rename(columns={"Charge": PEPTIDE_CHARGE}, inplace=True) - - result_df.rename(columns={"source name": SAMPLE_ID}, inplace=True) - result_df[STUDY_ID] = result_df[SAMPLE_ID].apply(get_study_accession) + if parquet is None: + result_df.rename(columns={"source name": SAMPLE_ID}, inplace=True) + else: + result_df.drop("source name", inplace=True, axis=1) + result_df[STUDY_ID] = result_df[SAMPLE_ID].str.split("-").str[0] # Write CSVs by Sample ID for sample in sample_names: @@ -605,7 +642,12 @@ def normalization(dataset_df, label, sample, skip_normalization, nmethod): } for sample in sample_names: dataset_df = pd.read_csv(f"ibaqpy_temp/{sample}.csv", sep=",") - norm_df = normalization(dataset_df, label, sample, skip_normalization, nmethod) + if len(dataset_df) != 0: + norm_df = normalization( + dataset_df, label, sample, skip_normalization, nmethod + ) + else: + continue sample_peptides = norm_df[PEPTIDE_CANONICAL].unique().tolist() if remove_low_frequency_peptides: peptides_count = {