Skip to content

Commit

Permalink
Enable parquet streaming
Browse files Browse the repository at this point in the history
  • Loading branch information
WangHong007 committed Oct 16, 2023
1 parent 3bf2089 commit 4fc969d
Show file tree
Hide file tree
Showing 2 changed files with 155 additions and 90 deletions.
53 changes: 38 additions & 15 deletions bin/peptide_normalization.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,43 @@
from matplotlib.backends.backend_pdf import PdfPages
from pandas import DataFrame

from ibaq.ibaqpy_commons import (BIOREPLICATE, CHANNEL, CONDITION,
FEATURE_COLUMNS, FRACTION, FRAGMENT_ION,
INTENSITY, ISOTOPE_LABEL_TYPE, NORM_INTENSITY,
PEPTIDE_CANONICAL, PEPTIDE_CHARGE,
PEPTIDE_SEQUENCE, PROTEIN_NAME, REFERENCE,
RUN, SAMPLE_ID, SEARCH_ENGINE, STUDY_ID,
ITRAQ4plex, ITRAQ8plex, TMT6plex, TMT10plex,
TMT11plex, TMT16plex, get_canonical_peptide,
get_reference_name, get_study_accession,
parquet_map, parse_uniprot_accession,
plot_box_plot, plot_distributions,
remove_contaminants_decoys,
remove_extension_file, remove_protein_by_ids,
sum_peptidoform_intensities)
from ibaq.ibaqpy_commons import (
BIOREPLICATE,
CHANNEL,
CONDITION,
FEATURE_COLUMNS,
FRACTION,
FRAGMENT_ION,
INTENSITY,
ISOTOPE_LABEL_TYPE,
NORM_INTENSITY,
PEPTIDE_CANONICAL,
PEPTIDE_CHARGE,
PEPTIDE_SEQUENCE,
PROTEIN_NAME,
REFERENCE,
RUN,
SAMPLE_ID,
SEARCH_ENGINE,
STUDY_ID,
ITRAQ4plex,
ITRAQ8plex,
TMT6plex,
TMT10plex,
TMT11plex,
TMT16plex,
get_canonical_peptide,
get_reference_name,
get_study_accession,
parquet_map,
parse_uniprot_accession,
plot_box_plot,
plot_distributions,
remove_contaminants_decoys,
remove_extension_file,
remove_protein_by_ids,
sum_peptidoform_intensities,
)


def print_dataset_size(dataset: DataFrame, message: str, verbose: bool) -> None:
Expand Down Expand Up @@ -318,7 +341,7 @@ def impute_peptide_intensities(dataset_df, field, class_field):
"-m", "--msstats", help="MsStats file import generated by quantms", default=None
)
@click.option(
"-p", "--parquet", help="Parquet file import generated by quantms", default=None
"-p", "--parquet", help="Parquet file import generated by quantms.io", default=None
)
@click.option(
"-s", "--sdrf", help="SDRF file import generated by quantms", default=None
Expand Down
192 changes: 117 additions & 75 deletions bin/peptide_normalization_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,17 @@
import random

from matplotlib.backends.backend_pdf import PdfPages

import pyarrow.parquet as pq
from ibaq.ibaqpy_commons import *


def read_large_parquet(parquet_path: str, batch_size: int = 100000):
parquet_file = pq.ParquetFile(parquet_path)
for batch in parquet_file.iter_batches(batch_size=batch_size):
batch_df = batch.to_pandas()
yield batch_df


def parse_uniprot_accession(uniprot_id: str) -> str:
"""
Parse the uniprot accession from the uniprot id in the form of
Expand Down Expand Up @@ -106,16 +113,76 @@ def get_reference_name(reference_spectrum: str) -> str:
return re.split(r"\.mzML|\.MZML|\.raw|\.RAW", reference_spectrum)[0]


def extract_label_from_sdrf(sdrf_path: str, compression: bool) -> tuple:
sdrf_df = pd.read_csv(sdrf_path, sep="\t", compression=compression)
sdrf_df[REFERENCE] = sdrf_df["comment[data file]"].apply(remove_extension_file)

# Determine label type
labels = set(sdrf_df["comment[label]"])
choice = None
if len(labels) == 1:
label = "LFQ"
elif "TMT" in ",".join(labels) or "tmt" in ",".join(labels):
if (
len(labels) > 11
or "TMT134N" in labels
or "TMT133C" in labels
or "TMT133N" in labels
or "TMT132C" in labels
or "TMT132N" in labels
):
choice = TMT16plex
elif len(labels) == 11 or "TMT131C" in labels:
choice = TMT11plex
elif len(labels) > 6:
choice = TMT10plex
else:
choice = TMT6plex
choice_df = (
pd.DataFrame.from_dict(choice, orient="index", columns=[CHANNEL])
.reset_index()
.rename(columns={"index": "comment[label]"})
)
sdrf_df = sdrf_df.merge(choice_df, on="comment[label]", how="left")
label = "TMT"
elif "ITRAQ" in ",".join(labels) or "itraq" in ",".join(labels):
if len(labels) > 4:
choice = ITRAQ8plex
else:
choice = ITRAQ4plex
choice_df = (
pd.DataFrame.from_dict(choice, orient="index", columns=[CHANNEL])
.reset_index()
.rename(columns={"index": "comment[label]"})
)
sdrf_df = sdrf_df.merge(choice_df, on="comment[label]", how="left")
label = "ITRAQ"
else:
print("Warning: Only support label free, TMT and ITRAQ experiment!")
exit(1)
sample_names = sdrf_df["source name"].unique().tolist()

return sdrf_df, label, sample_names, choice


@click.command()
@click.option("-m", "--msstats", help="MsStats file import generated by quantms")
@click.option(
"-m", "--msstats", help="MsStats file import generated by quantms", default=None
)
@click.option(
"-p",
"--parquet",
help="Feature parquet import generated by quantms.io",
default=None,
)
@click.option(
"-s", "--sdrf", help="SDRF file import generated by quantms", required=True
)
@click.option("--compress", help="Read all files compress", is_flag=True)
@click.option(
"--chunksize",
help="The number of rows of MSstats read using pandas streaming",
default=10 * 1024 * 1024,
default=1000000,
)
@click.option(
"--min_aa", help="Minimum number of amino acids to filter peptides", default=7
Expand Down Expand Up @@ -175,6 +242,7 @@ def get_reference_name(reference_spectrum: str) -> str:
)
def peptide_normalization(
msstats: str,
parquet: str,
sdrf: str,
remove_ids: str,
remove_decoy_contaminants: bool,
Expand Down Expand Up @@ -208,93 +276,60 @@ def peptide_normalization(
:return:
"""

if msstats is None or output is None:
if output is None:
print_help_msg(peptide_normalization)
exit(1)

pd.set_option("display.max_columns", None)
print("Loading data..")
compression_method = "gzip" if compress else None
sdrf_df, label, sample_names, choice = extract_label_from_sdrf(
sdrf, compression_method
)

# Read the sdrf file
sdrf_df = pd.read_csv(sdrf, sep="\t", compression=compression_method)
sdrf_df[REFERENCE] = sdrf_df["comment[data file]"].apply(remove_extension_file)
print(sdrf_df)

# Determine label type
labels = set(sdrf_df["comment[label]"])
with open(msstats, "r") as file:
header_row = file.readline().strip().split(",")
msstats_columns = header_row[1:]
if CHANNEL not in msstats_columns:
label = "LFQ"
elif "TMT" in ",".join(labels) or "tmt" in ",".join(labels):
if (
len(labels) > 11
or "TMT134N" in labels
or "TMT133C" in labels
or "TMT133N" in labels
or "TMT132C" in labels
or "TMT132N" in labels
):
choice = TMT16plex
elif len(labels) == 11 or "TMT131C" in labels:
choice = TMT11plex
elif len(labels) > 6:
choice = TMT10plex
else:
choice = TMT6plex
choice = (
pd.DataFrame.from_dict(choice, orient="index", columns=[CHANNEL])
.reset_index()
.rename(columns={"index": "comment[label]"})
if parquet is None:
msstats_chunks = pd.read_csv(
msstats,
sep=",",
compression=compression_method,
dtype={CONDITION: "category", ISOTOPE_LABEL_TYPE: "category"},
chunksize=chunksize,
)
sdrf_df = sdrf_df.merge(choice, on="comment[label]", how="left")
label = "TMT"
elif "ITRAQ" in ",".join(labels) or "itraq" in ",".join(labels):
if len(labels) > 4:
choice = ITRAQ8plex
else:
choice = ITRAQ4plex
choice = (
pd.DataFrame.from_dict(choice, orient="index", columns=[CHANNEL])
.reset_index()
.rename(columns={"index": "comment[label]"})
)
sdrf_df = sdrf_df.merge(choice, on="comment[label]", how="left")
label = "ITRAQ"
else:
print("Warning: Only support label free, TMT and ITRAQ experiment!")
exit(1)
sample_names = sdrf_df["source name"].unique().tolist()
msstats_chunks = read_large_parquet(parquet, batch_size=chunksize)

# TODO: Stream processing to obtain strong proteins with more than 2 uniqe peptides
print("IBAQPY WARNING: Writing files into ibaqpy_temp...")
if not os.path.exists("ibaqpy_temp/"):
os.mkdir("ibaqpy_temp/")
msstats_chunks = pd.read_csv(
msstats,
sep=",",
compression=compression_method,
dtype={CONDITION: "category", ISOTOPE_LABEL_TYPE: "category"},
chunksize=chunksize,
)
unique_peptides = {}
canonical_dict = {}
group_intensities = {}
quantile = {}
for msstats_df in msstats_chunks:
msstats_df.rename(
columns={
"ProteinName": PROTEIN_NAME,
"PeptideSequence": PEPTIDE_SEQUENCE,
"PrecursorCharge": PEPTIDE_CHARGE,
"Run": RUN,
"Condition": CONDITION,
"Intensity": INTENSITY,
},
inplace=True,
)
if parquet is None:
msstats_df.rename(
columns={
"ProteinName": PROTEIN_NAME,
"PeptideSequence": PEPTIDE_SEQUENCE,
"PrecursorCharge": PEPTIDE_CHARGE,
"Run": RUN,
"Condition": CONDITION,
"Intensity": INTENSITY,
},
inplace=True,
)
else:
msstats_df = msstats_df[FEATURE_COLUMNS]
msstats_df = msstats_df.rename(columns=parquet_map)
msstats_df[PROTEIN_NAME] = msstats_df.apply(
lambda x: ",".join(x[PROTEIN_NAME]), axis=1
)
if label == "LFQ":
msstats_df.drop(CHANNEL, inplace=True, axis=1)
else:
msstats_df[CHANNEL] = msstats_df[CHANNEL].map(choice)
msstats_df = msstats_df[msstats_df["Condition"] != "Empty"]
msstats_df = msstats_df[msstats_df[INTENSITY] > 0]
if PEPTIDE_CANONICAL not in msstats_df.columns:
modified_seqs = msstats_df[PEPTIDE_SEQUENCE].unique().tolist()
Expand Down Expand Up @@ -359,9 +394,11 @@ def peptide_normalization(
)
result_df = result_df[result_df["Condition"] != "Empty"]
result_df.rename(columns={"Charge": PEPTIDE_CHARGE}, inplace=True)

result_df.rename(columns={"source name": SAMPLE_ID}, inplace=True)
result_df[STUDY_ID] = result_df[SAMPLE_ID].apply(get_study_accession)
if parquet is None:
result_df.rename(columns={"source name": SAMPLE_ID}, inplace=True)
else:
result_df.drop("source name", inplace=True, axis=1)
result_df[STUDY_ID] = result_df[SAMPLE_ID].str.split("-").str[0]

# Write CSVs by Sample ID
for sample in sample_names:
Expand Down Expand Up @@ -605,7 +642,12 @@ def normalization(dataset_df, label, sample, skip_normalization, nmethod):
}
for sample in sample_names:
dataset_df = pd.read_csv(f"ibaqpy_temp/{sample}.csv", sep=",")
norm_df = normalization(dataset_df, label, sample, skip_normalization, nmethod)
if len(dataset_df) != 0:
norm_df = normalization(
dataset_df, label, sample, skip_normalization, nmethod
)
else:
continue
sample_peptides = norm_df[PEPTIDE_CANONICAL].unique().tolist()
if remove_low_frequency_peptides:
peptides_count = {
Expand Down

0 comments on commit 4fc969d

Please sign in to comment.