Skip to content

Commit

Permalink
Merge pull request #29 from hillerlab/fixed_partitioning
Browse files Browse the repository at this point in the history
Fixed partitioning
  • Loading branch information
kirilenkobm authored Sep 16, 2023
2 parents f63513f + 5115cf4 commit 5730aee
Show file tree
Hide file tree
Showing 9 changed files with 213 additions and 37 deletions.
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Make Lastz Chains

[![Code style: black](https://img.shields.io/badge/code%20style-black-000000.svg)](https://github.com/psf/black)
![version](https://img.shields.io/badge/version-2.0.3-blue)
![version](https://img.shields.io/badge/version-2.0.4-blue)
[![made-with-Nextflow](https://img.shields.io/badge/Made%20with-Nextflow-23aa62.svg)](https://www.nextflow.io/)

Portable Hillerlab solution for generating pairwise genome alignment chains.
Expand Down Expand Up @@ -59,9 +59,9 @@ pslSortAcc
axtChain
chainAntiRepeat
chainMergeSort
chainCleaner"
chainSort"
chainScore"
chainCleaner
chainSort
chainScore
chainNet
axtToPsl
chainFilter
Expand Down
10 changes: 5 additions & 5 deletions constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ class Constants:
DEFAULT_SEQ1_CHUNK = 175_000_000
DEFAULT_SEQ2_CHUNK = 50_000_000
DEFAULT_SEQ1_LAP = 0
DEFAULT_SEQ1_LIMIT = 4000 # what is it?
DEFAULT_SEQ1_LIMIT = 5000 # what is it?
DEFAULT_SEQ2_LAP = 10_000
DEFAULT_SEQ2_LIMIT = 10_000

Expand Down Expand Up @@ -58,7 +58,9 @@ class Constants:
QUERY_CHROM_SIZES_FILENAME = f"{QUERY_LABEL}.chrom.sizes"

# file and directory names
PART_BULK_FILENAME_PREFIX = "BULK"
LASTZ_OUT_BUCKET_PREFIX = "bucket_ref_"
LASTZ_OUT_BULK_PREFIX = "bucket_ref_bulk"
TEMP_LASTZ_DIRNAME = "temp_lastz_run"
TEMP_PSL_DIRNAME = "temp_lastz_psl_output"
TEMP_CAT_DIRNAME = "temp_concat_lastz_output"
Expand All @@ -70,11 +72,10 @@ class Constants:
CHAIN_RUN_OUT_DIRNAME = "chain"
PSL_SORT_TEMP_DIRNAME = "psl_sort_temp_dir"

REMOVED_SUSPECTS_BED_FNAME = "removed_suspects.bed"

FILL_CHAIN_DIRNAME = "temp_fill_chain"
FILLED_CHAINS_DIRNAME = "filled_chain_files"
FILLED_CHAINS_JOBS_DIRNAME = "fill_chain_chunks"
REMOVED_SUSPECTS_BED_FNAME = "removed_suspects.bed"

FILL_PREPARE_JOBLIST_NAME = "jobList_prepare.txt"
REPEAT_FILLER_JOBLIST_NAME = "repeat_filler_joblist.txt"
Expand All @@ -98,11 +99,9 @@ class NextflowConstants:
NF_DIRNAME = "parallelization"
NF_DIR = os.path.abspath(os.path.join(SCRIPT_LOCATION, NF_DIRNAME))
NF_SCRIPT_PATH = os.path.join(NF_DIR, "execute_joblist.nf")

LASTZ_STEP_LABEL = "lastz"
FILL_CHAIN_LABEL = "fill_chain"
CHAIN_RUN_LABEL = "chain_run"

JOB_MEMORY_REQ = 16 # GB
JOB_TIME_REQ = '24h'

Expand All @@ -126,3 +125,4 @@ class ToolNames:
class ScriptNames:
REPEAT_FILLER = "chain_gap_filler.py"
RUN_LASTZ = "run_lastz.py"
RUN_LASTZ_LAYER = "run_lastz_intermediate_layer.py"
7 changes: 3 additions & 4 deletions make_chains.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,15 +202,14 @@ def cleanup(parameters: PipelineParameters, project_paths: ProjectPaths):
def run_pipeline(args):
# setup project dir, parameters and step manager
start_time = dt.now()

step_executables = StepExecutables(SCRIPT_LOCATION)
project_dir = OutputDirectoryManager(args).project_dir
setup_logger(os.path.join(project_dir, "run.log"))
log_version()
parameters = PipelineParameters(args)
project_paths = ProjectPaths(project_dir, SCRIPT_LOCATION, parameters)
step_executables = StepExecutables(SCRIPT_LOCATION)
step_manager = StepManager(project_paths, args)

setup_logger(project_paths.log_file)
log_version()
to_log(f"Making chains for {args.target_genome} and {args.query_genome} files, saving results to {project_dir}")
to_log(f"Pipeline started at {start_time}")

Expand Down
1 change: 1 addition & 0 deletions modules/step_executables.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ def __init__(self, root_dir):
self.not_found = []

self.lastz_wrapper = self.__find_script(Constants.ScriptNames.RUN_LASTZ)
self.lastz_layer = self.__find_script(Constants.ScriptNames.RUN_LASTZ_LAYER)
self.repeat_filler = self.__find_script(Constants.ScriptNames.REPEAT_FILLER)

self.fa_to_two_bit = self.__find_binary(Constants.ToolNames.FA_TO_TWO_BIT)
Expand Down
3 changes: 1 addition & 2 deletions standalone_scripts/run_lastz.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ def parse_args():
app.add_argument("output", help="Output file location")

app.add_argument("--output_format", choices=["psl", "axt"], help="Output format axt|psl")
app.add_argument("--gz", help="Compress output with gzip")
app.add_argument("--temp_dir",
help="Temp directory to save intermediate fasta files (if needed)\n"
"/tmp/ is default, however, params_json key TMPDIR can provide a value"
Expand Down Expand Up @@ -325,7 +324,7 @@ def main():
seq_2_sizes_path,
args.axt_to_psl,
v)
f = open(args.output, "w")
f = open(args.output, "a")
f.write(out_to_save)
f.close()
# else -> output is empty -> do not save an output file -> no need
Expand Down
120 changes: 120 additions & 0 deletions standalone_scripts/run_lastz_intermediate_layer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
#!/usr/bin/env python3
"""Intermediate layer to run LASTZ wrapper.
Needed to process the case if many little scaffolds are to be aligned
withing one cluster job."""
import argparse
import subprocess
import sys
import json
from itertools import product


def read_chrom_sizes(chrom_sizes_path):
"""Read chrom.sizes file"""
chrom_sizes = {}
f = open(chrom_sizes_path, "r")
for line in f:
line_data = line.rstrip().split("\t")
chrom = line_data[0]
chrom_size = int(line_data[1])
chrom_sizes[chrom] = chrom_size
f.close()
return chrom_sizes


def read_json_file(file_path):
with open(file_path, 'r') as f:
return json.load(f) # TODO: move to commons


def parse_args():
"""Parse arguments.
Recapitulates run_lastz.py parameters."""
app = argparse.ArgumentParser()
app.add_argument("target", help="Target: single sequence file or .lst")
app.add_argument("query", help="Query: single sequence file or .lst")
app.add_argument("params_json", help="pipeline configuration file")
app.add_argument("output", help="Output file location")
app.add_argument("run_lastz_script", help="Path to the run_lastz script")

app.add_argument("--output_format", choices=["psl", "axt"], help="Output format axt|psl")
app.add_argument("--temp_dir",
help="Temp directory to save intermediate fasta files (if needed)\n"
"/tmp/ is default, however, params_json key TMPDIR can provide a value"
"the command line argument has a higher priority than DEF file"
)
app.add_argument("--verbose",
"-v",
action="store_true",
dest="verbose",
help="Show verbosity messages")
app.add_argument("--axt_to_psl",
default="axtToPsl",
help="If axtToPst is not in the path, use this"
"argument to provide path to this binary, if needed"
)

if len(sys.argv) < 5:
app.print_help()
sys.exit(0)
args = app.parse_args()
return args


def get_intervals_list(to_ali_arg, chrom_sizes):
"""For bulk partitions argument, create a list of chromosomes
to feed into the LASTZ wrapper command.
"""
ret = []
if not to_ali_arg.startswith("BULK"):
return [to_ali_arg]
arg_split = to_ali_arg.split(":")
two_bit_path = arg_split[1]
chroms_to_be_aligned_in_bulk = arg_split[2:]
for chrom in chroms_to_be_aligned_in_bulk:
_start = 0
_end = chrom_sizes[chrom]
upd_arg = f"{two_bit_path}:{chrom}:{_start}-{_end}"
ret.append(upd_arg)
return ret


def main():
args = parse_args()
pipeline_params = read_json_file(args.params_json)
seq_1_sizes_path = pipeline_params["seq_1_len"]
seq_2_sizes_path = pipeline_params["seq_2_len"]
target_chrom_sizes = read_chrom_sizes(seq_1_sizes_path)
query_chrom_sizes = read_chrom_sizes(seq_2_sizes_path)
target_coordinates = get_intervals_list(args.target, target_chrom_sizes)
query_coordinates = get_intervals_list(args.query, query_chrom_sizes)

# for each combination of target and query partitions in this
# particular command, create and run the respective run_lastz.py command
# normally, it's just a single:
# reference chr 1 from 0 to 100000 vs query chr 2 1000000 to 3000000
# but in case some little scaffolds were aggregated, it must be unfolded first.
for target_arg, query_arg in product(target_coordinates, query_coordinates):
lastz_cmd = [
args.run_lastz_script,
target_arg,
query_arg,
args.params_json,
args.output,
"--output_format",
args.output_format,
]
if args.temp_dir:
lastz_cmd.append(f"--temp_dir")
lastz_cmd.append(args.temp_dir)
if args.axt_to_psl:
lastz_cmd.append(f"--axt_to_psl")
lastz_cmd.append(args.axt_to_psl)
# print(" ".join(lastz_cmd))
subprocess.call(lastz_cmd)


if __name__ == "__main__":
main()
22 changes: 20 additions & 2 deletions steps_implementations/lastz_step.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,37 @@

def locate_target_bucket(target_partition):
"""Extract the respective bucket for a given target partition."""
if target_partition.startswith(Constants.PART_BULK_FILENAME_PREFIX):
# many bulked chromosomes together
bulk_part = target_partition.split(":")[0]
bulk_num_str = bulk_part.split("_")[1]
bulk_dirname = f"{Constants.LASTZ_OUT_BULK_PREFIX}_{bulk_num_str}"
return bulk_dirname
interval_str = target_partition.split(":")
chrom = interval_str[1]
start, end = map(int, interval_str[2].split("-"))
interval = GenomicRegion(chrom, start, end)
return interval.to_bucket_dirname()


def _get_lastz_out_fname_part(partition):
if partition.startswith("BULK"): # TODO: constants
return partition.split(":")[0]
else:
return partition.split(':')[-2]


def create_lastz_jobs(project_paths: ProjectPaths, executables: StepExecutables):
to_log("LASTZ: making jobs")
target_partitions = read_list_txt_file(project_paths.reference_partitions)
query_partitions = read_list_txt_file(project_paths.query_partitions)
jobs = []

for num, (target, query) in enumerate(product(target_partitions, query_partitions), 1):
output_filename = f"{target.split(':')[-2]}_{query.split(':')[-2]}__{num}.psl"
target_out_name_part = _get_lastz_out_fname_part(target)
query_out_name_part = _get_lastz_out_fname_part(query)

output_filename = f"{target_out_name_part}_{query_out_name_part}__{num}.psl"
output_bucket = locate_target_bucket(target)
output_file = os.path.join(
project_paths.lastz_output_dir,
Expand All @@ -42,11 +58,13 @@ def create_lastz_jobs(project_paths: ProjectPaths, executables: StepExecutables)
lastz_exec = os.path.abspath(executables.lastz_wrapper)
# standalone_scripts/run_lastz.py
args_list = [
lastz_exec,
# lastz_exec,
executables.lastz_layer,
target,
query,
project_paths.project_params_dump,
output_file,
executables.lastz_wrapper,
"--output_format",
"psl",
"--axt_to_psl",
Expand Down
Loading

0 comments on commit 5730aee

Please sign in to comment.