From bdbdd7e8e67a5f411c698e540730a522d3e26107 Mon Sep 17 00:00:00 2001 From: pdimens Date: Tue, 10 Dec 2024 10:24:44 -0500 Subject: [PATCH 1/7] move lrez to main harpy env, rm from container/conda --- harpy/downsample.py | 26 ++++- harpy/snakefiles/downsample.smk | 132 ++++++++++++++++++++++++++ harpy/snakefiles/sv_leviathan.smk | 4 +- harpy/snakefiles/sv_leviathan_pop.smk | 4 +- resources/harpy.yaml | 1 + resources/meta.yaml | 1 + 6 files changed, 162 insertions(+), 6 deletions(-) create mode 100644 harpy/snakefiles/downsample.smk diff --git a/harpy/downsample.py b/harpy/downsample.py index 62beccb40..74a466e16 100644 --- a/harpy/downsample.py +++ b/harpy/downsample.py @@ -23,7 +23,8 @@ }, ] } - +# TODO SAM TOLERATE +# TODO NOT GZIPPED TOLERANT @click.command(no_args_is_help = True, context_settings=dict(allow_interspersed_args=False), epilog = "Documentation: https://pdimens.github.io/harpy/workflows/downsample") @click.option('-d', '--downsample', type = click.IntRange(min = 1), help = 'Downsampling amount') @click.option('-i', '--invalid', default = "keep", show_default = True, type=click.Choice( ["keep","drop"]), help = "Strategy to handle invalid/missing barcodes") @@ -70,7 +71,8 @@ def downsample(input, prefix, downsample, invalid, bx_tag, random_seed, threads, rng = random.Random(random_seed).sample if random_seed else random.Random().sample logfile = open(f"{prefix}.log", "w") sys.stderr.write = logfile.write - + #TODO THIS DOESNT NEED TO HAPPEN ANYMORE + # JUST PARSE THE BARCODES FROM THE FASTQ or BAM AND PULL READS ON THE SECOND PASS with harpy_pulsebar(quiet, f"sorting by {bx_tag} tags") as progress: progress.add_task(f"sorting by {bx_tag} tags", total = None) if len(input) == 2: @@ -133,6 +135,26 @@ def downsample(input, prefix, downsample, invalid, bx_tag, random_seed, threads, logfile.close() + barcodes = set() +if len(input) == 1: + infile = pysam.AlignmentFile(input[0], "rb", check_sq=False) +else: + infile = pysam.FastqFile(input[0], mode = 'rb') + infile_R = pysam. + with harpy_pulsebar(quiet, f"parsing {bx_tag} tags") as progress: + progress.add_task(f"parsing {bx_tag} tags", total = None) + for record in infile: + try: + barcode = record.get_tag(bx_tag) + if isinstance(barcode, int): + pass # an int from an MI-tharype tag + elif invalid_pattern.search(barcode): + continue + except KeyError: + continue + barcodes.add(barcode) + + # store records with the same barcode in an array # then subsample according to the downsample fraction and write to output diff --git a/harpy/snakefiles/downsample.smk b/harpy/snakefiles/downsample.smk new file mode 100644 index 000000000..d1def1233 --- /dev/null +++ b/harpy/snakefiles/downsample.smk @@ -0,0 +1,132 @@ +import os +import re +import sys +import gzip +import pysam +import random +import logging + +onstart: + logger.logger.addHandler(logging.FileHandler(config["snakemake_log"])) +onsuccess: + os.remove(logger.logfile) +onerror: + os.remove(logger.logfile) + +outdir = config["output_directory"] +envdir = os.path.join(os.getcwd(), outdir, "workflow", "envs") +inputs = config["inputs"] +invalids = config["invalid_strategy"] +is_fastq = True if len(inputs) == 2 else False +random_seed = config.get("random_seed", None) +rng = random.Random(random_seed) if random_seed else random.Random() + +if filetype == "fq": + # determine if a file is gzipped + try: + with gzip.open(inputs[0], 'rt') as f: + f.read(10) + is_gzip = True + except gzip.BadGzipFile: + is_gzip = False +else: + is_gzip = False + +rule bam_convert: + input: + inputs + output: + temp(f"{outdir}/input.bam") + threads: + workflow.cores + shell: + """ + samtools import -@ {threads} -T "*" {input} + """ + +rule extract_barcodes: + input: + f"{outdir}/input.bam" if is_fastq else inputs[0] + output: + f"{outdir}/sampled_barcodes.txt" + threads: + workflow.cores + params: + inv_prop = invalid_proportion, + downsample_amt = downsample + run: + invalid_pattern = re.compile(r'[AaBbCcDd]00') + barcodes = set() + with pysam.AlignmentFile(sorted_bam, "rb", check_sq=False) as infile" + for record in infile: + try: + barcode = record.get_tag(bx_tag) + if invalid_pattern.search(barcode): + # invalid barcode retention + if rng.random() > params.inv_prop: + continue + except KeyError: + continue + barcodes.add(barcode) + n_bc = len(barcodes) + if n_bc < params.downsample_amt: + raise ValueError(f"The input has fewer barcodes ({n_bc}) than the requested downsampling amount ({params.downsample_amt})") + else: + with open(output[0], "w") as bc_out: + _= [bc_out.write(f"{i}\n" for i in rng.sample(sorted(barcodes), params.downsample_amt))] + + +rule index_fastq_bx_tag: + input: + #TODO FASTQ DICT FOR WILDCARDS + output: + + params: + "--gzip" if is_gzip else "" + threads: + workflow.cores + shell: + "LRez index fastq -t {threads} -f {input} -o {output} {params}" + +rule downsample_fastq: + input: + #TODO FASTQ DICT + bc_index = "{fastq}.bci", + bc_list = f"{outdir}/sampled_barcodes.txt" + output: + XXXXXXXXXX + params: + "--gzip" if is_gzip else "" + threads: + workflow.cores + shell: + "LRez query fastq -t {threads} -f {input.fastq} -i {input.bc_index} -l {input.bc_list} {params} > {output}" + +rule index_bam_bx_tag: + input: + in_bam + output: + f"{in_bam}.bci" + threads: + workflow.cores + shell: + "LRez index bam --offsets -t {threads} -b {input} -o {output} --gzip" + +rule downsample_bam: + input: + bam = in_bam, + bc_index = f"{in_bam}.bci", + bc_list = f"{outdir}/sampled_barcodes.txt" + output: + in_bam[:-4] + ".downsample.bam" + threads: + workflow.cores + shell: + "LRez query bam -t {threads} -bam {input.bam} -i {input.bc_index} -l {input.bc_list} > {output}" + +rule workflow_summary: + input: + in_bam[:-4] + ".downsample.bam" if not is_fastq else [], + out_fq if is_fastq else [] + #TODO THINK OF FASTQ OUTPUT NAMES + diff --git a/harpy/snakefiles/sv_leviathan.smk b/harpy/snakefiles/sv_leviathan.smk index 4aa54074e..929b0e5a4 100644 --- a/harpy/snakefiles/sv_leviathan.smk +++ b/harpy/snakefiles/sv_leviathan.smk @@ -61,8 +61,8 @@ rule index_barcode: temp(outdir + "/lrezIndexed/{sample}.bci") threads: max(10, workflow.cores) - conda: - f"{envdir}/variants.yaml" + container: + None shell: "LRez index bam --threads {threads} -p -b {input.bam} -o {output}" diff --git a/harpy/snakefiles/sv_leviathan_pop.smk b/harpy/snakefiles/sv_leviathan_pop.smk index 8680fa3fd..60adf24a2 100644 --- a/harpy/snakefiles/sv_leviathan_pop.smk +++ b/harpy/snakefiles/sv_leviathan_pop.smk @@ -108,8 +108,8 @@ rule index_barcode: temp(outdir + "/lrez_index/{population}.bci") threads: max(10, workflow.cores) - conda: - f"{envdir}/variants.yaml" + container: + None shell: "LRez index bam -p -b {input.bam} -o {output} --threads {threads}" diff --git a/resources/harpy.yaml b/resources/harpy.yaml index 14652b913..fd7a6db3a 100644 --- a/resources/harpy.yaml +++ b/resources/harpy.yaml @@ -6,6 +6,7 @@ dependencies: - apptainer - bcftools =1.20 - conda >24.7 + - lrez - pysam - python - rich-click diff --git a/resources/meta.yaml b/resources/meta.yaml index f7b81569d..3c33871fd 100644 --- a/resources/meta.yaml +++ b/resources/meta.yaml @@ -36,6 +36,7 @@ requirements: - apptainer - bcftools =1.20 - conda >24.7 + - lrez - pysam - python - rich-click From 7d47a12a45ec4edec5230cbf69d614067a97e60c Mon Sep 17 00:00:00 2001 From: pdimens Date: Tue, 10 Dec 2024 12:35:51 -0500 Subject: [PATCH 2/7] simplify logic --- harpy/snakefiles/downsample.smk | 65 ++++++++++++++++++--------------- 1 file changed, 35 insertions(+), 30 deletions(-) diff --git a/harpy/snakefiles/downsample.smk b/harpy/snakefiles/downsample.smk index d1def1233..afa7f2e15 100644 --- a/harpy/snakefiles/downsample.smk +++ b/harpy/snakefiles/downsample.smk @@ -17,7 +17,8 @@ outdir = config["output_directory"] envdir = os.path.join(os.getcwd(), outdir, "workflow", "envs") inputs = config["inputs"] invalids = config["invalid_strategy"] -is_fastq = True if len(inputs) == 2 else False +infiles = dict(zip(inputs, inputs)) +is_fastq = True if len(inputs) == 2 else False random_seed = config.get("random_seed", None) rng = random.Random(random_seed) if random_seed else random.Random() @@ -75,58 +76,62 @@ rule extract_barcodes: with open(output[0], "w") as bc_out: _= [bc_out.write(f"{i}\n" for i in rng.sample(sorted(barcodes), params.downsample_amt))] - -rule index_fastq_bx_tag: +rule index_barcodes: input: - #TODO FASTQ DICT FOR WILDCARDS + lambda wc: infiles[wc.inputfile] output: - - params: - "--gzip" if is_gzip else "" + "{inputfile}.bci" threads: workflow.cores - shell: - "LRez index fastq -t {threads} -f {input} -o {output} {params}" + run: + if is_fastq: + gz_arg = "--gzip" if is_gzip else "" + shell(f"LRez index fastq -t {threads} -f {input} -o {output} {gz_arg}") + else: + shell(f"LRez index bam --offsets -t {threads} -b {input} -o {output}") -rule downsample_fastq: +rule downsample: input: - #TODO FASTQ DICT - bc_index = "{fastq}.bci", + file = infiles[0], + bc_index = infiles[0] + ".bci", bc_list = f"{outdir}/sampled_barcodes.txt" output: - XXXXXXXXXX - params: - "--gzip" if is_gzip else "" + f"{outdir}/downsample.bam" threads: workflow.cores shell: - "LRez query fastq -t {threads} -f {input.fastq} -i {input.bc_index} -l {input.bc_list} {params} > {output}" + "LRez query bam -t {threads} -bam {input.file} -i {input.bc_index} -l {input.bc_list} > {output}" -rule index_bam_bx_tag: +rule downsample_read_1: input: - in_bam + file = inputs[0], + bc_index = inputs[0] + ".bci", + bc_list = f"{outdir}/sampled_barcodes.txt" output: - f"{in_bam}.bci" + f"{outdir}/downsample.R1.fq.gz" + params: + "--gzip" if is_gzip else "" threads: workflow.cores shell: - "LRez index bam --offsets -t {threads} -b {input} -o {output} --gzip" - -rule downsample_bam: + "LRez query fastq -t {threads} -f {input.file} -i {input.bc_index} -l {input.bc_list} {params} > {output}" + +rule downsample_read_2: input: - bam = in_bam, - bc_index = f"{in_bam}.bci", + file = inputs[1], + bc_index = inputs[1] + ".bci", bc_list = f"{outdir}/sampled_barcodes.txt" output: - in_bam[:-4] + ".downsample.bam" + f"{outdir}/downsample.R2.fq.gz" + params: + "--gzip" if is_gzip else "" threads: workflow.cores shell: - "LRez query bam -t {threads} -bam {input.bam} -i {input.bc_index} -l {input.bc_list} > {output}" + "LRez query fastq -t {threads} -f {input.file} -i {input.bc_index} -l {input.bc_list} {params} > {output}" + rule workflow_summary: input: - in_bam[:-4] + ".downsample.bam" if not is_fastq else [], - out_fq if is_fastq else [] - #TODO THINK OF FASTQ OUTPUT NAMES - + f"{outdir}/downsample.bam" if not is_fastq else [], + collect(f"{outdir}/downsample.R" + "{FR}.fq.gz", FR = [1,2]) if is_fastq else [] From 9bc80f1c5f5e2eb8f2abee0ee8925b5af85e1817 Mon Sep 17 00:00:00 2001 From: pdimens Date: Tue, 10 Dec 2024 13:39:38 -0500 Subject: [PATCH 3/7] split into a function --- harpy/bin/extract_bxtags.py | 52 ++++++++ harpy/downsample.py | 219 ++++++++------------------------ harpy/snakefiles/downsample.smk | 57 ++++----- 3 files changed, 133 insertions(+), 195 deletions(-) create mode 100644 harpy/bin/extract_bxtags.py diff --git a/harpy/bin/extract_bxtags.py b/harpy/bin/extract_bxtags.py new file mode 100644 index 000000000..7d9215b88 --- /dev/null +++ b/harpy/bin/extract_bxtags.py @@ -0,0 +1,52 @@ +#! /usr/bin/env python + +import re +import sys +import pysam +import random +import argparse + +parser = argparse.ArgumentParser( + prog = 'extract_bxtags.py', + description = 'Extracts all the barcodes present in a SAM/BAM file. Can optionally subsample the barcodes. Use --invalid to specify a proportion of invalid haplotagging to output.', + usage = "extract_bxtags.py [-i] -b BC -d 15000 input.bam > output.txt", + ) +parser.add_argument('-i','--invalid', type= float, default=1, help = "Proportion of invalid barcodes to include in subsampling/output (default: %(default)s)") +parser.add_argument('-d','--downsample', type= int, help = "Number of barcodes to downsample to") +parser.add_argument('-b','--bx-tag', type= str, default="BX", help = "2-character tag with the barcodes (alphanumeric, default: %(default)s)") +parser.add_argument('input', type= str, help = "Input BAM file") +if len(sys.argv) == 1: + parser.print_help(sys.stderr) + sys.exit(1) + +args = parser.parse_args() +if args.invalid < 0 or args.invalid > 0: + parser.error("--invalid must be between 0 and 1") + +bx_tag = args.bx_tag.upper() +inv_prop = args.invalid +rng = random.Random(args.random_seed) if args.random_seed else random.Random() +invalid_pattern = re.compile(r'[AaBbCcDd]00') +barcodes = set() + +with pysam.AlignmentFile(args.input, "rb", check_sq=False) as infile: + for record in infile: + try: + barcode = record.get_tag(bx_tag) + if invalid_pattern.search(barcode): + # invalid barcode retention + if rng.random() > inv_prop: + continue + except KeyError: + continue + barcodes.add(barcode) + if args.downsample: + n_bc = len(barcodes) + if n_bc < args.downsample: + raise ValueError(f"The input has fewer barcodes ({n_bc}) than the requested downsampling amount ({args.downsample})") + else: + for i in rng.sample(sorted(barcodes), args.downsample): + sys.stdout.write(f"{i}\n") + else: + for i in barcodes: + sys.stdout.write(f"{i}\n") \ No newline at end of file diff --git a/harpy/downsample.py b/harpy/downsample.py index 74a466e16..5c3270730 100644 --- a/harpy/downsample.py +++ b/harpy/downsample.py @@ -3,19 +3,19 @@ import os import re import sys -import pysam -import random -import subprocess -from pathlib import Path +import yaml +from rich import box +from rich.table import Table import rich_click as click -from ._misc import harpy_pulsebar - +from ._cli_types_generic import SnakemakeParams +from ._launch import launch_snakemake +from ._misc import fetch_rule, snakemake_log docstring = { "harpy downsample": [ { "name": "Parameters", - "options": sorted(["--downsample", "--invalid", "--bx-tag", "--random-seed", "--prefix"]), + "options": sorted(["--downsample", "--invalid", "--random-seed", "--prefix"]), }, { "name": "Workflow Controls", @@ -23,33 +23,31 @@ }, ] } -# TODO SAM TOLERATE -# TODO NOT GZIPPED TOLERANT @click.command(no_args_is_help = True, context_settings=dict(allow_interspersed_args=False), epilog = "Documentation: https://pdimens.github.io/harpy/workflows/downsample") @click.option('-d', '--downsample', type = click.IntRange(min = 1), help = 'Downsampling amount') -@click.option('-i', '--invalid', default = "keep", show_default = True, type=click.Choice( ["keep","drop"]), help = "Strategy to handle invalid/missing barcodes") -@click.option('-b', '--bx-tag', type = str, default = "BX", show_default=True, help = "The header tag with the barcode") +@click.option('-i', '--invalid', default = 1, show_default = True, type=click.FloatRange(min=0,max=1), help = "Proportion of invalid barcodes to sample") @click.option('-p', '--prefix', type = click.Path(exists = False), default = "downsampled", show_default = True, help = 'Prefix for output file(s)') @click.option('--random-seed', type = click.IntRange(min = 1), help = "Random seed for sampling") @click.option('-t', '--threads', default = 4, show_default = True, type = click.IntRange(min = 1, max_open = True), help = 'Number of threads to use') @click.option('--quiet', is_flag = True, show_default = True, default = False, help = 'Don\'t show output text while running') +@click.option('--setup-only', is_flag = True, hidden = True, show_default = True, default = False, help = 'Setup the workflow and exit') +@click.option('--snakemake', type = SnakemakeParams(), help = 'Additional Snakemake parameters, in quotes') @click.argument('input', required=True, type=click.Path(exists=True, readable=True, dir_okay=False), nargs=-1) -def downsample(input, prefix, downsample, invalid, bx_tag, random_seed, threads, quiet): +def downsample(input, prefix, downsample, invalid, random_seed, setup_only, snakemake, threads, quiet): """ Downsample data by barcode - Downsamples FASTQ or BAM file(s) by barcode to keep all reads - from `-d` barcodes. + Downsamples FASTQ or BAM file(s) by barcode in the `BX` tag to keep all reads + from `-d` barcodes. Input can be: - one BAM file - two FASTQ files (R1 and R2 of a paired-end read set) - Use `--invalid` to specify how to handle invalid barcodes: - - `keep`: output all invalid/missing barcodes - - `drop`: don't output any invalid/missing barcodes + Use `--invalid` to specify the proportion of invalid barcodes to consider for sampling, where `0` will remove all + invalid barcodes from the barcode pool, `1` will add all invalid barcodes to the barcode pool, and a number in between + (e.g. `0.25`) will add approximately that proprotion of invalid barcodes into the barcode pool that gets subsampled down + to `-d` barcodes. """ # validate input files as either 1 bam or 2 fastq - if len(bx_tag) != 2 or not bx_tag.isalnum(): - raise click.BadParameter(f'\'{bx_tag}\' is not a valid SAM tag. Tags for --bx-tag must be alphanumeric and exactly 2 characters, e.g. "BX"') if len(input) > 2: raise click.BadParameter('inputs must be 1 BAM file or 2 FASTQ files.') if len(input) == 1: @@ -59,155 +57,44 @@ def downsample(input, prefix, downsample, invalid, bx_tag, random_seed, threads, if input[0] == input[1]: raise click.BadParameter('the two input files cannot be identical') re_ext = re.compile(r"\.(fq|fastq)(?:\.gz)?$", re.IGNORECASE) - badfastq = [] for i in input: if not re.search(re_ext, i): - badfastq.append(i) - if badfastq: - raise click.BadParameter('inputs must be 1 BAM (.bam) file or 2 FASTQ (.fastq|.fq) files. The FASTQ files can be gzipped.') - - output_bam = f"{prefix}.bam" - bx_tag = bx_tag.upper() - rng = random.Random(random_seed).sample if random_seed else random.Random().sample - logfile = open(f"{prefix}.log", "w") - sys.stderr.write = logfile.write - #TODO THIS DOESNT NEED TO HAPPEN ANYMORE - # JUST PARSE THE BARCODES FROM THE FASTQ or BAM AND PULL READS ON THE SECOND PASS - with harpy_pulsebar(quiet, f"sorting by {bx_tag} tags") as progress: - progress.add_task(f"sorting by {bx_tag} tags", total = None) - if len(input) == 2: - samtools_import = subprocess.Popen(f"samtools import -@ {threads} -T * {input[0]} {input[1]}".split(), stdout=subprocess.PIPE, stderr=logfile) - subprocess.run(f"samtools sort -@ {threads} -o bx_sorted.bam -t {bx_tag}".split(), stdin = samtools_import.stdout, stderr=logfile) - else: - subprocess.run(f"samtools sort -@ {threads} -o bx_sorted.bam -t {bx_tag} {input[0]}".split(), stderr=logfile) - sorted_bam = Path("bx_sorted.bam").resolve().as_posix() - invalid_pattern = re.compile(r'[AaBbCcDd]00') - - # read input file, get list of valid barcodes, subsample valid barcodes, read input again, output only sampled barcodes - with ( - pysam.AlignmentFile(sorted_bam, "rb", check_sq=False) as infile, - harpy_pulsebar(quiet, f"parsing {bx_tag} tags") as progress - ): - progress.add_task(f"parsing {bx_tag} tags", total = None) - barcodes = set() - for record in infile: - try: - barcode = record.get_tag(bx_tag) - if isinstance(barcode, int): - pass # an int from an MI-tharype tag - elif invalid_pattern.search(barcode): - continue - except KeyError: - continue - barcodes.add(barcode) - n_bc = len(barcodes) - if downsample > n_bc: - sys.stderr.write(f"The number of intended barcodes to downsample to ({downsample}) is greater than the number of barcodes in the input file ({n_bc}). Please choose a smaller number to downsample to.\n") - sys.exit(1) - barcodes = rng(sorted(barcodes), downsample) - with ( - pysam.AlignmentFile(sorted_bam, "rb", check_sq=False) as infile, - pysam.AlignmentFile(output_bam, "wb", template=infile) as outfile, - harpy_pulsebar(quiet, f"downsampling {bx_tag} tags") as progress - ): - progress.add_task(f"downsampling {bx_tag} tags", total = None) - for record in infile: - try: - barcode = record.get_tag(bx_tag) - if isinstance(barcode, int): - pass # an int from an MI-type tag - elif invalid_pattern.search(barcode): - if invalid == "keep": - outfile.write(record) - continue - except KeyError: - if invalid == "keep": - outfile.write(record) - continue - if barcode in barcodes: - outfile.write(record) - - # convert back - os.remove(sorted_bam) - if len(input) == 2: - subprocess.run(f'samtools fastq -T * -c 6 -1 {prefix}.R1.fq.gz -2 {prefix}.R2.fq.gz {output_bam}'.split(), stderr=logfile) - os.remove(output_bam) - logfile.close() - - - barcodes = set() -if len(input) == 1: - infile = pysam.AlignmentFile(input[0], "rb", check_sq=False) -else: - infile = pysam.FastqFile(input[0], mode = 'rb') - infile_R = pysam. - with harpy_pulsebar(quiet, f"parsing {bx_tag} tags") as progress: - progress.add_task(f"parsing {bx_tag} tags", total = None) - for record in infile: - try: - barcode = record.get_tag(bx_tag) - if isinstance(barcode, int): - pass # an int from an MI-tharype tag - elif invalid_pattern.search(barcode): - continue - except KeyError: - continue - barcodes.add(barcode) + raise click.BadParameter('inputs must be 1 BAM (.bam) file or 2 FASTQ (.fastq|.fq) files. The FASTQ files can be gzipped.') + workflow = "downsample" + output_dir = output_dir.rstrip("/") + workflowdir = os.path.join(output_dir, 'workflow') + command = f'snakemake --rerun-incomplete --show-failed-logs --rerun-triggers input mtime params --nolock --cores {threads} --directory . ' + command += f"--snakefile {workflowdir}/{workflow}.smk " + command += f"--configfile {workflowdir}/config.yaml " + if snakemake: + command += snakemake + os.makedirs(workflowdir, exist_ok=True) + os.makedirs(f"{output_dir}/logs/snakemake", exist_ok = True) + fetch_rule(workflowdir, f"{workflow}.smk") + sm_log = snakemake_log(output_dir, workflow) + configs = { + "workflow": workflow, + "snakemake_log" : sm_log, + "output_directory" : output_dir, + "prefix" : prefix, + "downsample" : downsample, + "invalid_proportion" : invalid, + **({"random_seed" : random_seed} if random_seed else {}), + "workflow_call" : command.rstrip(), + "inputs": [i.as_posix() for i in input] + } + with open(os.path.join(workflowdir, 'config.yaml'), "w", encoding="utf-8") as config: + yaml.dump(configs, config, default_flow_style= False, sort_keys=False, width=float('inf')) + if setup_only: + sys.exit(0) -# store records with the same barcode in an array -# then subsample according to the downsample fraction and write to output -# "within": -# with ( -# pysam.AlignmentFile(sorted_bam, "rb", check_sq=False) as infile, -# pysam.AlignmentFile(output_bam, "wb", template=infile) as outfile, -# harpy_pulsebar(quiet, f"downsampling within {bx_tag} tags") as progress -# ): -# progress.add_task(f"downsampling within {bx_tag} tags", total = None) -# record_store_F = [] -# record_store_R = [] -# last_barcode = None -# for record in infile: -# try: -# barcode = record.get_tag(bx_tag) -# if isinstance(barcode, int): -# pass -# elif invalid_pattern.search(barcode): -# if invalid == "keep": -# outfile.write(record) -# elif invalid == "downsample": -# if rng.uniform(0, 1) <= downsample: -# outfile.write(record) -# continue -# except KeyError: -# if invalid == "keep": -# outfile.write(record) -# elif invalid == "downsample": -# if rng.uniform(0, 1) <= downsample: -# outfile.write(record) -# continue -# -# if last_barcode and barcode != last_barcode: -# # subsample records to file and reset record stores -# for i,j in zip(record_store_F, record_store_R): -# if rng.uniform(0, 1) <= downsample: -# if i: -# outfile.write(i) -# if j: -# outfile.write(j) -# record_store_F = [] -# record_store_R = [] -# # otherwise proceed to append record to stores -# if record.is_forward and record.is_paired: -# record_store_F.append(record) -# elif record.is_reverse and record.is_paired: -# record_store_R.append(record) -# elif record.is_forward and not record.is_paired: -# record_store_F.append(record) -# record_store_R.append(None) -# elif record.is_reverse and not record.is_paired: -# record_store_F.append(None) -# record_store_R.append(record) -# # update what the last barcode is -# last_barcode = barcode + start_text = Table(show_header=False,pad_edge=False, show_edge=False, padding = (0,0), box=box.SIMPLE) + start_text.add_column("detail", justify="left", style="light_steel_blue", no_wrap=True) + start_text.add_column("value", justify="left") + start_text.add_row("Output Folder:", output_dir + "/") + start_text.add_row("Downsample to:", f"{downsample} barcodes") + start_text.add_row("Invalid Proportion:", f"{invalid}") + start_text.add_row("Workflow Log:", sm_log.replace(f"{output_dir}/", "") + "[dim].gz") + launch_snakemake(command, workflow, start_text, output_dir, sm_log, quiet, f"workflow/{workflow}.summary") diff --git a/harpy/snakefiles/downsample.smk b/harpy/snakefiles/downsample.smk index afa7f2e15..80d077005 100644 --- a/harpy/snakefiles/downsample.smk +++ b/harpy/snakefiles/downsample.smk @@ -1,6 +1,5 @@ import os import re -import sys import gzip import pysam import random @@ -14,15 +13,15 @@ onerror: os.remove(logger.logfile) outdir = config["output_directory"] -envdir = os.path.join(os.getcwd(), outdir, "workflow", "envs") inputs = config["inputs"] -invalids = config["invalid_strategy"] +invalids = config["invalid_proportion"] infiles = dict(zip(inputs, inputs)) +random_seed = config.get("random_seed", None) +downsample = config["downsample"] is_fastq = True if len(inputs) == 2 else False -random_seed = config.get("random_seed", None) -rng = random.Random(random_seed) if random_seed else random.Random() +rng = random.Random(random_seed) if random_seed else random.Random() -if filetype == "fq": +if is_fastq: # determine if a file is gzipped try: with gzip.open(inputs[0], 'rt') as f: @@ -50,31 +49,16 @@ rule extract_barcodes: f"{outdir}/input.bam" if is_fastq else inputs[0] output: f"{outdir}/sampled_barcodes.txt" + log: + f"{outdir}/logs/sampled_barcodes.log" threads: workflow.cores params: - inv_prop = invalid_proportion, - downsample_amt = downsample - run: - invalid_pattern = re.compile(r'[AaBbCcDd]00') - barcodes = set() - with pysam.AlignmentFile(sorted_bam, "rb", check_sq=False) as infile" - for record in infile: - try: - barcode = record.get_tag(bx_tag) - if invalid_pattern.search(barcode): - # invalid barcode retention - if rng.random() > params.inv_prop: - continue - except KeyError: - continue - barcodes.add(barcode) - n_bc = len(barcodes) - if n_bc < params.downsample_amt: - raise ValueError(f"The input has fewer barcodes ({n_bc}) than the requested downsampling amount ({params.downsample_amt})") - else: - with open(output[0], "w") as bc_out: - _= [bc_out.write(f"{i}\n" for i in rng.sample(sorted(barcodes), params.downsample_amt))] + inv_prop = f"-i {invalid_proportion}", + downsample_amt = f"-d {downsample}", + bx_tag = "-b BX" + shell: + "extract_bxtags.py {params} {input} > {output} 2> {log}" rule index_barcodes: input: @@ -130,8 +114,23 @@ rule downsample_read_2: shell: "LRez query fastq -t {threads} -f {input.file} -i {input.bc_index} -l {input.bc_list} {params} > {output}" - rule workflow_summary: input: f"{outdir}/downsample.bam" if not is_fastq else [], collect(f"{outdir}/downsample.R" + "{FR}.fq.gz", FR = [1,2]) if is_fastq else [] + run: + summary = ["The harpy downsample workflow ran using these parameters:"] + summary.append(f"The provided input file(s):\n" + "\n\t".join(inputs)) + convs = "The FASTQ files were converted into a BAM file with:\n" + convs += "\tsamtools import -T * fastq1 fastq2" + if is_fastq: + summary.append(convs) + summary.append("Barcodes were extract and sampled using a custom python script") + lrez = "The inputs were indexed and downsampled using LRez:\n" + if is_fastq: + lrez += "\tLRez query fastq -f fastq -i index.bci -l barcodes.txt" + else: + lrez += "\tLRez query bam -b bam -i index.bci -l barcodes.txt" + summary.append(lrez) + with open(outdir + "/workflow/downsample.summary", "w") as f: + f.write("\n\n".join(summary)) From c3d0bdc815f3781f57c99d3fadbdb03945ff8c28 Mon Sep 17 00:00:00 2001 From: pdimens Date: Tue, 10 Dec 2024 13:45:07 -0500 Subject: [PATCH 4/7] add prefix support --- harpy/snakefiles/downsample.smk | 31 +++++++++++++++++-------------- 1 file changed, 17 insertions(+), 14 deletions(-) diff --git a/harpy/snakefiles/downsample.smk b/harpy/snakefiles/downsample.smk index 80d077005..36496d2be 100644 --- a/harpy/snakefiles/downsample.smk +++ b/harpy/snakefiles/downsample.smk @@ -12,14 +12,15 @@ onsuccess: onerror: os.remove(logger.logfile) -outdir = config["output_directory"] -inputs = config["inputs"] -invalids = config["invalid_proportion"] -infiles = dict(zip(inputs, inputs)) -random_seed = config.get("random_seed", None) -downsample = config["downsample"] -is_fastq = True if len(inputs) == 2 else False -rng = random.Random(random_seed) if random_seed else random.Random() +outdir = config["output_directory"] +inputs = config["inputs"] +invalids = config["invalid_proportion"] +random_seed = config.get("random_seed", None) +downsample = config["downsample"] +prefix = config["prefix"] +infiles = dict(zip(inputs, inputs)) +is_fastq = True if len(inputs) == 2 else False +rng = random.Random(random_seed) if random_seed else random.Random() if is_fastq: # determine if a file is gzipped @@ -80,7 +81,7 @@ rule downsample: bc_index = infiles[0] + ".bci", bc_list = f"{outdir}/sampled_barcodes.txt" output: - f"{outdir}/downsample.bam" + f"{outdir}/{prefix}.bam" threads: workflow.cores shell: @@ -92,7 +93,7 @@ rule downsample_read_1: bc_index = inputs[0] + ".bci", bc_list = f"{outdir}/sampled_barcodes.txt" output: - f"{outdir}/downsample.R1.fq.gz" + f"{outdir}/{prefix}.R1.fq.gz" params: "--gzip" if is_gzip else "" threads: @@ -106,7 +107,7 @@ rule downsample_read_2: bc_index = inputs[1] + ".bci", bc_list = f"{outdir}/sampled_barcodes.txt" output: - f"{outdir}/downsample.R2.fq.gz" + f"{outdir}/{prefix}.R2.fq.gz" params: "--gzip" if is_gzip else "" threads: @@ -116,8 +117,8 @@ rule downsample_read_2: rule workflow_summary: input: - f"{outdir}/downsample.bam" if not is_fastq else [], - collect(f"{outdir}/downsample.R" + "{FR}.fq.gz", FR = [1,2]) if is_fastq else [] + f"{outdir}/{prefix}.bam" if not is_fastq else [], + collect(f"{outdir}/{prefix}.R" + "{FR}.fq.gz", FR = [1,2]) if is_fastq else [] run: summary = ["The harpy downsample workflow ran using these parameters:"] summary.append(f"The provided input file(s):\n" + "\n\t".join(inputs)) @@ -125,7 +126,9 @@ rule workflow_summary: convs += "\tsamtools import -T * fastq1 fastq2" if is_fastq: summary.append(convs) - summary.append("Barcodes were extract and sampled using a custom python script") + extraction = "Barcodes were extracted and sampled using:\n" + extraction += f"\textract_bxtags.py -i {invalids} -b BX -d {downsample} input.bam" + summary.append(extraction) lrez = "The inputs were indexed and downsampled using LRez:\n" if is_fastq: lrez += "\tLRez query fastq -f fastq -i index.bci -l barcodes.txt" From 63e9b21d68f3a7b742d1f71aecfe96d6516e029b Mon Sep 17 00:00:00 2001 From: pdimens Date: Tue, 10 Dec 2024 15:52:10 -0500 Subject: [PATCH 5/7] presumably the finished version --- .deprecated/align-minimap.smk | 345 -------------------------------- .deprecated/hpc.py | 248 ----------------------- .deprecated/phase.bak.smk | 323 ------------------------------ .github/workflows/tests.yml | 2 +- harpy/bin/extract_bxtags.py | 7 +- harpy/downsample.py | 16 +- harpy/snakefiles/downsample.smk | 56 ++++-- 7 files changed, 48 insertions(+), 949 deletions(-) delete mode 100644 .deprecated/align-minimap.smk delete mode 100644 .deprecated/hpc.py delete mode 100644 .deprecated/phase.bak.smk mode change 100644 => 100755 harpy/bin/extract_bxtags.py diff --git a/.deprecated/align-minimap.smk b/.deprecated/align-minimap.smk deleted file mode 100644 index 54e9752f7..000000000 --- a/.deprecated/align-minimap.smk +++ /dev/null @@ -1,345 +0,0 @@ -containerized: "docker://pdimens/harpy:latest" - -import os -import re -import sys -from rich.panel import Panel -from rich import print as rprint - -outdir = config["output_directory"] -envdir = os.path.join(os.getcwd(), ".harpy_envs") -genomefile = config["inputs"]["genome"] -fqlist = config["inputs"]["fastq"] -extra = config.get("extra", "") -bn = os.path.basename(genomefile) -genome_zip = True if bn.lower().endswith(".gz") else False -bn_idx = f"{bn}.gzi" if genome_zip else f"{bn}.fai" -envdir = os.getcwd() + "/.harpy_envs" -windowsize = config["depth_windowsize"] -skipreports = config["skipreports"] -windowsize = config["depth_windowsize"] -molecule_distance = config["molecule_distance"] - -wildcard_constraints: - sample = "[a-zA-Z0-9._-]+" - -onerror: - print("") - rprint( - Panel( - f"The workflow has terminated due to an error. See the log file below for more details.", - title = "[bold]harpy align minimap", - title_align = "left", - border_style = "red" - ), - file = sys.stderr - ) - -onsuccess: - print("") - rprint( - Panel( - f"The workflow has finished successfully! Find the results in [bold]{outdir}[/bold]", - title = "[bold]harpy align minimap", - title_align = "left", - border_style = "green" - ), - file = sys.stderr - ) - -bn_r = r"([_\.][12]|[_\.][FR]|[_\.]R[12](?:\_00[0-9])*)?\.((fastq|fq)(\.gz)?)$" -samplenames = {re.sub(bn_r, "", os.path.basename(i), flags = re.IGNORECASE) for i in fqlist} -d = dict(zip(samplenames, samplenames)) - -def get_fq(wildcards): - # returns a list of fastq files for read 1 based on *wildcards.sample* e.g. - r = re.compile(fr".*/({re.escape(wildcards.sample)}){bn_r}", flags = re.IGNORECASE) - return sorted(list(filter(r.match, fqlist))[:2]) - -rule process_genome: - input: - genomefile - output: - f"Genome/{bn}" - container: - None - message: - "Copying {input} to Genome/" - shell: - """ - if (file {input} | grep -q compressed ) ;then - # is regular gzipped, needs to be BGzipped - zcat {input} | bgzip -c > {output} - else - cp -f {input} {output} - fi - """ - -rule index_genome: - input: - f"Genome/{bn}" - output: - fai = f"Genome/{bn}.fai", - gzi = f"Genome/{bn}.gzi" if genome_zip else [] - log: - f"Genome/{bn}.faidx.log" - params: - genome_zip - container: - None - message: - "Indexing {input}" - shell: - """ - if [ "{params}" = "True" ]; then - samtools faidx --gzi-idx {output.gzi} --fai-idx {output.fai} {input} 2> {log} - else - samtools faidx --fai-idx {output.fai} {input} 2> {log} - fi - """ - -rule genome_index: - input: - f"Genome/{bn}" - output: - temp(f"Genome/{bn}.mmi") - log: - f"Genome/{bn}.mmi.log" - conda: - f"{envdir}/align.yaml" - message: - "Indexing {input}" - shell: - "minimap2 -x sr -d {output} {input} 2> {log}" - -rule align: - input: - fastq = get_fq, - genome = f"Genome/{bn}.mmi" - output: - pipe(outdir + "/samples/{sample}/{sample}.raw.sam") - log: - outdir + "/logs/{sample}.minimap.log" - params: - samps = lambda wc: d[wc.get("sample")], - extra = extra - benchmark: - ".Benchmark/Mapping/minimap/align.{sample}.txt" - threads: - min(10, workflow.cores) - 2 - conda: - f"{envdir}/align.yaml" - message: - "Aligning sequences: {wildcards.sample}" - shell: - """ - minimap2 -ax sr -t {threads} -y --sam-hit-only -R \"@RG\\tID:{wildcards.sample}\\tSM:{wildcards.sample}\" {input.genome} {input.fastq} > {output} 2> {log} - """ - -rule quality_filter: - input: - outdir + "/samples/{sample}/{sample}.raw.sam" - output: - temp(outdir + "/samples/{sample}/{sample}.mm2.sam") - params: - quality = config["quality"] - container: - None - message: - "Quality filtering alignments: {wildcards.sample}" - shell: - "samtools view -h -F 4 -q {params.quality} {input} > {output}" - -rule collate: - input: - outdir + "/samples/{sample}/{sample}.mm2.sam" - output: - temp(outdir + "/samples/{sample}/{sample}.collate.bam") - container: - None - message: - "Collating alignments: {wildcards.sample}" - shell: - "samtools collate -o {output} {input} 2> /dev/null" - -rule fix_mates: - input: - outdir + "/samples/{sample}/{sample}.collate.bam" - output: - temp(outdir + "/samples/{sample}/{sample}.fixmate.bam") - container: - None - message: - "Fixing mates in alignments: {wildcards.sample}" - shell: - "samtools fixmate -m {input} {output} 2> /dev/null" - -rule sort_alignments: - input: - sam = outdir + "/samples/{sample}/{sample}.fixmate.bam", - genome = f"Genome/{bn}", - genome_samidx = f"Genome/{bn_idx}" - output: - bam = temp(outdir + "/samples/{sample}/{sample}.sort.bam"), - bai = temp(outdir + "/samples/{sample}/{sample}.sort.bam.bai") - log: - outdir + "/logs/{sample}.minimap.sort.log" - params: - quality = config["quality"], - tmpdir = lambda wc: outdir + "/." + d[wc.sample] - container: - None - message: - "Sorting alignments: {wildcards.sample}" - shell: - """ - samtools sort -T {params.tmpdir} --reference {input.genome} -O bam -l 0 -m {resources.mem_mb}M --write-index -o {output.bam}##idx##{output.bai} {input.sam} 2> {log} - rm -rf {params.tmpdir} - """ - -rule mark_duplicates: - input: - outdir + "/samples/{sample}/{sample}.sort.bam" - output: - temp(outdir + "/samples/{sample}/{sample}.markdup.bam") - log: - outdir + "/logs/{sample}.markdup.log" - threads: - 2 - container: - None - message: - "Marking duplicates in alignments alignment: {wildcards.sample}" - shell: - "samtools markdup -@ {threads} -S --barcode-tag BX -f {log} {input} {output} 2> /dev/null" - -rule index_markdups: - input: - outdir + "/samples/{sample}/{sample}.markdup.bam" - output: - temp(outdir + "/samples/{sample}/{sample}.markdup.bam.bai") - container: - None - message: - "Indexing duplicate-marked alignments: {wildcards.sample}" - shell: - "samtools index {input}" - -rule assign_molecules: - input: - bam = outdir + "/samples/{sample}/{sample}.markdup.bam", - bai = outdir + "/samples/{sample}/{sample}.markdup.bam.bai" - output: - bam = outdir + "/{sample}.bam", - bai = outdir + "/{sample}.bam.bai" - params: - molecule_distance - conda: - f"{envdir}/qc.yaml" - message: - "Assigning barcodes to molecules: {wildcards.sample}" - shell: - "assign_mi.py -o {output.bam} -c {params} {input.bam}" - -rule alignment_bxstats: - input: - bam = outdir + "/{sample}.bam", - bai = outdir + "/{sample}.bam.bai" - output: - outdir + "/reports/data/bxstats/{sample}.bxstats.gz" - params: - sample = lambda wc: d[wc.sample] - conda: - f"{envdir}/qc.yaml" - message: - "Calculating barcode alignment statistics: {wildcards.sample}" - shell: - "bx_stats.py -o {output} {input.bam}" - -rule alignment_coverage: - input: - bam = outdir + "/{sample}.bam", - bai = outdir + "/{sample}.bam.bai" - output: - outdir + "/reports/data/coverage/{sample}.cov.gz" - params: - windowsize - container: - None - message: - "Calculating genomic coverage: {wildcards.sample}" - shell: - "samtools depth -a {input.bam} | depth_windows.py {params} | gzip > {output}" - -rule alignment_report: - input: - outdir + "/reports/data/bxstats/{sample}.bxstats.gz", - outdir + "/reports/data/coverage/{sample}.cov.gz" - output: - outdir + "/reports/{sample}.html" - params: - molecule_distance - conda: - f"{envdir}/r.yaml" - message: - "Summarizing barcoded alignments: {wildcards.sample}" - script: - "report/align_stats.Rmd" - -rule general_alignment_stats: - input: - bam = outdir + "/{sample}.bam", - bai = outdir + "/{sample}.bam.bai" - output: - stats = temp(outdir + "/reports/data/samtools_stats/{sample}.stats"), - flagstat = temp(outdir + "/reports/data/samtools_flagstat/{sample}.flagstat") - container: - None - message: - "Calculating alignment stats: {wildcards.sample}" - shell: - """ - samtools stats -d {input.bam} > {output.stats} - samtools flagstat {input.bam} > {output.flagstat} - """ - -rule samtools_reports: - input: - collect(outdir + "/reports/data/samtools_{ext}/{sample}.{ext}", sample = samplenames, ext = ["stats", "flagstat"]) - output: - outdir + "/reports/minimap.stats.html" - params: - outdir - conda: - f"{envdir}/qc.yaml" - message: - "Summarizing samtools stats and flagstat" - shell: - """ - multiqc {params}/reports/data/samtools_stats {params}/reports/data/samtools_flagstat --force --quiet --title "Basic Alignment Statistics" --comment "This report aggregates samtools stats and samtools flagstats results for all alignments." --no-data-dir --filename {output} 2> /dev/null - """ - -rule workflow_summary: - default_target: True - input: - bams = collect(outdir + "/{sample}.{ext}", sample = samplenames, ext = ["bam","bam.bai"]), - samtools = outdir + "/reports/minimap.stats.html" if not skip_reports else [] , - bx_reports = collect(outdir + "/reports/{sample}.html", sample = samplenames) if not skip_reports else [] - params: - quality = config["quality"], - extra = extra - - run: - with open(outdir + "/workflow/align.minimap.summary", "w") as f: - _ = f.write("The harpy align module ran using these parameters:\n\n") - _ = f.write(f"The provided genome: {bn}\n") - _ = f.write("Sequencing were aligned with Minimap2 using:\n") - _ = f.write(f" minimap2 -y {params.extra} --sam-hit-only -R \"@RG\\tID:SAMPLE\\tSM:SAMPLE\" genome.mmi forward_reads reverse_reads |\n") - _ = f.write(f" samtools view -h -F 4 -q {params.quality} |\n") - _ = f.write("Duplicates in the alignments were marked following:\n") - _ = f.write(" samtools collate \n") - _ = f.write(" samtools fixmate\n") - _ = f.write(" samtools sort -m 2000M\n") - _ = f.write(" samtools markdup -S --barcode-tag BX\n") - _ = f.write("\nThe Snakemake workflow was called via command line:\n") - _ = f.write(" " + str(config["workflow_call"]) + "\n") \ No newline at end of file diff --git a/.deprecated/hpc.py b/.deprecated/hpc.py deleted file mode 100644 index c8deb1965..000000000 --- a/.deprecated/hpc.py +++ /dev/null @@ -1,248 +0,0 @@ -"""Harpy module to create HPC profiles for running snakemake on a cluster""" - -import os -import rich_click as click -from rich.markdown import Markdown -from ._printing import print_notice - -@click.group(options_metavar='', context_settings={"help_option_names" : ["-h", "--help"]}) -def hpc(): - """ - Profile templates for cluster job submissions - - If running Harpy on an HPC system, you can leverage Snakemake - to handle all the job submissions on your behalf. This command creates templates - for common HPC schedulers so you can run Harpy on a cluster with minimal friction. - The subcommands create a `config.yaml` in an `hpc/` directory. You will also need - to install the associated Snakemake executor plugin for HPC job submission to work. - """ - -docstring = { - "harpy hpc": [ - { - "name": "Submission Systems", - "commands": ["generic", "googlebatch", "htcondor", "lsf", "slurm"], - }, - ] -} - -@click.command() -def generic(): - """Configuration for a generic scheduler""" - outfile = "hpc/generic/config.yaml" - os.makedirs("hpc/generic", exist_ok=True) - if os.path.exists(outfile): - click.echo(f"{outfile} exists, overwriting.") - print_notice(Markdown(""" -Using a scheduler requires installing an additional Snakemake plugin. If you haven't already, install the `generic-cluster` plugin with: - -```bash -mamba install bioconda::snakemake-executor-plugin-cluster-generic -``` - """)) - with open(outfile, "w", encoding = "utf-8") as yml: - yml.write("__use_yte__: true\n") - yml.write("executor: cluster-generic\n") - yml.write("default-resources:\n") - yml.write(" mem_mb: attempt * 2000\n") - yml.write("jobs: 50\n") - yml.write("latency-wait: 60\n") - yml.write("retries: 1\n") - yml.write("# command for submitting jobs\n") - yml.write("cluster-generic-submit-cmd VALUE\n") - yml.write("# [optional] command for retrieving job status\n") - yml.write("cluster-generic-status-cmd VALUE\n") - yml.write("# [optional] command for cancelling jobs-- expected to take one or more jobids as arguments\n") - yml.write("cluster-generic-cancel-cmd VALUE\n") - yml.write("# [optional] number of jobids to pass to cancel_cmd and if more are given, cancel_cmd will be called multiple times\n") - yml.write("cluster-generic-cancel-nargs 20\n") - yml.write("# [optional] command for sidecar process\n") - yml.write("cluster-generic-sidecar-cmd VALUE\n") - yml.write("\n# This section is for advanced copying into a scratch directory #\n") - yml.write("## requires snakemake-storage-plugin-fs, which can be installed via conda\n") - yml.write("#default-storage-provider: fs\n") - yml.write("#local-storage-prefix: /home2/$USER\n") - yml.write("#shared-fs-usage:\n") - yml.write("# - persistence\n") - yml.write("# - software-deployment\n") - yml.write("# - sources\n") - yml.write("# - source-cache\n") - -@click.command() -def lsf(): - """Configuration for LSF""" - outfile = "hpc/lsf/config.yaml" - os.makedirs("hpc/lsf", exist_ok=True) - if os.path.exists(outfile): - click.echo(f"{outfile} exists, overwriting.") - print_notice(Markdown(""" -Using a scheduler requires installing an additional Snakemake plugin. If you haven't already, install the `lsf` plugin with: - -```bash -mamba install bioconda::snakemake-executor-plugin-lsf -``` - """)) - with open(outfile, "w", encoding = "utf-8") as yml: - yml.write("__use_yte__: true\n") - yml.write("executor: lsf\n") - yml.write("default-resources:\n") - yml.write(" lsf_queue: \n") - yml.write(" walltime: 60 # minutes per job\n") - yml.write(" mem_mb: attempt * 2000\n") - yml.write(" # other args to pass to bsub\n") - yml.write(" lsf_extra: \n") - yml.write("jobs: 50\n") - yml.write("latency-wait: 60\n") - yml.write("retries: 1\n") - yml.write("\n# This section is for advanced copying into a scratch directory #\n") - yml.write("## requires snakemake-storage-plugin-fs, which can be installed via conda\n") - yml.write("#default-storage-provider: fs\n") - yml.write("#local-storage-prefix: /home2/$USER\n") - yml.write("#shared-fs-usage:\n") - yml.write("# - persistence\n") - yml.write("# - software-deployment\n") - yml.write("# - sources\n") - yml.write("# - source-cache\n") - -@click.command() -def htcondor(): - """Configuration for HTCondor""" - os.makedirs("hpc/htcondor", exist_ok=True) - outfile = "hpc/htcondor/config.yaml" - if os.path.exists(outfile): - click.echo(f"{outfile} exists, overwriting.") - print_notice(Markdown(""" -Using a scheduler requires installing an additional Snakemake plugin. If you haven't already, install the `htcondor` plugin with: - -```bash -mamba install bioconda::snakemake-executor-plugin-htcondor -``` - """)) - with open(outfile, "w", encoding = "utf-8") as yml: - yml.write("__use_yte__: true\n") - yml.write("executor: htcondor\n") - yml.write("default-resources:\n") - yml.write(" getenv: True\n") - yml.write(" rank: \n") - yml.write(" max-retries: 1\n") - yml.write(" request_memory: attempt * 2000\n") - yml.write("jobs: 50\n") - yml.write("latency-wait: 60\n") - yml.write("retries: 1\n") - yml.write("\n# This section is for advanced copying into a scratch directory #\n") - yml.write("## requires snakemake-storage-plugin-fs, which can be installed via conda\n") - yml.write("#default-storage-provider: fs\n") - yml.write("#local-storage-prefix: /home2/$USER\n") - yml.write("#shared-fs-usage:\n") - yml.write("# - persistence\n") - yml.write("# - software-deployment\n") - yml.write("# - sources\n") - yml.write("# - source-cache\n") - -@click.command() -def slurm(): - """Configuration for SLURM""" - os.makedirs("hpc/slurm", exist_ok=True) - outfile = "hpc/slurm/config.yaml" - if os.path.exists(outfile): - click.echo(f"{outfile} exists, overwriting.") - print_notice(Markdown(""" -Using a scheduler requires installing an additional Snakemake plugin. If you haven't already, install the `slurm` plugin with: - -```bash -mamba install bioconda::snakemake-executor-plugin-slurm -``` - """)) - with open(outfile, "w", encoding = "utf-8") as yml: - yml.write("__use_yte__: true\n") - yml.write("executor: slurm\n") - yml.write("default-resources:\n") - yml.write(" slurm_account: $USER\n") - yml.write(" slurm_partition: regular\n") - yml.write(" mem_mb: attempt * 2000\n") - yml.write(" runtime: 10\n") - yml.write("jobs: 50\n") - yml.write("latency-wait: 60\n") - yml.write("retries: 1\n") - yml.write("\n# This section is for advanced copying into a scratch directory #\n") - yml.write("## requires snakemake-storage-plugin-fs, which can be installed via conda\n") - yml.write("#default-storage-provider: fs\n") - yml.write("#local-storage-prefix: /home2/$USER\n") - yml.write("#shared-fs-usage:\n") - yml.write("# - persistence\n") - yml.write("# - software-deployment\n") - yml.write("# - sources\n") - yml.write("# - source-cache\n") - -@click.command() -def googlebatch(): - """Configuration for Google Batch""" - os.makedirs("hpc/googlebatch", exist_ok=True) - outfile = "hpc/googlebatch/config.yaml" - if os.path.exists(outfile): - click.echo(f"{outfile} exists, overwriting.") - print_notice(Markdown(""" -Using a scheduler requires installing an additional Snakemake plugin. If you haven't already, install the `googlebatch` plugin with: - -```bash -mamba install bioconda::snakemake-executor-plugin-googlebatch -``` - """)) - with open(outfile, "w", encoding = "utf-8") as yml: - yml.write("__use_yte__: true\n") - yml.write("executor: googlebatch\n") - yml.write("jobs: 50\n") - yml.write("latency-wait: 45\n") - yml.write("retries: 1\n") - yml.write("default-resources:\n") - yml.write("## YOU MAY NOT NEED ALL OF THESE! ##\n") - yml.write("# The name of the Google Project\n") - yml.write(" googlebatch_project: Harpy\n") - yml.write("# The name of the Google Project region (e.g., 'us-central1')\n") - yml.write(" googlebatch_region: 'us-central1'\n") - yml.write("# Retry count\n") - yml.write(" googlebatch_retry_count: 1\n") - yml.write("# Maximum run duration, string (e.g., '3600s')\n") - yml.write(" googlebatch_max_run_duration: '3600s'\n") - yml.write("# Memory in MiB\n") - yml.write(" googlebatch_memory: attempt * 2000\n") - yml.write("# The default number of work tasks (these are NOT MPI ranks)\n") - yml.write(" googlebatch_work_tasks: 50\n") - yml.write("# The default number of work tasks per node (NOT MPI ranks)\n") - yml.write(" googlebatch_work_tasks_per_node: 10\n") - yml.write("# Milliseconds per cpu-second\n") - yml.write(" googlebatch_cpu_milli: 1000\n") - yml.write("# A custom container for use with Google Batch COS\n") - yml.write(" googlebatch_container: VALUE\n") - yml.write("# A docker registry password for COS if credentials are required\n") - yml.write(" googlebatch_docker_password: VALUE\n") - yml.write("# A docker registry username for COS if credentials are required\n") - yml.write(" googlebatch_docker_username: VALUE\n") - yml.write("# Google Cloud machine type or VM (mpitune on c2 and c2d family)\n") - yml.write(" googlebatch_machine_type: 'c2-standard-4'\n") - yml.write("# Comma separated key value pairs to label job (e.g., model=a3,stage=test)\n") - yml.write(" googlebatch_labels: VALUE\n") - yml.write("# Google Cloud image family (defaults to hpc-centos-7)\n") - yml.write(" googlebatch_image_family: 'hpc-centos-7'\n") - yml.write("# Selected image project\n") - yml.write(" googlebatch_image_project: 'cloud-hpc-image-public'\n") - yml.write("# Boot disk size (GB)\n") - yml.write(" googlebatch_boot_disk_gb: VALUE\n") - yml.write("# The URL of an existing network resource\n") - yml.write(" googlebatch_network: VALUE\n") - yml.write("# The URL of an existing subnetwork resource\n") - yml.write(" googlebatch_subnetwork: VALUE\n") - yml.write("# Boot disk type. (e.g., gcloud compute disk-types list)\n") - yml.write(" googlebatch_boot_disk_type: VALUE\n") - yml.write("# Boot disk image (e.g., batch-debian, bath-centos)\n") - yml.write(" googlebatch_boot_disk_image: VALUE\n") - yml.write("# Mount path for Google bucket (if defined)\n") - yml.write(" googlebatch_mount_path: '/mnt/share'\n") - yml.write("# One or more snippets to add to the Google Batch task setup\n") - yml.write(" googlebatch_snippets: VALUE\n") - -hpc.add_command(slurm) -hpc.add_command(htcondor) -hpc.add_command(lsf) -hpc.add_command(generic) -hpc.add_command(googlebatch) diff --git a/.deprecated/phase.bak.smk b/.deprecated/phase.bak.smk deleted file mode 100644 index cace77c90..000000000 --- a/.deprecated/phase.bak.smk +++ /dev/null @@ -1,323 +0,0 @@ -containerized: "docker://pdimens/harpy:latest" - -from rich import print as rprint -from rich.panel import Panel -import sys - -##TODO MANUAL PRUNING OF SWITCH ERRORS -# https://github.com/vibansal/HapCUT2/blob/master/outputformat.md - -bam_dir = config["seq_directory"] -samplenames = config["samplenames"] -variantfile = config["variantfile"] -pruning = config["prune"] -molecule_distance = config["molecule_distance"] -extra = config.get("extra", "") -outdir = config["output_directory"] -envdir = os.path.join(os.getcwd(), ".harpy_envs") - -if config["noBX"]: - fragfile = outdir + "/extractHairs/{sample}.unlinked.frags" -else: - fragfile = outdir + "/linkFragments/{sample}.linked.frags" - -linkarg = "--10x 0" if config["noBX"] else "--10x 1" -skipreports = config["skip_reports"] - -try: - indelarg = "--indels 1 --ref " + config["indels"] -except: - indelarg = "" - -wildcard_constraints: - sample = "[a-zA-Z0-9._-]+" - -onerror: - print("") - rprint( - Panel( - f"The workflow has terminated due to an error. See the log file below for more details.", - title = "[bold]harpy phase", - title_align = "left", - border_style = "red" - ), - file = sys.stderr - ) - -onsuccess: - print("") - rprint( - Panel( - f"The workflow has finished successfully! Find the results in [bold]{outdir}/[/bold]", - title = "[bold]harpy phase", - title_align = "left", - border_style = "green" - ), - file = sys.stderr - ) - -rule split_by_samplehet: - input: - vcf = variantfile, - bam = bam_dir + "/{sample}.bam" - output: - outdir + "/workflow/input/vcf/{sample}.het.vcf" - benchmark: - ".Benchmark/Phase/splithet.{sample}.txt" - container: - None - message: - "Extracting heterozygous variants: {wildcards.sample}" - shell: - """ - bcftools view -s {wildcards.sample} {input.vcf} | - awk '/^#/;/CHROM/ {{OFS="\\t"}}; !/^#/ && $10~/^0\\/1/' > {output} - """ - -rule split_by_sample: - input: - vcf = variantfile, - bam = bam_dir + "/{sample}.bam" - output: - outdir + "/workflow/input/vcf/{sample}.vcf" - benchmark: - ".Benchmark/Phase/split.{sample}.txt" - container: - None - message: - "Extracting variants: {wildcards.sample}" - shell: - """ - bcftools view -s {wildcards.sample} {input.vcf} | - awk '/^#/;/CHROM/ {{OFS="\\t"}}; !/^#/ && $10~/^0\\/0/ {{$10="0|0:"substr($10,5);print $0}}; !/^#/ && $10~/^0\\/1/; !/^#/ && $10~/^1\\/1/ {{$10="1|1:"substr($10,5);print $0}}; !/^#/ {{print $0}}' > {output} - """ - -rule index_alignment: - input: - bam_dir + "/{sample}.bam" - output: - bam_dir + "/{sample}.bam.bai" - container: - None - message: - "Indexing alignment: {wildcards.sample}" - shell: - "samtools index {input} {output} 2> /dev/null" - -rule extract_hairs: - input: - vcf = outdir + "/workflow/input/vcf/{sample}.het.vcf", - bam = bam_dir + "/{sample}.bam", - bai = bam_dir + "/{sample}.bam.bai" - output: - outdir + "/extractHairs/{sample}.unlinked.frags" - log: - outdir + "/extractHairs/logs/{sample}.unlinked.log" - params: - indels = indelarg, - bx = linkarg - conda: - f"{envdir}/phase.yaml" - benchmark: - ".Benchmark/Phase/extracthairs.{sample}.txt" - message: - "Converting to compact fragment format: {wildcards.sample}" - shell: - "extractHAIRS {params} --nf 1 --bam {input.bam} --VCF {input.vcf} --out {output} 2> {log}" - -rule link_fragments: - input: - bam = bam_dir + "/{sample}.bam", - vcf = outdir + "/workflow/input/vcf/{sample}.het.vcf", - fragments = outdir + "/extractHairs/{sample}.unlinked.frags" - output: - outdir + "/linkFragments/{sample}.linked.frags" - log: - outdir + "/linkFragments/logs/{sample}.linked.log" - params: - d = molecule_distance - conda: - f"{envdir}/phase.yaml" - benchmark: - ".Benchmark/Phase/linkfrag.{sample}.txt" - message: - "Linking fragments: {wildcards.sample}" - shell: - "LinkFragments.py --bam {input.bam} --VCF {input.vcf} --fragments {input.fragments} --out {output} -d {params} > {log} 2>&1" - -rule phase_blocks: - input: - vcf = outdir + "/workflow/input/vcf/{sample}.het.vcf", - fragments = fragfile - output: - blocks = outdir + "/phaseBlocks/{sample}.blocks", - vcf = outdir + "/phaseBlocks/{sample}.blocks.phased.VCF" - log: - outdir + "/phaseBlocks/logs/{sample}.blocks.phased.log" - params: - prune = f"--threshold {pruning}" if pruning > 0 else "--no_prune 1", - extra = extra - conda: - f"{envdir}/phase.yaml" - benchmark: - ".Benchmark/Phase/phase.{sample}.txt" - message: - "Creating phased haplotype blocks: {wildcards.sample}" - shell: - "HAPCUT2 --fragments {input.fragments} --vcf {input.vcf} {params} --out {output.blocks} --nf 1 --error_analysis_mode 1 --call_homozygous 1 --outvcf 1 > {log} 2>&1" - -rule create_annotations: - input: - outdir + "/phaseBlocks/{sample}.blocks.phased.VCF" - output: - outdir + "/annotations/{sample}.annot.gz" - container: - None - message: - "Creating annotation files: {wildcards.sample}" - shell: - "bcftools query -f \"%CHROM\\t%POS[\\t%GT\\t%PS\\t%PQ\\t%PD]\\n\" {input} | bgzip -c > {output}" - -rule index_annotations: - input: - outdir + "/annotations/{sample}.annot.gz" - output: - outdir + "/annotations/{sample}.annot.gz.tbi" - container: - None - message: - "Indexing {wildcards.sample}.annot.gz" - shell: - "tabix -b 2 -e 2 {input}" - -rule headerfile: - output: - outdir + "/workflow/input/header.names" - message: - "Creating additional header file" - run: - with open(output[0], "w") as fout: - _ = fout.write('##INFO=\n') - _ = fout.write('##FORMAT=\n') - _ = fout.write('##FORMAT=\n') - _ = fout.write('##FORMAT=\n') - _ = fout.write('##FORMAT=') - -rule merge_annotations: - input: - annot = outdir + "/annotations/{sample}.annot.gz", - idx = outdir + "/annotations/{sample}.annot.gz.tbi", - orig = outdir + "/workflow/input/vcf/{sample}.vcf", - headers = outdir + "/workflow/input/header.names" - output: - bcf = outdir + "/annotations_merge/{sample}.phased.annot.bcf", - idx = outdir + "/annotations_merge/{sample}.phased.annot.bcf.csi" - threads: - 2 - benchmark: - ".Benchmark/Phase/mergeAnno.{sample}.txt" - container: - None - message: - "Merging annotations: {wildcards.sample}" - shell: - """ - bcftools annotate -h {input.headers} -a {input.annot} {input.orig} -c CHROM,POS,FMT/GT,FMT/PS,FMT/PQ,FMT/PD -m +HAPCUT | - awk '!/ 1 else "false" - threads: - workflow.cores - container: - None - message: - "Combining results into {output.bcf}" if len(samplenames) > 1 else "Copying results to {output.bcf}" - shell: - """ - if [ "{params}" = true ]; then - bcftools merge --threads {threads} -Ob -o {output.bcf} --write-index {input.bcf} - else - cp {input.bcf} {output.bcf} - cp {input.idx} {output.idx} - fi - """ - -rule summarize_blocks: - input: - collect(outdir + "/phaseBlocks/{sample}.blocks", sample = samplenames) - output: - outdir + "/reports/blocks.summary.gz" - params: - outdir + "/reports/blocks.summary" - container: - None - message: - "Summarizing phasing results" - shell: - """ - echo -e "sample\\tcontig\\tn_snp\\tpos_start\\tblock_length" > {params} - for i in {input}; do - parse_phaseblocks.py -i $i >> {params} - done - gzip {params} - """ - -rule phase_report: - input: - outdir + "/reports/blocks.summary.gz" - output: - outdir + "/reports/phase.html" - conda: - f"{envdir}/r.yaml" - message: - "Summarizing phasing results" - script: - "report/hapcut.Rmd" - -rule workflow_summary: - default_target: True - input: - vcf = outdir + "/variants.phased.bcf", - reports = outdir + "/reports/phase.html" if not skip_reports else [] - params: - prune = f"--threshold {pruning}" if pruning > 0 else "--no_prune 1", - extra = extra - message: - "Creating record of relevant runtime parameters" - run: - with open(outdir + "/workflow/phase.summary", "w") as f: - _ = f.write("The harpy phase module ran using these parameters:\n\n") - _ = f.write(f"The provided variant file: {variantfile}\n") - _ = f.write(f"The directory with alignments: {bam_dir}\n") - _ = f.write("The variant file was split by sample and preprocessed using:\n") - _ = f.write(""" bcftools view -s SAMPLE | awk '/^#/;/CHROM/ OFS="\\t"; !/^#/ && $10~/^0\\/1/'\n\n""") - _ = f.write("Phasing was performed using the components of HapCut2:\n") - _ = f.write(" extractHAIRS " + linkarg + " --nf 1 --bam sample.bam --VCF sample.vcf --out sample.unlinked.frags\n") - _ = f.write(" LinkFragments.py --bam sample.bam --VCF sample.vcf --fragments sample.unlinked.frags --out sample.linked.frags -d " + f"{molecule_distance}" + "\n") - _ = f.write(" HAPCUT2 --fragments sample.linked.frags --vcf sample.vcf --out sample.blocks --nf 1 --error_analysis_mode 1 --call_homozygous 1 --outvcf 1" + f" {params[0]} {params[1]}" + "\n\n") - _ = f.write("Variant annotation was performed using:\n") - _ = f.write(" bcftools query -f \"%CHROM\\t%POS[\\t%GT\\t%PS\\t%PQ\\t%PD]\\n\" sample.vcf | bgzip -c\n") - _ = f.write(" bcftools annotate -h header.file -a sample.annot sample.bcf -c CHROM,POS,FMT/GX,FMT/PS,FMT/PQ,FMT/PD -m +HAPCUT |\n") - _ = f.write(" awk '!/\n') - _ = f.write(' ##FORMAT=\n') - _ = f.write(' ##FORMAT=\n') - _ = f.write(' ##FORMAT=\n') - _ = f.write(' ##FORMAT=\n') - _ = f.write("\nThe Snakemake workflow was called via command line:\n") - _ = f.write(" " + str(config["workflow_call"]) + "\n") \ No newline at end of file diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 00953809b..ab7432fc9 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -833,7 +833,7 @@ jobs: run: harpy downsample -d 1 --random-seed 699 --quiet test/bam/sample1.bam - name: harpy downsample fastq shell: micromamba-shell {0} - run: harpy downsample -d 1 --quiet test/fastq/sample1.* + run: harpy downsample -d 1 --quiet test/fastq/sample1.*gz - name: harpy hpc shell: micromamba-shell {0} run: | diff --git a/harpy/bin/extract_bxtags.py b/harpy/bin/extract_bxtags.py old mode 100644 new mode 100755 index 7d9215b88..fd3e2c586 --- a/harpy/bin/extract_bxtags.py +++ b/harpy/bin/extract_bxtags.py @@ -13,6 +13,7 @@ ) parser.add_argument('-i','--invalid', type= float, default=1, help = "Proportion of invalid barcodes to include in subsampling/output (default: %(default)s)") parser.add_argument('-d','--downsample', type= int, help = "Number of barcodes to downsample to") +parser.add_argument('-r','--random-seed', type= int, help = "Random seed for sampling") parser.add_argument('-b','--bx-tag', type= str, default="BX", help = "2-character tag with the barcodes (alphanumeric, default: %(default)s)") parser.add_argument('input', type= str, help = "Input BAM file") if len(sys.argv) == 1: @@ -20,7 +21,7 @@ sys.exit(1) args = parser.parse_args() -if args.invalid < 0 or args.invalid > 0: +if args.invalid < 0 or args.invalid > 1: parser.error("--invalid must be between 0 and 1") bx_tag = args.bx_tag.upper() @@ -28,8 +29,8 @@ rng = random.Random(args.random_seed) if args.random_seed else random.Random() invalid_pattern = re.compile(r'[AaBbCcDd]00') barcodes = set() - -with pysam.AlignmentFile(args.input, "rb", check_sq=False) as infile: +mode = "r" if args.input.lower().endswith("sam") else "rb" +with pysam.AlignmentFile(args.input, mode, check_sq=False) as infile: for record in infile: try: barcode = record.get_tag(bx_tag) diff --git a/harpy/downsample.py b/harpy/downsample.py index 5c3270730..a0c4bd5c3 100644 --- a/harpy/downsample.py +++ b/harpy/downsample.py @@ -5,6 +5,7 @@ import sys import yaml from rich import box +from pathlib import Path from rich.table import Table import rich_click as click from ._cli_types_generic import SnakemakeParams @@ -19,7 +20,7 @@ }, { "name": "Workflow Controls", - "options": ["--quiet", "--snakemake", "--threads", "--help"], + "options": ["--output-dir", "--quiet", "--snakemake", "--threads", "--help"], }, ] } @@ -28,12 +29,13 @@ @click.option('-i', '--invalid', default = 1, show_default = True, type=click.FloatRange(min=0,max=1), help = "Proportion of invalid barcodes to sample") @click.option('-p', '--prefix', type = click.Path(exists = False), default = "downsampled", show_default = True, help = 'Prefix for output file(s)') @click.option('--random-seed', type = click.IntRange(min = 1), help = "Random seed for sampling") +@click.option('-o', '--output-dir', type = click.Path(exists = False), default = "Downsample", show_default=True, help = 'Output directory name') @click.option('-t', '--threads', default = 4, show_default = True, type = click.IntRange(min = 1, max_open = True), help = 'Number of threads to use') @click.option('--quiet', is_flag = True, show_default = True, default = False, help = 'Don\'t show output text while running') @click.option('--setup-only', is_flag = True, hidden = True, show_default = True, default = False, help = 'Setup the workflow and exit') @click.option('--snakemake', type = SnakemakeParams(), help = 'Additional Snakemake parameters, in quotes') @click.argument('input', required=True, type=click.Path(exists=True, readable=True, dir_okay=False), nargs=-1) -def downsample(input, prefix, downsample, invalid, random_seed, setup_only, snakemake, threads, quiet): +def downsample(input, invalid, output_dir, prefix, downsample, random_seed, setup_only, snakemake, threads, quiet): """ Downsample data by barcode @@ -42,10 +44,10 @@ def downsample(input, prefix, downsample, invalid, random_seed, setup_only, snak - one BAM file - two FASTQ files (R1 and R2 of a paired-end read set) - Use `--invalid` to specify the proportion of invalid barcodes to consider for sampling, where `0` will remove all - invalid barcodes from the barcode pool, `1` will add all invalid barcodes to the barcode pool, and a number in between - (e.g. `0.25`) will add approximately that proprotion of invalid barcodes into the barcode pool that gets subsampled down - to `-d` barcodes. + Use `--invalid` to specify the proportion of invalid barcodes to consider for sampling: + - `0` removes all invalid barcodes from the barcode pool + - `1` adds all invalid barcodes to the barcode pool + - 0<`i`<1 (e.g. `0.25`) keeps that proprotion of invalid barcodes in the barcode pool """ # validate input files as either 1 bam or 2 fastq if len(input) > 2: @@ -82,7 +84,7 @@ def downsample(input, prefix, downsample, invalid, random_seed, setup_only, snak "invalid_proportion" : invalid, **({"random_seed" : random_seed} if random_seed else {}), "workflow_call" : command.rstrip(), - "inputs": [i.as_posix() for i in input] + "inputs": [Path(i).resolve().as_posix() for i in input] } with open(os.path.join(workflowdir, 'config.yaml'), "w", encoding="utf-8") as config: yaml.dump(configs, config, default_flow_style= False, sort_keys=False, width=float('inf')) diff --git a/harpy/snakefiles/downsample.smk b/harpy/snakefiles/downsample.smk index 36496d2be..08809bb8e 100644 --- a/harpy/snakefiles/downsample.smk +++ b/harpy/snakefiles/downsample.smk @@ -2,7 +2,6 @@ import os import re import gzip import pysam -import random import logging onstart: @@ -20,10 +19,8 @@ downsample = config["downsample"] prefix = config["prefix"] infiles = dict(zip(inputs, inputs)) is_fastq = True if len(inputs) == 2 else False -rng = random.Random(random_seed) if random_seed else random.Random() if is_fastq: - # determine if a file is gzipped try: with gzip.open(inputs[0], 'rt') as f: f.read(10) @@ -42,10 +39,10 @@ rule bam_convert: workflow.cores shell: """ - samtools import -@ {threads} -T "*" {input} + samtools import -@ {threads} -T "*" {input} -O BAM > {output} """ -rule extract_barcodes: +rule sample_barcodes: input: f"{outdir}/input.bam" if is_fastq else inputs[0] output: @@ -55,9 +52,10 @@ rule extract_barcodes: threads: workflow.cores params: - inv_prop = f"-i {invalid_proportion}", + inv_prop = f"-i {invalids}", downsample_amt = f"-d {downsample}", - bx_tag = "-b BX" + bx_tag = "-b BX", + random_seed = f"-r {random_seed}" if random_seed else "" shell: "extract_bxtags.py {params} {input} > {output} 2> {log}" @@ -65,32 +63,42 @@ rule index_barcodes: input: lambda wc: infiles[wc.inputfile] output: - "{inputfile}.bci" + bci = temp("{inputfile}.bci"), + bai = temp("{inputfile}.bai") if not is_fastq else [], + gzi = temp("{inputfile}i") if is_fastq and is_gzip else [] threads: workflow.cores run: if is_fastq: gz_arg = "--gzip" if is_gzip else "" - shell(f"LRez index fastq -t {threads} -f {input} -o {output} {gz_arg}") + shell(f"LRez index fastq -t {threads} -f {input} -o {output.bci} {gz_arg}") else: - shell(f"LRez index bam --offsets -t {threads} -b {input} -o {output}") + shell(f"samtools index {input}") + shell(f"LRez index bam --offsets -t {threads} -b {input} -o {output.bci}") rule downsample: input: - file = infiles[0], - bc_index = infiles[0] + ".bci", + bam = inputs[0], + bai = inputs[0] + ".bai", + bc_index = inputs[0] + ".bci", bc_list = f"{outdir}/sampled_barcodes.txt" output: - f"{outdir}/{prefix}.bam" + sam = temp(f"{outdir}/{prefix}.sam"), + bam = f"{outdir}/{prefix}.bam" threads: workflow.cores shell: - "LRez query bam -t {threads} -bam {input.file} -i {input.bc_index} -l {input.bc_list} > {output}" + """ + samtools view -H {input.bam} > {output.sam} + LRez query bam -t {threads} -b {input.bam} -i {input.bc_index} -l {input.bc_list} >> {output.sam} + samtools view -O BAM {output.sam} > {output.bam} + """ rule downsample_read_1: input: - file = inputs[0], + fastq = inputs[0], bc_index = inputs[0] + ".bci", + fq_index = inputs[0] + "i", bc_list = f"{outdir}/sampled_barcodes.txt" output: f"{outdir}/{prefix}.R1.fq.gz" @@ -99,12 +107,13 @@ rule downsample_read_1: threads: workflow.cores shell: - "LRez query fastq -t {threads} -f {input.file} -i {input.bc_index} -l {input.bc_list} {params} > {output}" - + "LRez query fastq -t {threads} -f {input.fastq} -i {input.bc_index} -l {input.bc_list} {params} | bgzip > {output}" + rule downsample_read_2: input: - file = inputs[1], - bc_index = inputs[1] + ".bci", + file = inputs[-1], + bc_index = inputs[-1] + ".bci", + fq_index = inputs[-1] + "i", bc_list = f"{outdir}/sampled_barcodes.txt" output: f"{outdir}/{prefix}.R2.fq.gz" @@ -113,21 +122,24 @@ rule downsample_read_2: threads: workflow.cores shell: - "LRez query fastq -t {threads} -f {input.file} -i {input.bc_index} -l {input.bc_list} {params} > {output}" + "LRez query fastq -t {threads} -f {input.file} -i {input.bc_index} -l {input.bc_list} {params} | bgzip > {output}" rule workflow_summary: + default_target: True input: f"{outdir}/{prefix}.bam" if not is_fastq else [], collect(f"{outdir}/{prefix}.R" + "{FR}.fq.gz", FR = [1,2]) if is_fastq else [] + params: + random_seed = f"-r {random_seed}" if random_seed else "" run: summary = ["The harpy downsample workflow ran using these parameters:"] - summary.append(f"The provided input file(s):\n" + "\n\t".join(inputs)) + summary.append(f"The provided input file(s):\n\t" + "\n\t".join(inputs)) convs = "The FASTQ files were converted into a BAM file with:\n" convs += "\tsamtools import -T * fastq1 fastq2" if is_fastq: summary.append(convs) extraction = "Barcodes were extracted and sampled using:\n" - extraction += f"\textract_bxtags.py -i {invalids} -b BX -d {downsample} input.bam" + extraction += f"\textract_bxtags.py -i {invalids} -b BX -d {downsample} {params.random_seed} input.bam" summary.append(extraction) lrez = "The inputs were indexed and downsampled using LRez:\n" if is_fastq: From 22d030ed2ca938c5041db53b2e32a11cef1ede67 Mon Sep 17 00:00:00 2001 From: pdimens Date: Tue, 10 Dec 2024 15:53:37 -0500 Subject: [PATCH 6/7] apply prefix to barcodes --- harpy/snakefiles/downsample.smk | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/harpy/snakefiles/downsample.smk b/harpy/snakefiles/downsample.smk index 08809bb8e..1cbe7563d 100644 --- a/harpy/snakefiles/downsample.smk +++ b/harpy/snakefiles/downsample.smk @@ -46,7 +46,7 @@ rule sample_barcodes: input: f"{outdir}/input.bam" if is_fastq else inputs[0] output: - f"{outdir}/sampled_barcodes.txt" + f"{outdir}/{prefix}.barcodes" log: f"{outdir}/logs/sampled_barcodes.log" threads: @@ -81,7 +81,7 @@ rule downsample: bam = inputs[0], bai = inputs[0] + ".bai", bc_index = inputs[0] + ".bci", - bc_list = f"{outdir}/sampled_barcodes.txt" + bc_list = f"{outdir}/{prefix}.barcodes" output: sam = temp(f"{outdir}/{prefix}.sam"), bam = f"{outdir}/{prefix}.bam" @@ -99,7 +99,7 @@ rule downsample_read_1: fastq = inputs[0], bc_index = inputs[0] + ".bci", fq_index = inputs[0] + "i", - bc_list = f"{outdir}/sampled_barcodes.txt" + bc_list = f"{outdir}/{prefix}.barcodes" output: f"{outdir}/{prefix}.R1.fq.gz" params: @@ -114,7 +114,7 @@ rule downsample_read_2: file = inputs[-1], bc_index = inputs[-1] + ".bci", fq_index = inputs[-1] + "i", - bc_list = f"{outdir}/sampled_barcodes.txt" + bc_list = f"{outdir}/{prefix}.barcodes" output: f"{outdir}/{prefix}.R2.fq.gz" params: From 4efd612316bda7ce3d56cbd3f57af82088eae420 Mon Sep 17 00:00:00 2001 From: pdimens Date: Wed, 11 Dec 2024 09:40:32 -0500 Subject: [PATCH 7/7] add NXX calc --- harpy/reports/hapcut.Rmd | 44 +++++++++++++++++++++++++--------------- 1 file changed, 28 insertions(+), 16 deletions(-) diff --git a/harpy/reports/hapcut.Rmd b/harpy/reports/hapcut.Rmd index 15a4129a7..5015b6624 100644 --- a/harpy/reports/hapcut.Rmd +++ b/harpy/reports/hapcut.Rmd @@ -51,6 +51,14 @@ df <- df[df$block_length > 0, c(1,2,3,4,6,5)] levels(df$sample) <- basename(levels(df$sample)) ``` +```{r nxx, results = F} +NX <- function(lengths, X=50){ + lengths <- as.numeric(sort(lengths, decreasing=TRUE)) + index <- which(cumsum(lengths) / sum(lengths) >= X/100)[1] + return(lengths[index]) +} +``` + # General Stats ## fileheader ### details desc {.no-title} @@ -60,11 +68,9 @@ The data presented here are the results of phasing genotypes into haplotypes usi which summarized information across all samples using the `.blocks` files HapCut2 generates. The VCF files HapCut2 also generates were not used as they lack some of the information in the `.blocks` files that were collated in this report. -This page shows general and per-contig information. You may click the plots to -expand them in your browser. Clicking them again will exit from the zoomed pop-up -window. The `Per-Sample Stats` tab in the navigation bar above will show you -statistics relating to haplotypes on the per-sample level. Haplotype blocks with a size -of `0` were removed from the data. +This page shows general and per-contig information. The `Per-Sample Stats` tab in +the navigation bar above will show you statistics relating to haplotypes on the +per-sample level. Haplotype blocks with a size of `0` were removed from the data. ## Haplotyping details {data-height=100} ```{r process_stats} @@ -72,9 +78,10 @@ overall <- df %>% summarise( total_snps = sum(n_snp), mean_snps = round(mean(n_snp), digits = 0), median_snps = median(n_snp), - mean_blocksize = round(mean(block_length), digits = 0), - median_blocksize = median(block_length), - max_blocksize = max(block_length) + max_blocksize = max(block_length), + N50 = NX(block_length, 50), + N75 = NX(block_length, 75), + N90 = NX(block_length, 90) ) ``` @@ -92,14 +99,19 @@ valueBox(scales::comma(overall$mean_snps[1]), caption = "Avg SNPs/Haplotype", co ```{r} valueBox(scales::comma(overall$median_snps[1]), caption = "Median SNPs/Haplotype", color = "info") ``` -### avg blocksize +### N50 +```{r} +valueBox(scales::comma(overall$N50[1]), caption = "N50", color = "info") +``` + +### N75 ```{r} -valueBox(scales::comma(overall$mean_blocksize[1]), caption = "Avg Haplotype Length", color = "info") +valueBox(scales::comma(overall$N75[1]), caption = "N75", color = "info") ``` -### median blocksize +### N90 ```{r} -valueBox(scales::comma(overall$median_blocksize[1]), caption = "Median Haplotype Length", color = "info") +valueBox(scales::comma(overall$N90[1]), caption = "N90", color = "info") ``` ### longest haplotype @@ -152,7 +164,7 @@ percontig <- df %>% group_by(contig) %>% summarise( mean_snps = round(mean(n_snp), digits = 0), median_snps = median(n_snp), mean_blocksize = round(mean(block_length), digits = 0), - median_blocksize = median(block_length), + N50 = NX(block_length, 50), max_blocksize = max(block_length) ) @@ -165,7 +177,7 @@ DT::datatable( buttons = list(list(extend = "csv",filename = "phasing_per_contig")), scrollX = TRUE ), - colnames = c("Contig", "Total Haplotypes", "Mean SNPs", "Median SNPs", "Mean Haplotype Length", "Median Haplotype Length", "Largest Haplotype"), + colnames = c("Contig", "Total Haplotypes", "Mean SNPs", "Median SNPs", "Mean Haplotype Length", "Haplotype N50", "Largest Haplotype"), fillContainer = T ) ``` @@ -248,7 +260,7 @@ persample <- df %>% group_by(sample) %>% summarise( mean_snps = round(mean(n_snp), digits = 0), median_snps = median(n_snp), mean_blocksize = round(mean(block_length), digits = 0), - median_blocksize = median(block_length), + N50 = NX(block_length, 50), max_blocksize = max(block_length) ) @@ -261,7 +273,7 @@ DT::datatable( buttons = list(list(extend = "csv",filename = "phasing_per_sample")), scrollX = TRUE ), - colnames = c("Sample", "Haplotypes", "Mean SNPs", "Median SNPs", "Mean Haplotype Length", "Median Haplotype Length", "Largest Haplotype"), + colnames = c("Sample", "Haplotypes", "Mean SNPs", "Median SNPs", "Mean Haplotype Length", "Haplotype N50", "Largest Haplotype"), fillContainer = T ) ```