diff --git a/LICENSE b/LICENSE index 3550843..6151443 100644 --- a/LICENSE +++ b/LICENSE @@ -1,5 +1,5 @@ Falco - Cloud based single-cell RNA-Seq pipeline -Copyright (c) 2016, Victor Chang Cardiac Research Institute +Copyright (c) 2016-2019, Victor Chang Cardiac Research Institute This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by diff --git a/README.md b/README.md index edd05e1..6baf7bb 100644 --- a/README.md +++ b/README.md @@ -1,15 +1,15 @@ Falco # _Falco_: A quick and flexible single-cell RNA-seq processing framework on the cloud -Authors: Andrian Yang, Michael Troup, Peijie Lin, Joshua W. K. Ho +Authors: Andrian Yang, Michael Troup, Peijie Lin, Abhinav Kishore, Benjamin Phipps, Joshua W. K. Ho Contact: j.ho@victorchang.edu.au -Copyright © 2016, Victor Chang Cardiac Research Institute +Copyright © 2016-2019, Victor Chang Cardiac Research Institute ## Synopsis _Falco_ is a software bundle that enables bioinformatic analysis of large-scale transcriptomic data by utilising public cloud -infrastructure. The framework is suited to single cell RNA feature quantification analysis. +infrastructure. The framework currently provide supports for single cell RNA feature quantification, alignment and transcript assembly analyses. ## Motivation Computational analysis in this field has many challenges, including processing large volumes of data in the order of diff --git a/alignment_job.config b/alignment_job.config new file mode 100644 index 0000000..a15b1b3 --- /dev/null +++ b/alignment_job.config @@ -0,0 +1,19 @@ +[job_config] +name = FASTQ alignment +action_on_failure = CONTINUE +alignment_script = run_pipeline_alignment.py +alignment_script_s3_location = s3://[YOUR-BUCKET]/scripts +alignment_script_local_location = source/spark_runner +upload_alignment_script = True + +[spark_config] +driver_memory = 30g +executor_memory = 30g + +[script_arguments] +input_location = s3://[YOUR-BUCKET]/... +output_location = s3://[YOUR-BUCKET]/... +# Option for aligner tools is STAR or HISAT2 +aligner_tool = STAR +aligner_extra_args = +region = us-west-2 \ No newline at end of file diff --git a/assembly_job.config b/assembly_job.config new file mode 100644 index 0000000..00e4ebe --- /dev/null +++ b/assembly_job.config @@ -0,0 +1,27 @@ +[job_config] +name = Transcript Assembly +action_on_failure = CONTINUE +assembly_script = run_pipeline_assembly3.py +assembly_script_s3_location = s3://[YOUR-BUCKET]/scripts +assembly_script_local_location = source/spark_runner +upload_assembly_script = True + +[spark_config] +driver_memory = 60g +executor_memory = 30g + +[script_arguments] +input_location = s3://[YOUR-BUCKET]/... +output_location = s3://[YOUR-BUCKET]/... +annotation_file = +enable_tiling = True +enable_analysis = True +# Option for aligner tools is STAR or HISAT2 +aligner_tool = STAR +aligner_extra_args = +# Option for assembly tools is StringTie or Scallop +assembler_tool = StringTie +assembler_use_reference = +assembler_extra_args = +assembler_merge_extra_args = +region = us-west-2 diff --git a/emr_cluster.config b/emr_cluster.config index 21c5f88..2180423 100644 --- a/emr_cluster.config +++ b/emr_cluster.config @@ -1,5 +1,5 @@ [EMR] -release_label = emr-4.6.0 +release_label = emr-5.7.0 name = Falco cluster log_uri = s3://[YOUR-BUCKET]/... bootstrap_scripts = install_software.sh, copy_reference.sh @@ -25,3 +25,6 @@ vpc_subnet = master_security_group = slave_security_group = service_access_security_group = +# If custom AMI ID is specified, it is recommended to remove install_software.sh from thr bootstrap scripts as the custom AMI should already have these installed. +custom_ami_id = +ebs_root_volume_size = \ No newline at end of file diff --git a/launch_cluster.py b/launch_cluster.py index 85b31cf..fe3ea5d 100644 --- a/launch_cluster.py +++ b/launch_cluster.py @@ -8,7 +8,7 @@ global emr_configuration, emr_applications, cluster_config, optional_instance_config emr_configuration = "emr_cluster.config" emr_applications = ["Hadoop", "Spark", "Ganglia"] -cluster_config = "" # ""source/cluster_creator/cluster_config.json" +cluster_config = "source/cluster_creator/cluster_config.json" optional_instance_config = {"vpc_subnet": "Ec2SubnetId", "master_security_group": "EmrManagedMasterSecurityGroup", "slave_security_group": "EmrManagedSlaveSecurityGroup", @@ -29,6 +29,14 @@ def check_configuration(config): "core_instance_type", "core_instance_count"]): return False + release_version = config["EMR"]["release_label"].split("-")[-1].split(".") + major_release_version = int(release_version[0]) + minor_release_version = int(release_version[1]) + if config["EMR_nodes"].get("custom_ami_id", "").strip() != "" \ + and not (major_release_version >= 5 and minor_release_version >= 7): + print("\033[31mERROR: \033[0mCustom AMI can only be used with EMR release >= 5.7") + return False + return True @@ -119,6 +127,12 @@ def build_command(config): emr_arguments["JobFlowRole"] = config["EMR_nodes"]["instance_profile"] emr_arguments["ServiceRole"] = config["EMR_nodes"]["service_role"] + if "custom_ami_id" in config["EMR_nodes"]: + emr_arguments["CustomAmiId"] = config["EMR_nodes"]["custom_ami_id"] + + if "ebs_root_volume_size" in config["EMR_nodes"]: + emr_arguments["EbsRootVolumeSize"] = config["EMR_nodes"]["ebs_root_volume_size"] + return emr_arguments if __name__ == "__main__": diff --git a/source/ami_creator/create-image.sh b/source/ami_creator/create-image.sh new file mode 100755 index 0000000..36c3137 --- /dev/null +++ b/source/ami_creator/create-image.sh @@ -0,0 +1,6 @@ +#!/bin/bash +id=$( head -1 ids.txt ) +aws ec2 create-image --instance-id $id \ + --name "Falco custom AMI" \ + --description "Custom AMI for Falco framework with tools pre-installed" \ + --query 'ImageId' > custom_ami_id.txt diff --git a/source/ami_creator/ec2-ami-spec.json b/source/ami_creator/ec2-ami-spec.json new file mode 100644 index 0000000..994c1c7 --- /dev/null +++ b/source/ami_creator/ec2-ami-spec.json @@ -0,0 +1,12 @@ +{ + "ImageId": "ami-6df1e514", + "KeyName": "[PRIVATE_KEY]", + "InstanceType": "[INSTANCE_TYPE]", + "Placement": { + "AvailabilityZone": "[AVAILABILITY_ZONE]" + }, + "IamInstanceProfile": { + "Arn": "arn:aws:iam::[AWS_ACCOUNT_ID]:instance-profile/EMR_EC2_DefaultRole" + }, + "UserData":"" +} diff --git a/source/ami_creator/launch-ec2.sh b/source/ami_creator/launch-ec2.sh new file mode 100755 index 0000000..86543da --- /dev/null +++ b/source/ami_creator/launch-ec2.sh @@ -0,0 +1,180 @@ +#!/bin/bash + +count=1 +spot_price=5 +launch_spec_json_file=ec2-ami-spec.json +user_data_file=user-data-ami.sh +tmp_json_file=tmp.json + +#colours; need to use -e option with echo +#red='\e[0;31m' +#cyan='\e[0;36m' +#green='\e[0;32m' +#yellow='\e[1;33m' +#purple='\e[0;35m' +#nc='\e[0m' #no colour +red=$( tput setaf 1 ) +cyan=$( tput setaf 6 ) +green=$( tput setaf 2 ) +yellow=$( tput setaf 3 ) +purple=$( tput setaf 5 ) +nc=$( tput sgr0 ) + +#program usage +usage() { + echo -e "${red}program exited due to invalid usage${nc}" + echo -e "${yellow}usage:${purple} $0 ${cyan}--instance-type \ + [[--instance-type ]...] \ + [--user-data ] [--dry-run]${nc}" + echo -e "${yellow}Example:${nc}" + echo -e "$0 --instance-type r3.large 1 --user-data some-file.sh --dry-run" + echo -e "${yellow}Valid instance types:${cyan}" + echo -e "${nc}" + exit 1 +} + +#checks latest return status +#accepts one argument - name of program call +check_status() { + if [ $? -ne 0 ] ; then + echo -e "${red}program exited due to unsuccessful excecution: \ + ${cyan}${1}${nc}" + exit 1 + fi +} + +#checks program usage: assumes 1 parameter: modify as necessary +#if [ $# -lt 3 ] ; then +# usage +#fi + +#function to exit program with message +exit_msg() { + echo -e "${red}Exiting program: ${cyan}${1}${nc}" + exit 1 +} + +start=`date +%s` #seconds since epoc + +# remove old instance id file +ids_file=ids.txt +if [[ -f $ids_file ]] ; then + rm $ids_file +fi + +# file to hold instance public ip addresses +ips_file=ips.txt +if [[ -f $ips_file ]] ; then + rm $ips_file +fi + +# file to hold spot instance request ids +spot_request_ids_file=spot-ids.txt +if [[ -f $spot_request_ids_file ]] ; then + rm $spot_request_ids_file +fi + +# create base64 code for user data +# NOTE difference in base64 between mac platform & other linux +# mac base64 uses -b option iso -w option +op="-w" +v=$( man base64 | grep '\-w' ) +[ $? -ne 0 ] && op="-b" +user_data=$( base64 $op 0 $user_data_file ) +check_status "creating user data" +# update the base64 user-data string in the .json file +awk -F":" -v user_data=$user_data -v json_file=$tmp_json_file '{ + if ($1 ~ /UserData/) + print $1 ":\"" user_data "\"" > json_file + else + print $0 > json_file +} END { +close (json_file) +}' $launch_spec_json_file +check_status "replacing user data" +mv $tmp_json_file $launch_spec_json_file + +#create the instances that were requested +aws ec2 request-spot-instances \ + --spot-price $spot_price \ + --instance-count $count \ + --type "one-time" \ + --launch-specification file://$launch_spec_json_file \ + --output text \ + --query 'SpotInstanceRequests[*].SpotInstanceRequestId' > \ + $spot_request_ids_file + +if [[ $? -ne 0 ]] ; then + exit_msg "spot request command failed" +fi +echo -e "${yellow}Waiting for spot requests to be fulfilled${nc}" +while (true) ; do + sleep 10 + fulfilled=0 + for request_id in `cat $spot_request_ids_file` ; do + # the effect of this look is to count the # of fulfilled + # spot requests + fulfilled=$((`aws ec2 describe-spot-instance-requests \ + --spot-instance-request-ids $request_id \ + --output text \ + --query "SpotInstanceRequests[*].Status.Code" \ + |grep "fulfilled"|wc -l|awk '{print $1}'` + \ + $fulfilled)) + done + if [[ $fulfilled -eq $count ]] ; then + # record instance_ids + for request_id in `cat $spot_request_ids_file` ; do + aws ec2 describe-spot-instance-requests \ + --spot-instance-request-ids $request_id \ + --output text \ + --query "SpotInstanceRequests[*].InstanceId" >> \ + $ids_file + done + break + fi +done +echo -e "${green}All spot requests have been fulfilled${nc}" +all_done=true + +#wait a minute for the instances to start running +sleep 60 +#start an infinite loop to check when instances are running +while true; do + all_done=true + #check the run state of each instance id that was created + if [[ -f $ips_file ]] ; then + rm -f $ips_file + fi + for id in `cat $ids_file`; do + #check the instance reachability status - when "passed" + #should be ok to use + instance_details_name=`aws ec2 describe-instance-status \ + --output text \ + --instance-ids $id --query \ + 'InstanceStatuses[0].InstanceStatus.Details[0].Name'` + instance_details_status=`aws ec2 describe-instance-status \ + --output text \ + --instance-ids $id --query \ + 'InstanceStatuses[0].InstanceStatus.Details[0].Status'` + if ! [[ ("$instance_details_name" == "reachability") && + ("$instance_details_status" == "passed") ]] ; then + all_done=false + #this instance is not ready + break + fi + ipaddr=`aws ec2 describe-instances --instance-ids --output text $id --query \ + 'Reservations[0].Instances[0].PublicDnsName'` + inst_type=`aws ec2 describe-instances --instance-ids --output text $id --query \ + 'Reservations[0].Instances[0].InstanceType'` + echo $ipaddr >> $ips_file + done + if ! $all_done ; then + sleep 10 + else + break + fi +done + +finish=`date +%s` #seconds since epoc +echo -e "${yellow}time: ${cyan}$(( $finish - $start ))${nc}" +cat $ips_file diff --git a/source/ami_creator/ssh.sh b/source/ami_creator/ssh.sh new file mode 100755 index 0000000..50e2898 --- /dev/null +++ b/source/ami_creator/ssh.sh @@ -0,0 +1,3 @@ +#!/bin/bash +ip=$( head -1 ips.txt ) +ssh -i [PRIVATE_KEY] ec2-user@$ip diff --git a/source/ami_creator/user-data-ami.sh b/source/ami_creator/user-data-ami.sh new file mode 100755 index 0000000..933e088 --- /dev/null +++ b/source/ami_creator/user-data-ami.sh @@ -0,0 +1,160 @@ +#!/bin/bash +############################################################################### +# designed for use with aws instance with ssd (called from --user-data option # +# mounts ssd to /ssd & creates directories for DNA variant calling pipeline # +# # +############################################################################### + +s3_software_install=s3://[YOUR-BUCKET]/... +aws_region=us-west-2 + +create_dir() { + dir=$1 + # allow for case where image already has dir + sudo mkdir -p $dir + check_status "mkdir -p $dir" + sudo chmod a+rwx $dir + check_status "chmod $dir" + sudo chgrp ec2-user $dir + check_status "chgrp $dir" + sudo chown ec2-user $dir + check_status "chown $dir" +} + +function unzip_files() { + # unzip any .gz files in current directory or any subdirectories + # determine if there are any .gz files; note that without this test, the xargs command would fail with a null input + zip_files=$( find -L . -name "*.gz" -print0 ) + if [ "$zip_files" != "" ] ; then + # unzip all the .gz files using as many processors as possible + find -L . -name "*.gz" -print0 | xargs -P0 -0 gunzip + fi +} + +#copy data to newly mounted drive +mount_dir=/ +create_dir $mount_dir/app + +set -e +set -o pipefail + +# update system software +sudo yum update -y --skip-broken + +pushd /app > /dev/null + +# copy install software +aws s3 cp $s3_software_install . --recursive --region=$aws_region + +# give rights to ec2-user +for f in * ; do + sudo chgrp ec2-user $f + sudo chown ec2-user $f +done + +# Install STAR and its' dependencies +sudo yum install make gcc-c++ glibc-static -y + +# STAR +tar -xzf STAR*.tar.gz +star_path=$( find . -name "STAR"|grep -E "/Linux_x86_64/" ) +# symbolic link to the STAR directory (rather than to the executable itself) +ln -s ${star_path%STAR} STAR + +sudo yum install python-devel numpy python-matplotlib -y + +# Install subread (featureCount) +tar -xzf subread*.tar.gz +fc=$( find -name "featureCounts"|grep bin ) +sr_path=${fc%featureCounts} +ln -s $sr_path subread + +# Install HISAT2 +unzip hisat2*.zip +hisat_dir=$( find . -maxdepth 1 -type d -name "hisat2*") +ln -s $hisat_dir hisat + +# Install HTSeq +sudo yum install python27-devel python27-numpy python27-matplotlib python27-Cython -y +sudo pip install pysam +sudo pip install htseq + +# Install samtools +sudo yum install zlib-devel ncurses-devel ncurses bzip2-devel xz-devel -y +tar -xjf samtools*.tar.bz2 +sam_dir=$( find . -maxdepth 1 -type d -name "samtools*" ) +pushd $sam_dir > /dev/null +make +sudo make install +popd > /dev/null +ln -s $sam_dir samtools + +# Install htslib +hts_dir=$( find $sam_dir -maxdepth 1 -type d -name "htslib-*" ) +pushd $hts_dir > /dev/null +make +sudo make install +popd > /dev/null + +# Install picard_tools +# Note: latest version of picard_tools come as a jar file. We do not need to do anything. +mkdir picard-tool +mv picard.jar picard-tool/ + +# Install stringtie +tar -xzf stringtie*.tar.gz +stringtie_dir=$( find . -maxdepth 1 -type d -name "stringtie*") +ln -s $stringtie_dir stringtie + +# Install scallop +tar -xzf scallop-*_linux_x86_64.tar.gz +scallop_dir=$( find . -maxdepth 1 -type d -name "scallop*" ) +ln -s $scallop_dir scallop + +# Install gffcompare +tar -xzf gffcompare*.tar.gz +gffcompare_dir=$( find . -maxdepth 1 -type d -name "gffcompare*") +ln -s $gffcompare_dir gffcompare + +# INSTALL CUSTOM PRE-PROCESSING TOOLS BELOW + +# trim galore +tg=trim_galore +unzip trim_galore*.zip +tg_path=$( find . -name $tg ) +ln -s $tg_path $tg + +# trimmomatic +unzip Trimmomatic*.zip +tm=$( find . -name trimmomatic*.jar ) +ln -s $tm ${tm##*/} +# hardcoded +ln -s Trimmomatic-0.36/adapters/NexteraPE-PE.fa NexteraPE-PE.fa + +# prinseq +ps=prinseq-lite.pl +tar -xzf prinseq-lite*.tar.gz +ps_path=$( find . -name "$ps" ) +ln -s $ps_path $ps + +# install cutadapt +sudo pip install cutadapt + +# ------------------------------------------------------------- +# no longer in /app +popd > /dev/null + +mkdir /mnt/output + +# Install python dependencies for framework +sudo yum install python35 -y +sudo pip install pandas boto3 ascii_graph pysam +sudo python3 -m pip install pandas boto3 ascii_graph pysam + +# Install java8 +sudo yum install java-1.8.0-openjdk.x86_64 -y + +# install htop +sudo yum install htop -y + + diff --git a/source/cluster_creator/cluster_config.json b/source/cluster_creator/cluster_config.json new file mode 100644 index 0000000..ad766fe --- /dev/null +++ b/source/cluster_creator/cluster_config.json @@ -0,0 +1,13 @@ +[ + { + "Classification": "spark-env", + "Configurations": [ + { + "Classification": "export", + "Properties": { + "PYSPARK_PYTHON": "/usr/bin/python3" + } + } + ] + } +] \ No newline at end of file diff --git a/source/cluster_creator/copy_reference.sh b/source/cluster_creator/copy_reference.sh index 38a3388..160db1d 100755 --- a/source/cluster_creator/copy_reference.sh +++ b/source/cluster_creator/copy_reference.sh @@ -21,3 +21,4 @@ if [ "$zip_files" != "" ] ; then fi popd +mkdir /mnt/output diff --git a/source/cluster_creator/install_software.sh b/source/cluster_creator/install_software.sh index 4a54375..a15c754 100644 --- a/source/cluster_creator/install_software.sh +++ b/source/cluster_creator/install_software.sh @@ -6,17 +6,16 @@ set -e set -o pipefail -sudo yum update -y +#sudo yum update -y --skip-broken -mkdir /mnt/app -pushd /mnt/app > /dev/null +sudo mkdir /app +sudo chown hadoop /app # We need this as making directory in / will require sudo usage and thus the owner won't be hadoop anymore +pushd /app > /dev/null aws s3 cp $1 . --recursive # Install STAR and its' dependencies -sudo yum install make -y -sudo yum install gcc-c++ -y -sudo yum install glibc-static -y +sudo yum install make gcc-c++ glibc-static -y # STAR tar -xzf STAR*.tar.gz @@ -24,8 +23,6 @@ star_path=$( find . -name "STAR"|grep -E "/Linux_x86_64/" ) # symbolic link to the STAR directory (rather than to the executable itself) ln -s ${star_path%STAR} STAR -sudo yum install python-devel numpy python-matplotlib -y - # Install subread (featureCount) tar -xzf subread*.tar.gz fc=$( find -name "featureCounts"|grep bin ) @@ -38,11 +35,11 @@ hisat_dir=$( find . -maxdepth 1 -type d -name "hisat2*") ln -s $hisat_dir hisat # Install HTSeq -sudo yum install python27-devel python27-numpy python27-matplotlib -y sudo pip install pysam sudo pip install htseq # Install samtools +sudo yum install zlib-devel ncurses-devel ncurses bzip2-devel xz-devel -y --skip-broken tar -xjf samtools*.tar.bz2 sam_dir=$( find . -maxdepth 1 -type d -name "samtools*" ) pushd $sam_dir > /dev/null @@ -56,14 +53,29 @@ hts_dir=$( find $sam_dir -maxdepth 1 -type d -name "htslib-*" ) pushd $hts_dir > /dev/null make sudo make install - popd > /dev/null # Install picard_tools -unzip picard-tools*.zip -pic_jar=$( find . -name picard.jar ) -pic_path=${pic_jar%picard.jar} -ln -s $pic_path picard-tools +# Note: latest version of picard_tools come as a jar file. We do not need to do anything. +mkdir picard-tool +mv picard.jar picard-tool/ + +# Install stringtie +tar -xzf stringtie*.tar.gz +stringtie_dir=$( find . -maxdepth 1 -type d -name "stringtie*") +ln -s $stringtie_dir stringtie + +# Install scallop +tar -xzf scallop-*_linux_x86_64.tar.gz +scallop_dir=$( find . -maxdepth 1 -type d -name "scallop*" ) +ln -s $scallop_dir scallop + +# Install gffcompare +tar -xzf gffcompare*.tar.gz +gffcompare_dir=$( find . -maxdepth 1 -type d -name "gffcompare*") +ln -s $gffcompare_dir gffcompare + +# INSTALL CUSTOM PRE-PROCESSING TOOLS BELOW # trim galore tg=trim_galore @@ -82,23 +94,16 @@ tar -xzf prinseq-lite*.tar.gz ps_path=$( find . -name "$ps" ) ln -s $ps_path $ps +# install cutadapt +sudo pip install cutadapt + # ------------------------------------------------------------- # no longer in /mnt/app popd > /dev/null -mkdir /mnt/output - # Install python dependencies for framework -sudo yum install python27-Cython -y -sudo pip install pandas -sudo pip install boto3 -sudo python3 -m pip install boto3 - -# Install java8 -sudo yum install java-1.8.0-openjdk.x86_64 -y - -# install cutadapt -sudo pip install cutadapt +sudo pip install pandas boto3 ascii_graph pysam +sudo python3 -m pip install pandas boto3 ascii_graph pysam # install htop sudo yum install htop -y diff --git a/source/cluster_creator/prepare_install_files.sh b/source/cluster_creator/prepare_install_files.sh index 99d99de..b68cdc8 100755 --- a/source/cluster_creator/prepare_install_files.sh +++ b/source/cluster_creator/prepare_install_files.sh @@ -20,25 +20,34 @@ mkdir $tmp cd $tmp # STAR -wget -O STAR-2.5.2a.tar.gz https://github.com/alexdobin/STAR/archive/2.5.2a.tar.gz +wget -O STAR-2.5.4b.tar.gz https://github.com/alexdobin/STAR/archive/2.5.4b.tar.gz # HISAT2 -wget -O hisat2-2.0.4.zip ftp://ftp.ccb.jhu.edu/pub/infphilo/hisat2/downloads/hisat2-2.0.4-Linux_x86_64.zip +wget -O hisat2-2.1.0.zip ftp://ftp.ccb.jhu.edu/pub/infphilo/hisat2/downloads/hisat2-2.1.0-Linux_x86_64.zip -# subread -wget -O subread-1.5.0-p3-Linux-x86_64.tar.gz https://sourceforge.net/projects/subread/files/subread-1.5.0-p3/subread-1.5.0-p3-Linux-x86_64.tar.gz/download +# subread +wget -O subread-1.6.0-Linux-x86_64.tar.gz https://sourceforge.net/projects/subread/files/subread-1.6.0/subread-1.6.0-Linux-x86_64.tar.gz/download + +# stringtie +wget -O stringtie-1.3.3b.tar.gz http://ccb.jhu.edu/software/stringtie/dl/stringtie-1.3.3b.Linux_x86_64.tar.gz + +# Scallop +wget https://github.com/Kingsford-Group/scallop/releases/download/v0.10.2/scallop-0.10.2_linux_x86_64.tar.gz + +# gffcompare +wget -O gffcompare.tar.gz http://ccb.jhu.edu/software/stringtie/dl/gffcompare-0.10.1.Linux_x86_64.tar.gz # picard tools -wget https://github.com/broadinstitute/picard/releases/download/2.4.1/picard-tools-2.4.1.zip +wget -O picard.jar https://github.com/broadinstitute/picard/releases/download/2.17.10/picard.jar # sam tools -wget -O samtools-1.3.1.tar.bz2 https://sourceforge.net/projects/samtools/files/samtools/1.3.1/samtools-1.3.1.tar.bz2/download +wget -O samtools-1.7.tar.bz2 https://github.com/samtools/samtools/releases/download/1.7/samtools-1.7.tar.bz2 # prinseq wget -O prinseq-lite-0.20.4.tar.gz https://sourceforge.net/projects/prinseq/files/standalone/prinseq-lite-0.20.4.tar.gz/download # trim galore -wget http://www.bioinformatics.babraham.ac.uk/projects/trim_galore/trim_galore_v0.4.1.zip +wget -O trim_galore_v0.4.5.zip https://github.com/FelixKrueger/TrimGalore/archive/0.4.5.zip # trimmomatic wget http://www.usadellab.org/cms/uploads/supplementary/Trimmomatic/Trimmomatic-0.36.zip diff --git a/source/preprocessing/user_prinseq_cutadapt.sh b/source/preprocessing/user_prinseq_cutadapt.sh index dc76622..d8bad45 100755 --- a/source/preprocessing/user_prinseq_cutadapt.sh +++ b/source/preprocessing/user_prinseq_cutadapt.sh @@ -9,8 +9,8 @@ # this file includes example code that could be used for paired-end data -prinseq=/mnt/app/prinseq-lite.pl -trim_galore=/mnt/app/trim_galore +prinseq=/app/prinseq-lite.pl +trim_galore=/app/trim_galore # function to deal with terminal errors exit_msg() { diff --git a/source/preprocessing/user_trimmomatic.sh b/source/preprocessing/user_trimmomatic.sh index fc21250..a295294 100755 --- a/source/preprocessing/user_trimmomatic.sh +++ b/source/preprocessing/user_trimmomatic.sh @@ -24,7 +24,7 @@ fq_2="$2" [ -f $fq_1 ] || exit_msg "Unable to locate read 1 file: $fq_1" [ -f $fq_2 ] || exit_msg "Unable to locate read 1 file: $fq_2" -trimmomatic="/mnt/app/trimmomatic-0.36.jar" +trimmomatic="/app/trimmomatic-0.36.jar" fa="NexteraPE-PE.fa" [ -f $trimmomatic ] || exit_msg "Couldn't find trimmomatic: $trimmomatic" [ -f $fa ] || exit_msg "Couldn't find fa file: $fa" diff --git a/source/spark_runner/run_pipeline_alignment.py b/source/spark_runner/run_pipeline_alignment.py new file mode 100644 index 0000000..c362e58 --- /dev/null +++ b/source/spark_runner/run_pipeline_alignment.py @@ -0,0 +1,358 @@ +import argparse +import sys +from operator import add +import os +import shlex +import shutil +from subprocess import Popen, PIPE +from pyspark import SparkContext, SparkConf +import pyspark.serializers +import subprocess +import boto3 +import re + +global parser_result + +if sys.version > "3.4": + pyspark.serializers.protocol = 4 + +APPLICATION_FOLDER = "/app" +GENOME_REFERENCES_FOLDER = "/mnt/ref" +TEMP_OUTPUT_FOLDER = "/mnt/output" +HDFS_TEMP_OUTPUT_FOLDER = "/tmp/sam_chunks" + + +################################# +# File splitting +################################# + + +def split_interleaved_file(file_prefix, file_content, output_dir): + """ + Unpacks an interleaved file into the standard FASTQ format + :param file_prefix: the prefix of the file name + :param file_content: the lines of content from the input file + :param output_dir: the location to store the unpacked files + :return: a tuple with first element being a list of output file names + (1 for se, 2 for pe); 2nd element a boolean flag - True if pe data, + False otherwise + """ + fastq_line_count_se = 4 + fastq_line_count_pe = 8 + paired_reads = False + output_file_names = [] + + file_prefix = output_dir + "/" + file_prefix + output_file = file_prefix + "_1.fq" + output_file_names.append(output_file) + output_file_writer = open(output_file, 'w') + + count = 0 + for line in file_content.strip().split("\n"): + # In the first line, check if it's paired or not + if count == 0 and len(line.strip().split("\t")) == fastq_line_count_pe: + paired_reads = True + output_file_pair = file_prefix + "_2.fq" + output_file_names.append(output_file_pair) + output_pair_writer = open(output_file_pair, 'w') + + if paired_reads: + parts = line.strip().split("\t") + + if len(parts) != fastq_line_count_pe: + continue + + read_one = parts[:fastq_line_count_se] + read_two = parts[fastq_line_count_se:] + output_file_writer.write("\n".join(read_one) + "\n") + output_pair_writer.write("\n".join(read_two) + "\n") + else: + output_file_writer.writelines(line.strip().replace("\t", "\n") + "\n") + + count += 1 + + output_file_writer.close() + if paired_reads: + output_pair_writer.close() + + return output_file_names, paired_reads + +################################# +# Aligner +################################# + + +def align_reads_star(sample_name, file_names, alignment_output_dir): + # If paired read flag is required + # paired_read = True if len(file_names) == 2 else False + + print("Aligning reads...") + aligner_args = "{app_folder}/STAR/STAR --runThreadN 4 {aligner_extra_args} --genomeDir {index_folder} " \ + "--readFilesIn {fastq_file_names} --outFileNamePrefix {output_folder} --outSAMtype BAM Unsorted".\ + format(app_folder=APPLICATION_FOLDER, + aligner_extra_args="" if parser_result.aligner_extra_args is None else parser_result.aligner_extra_args, + index_folder=GENOME_REFERENCES_FOLDER + "/star_index", + fastq_file_names=" ".join(file_names), + output_folder=alignment_output_dir + "/") + print("Command: " + aligner_args) + aligner_process = Popen(shlex.split(aligner_args), stdout=PIPE, stderr=PIPE) + aligner_out, aligner_error = aligner_process.communicate() + + if aligner_process.returncode != 0: + raise ValueError("STAR failed to complete (Non-zero return code)!\n" + "STAR stdout: {std_out} \nSTAR stderr: {std_err}".format(std_out=aligner_out.decode("utf8"), + std_err=aligner_error.decode("utf8"))) + + if aligner_error.decode("utf8").strip() != "" or not os.path.isfile(alignment_output_dir + "/Log.final.out"): + raise ValueError("STAR failed to complete (No output file is found)!\n" + "STAR stdout: {std_out} \nSTAR stderr: {std_err}".format(std_out=aligner_out.decode("utf8"), + std_err=aligner_error.decode("utf8"))) + + print('Completed reads alignment') + + bam_file_name_output = "Aligned.out.bam" + + return bam_file_name_output + + +def align_reads_hisat(sample_name, file_names, alignment_output_dir): + # If paired read flag is required + paired_read = True if len(file_names) == 2 else False + + print("Aligning reads...") + if paired_read: + fastq_file_args = "-1 {} -2 {}".format(*file_names) + else: + fastq_file_args = "-U {}".format(*file_names) + + aligner_args = "{app_folder}/hisat/hisat2 -p 4 --tmo {aligner_extra_args} -x {index_folder}/hisat2.index " \ + "{fastq_file_names} -S {output_folder}/output.sam".\ + format(app_folder=APPLICATION_FOLDER, + aligner_extra_args="" if parser_result.aligner_extra_args is None else parser_result.aligner_extra_args, + index_folder=GENOME_REFERENCES_FOLDER + "/hisat_index", + fastq_file_names=fastq_file_args, + output_folder=alignment_output_dir) + print("Command: " + aligner_args) + aligner_process = Popen(shlex.split(aligner_args), stdout=PIPE, stderr=PIPE) + aligner_out, aligner_error = aligner_process.communicate() + + if aligner_process.returncode != 0: + raise ValueError("HISAT2 failed to complete (Non-zero return code)!\n" + "HISAT2 stdout: {std_out} \nHISAT2 stderr: {std_err}".format(std_out=aligner_out.decode("utf8"), + std_err=aligner_error.decode("utf8"))) + print('Completed reads alignment') + + samtools_args = "{app_folder}/samtools/samtools view -@ 4 -o {output_folder}/output.bam {output_folder}/output.sam". \ + format(app_folder=APPLICATION_FOLDER, + output_folder=alignment_output_dir) + print("Command: " + samtools_args) + samtools_process = Popen(shlex.split(samtools_args), stdout=PIPE, stderr=PIPE) + samtools_out, samtools_error = samtools_process.communicate() + + if samtools_process.returncode != 0: + raise ValueError("Samtools failed to complete (Non-zero return code)!\n" + "Samtools stdout: {std_out} \nSamtools stderr: {std_err}".format( + std_out=samtools_out.decode("utf8"), std_err=samtools_error.decode("utf8"))) + + sam_file_name_output = "output.bam" + + return sam_file_name_output + + +def align_reads_subread(sample_name, file_names, alignment_output_dir): + # If paired read flag is required + paired_read = True if len(file_names) == 2 else False + + print("Aligning reads...") + print("Aligning with subread") + if paired_read: + fastq_file_args = "-r {} -R {}".format(*file_names) + else: + fastq_file_args = "-r {}".format(*file_names) + + aligner_args = "{app_folder}/subread/subread-align -T 4 -t 0 --SAMoutput {aligner_extra_args} " \ + "-i {index_folder}/genome {fastq_file_names} -o {output_folder}/output.bam".\ + format(app_folder=APPLICATION_FOLDER, + aligner_extra_args="" if parser_result.aligner_extra_args is None else parser_result.aligner_extra_args, + index_folder=GENOME_REFERENCES_FOLDER + "/subread_index", + fastq_file_names=fastq_file_args, + output_folder=alignment_output_dir) + print("Command: " + aligner_args) + aligner_process = Popen(shlex.split(aligner_args), stdout=PIPE, stderr=PIPE) + aligner_out, aligner_error = aligner_process.communicate() + + if aligner_process.returncode != 0: + raise ValueError("Subread failed to complete (Non-zero return code)!\n" + "Subread stdout: {std_out} \nSubread stderr: {std_err}".format(std_out=aligner_out.decode("utf8"), + std_err=aligner_error.decode("utf8"))) + print('Completed reads alignment') + + sam_file_name_output = "output.bam" + + return sam_file_name_output + + +################################# +# Main functions +################################# + + +def alignment_step(keyval): + # Input: file_name, file_content as key,val + # Output: [sample_name, file_name] as [key,val] + global parser_result + + prefix_regex = r"(.*_part[0-9]*)\." + + file_name, file_content = keyval + prefix_match = re.findall(prefix_regex, file_name.rstrip("/").split("/")[-1]) + + if len(prefix_match) != 1: + raise ValueError("Filename can not be resolved (invalid, pattern mismatch): {}".format(file_name)) + + prefix = prefix_match[0] + sample_name = prefix.rsplit("_part", 1)[0] + + alignment_dir = TEMP_OUTPUT_FOLDER + "/alignment_" + prefix + + try: + os.mkdir(alignment_dir) + except: + print('Alignment directory {} exist.'.format(alignment_dir)) + + print("Recreating FASTQ file(s)") + split_file_names, paired_reads = split_interleaved_file(prefix, file_content, alignment_dir) + print("Recreating FASTQ file(s) complete. Files recreated: {}".format(",".join(split_file_names))) + + alignment_output_dir = alignment_dir + "/aligner_output" + + try: + os.mkdir(alignment_output_dir) + except: + print('Alignment output directory {} exist.'.format(alignment_output_dir)) + + if parser_result.aligner.lower() == "star": + aligned_sam_output = align_reads_star(sample_name, split_file_names, alignment_output_dir) + elif parser_result.aligner.lower() == "hisat" or parser_result.aligner.lower() == "hisat2": + aligned_sam_output = align_reads_hisat(sample_name, split_file_names, alignment_output_dir) + elif parser_result.aligner.lower() == "subread": + aligned_sam_output = align_reads_subread(sample_name, split_file_names, alignment_output_dir) + else: + print("Aligner specified is not yet supported. Defaulting to STAR") + aligned_sam_output = align_reads_star(sample_name, split_file_names, alignment_output_dir) + + aligned_output_filepath = "{}/{}".format(alignment_output_dir.rstrip("/"), aligned_sam_output) + aligned_output_hdfs_filepath = "{}/{}".format(HDFS_TEMP_OUTPUT_FOLDER, prefix) + + subprocess.call(["hdfs", "dfs", "-rm", aligned_output_hdfs_filepath]) + subprocess.call(["hdfs", "dfs", "-put", aligned_output_filepath, aligned_output_hdfs_filepath]) + + shutil.rmtree(alignment_dir, ignore_errors=True) + return sample_name, [prefix] + + +def fuse_alignment(keyval): + # Input: sample_name, [file_name,...] as key, val + # Output: sample_name + global parser_result + + key, file_lists = keyval + fuse_alignment_dir = TEMP_OUTPUT_FOLDER.rstrip("/") + "/" + key + + ordered_file_lists = sorted([(f, int(f.rsplit("part", 1)[-1])) for f in file_lists], key=lambda x:x[-1]) + print(ordered_file_lists) + + try: + os.mkdir(fuse_alignment_dir) + except: + print('Fuse alignment directory {} exist.'.format(fuse_alignment_dir)) + + fuse_alignment_file = key + ".bam" + + previous_file_path = "" + for index, file_name_pair in enumerate(ordered_file_lists): + file_name, number = file_name_pair + local_file_path = fuse_alignment_dir + "/" + file_name + ".bam" + subprocess.call(["hdfs", "dfs", "-get", HDFS_TEMP_OUTPUT_FOLDER.rstrip("/") + "/" + file_name, local_file_path]) + + if index != 0: + new_merged_file_path = "{}/temp_{}.bam".format(fuse_alignment_dir, index) + subprocess.call(["samtools", "cat", "-o", new_merged_file_path, previous_file_path, local_file_path]) + + os.remove(previous_file_path) + os.remove(local_file_path) + previous_file_path = new_merged_file_path + else: + previous_file_path = local_file_path + + subprocess.call(["hdfs", "dfs", "-rm", HDFS_TEMP_OUTPUT_FOLDER.rstrip("/") + "/" + file_name]) + + if parser_result.output_dir.startswith("s3://"): # From S3 + s3_client = boto3.client('s3', region_name=parser_result.aws_region) + print("uploading to S3") + output_bucket, key_prefix = parser_result.output_dir.strip().strip("/")[5:].split("/", 1) + s3_client.upload_file(previous_file_path, output_bucket, key_prefix + "/" + fuse_alignment_file) + else: + print("outputting to HDFS") + subprocess.call(["hdfs", "dfs", "-mkdir", "-p", parser_result.output_dir.rstrip("/")]) + subprocess.call(["hdfs", "dfs", "-put", previous_file_path, parser_result.output_dir.rstrip("/") + "/" + + fuse_alignment_file]) + + os.remove(previous_file_path) + return key + +if __name__ == "__main__": + global parser_result + + parser = argparse.ArgumentParser(description='Spark-based RNA-seq Pipeline Alignment') + parser.add_argument('--input', '-i', action="store", dest="input_dir", help="Input directory - HDFS or S3") + parser.add_argument('--output', '-o', action="store", dest="output_dir", help="Output directory - HDFS or S3") + parser.add_argument('--aligner_tools', '-at', action="store", dest="aligner", nargs='?', + help="Aligner to be used (STAR|HISAT2|Subread)", default="STAR") + parser.add_argument('--aligner_extra_args', '-s', action="store", dest="aligner_extra_args", nargs='?', + help="Extra argument to be passed to alignment tool", default="") + parser.add_argument('--region', '-r', action="store", dest="aws_region", help="AWS region") + + parser_result = parser.parse_args() + + split_num = 0 + + conf = SparkConf().setAppName("Spark-based RNA-seq Pipeline Alignment") + sc = SparkContext(conf=conf) + + if parser_result.input_dir.startswith("s3://"): # From S3 + s3_client = boto3.client('s3', region_name=parser_result.aws_region) + # Get number of input files + s3_paginator = s3_client.get_paginator('list_objects') + input_bucket, key_prefix = parser_result.input_dir[5:].strip().split("/", 1) + + input_file_num = 0 + + for result in s3_paginator.paginate(Bucket=input_bucket, Prefix=key_prefix): + for file in result.get("Contents"): + input_file_num += 1 + + if input_file_num == 0: + raise ValueError("Input directory is invalid or empty!") + + split_num = input_file_num + else: # From HDFS + hdfs_process = Popen(shlex.split("hdfs dfs -count {}".format(parser_result.input_dir)), + stdout=PIPE, stderr=PIPE) + hdfs_out, hdfs_error = hdfs_process.communicate() + + if hdfs_error: + raise ValueError("Input directory is invalid or empty!") + + dir_count, file_count, size, path = hdfs_out.strip().split() + + split_num = int(file_count) + + subprocess.call(["hdfs", "dfs", "-mkdir", "-p", HDFS_TEMP_OUTPUT_FOLDER]) + + input_files = sc.wholeTextFiles(parser_result.input_dir, split_num) + + aligned_files = input_files.map(alignment_step) + aligned_file_lists = aligned_files.reduceByKey(add) + aligned_samples = aligned_file_lists.map(fuse_alignment) + aligned_samples.collect() diff --git a/source/spark_runner/run_pipeline_assembly.py b/source/spark_runner/run_pipeline_assembly.py new file mode 100644 index 0000000..f713e6d --- /dev/null +++ b/source/spark_runner/run_pipeline_assembly.py @@ -0,0 +1,693 @@ +import argparse +import datetime +import os +import sys +import shlex +import shutil +from subprocess import Popen, PIPE +from operator import add +from pyspark import SparkContext, SparkConf +import pyspark.serializers +import subprocess +import boto3 +import re +import pysam + +global parser_result + +if sys.version > "3.4": + pyspark.serializers.protocol = 4 + +APPLICATION_FOLDER = "/app" +GENOME_REFERENCES_FOLDER = "/mnt/ref" +TEMP_OUTPUT_FOLDER = "/mnt/output" + +BIN_BP_SIZE = 320000 +BIN_BP_OFFSET = 64000 # Note: offset must never be greater than (BIN_BP_SIZE/2)+1! +BIN_BP_SIZE_W_OFFSET = BIN_BP_SIZE-BIN_BP_OFFSET +MINIMUM_READS_IN_BIN = 50 +CHROMOSOME_PATTERN = r"^chr[0-9XY]+" + + +################################# +# File splitting +################################# + + +def split_interleaved_file(file_prefix, file_content, output_dir): + """ + Unpacks an interleaved file into the standard FASTQ format + :param file_prefix: the prefix of the file name + :param file_content: the lines of content from the input file + :param output_dir: the location to store the unpacked files + :return: a tuple with first element being a list of output file names + (1 for se, 2 for pe); 2nd element a boolean flag - True if pe data, + False otherwise + """ + fastq_line_count_se = 4 + fastq_line_count_pe = 8 + paired_reads = False + output_file_names = [] + + file_prefix = output_dir + "/" + file_prefix + output_file = file_prefix + "_1.fq" + output_file_names.append(output_file) + output_file_writer = open(output_file, 'w') + + count = 0 + for line in file_content.strip().split("\n"): + # In the first line, check if it's paired or not + if count == 0 and len(line.strip().split("\t")) == fastq_line_count_pe: + paired_reads = True + output_file_pair = file_prefix + "_2.fq" + output_file_names.append(output_file_pair) + output_pair_writer = open(output_file_pair, 'w') + + if paired_reads: + parts = line.strip().split("\t") + + if len(parts) != fastq_line_count_pe: + continue + + read_one = parts[:fastq_line_count_se] + read_two = parts[fastq_line_count_se:] + output_file_writer.write("\n".join(read_one) + "\n") + output_pair_writer.write("\n".join(read_two) + "\n") + else: + output_file_writer.writelines(line.strip().replace("\t", "\n") + "\n") + + count += 1 + + output_file_writer.close() + if paired_reads: + output_pair_writer.close() + + return output_file_names, paired_reads + + +################################# +# Aligner +################################# + + +def align_reads_star(file_names, alignment_output_dir): + # If paired read flag is required + # paired_read = True if len(file_names) == 2 else False + + print("Aligning reads...") + aligner_args = "{app_folder}/STAR/STAR --runThreadN 4 --outSAMstrandField intronMotif {aligner_extra_args} " \ + "--genomeDir {index_folder} --readFilesIn {fastq_file_names} --outFileNamePrefix {output_folder}". \ + format(app_folder=APPLICATION_FOLDER, + aligner_extra_args="" if parser_result.aligner_extra_args is None else parser_result.aligner_extra_args, + index_folder=GENOME_REFERENCES_FOLDER + "/star_index", + fastq_file_names=" ".join(file_names), + output_folder=alignment_output_dir + "/") + print("Command: " + aligner_args) + aligner_process = Popen(shlex.split(aligner_args), stdout=PIPE, stderr=PIPE) + aligner_out, aligner_error = aligner_process.communicate() + + if aligner_process.returncode != 0: + raise ValueError( + "STAR failed to complete (Non-zero return code)!\n" + "STAR stdout: {std_out} \n" + "STAR stderr: {std_err}". + format(std_out=aligner_out.decode("utf8"), std_err=aligner_error.decode("utf8")) + ) + + if aligner_error.decode("utf8").strip() != "" or not os.path.isfile(alignment_output_dir + "/Log.final.out"): + raise ValueError("STAR failed to complete (No output file is found)!\n" + "STAR stdout: {std_out} \n" + "STAR stderr: {std_err}". + format(std_out=aligner_out.decode("utf8"), std_err=aligner_error.decode("utf8"))) + + print('Completed reads alignment') + + sam_file_name_output = "Aligned.out.sam" + return sam_file_name_output + + +def align_reads_hisat(file_names, alignment_output_dir): + # If paired read flag is required + paired_read = True if len(file_names) == 2 else False + + print("Aligning reads...") + if paired_read: + fastq_file_args = "-1 {} -2 {}".format(*file_names) + else: + fastq_file_args = "-U {}".format(*file_names) + + aligner_args = "{app_folder}/hisat/hisat2 -p 4 --no-unal --no-mixed {aligner_extra_args} " \ + "-x {index_folder}/hisat2.index {fastq_file_names} -S {output_folder}/output.sam". \ + format(app_folder=APPLICATION_FOLDER, + aligner_extra_args="" if parser_result.aligner_extra_args is None else parser_result.aligner_extra_args, + index_folder=GENOME_REFERENCES_FOLDER + "/hisat_index", + fastq_file_names=fastq_file_args, + output_folder=alignment_output_dir) + print("Command: " + aligner_args) + aligner_process = Popen(shlex.split(aligner_args), stdout=PIPE, stderr=PIPE) + aligner_out, aligner_error = aligner_process.communicate() + + if aligner_process.returncode != 0: + raise ValueError( + "HISAT2 failed to complete (Non-zero return code)!\n" + "HISAT2 stdout: {std_out} \n" + "HISAT2 stderr: {std_err}". + format(std_out=aligner_out.decode("utf8"), std_err=aligner_error.decode("utf8")) + ) + print('Completed reads alignment') + + sam_file_name_output = "output.sam" + return sam_file_name_output + + +def align_reads_subread(file_names, alignment_output_dir): + # If paired read flag is required + paired_read = True if len(file_names) == 2 else False + + print("Aligning reads...") + if paired_read: + fastq_file_args = "-r {} -R {}".format(*file_names) + else: + fastq_file_args = "-r {}".format(*file_names) + + aligner_args = "{app_folder}/subread/subread-align -T 4 -t 0 --SAMoutput -u {aligner_extra_args} " \ + "-i {index_folder}/genome {fastq_file_names} -o {output_folder}/output.sam". \ + format(app_folder=APPLICATION_FOLDER, + aligner_extra_args="" if parser_result.aligner_extra_args is None else parser_result.aligner_extra_args, + index_folder=GENOME_REFERENCES_FOLDER + "/subread_index", + fastq_file_names=fastq_file_args, + output_folder=alignment_output_dir) + print("Command: " + aligner_args) + aligner_process = Popen(shlex.split(aligner_args), stdout=PIPE, stderr=PIPE) + aligner_out, aligner_error = aligner_process.communicate() + + if aligner_process.returncode != 0: + raise ValueError( + "Subread failed to complete (Non-zero return code)!\n" + "Subread stdout: {std_out} \n" + "Subread stderr: {std_err}".format( + std_out=aligner_out.decode("utf8"), std_err=aligner_error.decode("utf8")) + ) + print('Completed reads alignment') + + sam_file_name_output = "output.sam" + return sam_file_name_output + + +################################# +# Binning +################################# + + +def bin_reads(aligned_output_filepath, paired_reads): + binned_reads = [] + + def process_read(read_one, read_two=None): + if read_one.reference_id in chromosome_list_broadcast.value: + read_start = read_one.reference_start + read_end = read_one.reference_end + read_one.query_sequence = "" + read_one.query_qualities = "" + + if read_two: + if read_two.reference_start < read_start: + read_start = read_two.reference_start + + if read_two.reference_end > read_end: + read_end = read_two.reference_end + + read_two.query_sequence = "" + read_two.query_qualities = "" + + if parser_result.enable_tiling: + start_bin_number = max(read_start-BIN_BP_OFFSET, 0) // BIN_BP_SIZE_W_OFFSET + end_bin_number = read_end // BIN_BP_SIZE_W_OFFSET + else: + start_bin_number = read_start // BIN_BP_SIZE + end_bin_number = read_end // BIN_BP_SIZE + + for bin_number in range(start_bin_number, end_bin_number+1): + binned_reads.append(((read_one.reference_id, bin_number), [read_one.tostring() + "\n"])) + if read_two: + binned_reads.append(((read_two.reference_id, bin_number), [read_two.tostring() + "\n"])) + + with pysam.AlignmentFile(aligned_output_filepath) as alignment_file: + if paired_reads: + previous_read = None + for index, read in enumerate(alignment_file): + if index % 2 == 1: + # For improperly paired reads (across multiple chromosome), we will treat them like single reads + if previous_read.reference_id != read.reference_id: + process_read(previous_read) + process_read(read) + else: + process_read(previous_read, read) + else: + previous_read = read + else: + for read in alignment_file: + process_read(read) + + return binned_reads + + +################################# +# Assembly +################################# + + +def assemble_transcripts_stringtie(bin_id, aligned_output_filepath, assembler_output_dir, reference_gtf_filepath=None): + print("Assembling reads...") + assembler_command = "{app_folder}/stringtie/stringtie -p 8 -l {prefix} {assembler_extra_args} {reference_gtf} " \ + "{aligned_file}".\ + format(app_folder=APPLICATION_FOLDER, + prefix=bin_id, + assembler_extra_args=parser_result.assembler_extra_args, + reference_gtf="-G " + reference_gtf_filepath if reference_gtf_filepath else "", + aligned_file=aligned_output_filepath) + print("Command: " + assembler_command) + assembler_process = Popen(shlex.split(assembler_command), stdout=PIPE, stderr=PIPE) + assembler_out, assembler_error = assembler_process.communicate() + + if assembler_process.returncode != 0: + raise ValueError('Stringtie failed to complete (non-zero return):\n' + 'Stringtie stdout: {std_out}\n' + 'Stringtie stderr: {std_err}'. + format(std_out=assembler_out.decode("utf8"), std_err=assembler_error.decode("utf8"))) + + if assembler_error.decode("utf8").strip() != "": + raise ValueError('Stringtie failed to complete (error):\n' + 'Stringtie stdout: {std_out}\n' + 'Stringtie stderr: {std_err}'. + format(std_out=assembler_out.decode("utf8"), std_err=assembler_error.decode("utf8"))) + + return assembler_out.decode("utf8") + + +def assemble_transcripts_scallop(bin_id, aligned_output_filepath, assembler_output_dir): + print("Assembling reads...") + output_gtf = assembler_output_dir + "/output.gtf" + assembler_command = "{app_folder}/scallop/scallop -i {sam_file} -o {out_gtf} {assembler_extra_args} --verbose 0".\ + format(app_folder=APPLICATION_FOLDER, + sam_file=aligned_output_filepath, + out_gtf=output_gtf, + assembler_extra_args=parser_result.assembler_extra_args.strip()) + print("Command: " + assembler_command) + assembler_process = Popen(shlex.split(assembler_command), stdout=PIPE, stderr=PIPE) + assembler_out, assembler_error = assembler_process.communicate() + + if assembler_process.returncode != 0: + raise ValueError('scallop failed to complete (non-zero return):\n' + 'Scallop stdout: {std_out}\n' + 'Scallop stderr: {std_err}'. + format(std_out=assembler_out.decode("utf8"), std_err=assembler_error.decode("utf8"))) + + if not os.path.isfile(output_gtf): + raise ValueError('Scallop failed to complete (no output file):\n' + 'Scallop stdout: {std_out}\n' + 'Scallop stderr: {std_err}'. + format(std_out=assembler_out.decode("utf8"), std_err=assembler_error.decode("utf8"))) + + # We need to give each GTF entry a unique ID for the final merging using the region + annotation_output = "" + with open(output_gtf) as gtf: + for line in gtf: + if not line.startswith("#"): + line = line.replace('"gene', '"{}'.format(bin_id)) + annotation_output += line + + return annotation_output + + +################################# +# Augmenting Annotation +################################# + + +def merge_reference_annotation(assembled_transcript): + assembled_transcript_path = "all_transcripts.gtf" + merged_transcript_path = "reference.gtf" + + # Create a GTF file to pass to StringTie. + with open(assembled_transcript_path, 'w') as gtf_file: + gtf_file.writelines(assembled_transcript) + assembled_transcript.clear() + + # Merge the reference annotation and transcripts from all bins. + merge_command = "{app_folder}/stringtie/stringtie -p 32 {assembler_extra_args} --merge {assembled_transcript} " \ + "-G {genome_ref_folder}/{annotation_file} -o {merged_transcript}".\ + format(app_folder=APPLICATION_FOLDER, + assembler_extra_args=parser_result.assembler_merge_extra_args, + assembled_transcript=assembled_transcript_path, + genome_ref_folder=GENOME_REFERENCES_FOLDER + "/genome_ref", + annotation_file=parser_result.annotation_file, + merged_transcript=merged_transcript_path) + print("Command: " + merge_command) + merge_process = Popen(shlex.split(merge_command), stdout=PIPE, stderr=PIPE) + merge_out, merge_error = merge_process.communicate() + + if merge_process.returncode != 0: + raise ValueError('Stringtie (merge) failed to complete (non-zero return):\n' + 'Stringtie stdout: {std_out}\n' + 'Stringtie stderr: {std_err}\n'. + format(std_out=merge_out.decode("utf8"), std_err=merge_error.decode("utf8"))) + + if merge_error.decode("utf8").strip() != "" or not os.path.isfile(merged_transcript_path): + raise ValueError('Stringtie (merge) failed to complete (error):\n' + 'Stringtie stdout: {std_out}\n' + 'Stringtie stderr: {std_err}\n'. + format(std_out=merge_out.decode("utf8"), std_err=merge_error.decode("utf8"))) + + return assembled_transcript_path, merged_transcript_path + + +def extract_reference_gtf(chromosome_name, assembler_output_dir): + gtf_reference_extracted_path = assembler_output_dir + "/" + chromosome_name + "_annotation.gtf" + + gtf_extract_command = """awk '$1 == "{chr_name}"' {genome_ref_folder}/{annotation_file}""".\ + format(chr_name=chromosome_name, + genome_ref_folder=GENOME_REFERENCES_FOLDER + "/genome_ref", + annotation_file=parser_result.annotation_file) + print("Command: " + gtf_extract_command) + + with open(gtf_reference_extracted_path, "w") as output_bam: + gtf_extract_process = Popen(shlex.split(gtf_extract_command), stdout=output_bam, stderr=PIPE) + gtf_extract_out, gtf_extract_error = gtf_extract_process.communicate() + + if gtf_extract_process.returncode != 0 or gtf_extract_error.decode("utf8").strip() != "": + print("Extraction failed! Using full annotation file instead.\n" + "Stdout: {std_out}\n" + "Stderr: {std_err}".format(std_out=gtf_extract_out.decode("utf8"), + std_err=gtf_extract_error.decode("utf8"))) + + gtf_reference_extracted_path = "{genome_ref_folder}/{annotation_file}". \ + format(genome_ref_folder=GENOME_REFERENCES_FOLDER + "/genome_ref", + annotation_file=parser_result.annotation_file) + + # No reference data exist for the given chromosome. Will use de-novo argument instead. + if os.stat(gtf_reference_extracted_path).st_size == 0: + gtf_reference_extracted_path = None + + return gtf_reference_extracted_path + + +def analyse_transcripts(merged_transcript_path, analysis_output_dir): + """ + Analyses the accuracy and precision of a generated GTF, with + respect to a reference genome. Uses GFFCompare. + :param merged_transcript_path: updated GTF + :param analysis_output_dir: output directory + :return: file path + """ + # Merge the reference annotation and transcripts from all bins. + gffcompare_command = "{app_folder}/gffcompare/gffcompare -T -r {genome_ref_folder}/{annotation_file} " \ + "-o {analysis_output_dir}/gffcomp {merged_transcript}".\ + format(app_folder=APPLICATION_FOLDER, + genome_ref_folder=GENOME_REFERENCES_FOLDER + "/genome_ref", + annotation_file=parser_result.annotation_file, + analysis_output_dir=analysis_output_dir, + merged_transcript=merged_transcript_path) + print("Command: " + gffcompare_command) + gffcompare_process = Popen(shlex.split(gffcompare_command), stdout=PIPE, stderr=PIPE) + gffcompare_out, gffcompare_error = gffcompare_process.communicate() + + if gffcompare_process.returncode != 0: + raise ValueError('GFFcompare failed to complete (non-zero return):\n' + 'GFFcompare stdout: {std_out}\n' + 'GFFcompare stderr: {std_err}\n'. + format(std_out=gffcompare_out.decode("utf8"),std_err=gffcompare_error.decode("utf8"))) + + gffcompare_stats_output = "gffcomp.stats" + if gffcompare_error.decode("utf8").strip() != "" or not os.path.isfile(analysis_output_dir + "/" + + gffcompare_stats_output): + raise ValueError('GFFcompare failed to complete (error):\n' + 'GFFcompare stdout: {std_out}\n' + 'GFFcompare stderr: {std_err}\n'. + format(std_out=gffcompare_out.decode("utf8"), std_err=gffcompare_error.decode("utf8"))) + + return gffcompare_stats_output + +################################# +# Main functions +################################# + + +def create_sam_header(): + alignment_dir = TEMP_OUTPUT_FOLDER + "/sam_header" + + try: + os.mkdir(alignment_dir) + except: + print('Alignment directory {} exist.'.format(alignment_dir)) + + file_name = os.path.join(alignment_dir, "empty.sam") + if os.path.isfile(file_name): + os.remove(file_name) + + os.mknod(file_name) + + if parser_result.aligner.lower() == "star": + aligned_sam_output = align_reads_star([file_name], alignment_dir) + elif parser_result.aligner.lower() == "hisat" or parser_result.aligner.lower() == "hisat2": + aligned_sam_output = align_reads_hisat([file_name], alignment_dir) + elif parser_result.aligner.lower() == "subread": + aligned_sam_output = align_reads_subread([file_name], alignment_dir) + else: + print("Aligner specified is not yet supported. Defaulting to STAR") + aligned_sam_output = align_reads_star([file_name], alignment_dir) + + aligned_output_filepath = os.path.join(alignment_dir, aligned_sam_output) + with pysam.AlignmentFile(aligned_output_filepath) as f: + sam_header = str(f.header) + + chromosome_list = {} + for index, reference_name in enumerate(f.header.references): + if re.search(CHROMOSOME_PATTERN, reference_name) is not None: + chromosome_list[index] = reference_name + + shutil.rmtree(alignment_dir, ignore_errors=True) + return sam_header, chromosome_list + + +def alignment_bin_step(keyval): + # Input: file_name, file_content as key,val + # Output: [sample_name\tgene, count] as [key,val] + global parser_result + + prefix_regex = r"(.*_part[0-9]*)\." + + file_name, file_content = keyval + prefix_match = re.findall(prefix_regex, file_name.rstrip("/").split("/")[-1]) + + if len(prefix_match) != 1: + raise ValueError("Filename can not be resolved (invalid, pattern mismatch): {}".format(file_name)) + + prefix = prefix_match[0] + + alignment_dir = TEMP_OUTPUT_FOLDER + "/alignment_" + prefix + try: + os.mkdir(alignment_dir) + except: + print('Alignment directory {} exist.'.format(alignment_dir)) + + print('{0:%Y-%m-%d %H:%M:%S}'.format(datetime.datetime.now()) + ":" + "Recreating FASTQ file(s)") + split_file_names, paired_reads = split_interleaved_file(prefix, file_content, alignment_dir) + print('{0:%Y-%m-%d %H:%M:%S}'.format(datetime.datetime.now()) + ":" + + "Recreating FASTQ file(s) complete. Files recreated: {}".format(",".join(split_file_names))) + + alignment_output_dir = alignment_dir + "/aligner_output" + + try: + os.mkdir(alignment_output_dir) + except: + print('Alignment output directory {} exist.'.format(alignment_output_dir)) + + if parser_result.aligner.lower() == "star": + aligned_sam_output = align_reads_star(split_file_names, alignment_output_dir) + elif parser_result.aligner.lower() == "hisat" or parser_result.aligner.lower() == "hisat2": + aligned_sam_output = align_reads_hisat(split_file_names, alignment_output_dir) + elif parser_result.aligner.lower() == "subread": + aligned_sam_output = align_reads_subread(split_file_names, alignment_output_dir) + else: + print("Aligner specified is not yet supported. Defaulting to STAR") + aligned_sam_output = align_reads_star(split_file_names, alignment_output_dir) + + aligned_output_filepath = os.path.join(alignment_output_dir, aligned_sam_output) + + print('{0:%Y-%m-%d %H:%M:%S}'.format(datetime.datetime.now()) + ":" + "Binning reads") + binned_reads = bin_reads(aligned_output_filepath, paired_reads) + print('{0:%Y-%m-%d %H:%M:%S}'.format(datetime.datetime.now()) + ":" + "Binning reads done") + + print('{0:%Y-%m-%d %H:%M:%S}'.format(datetime.datetime.now()) + ":" + "Alignment of reads for {} done.".format(prefix)) + shutil.rmtree(alignment_dir, ignore_errors=True) + return binned_reads + + +def assemble_transcripts(keyval): + """ + Applies Stringtie to formulate transcripts. + :param region_reads: + :return: (chromosome_region, gtf_output) tuple + """ + bin_id, read_list = keyval + chromosome_id, bin_number = bin_id + chromosome_name = chromosome_list_broadcast.value[chromosome_id] + + output = [] + + if len(read_list) >= MINIMUM_READS_IN_BIN: + print('{0:%Y-%m-%d %H:%M:%S}'.format(datetime.datetime.now()) + ":" + "Starting transcript assembly for {}.".format(bin_id)) + sample_prefix = "{}_{}".format(chromosome_name, bin_number) + assembler_output_dir = TEMP_OUTPUT_FOLDER + '/assembly_' + sample_prefix + try: + os.makedirs(assembler_output_dir) + except: + print("Assembler output directory already exists for %s." % sample_prefix) + + aligned_output_file = assembler_output_dir + '/output.sam' + aligned_sorted_output_file = assembler_output_dir + '/output.sorted.bam' + + with open(aligned_output_file, 'w') as sam_outfile: + sam_outfile.write(sam_header_broadcast.value.strip() + "\n") + sam_outfile.writelines(read_list) + + read_list.clear() + + print('{0:%Y-%m-%d %H:%M:%S}'.format( + datetime.datetime.now()) + ":" + "Sorting and converting sam file.".format(bin_id)) + pysam.sort("-@", '4', "-m", "1G", "-o", aligned_sorted_output_file, aligned_output_file) + + print('{0:%Y-%m-%d %H:%M:%S}'.format( + datetime.datetime.now()) + ":" + "Running stringtie.".format(bin_id)) + + if parser_result.assembler.lower() == "stringtie": + # If we are performing genome-guided assembly, build a smaller GTF file. + reference_gtf_filepath = None + if parser_result.assembler_use_reference: + reference_gtf_filepath = extract_reference_gtf(chromosome_name, assembler_output_dir) + + gtf_output = assemble_transcripts_stringtie(sample_prefix, aligned_sorted_output_file, assembler_output_dir, + reference_gtf_filepath) + elif parser_result.assembler.lower() == "scallop": + gtf_output = assemble_transcripts_scallop(sample_prefix, aligned_sorted_output_file, assembler_output_dir) + + shutil.rmtree(assembler_output_dir, ignore_errors=True) + print('{0:%Y-%m-%d %H:%M:%S}'.format(datetime.datetime.now()) + ":" + "Assembly of transcript for {} done.".format(bin_id)) + output.append((bin_id, gtf_output.strip()+"\n")) + + return output + + +if __name__ == "__main__": + global parser_result + + parser = argparse.ArgumentParser(description='Spark-based RNA-seq Pipeline') + parser.add_argument('--input', '-i', action="store", dest="input_dir", help="Input directory - HDFS or S3") + parser.add_argument('--output', '-o', action="store", dest="output_dir", help="Output directory - HDFS or S3") + parser.add_argument('--annotation', '-a', action="store", dest="annotation_file", + help="Name of annotation file to be used") + parser.add_argument('--enable-tiling', '-et', action="store_true", dest="enable_tiling", + help="Enable tiling of genome bins") + parser.add_argument('--enable-analysis', '-ea', action="store_true", dest="enable_analysis", + help="Generate stats on updated GTF w.r.t. reference GTF") + parser.add_argument('--aligner-tools', '-at', action="store", dest="aligner", nargs='?', + help="Aligner to be used (STAR|HISAT2|Subread)", default="STAR") + parser.add_argument('--aligner-extra-args', '-s', action="store", dest="aligner_extra_args", nargs='?', + help="Extra argument to be passed to alignment tool", default="") + parser.add_argument('--assembler-tools', '-as', action="store", dest="assembler", nargs='?', + help="Assembler tools to be used (StringTie|Scallop)", default="StringTie") + parser.add_argument('--assembler-extra-args', '-ag', action="store", dest="assembler_extra_args", nargs='?', + help="Extra arguments to be passed to the assembler tool", default="") + parser.add_argument('--assembler-use-reference', '-aur', action="store_true", dest="assembler_use_reference", + help="Use annotation in initial transcript assembly - only applicable to StringTie") + parser.add_argument('--assembler-merge-extra-args', '-am', action="store", dest="assembler_merge_extra_args", + nargs='?', help="Extra arguments to be passed to the assembler merge tool", default="") + parser.add_argument('--region', '-r', action="store", dest="aws_region", help="AWS region") + + parser_result = parser.parse_args() + + split_num = 0 + + conf = SparkConf().setAppName("Spark-based RNA-seq Pipeline for Read Assembly") + sc = SparkContext(conf=conf) + + if parser_result.input_dir.startswith("s3://"): # From S3 + + s3_client = boto3.client('s3', region_name=parser_result.aws_region) + # Get number of input files + s3_paginator = s3_client.get_paginator('list_objects') + input_bucket, key_prefix = parser_result.input_dir[5:].strip().split("/", 1) + + input_file_num = 0 + + for result in s3_paginator.paginate(Bucket=input_bucket, Prefix=key_prefix): + for file in result.get("Contents"): + input_file_num += 1 + + if input_file_num == 0: + raise ValueError("Input directory is invalid or empty!") + + split_num = input_file_num + else: # From HDFS + hdfs_process = Popen(shlex.split("hdfs dfs -count {}".format(parser_result.input_dir)), + stdout=PIPE, stderr=PIPE) + hdfs_out, hdfs_error = hdfs_process.communicate() + + if hdfs_error: + raise ValueError("Input directory is invalid or empty!") + + dir_count, file_count, size, path = hdfs_out.strip().split() + + split_num = int(file_count) + + print('{0:%Y-%m-%d %H:%M:%S}'.format(datetime.datetime.now()) + ":" + "Create sam header.") + sam_header, chromosome_list = create_sam_header() + sam_header_broadcast = sc.broadcast(sam_header) + chromosome_list_broadcast = sc.broadcast(chromosome_list) + print('{0:%Y-%m-%d %H:%M:%S}'.format(datetime.datetime.now()) + ":" + "Sam header done.") + + input_files = sc.wholeTextFiles(parser_result.input_dir, split_num) + read_binned = input_files.flatMap(alignment_bin_step).reduceByKey(add, numPartitions=split_num*2) + + binned_transcripts = read_binned.flatMap(assemble_transcripts).persist() + binned_transcripts2 = binned_transcripts.sortByKey().values().collect() + + print('{0:%Y-%m-%d %H:%M:%S}'.format(datetime.datetime.now()) + ":" + "Starting to merge back to reference.") + assembled_transcript, merged_transcript = merge_reference_annotation(binned_transcripts2) + print('{0:%Y-%m-%d %H:%M:%S}'.format(datetime.datetime.now()) + ":" + "Merging back to reference completed.") + + if parser_result.input_dir.startswith("s3://"): # From S3 + output_bucket, key_prefix = parser_result.output_dir.strip().strip("/")[5:].split("/", 1) + s3_client.upload_file(assembled_transcript, output_bucket, key_prefix + "/" + assembled_transcript) + s3_client.upload_file(merged_transcript, output_bucket, key_prefix + "/" + merged_transcript) + else: + subprocess.call(["hdfs", "dfs", "-mkdir", "-p", parser_result.output_dir.rstrip("/")]) + subprocess.call(["hdfs", "dfs", "-put", assembled_transcript, parser_result.output_dir.rstrip("/") + "/" + + assembled_transcript]) + subprocess.call(["hdfs", "dfs", "-put", merged_transcript, parser_result.output_dir.rstrip("/") + "/" + + merged_transcript]) + + if parser_result.enable_analysis: + print('{0:%Y-%m-%d %H:%M:%S}'.format(datetime.datetime.now()) + ":" + "Starting analysis of GTF.") + analysis_output_dir = TEMP_OUTPUT_FOLDER + "/analysis" + try: + os.mkdir(analysis_output_dir) + except: + print('Analysis directory {} already exists.'.format(analysis_output_dir)) + + analysis_result = analyse_transcripts(merged_transcript, analysis_output_dir) + print('{0:%Y-%m-%d %H:%M:%S}'.format(datetime.datetime.now()) + ":" + "Analysis of GTF complete.") + + analysis_result_path = os.path.join(analysis_output_dir, analysis_result) + # Rename the file to make it easier + analysis_result_key = analysis_result.rsplit(".",1)[0] + ".txt" + if parser_result.input_dir.startswith("s3://"): # From S3 + s3_client.upload_file(analysis_result_path, output_bucket, key_prefix + "/" + analysis_result_key) + else: + subprocess.call(["hdfs", "dfs", "-put", analysis_result_path, parser_result.output_dir.rstrip("/") + "/" + + analysis_result_key]) + + shutil.rmtree(analysis_output_dir, ignore_errors=True) + + os.remove(assembled_transcript) + os.remove(merged_transcript) diff --git a/source/spark_runner/run_pipeline_multiple_files.py b/source/spark_runner/run_pipeline_multiple_files.py index 27324e0..b5b68a2 100644 --- a/source/spark_runner/run_pipeline_multiple_files.py +++ b/source/spark_runner/run_pipeline_multiple_files.py @@ -2,8 +2,10 @@ import os import shlex import shutil +import sys from subprocess import Popen, PIPE from pyspark import SparkContext, SparkConf +import pyspark.serializers import pandas as pd import subprocess import boto3 @@ -11,7 +13,10 @@ global parser_result -APPLICATION_FOLDER = "/mnt/app" +if sys.version > "3.4": + pyspark.serializers.protocol = 4 + +APPLICATION_FOLDER = "/app" GENOME_REFERENCES_FOLDER = "/mnt/ref" TEMP_OUTPUT_FOLDER = "/mnt/output" @@ -106,13 +111,13 @@ def align_reads_star(sample_name, file_names, alignment_output_dir): if aligner_process.returncode != 0: raise ValueError("STAR failed to complete (Non-zero return code)!\n" - "STAR stdout: {std_out} \nSTAR stderr: {std_err}".format(std_out=aligner_out, - std_err=aligner_error)) + "STAR stdout: {std_out} \nSTAR stderr: {std_err}".format(std_out=aligner_out.decode("utf8"), + std_err=aligner_error.decode("utf8"))) - if aligner_error.strip() != "" or not os.path.isfile(alignment_output_dir + "/Log.final.out"): + if aligner_error.decode("utf8").strip() != "" or not os.path.isfile(alignment_output_dir + "/Log.final.out"): raise ValueError("STAR failed to complete (No output file is found)!\n" - "STAR stdout: {std_out} \nSTAR stderr: {std_err}".format(std_out=aligner_out, - std_err=aligner_error)) + "STAR stdout: {std_out} \nSTAR stderr: {std_err}".format(std_out=aligner_out.decode("utf8"), + std_err=aligner_error.decode("utf8"))) print('Completed reads alignment') @@ -140,7 +145,7 @@ def align_reads_hisat(sample_name, file_names, alignment_output_dir): if paired_read: fastq_file_args = "-1 {} -2 {}".format(*file_names) else: - fastq_file_args = "-U {}".format() + fastq_file_args = "-U {}".format(*file_names) aligner_args = "{app_folder}/hisat/hisat2 -p 4 --tmo {aligner_extra_args} -x {index_folder}/hisat2.index " \ "{fastq_file_names} -S {output_folder}/output.sam".\ @@ -155,12 +160,12 @@ def align_reads_hisat(sample_name, file_names, alignment_output_dir): if aligner_process.returncode != 0: raise ValueError("HISAT2 failed to complete (Non-zero return code)!\n" - "HISAT2 stdout: {std_out} \nHISAT2 stderr: {std_err}".format(std_out=aligner_out, - std_err=aligner_error)) + "HISAT2 stdout: {std_out} \nHISAT2 stderr: {std_err}".format(std_out=aligner_out.decode("utf8"), + std_err=aligner_error.decode("utf8"))) print('Completed reads alignment') aligner_qc_output = [] - for line in aligner_error.split("\n"): + for line in aligner_error.decode("utf8").split("\n"): line = line.strip() # Check if line only give percentage info (we will ignore this line) @@ -179,6 +184,39 @@ def align_reads_hisat(sample_name, file_names, alignment_output_dir): return sam_file_name_output, aligner_qc_output + +def align_reads_subread(sample_name, file_names, alignment_output_dir): + # If paired read flag is required + paired_read = True if len(file_names) == 2 else False + + print("Aligning reads...") + if paired_read: + fastq_file_args = "-r {} -R {}".format(*file_names) + else: + fastq_file_args = "-r {}".format(*file_names) + + aligner_args = "{app_folder}/subread/subread-align -T 4 -t 0 --SAMoutput -u {aligner_extra_args} " \ + "-i {index_folder}/genome {fastq_file_names} -o {output_folder}/output.sam".\ + format(app_folder=APPLICATION_FOLDER, + aligner_extra_args="" if parser_result.aligner_extra_args is None else parser_result.aligner_extra_args, + index_folder=GENOME_REFERENCES_FOLDER + "/subread_index", + fastq_file_names=fastq_file_args, + output_folder=alignment_output_dir) + print("Command: " + aligner_args) + aligner_process = Popen(shlex.split(aligner_args), stdout=PIPE, stderr=PIPE) + aligner_out, aligner_error = aligner_process.communicate() + + if aligner_process.returncode != 0: + raise ValueError("Subread failed to complete (Non-zero return code)!\n" + "Subread stdout: {std_out} \nSubread stderr: {std_err}".format(std_out=aligner_out.decode("utf8"), + std_err=aligner_error.decode("utf8"))) + print('Completed reads alignment') + + aligner_qc_output = [] + sam_file_name_output = "output.sam" + + return sam_file_name_output, aligner_qc_output + ################################# # Counter ################################# @@ -203,7 +241,7 @@ def count_reads_featurecount(sample_name, aligned_output_filepath, paired_reads, raise ValueError("featureCount failed to complete! (Non-zero return code)\nCounter stdout: {} \n" "Counter stderr: {}".format(counter_out, counter_error)) - if "[Errno" in counter_error.strip() or "error" in counter_error.strip().lower(): + if "[Errno" in counter_error.decode("utf8").strip() or "error" in counter_error.decode("utf8").strip().lower(): raise ValueError("featureCount failed to complete! (Error)\nCounter stdout: {} \nCounter stderr: {}". format(counter_out, counter_error)) @@ -252,13 +290,13 @@ def count_reads_htseq(sample_name, aligned_output_filepath, paired_reads, counte raise ValueError("HTSeq failed to complete! (Non-zero return code)\nCounter stdout: {} \nCounter stderr: {}". format(counter_out, counter_error)) - if "[Errno" in counter_error.strip(): + if "[Errno" in counter_error.decode("utf8").strip(): raise ValueError("HTSeq failed to complete! (Error)\nCounter stdout: {} \nCounter stderr: {}". format(counter_out, counter_error)) counter_output = [] counter_qc_output = [] - for gene_count in counter_out.strip().split("\n"): + for gene_count in counter_out.decode("utf8").strip().split("\n"): if len(gene_count.strip().split()) == 0: print(gene_count) gene, count = gene_count.strip().split() @@ -270,6 +308,71 @@ def count_reads_htseq(sample_name, aligned_output_filepath, paired_reads, counte return counter_output, counter_qc_output +################################# +# Counting without alignment output +################################# + + +def count_reads_star(sample_name, file_names, alignment_output_dir): + # If paired read flag is required + # paired_read = True if len(file_names) == 2 else False + + print("Counting reads...") + aligner_args = "{app_folder}/STAR/STAR --runThreadN 4 {aligner_extra_args} --genomeDir {index_folder} " \ + "--readFilesIn {fastq_file_names} --outFileNamePrefix {output_folder} --quantMode GeneCounts".\ + format(app_folder=APPLICATION_FOLDER, + aligner_extra_args="" if parser_result.aligner_extra_args is None else parser_result.aligner_extra_args, + index_folder=GENOME_REFERENCES_FOLDER + "/star_index", + fastq_file_names=" ".join(file_names), + output_folder=alignment_output_dir + "/") + print("Command: " + aligner_args) + aligner_process = Popen(shlex.split(aligner_args), stdout=PIPE, stderr=PIPE) + aligner_out, aligner_error = aligner_process.communicate() + + if aligner_process.returncode != 0: + raise ValueError("STAR counting failed to complete (Non-zero return code)!\n" + "STAR stdout: {std_out} \nSTAR stderr: {std_err}".format(std_out=aligner_out.decode("utf8"), + std_err=aligner_error.decode("utf8"))) + + if aligner_error.decode("utf8").strip() != "" or not os.path.isfile(alignment_output_dir + "/Log.final.out"): + raise ValueError("STAR counting failed to complete (No output file is found)!\n" + "STAR stdout: {std_out} \nSTAR stderr: {std_err}".format(std_out=aligner_out.decode("utf8"), + std_err=aligner_error.decode("utf8"))) + + print('Completed read counts') + + qc_output = [] + with open(alignment_output_dir + "/Log.final.out") as aligner_qc: + for line in aligner_qc: + parts = line.strip().split("\t") + if len(parts) < 2: + continue + aligner_metric_name, aligner_metric_value = parts[0].strip("| "), parts[1].strip() + if aligner_metric_name.lower() in star_collected_metrics: + qc_output.append((sample_name + "\t" + "QC_STAR_" + aligner_metric_name.replace(" ", "_"), + int(aligner_metric_value))) + + counter_output = [] + with open(alignment_output_dir + "/ReadsPerGene.out.tab") as f: + # STAR produces 3 counts for unstranded, first strand and second strand + count_index = 1 + if parser_result.strand_specificity == "FIRST_READ_TRANSCRIPTION_STRAND": + count_index = 2 + elif parser_result.strand_specificity == "SECOND_READ_TRANSCRIPTION_STRAND": + count_index = 3 + + for index, line in enumerate(f): + line = line.strip().split() + + if len(line) == 0: + print(line) + + if line[0].startswith("N_"): # QC output + qc_output.append((sample_name + "\t" + "QC_STAR-count_" + line[0].lstrip("N_"), int(line[count_index]))) + else: + counter_output.append((sample_name + "\t" + line[0], int(line[count_index]))) + + return counter_output, qc_output ################################# # Picard tools @@ -288,7 +391,8 @@ def run_picard(sample_name, aligned_output_filepath, picard_output_dir): if not os.path.isfile(picard_output_dir + "/output.RNA_Metrics"): raise ValueError("Picard tools failed to complete (No output file is found)!\n" - "Picard tools stdout: {} \nPicard tools stderr: {}".format(picard_out, picard_error)) + "Picard tools stdout: {} \nPicard tools stderr: {}".format(picard_out.decode("utf8"), + picard_error.decode("utf8"))) picard_qc_output = [] with open(picard_output_dir + "/output.RNA_Metrics") as picard_qc: @@ -352,8 +456,15 @@ def alignment_count_step(keyval): # Output: [sample_name\tgene, count] as [key,val] global parser_result, star_collected_metrics, picard_collected_metrics + prefix_regex = r"(.*_part[0-9]*)\." + file_name, file_content = keyval - prefix = file_name.rstrip("/").split("/")[-1].split(".")[0] + prefix_match = re.findall(prefix_regex, file_name.rstrip("/").split("/")[-1]) + + if len(prefix_match) != 1: + raise ValueError("Filename can not be resolved (invalid, pattern mismatch): {}".format(file_name)) + + prefix = prefix_match[0] sample_name = prefix.rsplit("_part", 1)[0] alignment_dir = TEMP_OUTPUT_FOLDER + "/alignment_" + prefix @@ -374,31 +485,45 @@ def alignment_count_step(keyval): except: print('Alignment output directory {} exist.'.format(alignment_output_dir)) - if parser_result.aligner.lower() == "star": - aligned_sam_output, aligner_qc_output = align_reads_star(sample_name, split_file_names, alignment_output_dir) - elif parser_result.aligner.lower() == "hisat" or parser_result.aligner.lower() == "hisat2": - aligned_sam_output, aligner_qc_output = align_reads_hisat(sample_name, split_file_names, alignment_output_dir) - else: - print("Aligner specified is not yet supported. Defaulting to STAR") - aligned_sam_output, aligner_qc_output = align_reads_star(sample_name, split_file_names, alignment_output_dir) + aligner_qc_output = [] + counter_qc_output = [] + aligned_output_filepath = "" - aligned_output_filepath = "{}/{}".format(alignment_output_dir.rstrip("/"), aligned_sam_output) + if parser_result.aligner.lower() == "star" and parser_result.counter.lower() == "star": + counter_output, aligner_qc_output = count_reads_star(sample_name, split_file_names, alignment_output_dir) - if parser_result.counter.lower() == "featurecount" or parser_result.counter.lower() == "featurecounts": - counter_output, counter_qc_output = count_reads_featurecount(sample_name, aligned_output_filepath, paired_reads, - alignment_output_dir) - elif parser_result.counter.lower() == "htseq": - counter_output, counter_qc_output = count_reads_htseq(sample_name, aligned_output_filepath, paired_reads, - alignment_output_dir) else: - print("Counter specified is not yet supported. Defaulting to featureCount") - counter_output, counter_qc_output = count_reads_featurecount(sample_name, aligned_output_filepath, paired_reads, + if parser_result.aligner.lower() == "star": + aligned_sam_output, aligner_qc_output = align_reads_star(sample_name, split_file_names, + alignment_output_dir) + elif parser_result.aligner.lower() == "hisat" or parser_result.aligner.lower() == "hisat2": + aligned_sam_output, aligner_qc_output = align_reads_hisat(sample_name, split_file_names, + alignment_output_dir) + elif parser_result.aligner.lower() == "subread": + aligned_sam_output, aligner_qc_output = align_reads_subread(sample_name, split_file_names, + alignment_output_dir) + else: + print("Aligner specified is not yet supported. Defaulting to STAR") + aligned_sam_output, aligner_qc_output = align_reads_star(sample_name, split_file_names, alignment_output_dir) + aligned_output_filepath = "{}/{}".format(alignment_output_dir.rstrip("/"), aligned_sam_output) + + if parser_result.counter.lower() == "featurecount" or parser_result.counter.lower() == "featurecounts": + counter_output, counter_qc_output = count_reads_featurecount(sample_name, aligned_output_filepath, + paired_reads, alignment_output_dir) + elif parser_result.counter.lower() == "htseq": + counter_output, counter_qc_output = count_reads_htseq(sample_name, aligned_output_filepath, paired_reads, + alignment_output_dir) + else: + print("Counter specified is not yet supported. Defaulting to featureCount") + counter_output, counter_qc_output = count_reads_featurecount(sample_name, aligned_output_filepath, + paired_reads, alignment_output_dir) + counter_output.extend(aligner_qc_output) counter_output.extend(counter_qc_output) - if parser_result.run_picard: + if parser_result.run_picard and aligned_output_filepath != "": picard_qc_output = run_picard(sample_name, aligned_output_filepath, alignment_output_dir) counter_output.extend(picard_qc_output) @@ -407,9 +532,10 @@ def alignment_count_step(keyval): if __name__ == "__main__": + print("pickle protocol {}".format(pyspark.serializers.protocol)) global parser_result - parser = argparse.ArgumentParser(description='Spark-based RNA-seq Pipeline') + parser = argparse.ArgumentParser(description='Spark-based RNA-seq Pipeline Quantification') parser.add_argument('--input', '-i', action="store", dest="input_dir", help="Input directory - HDFS or S3") parser.add_argument('--output', '-o', action="store", dest="output_dir", help="Output directory - HDFS or S3") parser.add_argument('--annotation', '-a', action="store", dest="annotation_file", @@ -419,11 +545,11 @@ def alignment_count_step(keyval): , default="NONE") parser.add_argument('--run_picard', '-rp', action="store_true", dest="run_picard", help="Run picard") parser.add_argument('--aligner_tools', '-at', action="store", dest="aligner", nargs='?', - help="Aligner to be used (STAR|HISAT2)", default="STAR") + help="Aligner to be used (STAR|HISAT2|Subread)", default="STAR") parser.add_argument('--aligner_extra_args', '-s', action="store", dest="aligner_extra_args", nargs='?', help="Extra argument to be passed to alignment tool", default="") parser.add_argument('--counter_tools', '-ct', action="store", dest="counter", nargs='?', - help="Counter to be used (featureCount|StringTie)", default="featureCount") + help="Counter to be used (featureCount|StringTie|STAR)", default="featureCount") parser.add_argument('--counter_extra_args', '-c', action="store", dest="counter_extra_args", nargs='?', help="Extra argument to be passed to quantification tool", default="") parser.add_argument('--picard_extra_args', '-p', action="store", dest="picard_extra_args", nargs='?', @@ -434,11 +560,10 @@ def alignment_count_step(keyval): split_num = 0 - conf = SparkConf().setAppName("Spark-based RNA-seq Pipeline Multifile") + conf = SparkConf().setAppName("Spark-based RNA-seq Pipeline Quantification") sc = SparkContext(conf=conf) if parser_result.input_dir.startswith("s3://"): # From S3 - s3_client = boto3.client('s3', region_name=parser_result.aws_region) # Get number of input files s3_paginator = s3_client.get_paginator('list_objects') diff --git a/submit_alignment_job.py b/submit_alignment_job.py new file mode 100644 index 0000000..d8ad256 --- /dev/null +++ b/submit_alignment_job.py @@ -0,0 +1,173 @@ +import configparser +import argparse +import boto3 +import utility +import sys +from collections import OrderedDict + +global job_configuration, cluster_id, spark_extra_config +job_configuration = "alignment_job.config" +cluster_id = "" +spark_extra_config = [("spark.memory.fraction", "0.5"), + ("spark.memory.storageFraction", "0.3"), + ("spark.python.profile", "true"), + ("spark.python.worker.reuse", "false"), + ("spark.yarn.executor.memoryOverhead", "12288"), + ("spark.driver.maxResultSize", "0"), + ("spark.executor.extraJavaOptions", + "-Dlog4j.configuration=file:///etc/spark/conf/log4j.properties " + "-XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=30 " + "-XX:MaxHeapFreeRatio=50 -XX:+CMSClassUnloadingEnabled " + "-XX:MaxPermSize=512M -XX:OnOutOfMemoryError='kill -9 %%p'" + "-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/app/oom_dump_`date`.hprof")] + + +def check_configuration(config): + if not utility.check_config(config, "job_config", ["name", "action_on_failure", "alignment_script", + "alignment_script_s3_location", "upload_alignment_script"]): + return False + + if not utility.check_upload_config(config["job_config"], "upload_alignment_script", "alignment_script", + "alignment_script_local_location", "alignment_script_s3_location"): + return False + + if not utility.check_config(config, "spark_config", ["driver_memory", "executor_memory"]): + return False + + if not utility.check_config(config, "script_arguments", ["input_location", "output_location", + "region", "aligner_tool"]): + return False + + if not utility.check_s3_region(config["script_arguments"]["region"]): + return False + + return True + + +def calculate_num_executor(cluster_id, executor_memory): + global spark_extra_config + + memory_overhead = 512 + for conf in spark_extra_config: + if conf[0] == "spark.yarn.executor.memoryOverhead": + memory_overhead = int(conf[1]) + + memory_per_executor = int(executor_memory.strip("g")) + memory_overhead/1024 + + total_mem, total_cpu = utility.get_cluster_mem_cpu(cluster_id) + + if total_mem < 0 or total_cpu < 0: + num_executors = -1 # dry run + else: + num_executors = int(total_mem/memory_per_executor) + + return num_executors + + +def build_command(config): + global cluster_id + + job_arguments = OrderedDict() + job_arguments["JobFlowId"] = cluster_id + + step_arguments = OrderedDict() + step_arguments['Name'] = config["job_config"]["name"] + step_arguments["ActionOnFailure"] = config["job_config"]["action_on_failure"] + + hadoop_arguments = OrderedDict() + hadoop_arguments["Jar"] = "command-runner.jar" + + command_args = ["spark-submit", + "--deploy-mode", "cluster"] + + for config_name, config_value in spark_extra_config: + command_args.append("--conf") + command_args.append("{}={}".format(config_name, config_value)) + + for spark_conf in config["spark_config"]: + command_args.append("--" + spark_conf.replace("_", "-")) + command_args.append(config["spark_config"][spark_conf]) + + command_args.append(config["job_config"]["alignment_script_s3_location"].rstrip("/") + "/" + + config["job_config"]["alignment_script"]) + + command_args.append("-i") + command_args.append(config["script_arguments"]["input_location"]) + command_args.append("-o") + command_args.append(config["script_arguments"]["output_location"]) + + command_args.append("-at={}".format(config["script_arguments"]["aligner_tool"])) + + if "aligner_extra_args" in config["script_arguments"] and config["script_arguments"]["aligner_extra_args"].strip() != "": + command_args.append('-s={}'.format(config["script_arguments"]["aligner_extra_args"])) + + command_args.append("-r") + command_args.append(config["script_arguments"]["region"]) + + hadoop_arguments['Args'] = command_args + step_arguments["HadoopJarStep"] = hadoop_arguments + job_arguments["Steps"] = [step_arguments] + return job_arguments + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description='Job submission script for spark-based RNA-seq Alignment') + parser.add_argument('--config', '-c', action="store", dest="job_config", help="Job configuration file") + parser.add_argument('--cluster-id', '-id', action="store", dest="cluster_id", help="Cluster ID for submission") + parser.add_argument('--dry-run', '-d', action="store_true", dest="dry_run", + help="Produce the configurations for the job flow to be submitted") + parser_result = parser.parse_args() + + if parser_result.job_config is not None and parser_result.job_config.strip() != "": + job_configuration = parser_result.job_config.strip() + + config = configparser.ConfigParser() + config.optionxform = str + config.read(job_configuration) + + if parser_result.cluster_id is None or parser_result.cluster_id.strip() == "": + cluster_id = utility.get_cluster_id(parser_result.dry_run) + else: + cluster_id = parser_result.cluster_id.strip() + + if cluster_id != "" and check_configuration(config): + if config["job_config"].get("upload_alignment_script", "False") == "True": + utility.upload_files_to_s3([(config["job_config"]["alignment_script"], + config["job_config"]["alignment_script_local_location"], + config["job_config"]["alignment_script_s3_location"])], parser_result.dry_run) + + num_executors = calculate_num_executor(cluster_id, config["spark_config"]["executor_memory"]) + if num_executors < 0: + config["spark_config"]["num_executors"] = "None" + else: + config["spark_config"]["num_executors"] = str(num_executors) + + config["spark_config"]["executor_cores"] = "1" + + job_argument = build_command(config) + + if not parser_result.dry_run: + emr_client = boto3.client("emr") + # warn user before removing any output + out = config["script_arguments"]["output_location"] + # find out which output dirs, if any, exist + dirs_to_remove = utility.check_s3_path_exists([out]) + # create a list of the names of the directories to remove + if dirs_to_remove: + response = input("About to remove any existing output directories." + + "\n\n\t{}\n\nProceed? [y/n]: ".format( + '\n\n\t'.join(dirs_to_remove))) + while response not in ['y', 'n']: + response = input('Proceed? [y/n]: ') + if response == 'n': + print("Program Terminated. Modify config file to change " + + "output directories.") + sys.exit(0) + # remove the output directories + if not utility.remove_s3_files(dirs_to_remove): + print("Program terminated") + sys.exit(1) + job_submission = emr_client.add_job_flow_steps(**job_argument) + print("Submitted job to cluster {}. Job id is {}".format(cluster_id, job_submission["StepIds"][0])) + else: + print(job_argument) diff --git a/submit_analysis_job.py b/submit_analysis_job.py index 0a71dee..0e4e4f8 100644 --- a/submit_analysis_job.py +++ b/submit_analysis_job.py @@ -12,13 +12,13 @@ ("spark.python.profile", "true"), ("spark.python.worker.reuse", "false"), ("spark.yarn.executor.memoryOverhead", "4096"), - ("spark.driver.maxResultSize", "3g"), + ("spark.driver.maxResultSize", "0"), ("spark.executor.extraJavaOptions", "-Dlog4j.configuration=file:///etc/spark/conf/log4j.properties " "-XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=30 " "-XX:MaxHeapFreeRatio=50 -XX:+CMSClassUnloadingEnabled " "-XX:MaxPermSize=512M -XX:OnOutOfMemoryError='kill -9 %%p'" - " -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/mnt/app/oom_dump_`date`.hprof")] + " -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/app/oom_dump_`date`.hprof")] def check_configuration(config): @@ -107,7 +107,8 @@ def build_command(config): if "run_picard" in config["script_arguments"] and config["script_arguments"]["run_picard"].lower() == "true": command_args.append("--run_picard") - if "aligner_extra_args" in config["script_arguments"] and config["script_arguments"]["aligner_extra_args"].strip() != "": + if "aligner_extra_args" in config["script_arguments"] and \ + config["script_arguments"]["aligner_extra_args"].strip() != "": command_args.append('-s={}'.format(config["script_arguments"]["aligner_extra_args"])) if "counter_extra_args" in config["script_arguments"] and \ @@ -127,8 +128,9 @@ def build_command(config): return job_arguments + if __name__ == "__main__": - parser = argparse.ArgumentParser(description='Job submission script for spark-based RNA-seq Pipeline') + parser = argparse.ArgumentParser(description='Job submission script for spark-based RNA-seq Quantification') parser.add_argument('--config', '-c', action="store", dest="job_config", help="Job configuration file") parser.add_argument('--cluster-id', '-id', action="store", dest="cluster_id", help="Cluster ID for submission") parser.add_argument('--dry-run', '-d', action="store_true", dest="dry_run", diff --git a/submit_assembly_job.py b/submit_assembly_job.py new file mode 100644 index 0000000..00db2ca --- /dev/null +++ b/submit_assembly_job.py @@ -0,0 +1,202 @@ +import configparser +import argparse +import boto3 +import utility +import sys +from collections import OrderedDict + +global job_configuration, cluster_id, spark_extra_config +job_configuration = "assembly_job.config" +cluster_id = "" +spark_extra_config = [("spark.driver.maxResultSize", "0"), + ("spark.memory.fraction", "0.95"), + ("spark.memory.storageFraction", "0.05"), + ("spark.python.worker.reuse", "False"), + ("spark.python.worker.memory", "1024m"), + ("spark.serializer", "org.apache.spark.serializer.KryoSerializer"), + ("spark.yarn.executor.memoryOverhead", "4096"), + ("spark.executor.extraJavaOptions", + "-Dlog4j.debug=true " + "-Dlog4j.configuration=file:///etc/spark/conf/log4j.properties " + "-XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=30 " + "-XX:MaxHeapFreeRatio=50 -XX:+CMSClassUnloadingEnabled " + "-XX:MaxPermSize=512M -XX:OnOutOfMemoryError='kill -9 %%p'" + " -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/app/oom_dump_`date`.hprof" + ) + ] + + +def check_configuration(config): + if not utility.check_config(config, "job_config", ["name", "action_on_failure", "assembly_script", + "assembly_script_s3_location", "upload_assembly_script"]): + return False + + if not utility.check_upload_config(config["job_config"], "upload_assembly_script", "assembly_script", + "assembly_script_local_location", "assembly_script_s3_location"): + return False + + if not utility.check_config(config, "spark_config", ["driver_memory", "executor_memory"]): + return False + + if not utility.check_config(config, "script_arguments", ["input_location", "output_location", "annotation_file", + "enable_tiling", "enable_analysis", "region", + "aligner_tool", "assembler_tool", + "assembler_extra_args", "assembler_merge_extra_args"]): + return False + + if not utility.check_s3_region(config["script_arguments"]["region"]): + return False + + return True + + +def calculate_num_executor(cluster_id, executor_memory): + global spark_extra_config + + memory_overhead = 512 + for conf in spark_extra_config: + if conf[0] == "spark.yarn.executor.memoryOverhead": + memory_overhead = int(conf[1]) + + memory_per_executor = int(executor_memory.strip("g")) + memory_overhead/1024 + + total_mem, total_cpu = utility.get_cluster_mem_cpu(cluster_id) + + if total_mem < 0 or total_cpu < 0: + num_executors = -1 # dry run + else: + num_executors = int(total_mem/memory_per_executor) + + return num_executors + + +def build_command(config): + global cluster_id + + job_arguments = OrderedDict() + job_arguments["JobFlowId"] = cluster_id + + step_arguments = OrderedDict() + step_arguments['Name'] = config["job_config"]["name"] + step_arguments["ActionOnFailure"] = config["job_config"]["action_on_failure"] + + hadoop_arguments = OrderedDict() + hadoop_arguments["Jar"] = "command-runner.jar" + + command_args = ["spark-submit", + "--deploy-mode", "cluster"] + + for config_name, config_value in spark_extra_config: + command_args.append("--conf") + command_args.append("{}={}".format(config_name, config_value)) + + for spark_conf in config["spark_config"]: + command_args.append("--" + spark_conf.replace("_", "-")) + command_args.append(config["spark_config"][spark_conf]) + + command_args.append(config["job_config"]["assembly_script_s3_location"].rstrip("/") + "/" + + config["job_config"]["assembly_script"]) + + command_args.append("-i") + command_args.append(config["script_arguments"]["input_location"]) + command_args.append("-o") + command_args.append(config["script_arguments"]["output_location"]) + command_args.append("-a={}".format(config["script_arguments"]["annotation_file"])) + + command_args.append("-at={}".format(config["script_arguments"]["aligner_tool"])) + command_args.append("-as={}".format(config["script_arguments"]["assembler_tool"])) + + if "aligner_extra_args" in config["script_arguments"] and \ + config["script_arguments"]["aligner_extra_args"].strip() != "": + command_args.append('-s={}'.format(config["script_arguments"]["aligner_extra_args"])) + + if "assembler_extra_args" in config["script_arguments"] and \ + config["script_arguments"]["assembler_extra_args"].strip() != "": + command_args.append("-ag={}".format(config["script_arguments"]["assembler_extra_args"])) + + if "assembler_merge_extra_args" in config["script_arguments"] and \ + config["script_arguments"]["assembler_merge_extra_args"].strip() != "": + command_args.append("-am={}".format(config["script_arguments"]["assembler_merge_extra_args"])) + + if "assembler_use_reference" in config["script_arguments"] and \ + config["script_arguments"]["assembler_use_reference"].lower() == "true": + command_args.append("-aur") + + if "enable_tiling" in config["script_arguments"] and config["script_arguments"]["enable_tiling"].lower() == "true": + command_args.append("-et") + + if "enable_analysis" in config["script_arguments"] and \ + config["script_arguments"]["enable_analysis"].lower() == "true": + command_args.append("-ea") + + command_args.append("-r") + command_args.append(config["script_arguments"]["region"]) + + hadoop_arguments['Args'] = command_args + step_arguments["HadoopJarStep"] = hadoop_arguments + job_arguments["Steps"] = [step_arguments] + + return job_arguments + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description='Job submission script for spark-based RNA-seq Transcript Assembly') + parser.add_argument('--config', '-c', action="store", dest="job_config", help="Job configuration file") + parser.add_argument('--cluster-id', '-id', action="store", dest="cluster_id", help="Cluster ID for submission") + parser.add_argument('--dry-run', '-d', action="store_true", dest="dry_run", + help="Produce the configurations for the job flow to be submitted") + parser_result = parser.parse_args() + + if parser_result.job_config is not None and parser_result.job_config.strip() != "": + job_configuration = parser_result.job_config.strip() + + config = configparser.ConfigParser() + config.optionxform = str + config.read(job_configuration) + + if parser_result.cluster_id is None or parser_result.cluster_id.strip() == "": + cluster_id = utility.get_cluster_id(parser_result.dry_run) + else: + cluster_id = parser_result.cluster_id.strip() + + if cluster_id != "" and check_configuration(config): + if config["job_config"].get("upload_assembly_script", "False") == "True": + utility.upload_files_to_s3([(config["job_config"]["assembly_script"], + config["job_config"]["assembly_script_local_location"], + config["job_config"]["assembly_script_s3_location"])], parser_result.dry_run) + + num_executors = calculate_num_executor(cluster_id, config["spark_config"]["executor_memory"]) + if num_executors < 0: + config["spark_config"]["num_executors"] = "None" + else: + config["spark_config"]["num_executors"] = str(num_executors) + + config["spark_config"]["executor_cores"] = "1" + + job_argument = build_command(config) + + if not parser_result.dry_run: + emr_client = boto3.client("emr") + # warn user before removing any output + out = config["script_arguments"]["output_location"] + # find out which output dirs, if any, exist + dirs_to_remove = utility.check_s3_path_exists([out]) + # create a list of the names of the directories to remove + if dirs_to_remove: + response = input("About to remove any existing output directories." + + "\n\n\t{}\n\nProceed? [y/n]: ".format( + '\n\n\t'.join(dirs_to_remove))) + while response not in ['y', 'n']: + response = input('Proceed? [y/n]: ') + if response == 'n': + print("Program Terminated. Modify config file to change " + + "output directories.") + sys.exit(0) + # remove the output directories + if not utility.remove_s3_files(dirs_to_remove): + print("Program terminated") + sys.exit(1) + job_submission = emr_client.add_job_flow_steps(**job_argument) + print("Submitted job to cluster {}. Job id is {}".format(cluster_id, job_submission["StepIds"][0])) + else: + print(job_argument)