Skip to content

Commit

Permalink
Merge pull request #36 from WangHong007/master
Browse files Browse the repository at this point in the history
Temp PR for stream normalization
  • Loading branch information
ypriverol authored Nov 23, 2023
2 parents c0cd8e4 + a7ef2a2 commit e999c58
Showing 1 changed file with 80 additions and 35 deletions.
115 changes: 80 additions & 35 deletions bin/peptide_normalization_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,48 @@ def extract_label_from_sdrf(sdrf_path: str, compression: bool) -> tuple:
return sdrf_df, label, sample_names, choice


def extract_label_from_parquet(parquet_path: str, batch_size: int = 100000) -> tuple:
parquet_chunks = read_large_parquet(parquet_path, batch_size)
labels, samples = list(), list()
for chunk in parquet_chunks:
samples.extend(chunk["sample_accession"].unique().tolist())
labels.extend(chunk["isotope_label_type"].unique().tolist())
samples = list(set(samples))
labels = list(set(labels))
choice = None
if len(labels) == 1:
label = "LFQ"
elif "TMT" in ",".join(labels) or "tmt" in ",".join(labels):
if (
len(labels) > 11
or "TMT134N" in labels
or "TMT133C" in labels
or "TMT133N" in labels
or "TMT132C" in labels
or "TMT132N" in labels
):
choice = TMT16plex
elif len(labels) == 11 or "TMT131C" in labels:
choice = TMT11plex
elif len(labels) > 6:
choice = TMT10plex
else:
choice = TMT6plex
label = "TMT"
elif "ITRAQ" in ",".join(labels) or "itraq" in ",".join(labels):
if len(labels) > 4:
choice = ITRAQ8plex
else:
choice = ITRAQ4plex
label = "ITRAQ"
else:
print("Warning: Only support label free, TMT and ITRAQ experiment!")
exit(1)

return label, samples, choice



@click.command()
@click.option(
"-m", "--msstats", help="MsStats file import generated by quantms", default=None
Expand All @@ -78,7 +120,9 @@ def extract_label_from_sdrf(sdrf_path: str, compression: bool) -> tuple:
default=None,
)
@click.option(
"-s", "--sdrf", help="SDRF file import generated by quantms", required=True
"-s",
"--sdrf",
help="SDRF file import generated by quantms",
)
@click.option("--compress", help="Read all files compress", is_flag=True)
@click.option(
Expand Down Expand Up @@ -185,11 +229,11 @@ def peptide_normalization(
pd.set_option("display.max_columns", None)
print("Loading data..")
compression_method = "gzip" if compress else None
sdrf_df, label, sample_names, choice = extract_label_from_sdrf(
sdrf, compression_method
)

if parquet is None:
sdrf_df, label, sample_names, choice = extract_label_from_sdrf(
sdrf, compression_method
)
msstats_chunks = pd.read_csv(
msstats,
sep=",",
Expand All @@ -198,6 +242,7 @@ def peptide_normalization(
chunksize=chunksize,
)
else:
label, sample_names, choice = extract_label_from_parquet(parquet, batch_size=chunksize)
msstats_chunks = read_large_parquet(parquet, batch_size=chunksize)

# TODO: Stream processing to obtain strong proteins with more than 2 uniqe peptides
Expand Down Expand Up @@ -268,39 +313,39 @@ def peptide_normalization(
]
]

# Merged the SDRF with the Resulted file
if label == "LFQ":
msstats_df[REFERENCE] = msstats_df[REFERENCE].apply(get_spectrum_prefix)
result_df = pd.merge(
msstats_df,
sdrf_df[["source name", REFERENCE]],
how="left",
on=[REFERENCE],
)
elif label == "TMT":
msstats_df[REFERENCE] = msstats_df[REFERENCE].apply(get_spectrum_prefix)
result_df = pd.merge(
msstats_df,
sdrf_df[["source name", REFERENCE, CHANNEL]],
how="left",
on=[REFERENCE, CHANNEL],
)
result_df = result_df[result_df["Condition"] != "Empty"]
result_df.rename(columns={"Charge": PEPTIDE_CHARGE}, inplace=True)
elif label == "ITRAQ":
msstats_df[REFERENCE] = msstats_df[REFERENCE].apply(get_spectrum_prefix)
result_df = pd.merge(
msstats_df,
sdrf_df[["source name", REFERENCE, CHANNEL]],
how="left",
on=[REFERENCE, CHANNEL],
)
result_df = result_df[result_df["Condition"] != "Empty"]
result_df.rename(columns={"Charge": PEPTIDE_CHARGE}, inplace=True)
if parquet is None:
result_df.rename(columns={"source name": SAMPLE_ID}, inplace=True)
# Merged the SDRF with the Resulted file
if label == "LFQ":
msstats_df[REFERENCE] = msstats_df[REFERENCE].apply(get_spectrum_prefix)
result_df = pd.merge(
msstats_df,
sdrf_df[["source name", REFERENCE]],
how="left",
on=[REFERENCE],
)
elif label == "TMT":
msstats_df[REFERENCE] = msstats_df[REFERENCE].apply(get_spectrum_prefix)
result_df = pd.merge(
msstats_df,
sdrf_df[["source name", REFERENCE, CHANNEL]],
how="left",
on=[REFERENCE, CHANNEL],
)
result_df = result_df[result_df["Condition"] != "Empty"]
result_df.rename(columns={"Charge": PEPTIDE_CHARGE}, inplace=True)
elif label == "ITRAQ":
msstats_df[REFERENCE] = msstats_df[REFERENCE].apply(get_spectrum_prefix)
result_df = pd.merge(
msstats_df,
sdrf_df[["source name", REFERENCE, CHANNEL]],
how="left",
on=[REFERENCE, CHANNEL],
)
result_df = result_df[result_df["Condition"] != "Empty"]
result_df.rename(columns={"Charge": PEPTIDE_CHARGE}, inplace=True)
result_df.rename(columns={"source name": SAMPLE_ID}, inplace=True)
else:
result_df.drop("source name", inplace=True, axis=1)
result_df = msstats_df[msstats_df["Condition"] != "Empty"]
result_df[STUDY_ID] = result_df[SAMPLE_ID].str.split("-").str[0]

# Write CSVs by Sample ID
Expand Down

0 comments on commit e999c58

Please sign in to comment.