diff --git a/metamorph b/metamorph index 2795f03..17a1f55 100755 --- a/metamorph +++ b/metamorph @@ -47,9 +47,10 @@ EXAMPLES: # Python standard library from __future__ import print_function import sys, os, subprocess, re, json, textwrap +from datetime import timezone, datetime # 3rd party imports from pypi -import argparse # potential python3 3rd party package, added in python/3.5 +import argparse # Local imports from src import version @@ -70,6 +71,7 @@ __version__ = version __authors__ = 'Neelam Redekar, Skyler Kuhn' __email__ = 'neelam.redekar nih.gov, skyler.kuhn nih.gov' __home__ = os.path.dirname(os.path.abspath(__file__)) +_datetime = int(datetime.now(tz=timezone.utc).timestamp() * 1000) _name = os.path.basename(sys.argv[0]) _description = 'An awesome metagenomics and metatranscriptomics pipeline' @@ -87,7 +89,10 @@ def unlock(sub_args): try: unlock_output = subprocess.check_output([ - 'snakemake', '--unlock', + 'snakemake', + '--unlock', + '--force', + '-s', os.path.abspath(f'{outdir}/workflow/Snakefile'), '--cores', '1', '--configfile=config.json' ], cwd = outdir, @@ -146,7 +151,8 @@ def run(sub_args): ) config['bindpaths'] = bindpaths - config['coassembly'] = sub_args.coa + # config['coassembly'] = sub_args.coa + config['coassembly'] = False # Step 4. Save config to output directory with open(os.path.join(sub_args.output, 'config.json'), 'w') as fh: @@ -185,7 +191,7 @@ def run(sub_args): submission_script=os.path.join(__home__, 'src', 'run.sh'), logger = logfh, additional_bind_paths = ",".join(bindpaths), - tmp_dir = sub_args.tmp_dir, + tmp_dir = sub_args.tmp_dir, ) # Step 6. Wait for subprocess to complete, @@ -292,9 +298,7 @@ def parsed_arguments(name, description): provided working directory has not been initialized, it will be created automatically. Example: --output /data/$USER/output - - {3}{4}Analysis options:{5} - ...coming soon! + {3}{4}Orchestration options:{5} --mode {{slurm,local}} @@ -372,35 +376,50 @@ def parsed_arguments(name, description): # Display example usage in epilog run_epilog = textwrap.dedent("""\ - {2}{3}Example:{4} - # Step 1.) Grab an interactive node, - # do not run on head node! - srun -N 1 -n 1 --time=1:00:00 --mem=8gb --cpus-per-task=2 --pty bash - module purge - module load singularity snakemake - - # Step 2A.) Dry-run the pipeline - ./{0} run --input .tests/*.R?.fastq.gz \\ - --output /data/$USER/output \\ - --mode slurm \\ - --dry-run - - # Step 3A.) Run the {0} pipeline in per-sample fashion - # The slurm mode will submit jobs to - # the cluster. It is recommended running - # the pipeline in this mode. - ./{0} run --input .tests/*.R?.fastq.gz \\ - --output /data/$USER/output \\ - --mode slurm - - # Step 3B.) Run the {0} pipeline in co-assembly fashion - # with slurm - ./{0} run --coa --input .tests/*.R?.fastq.gz \\ - --output /data/$USER/output \\ - --mode slurm + {2}{3}INPUT MODES:{4} + # Step 1.) Grab an interactive node, + # do not run on head node! + srun -N 1 -n 1 --time=1:00:00 --mem=8gb --cpus-per-task=2 --pty bash + module purge + module load singularity snakemake + + # Step 2A.) Dry-run the pipeline + ./{0} run --input .tests/*.R?.fastq.gz \\ + --output /data/$USER/output \\ + --mode slurm \\ + --dry-run + + # Step 3A.) Run the {0} pipeline in per-sample fashion + # The slurm mode will submit jobs to + # the cluster. It is recommended running + # the pipeline in this mode. + ./{0} run --input .tests/*.R?.fastq.gz \\ + --output /data/$USER/output \\ + --mode slurm + + # Step 3B.) Run the {0} pipeline in co-assembly fashion + # with slurm + ./{0} run --coa --input .tests/*.R?.fastq.gz \\ + --output /data/$USER/output \\ + --mode slurm + + {2}{3}EXAMPLES:{4} + co-assembly dna-only: + $ metamorph run --coa --input *.R?.fastq.gz --output output + $ metamorph run -C --input *.R?.fastq.gz --output output + + per-sample assembly dna-only: + $ metamorph run --input *.R?.fastq.gz --output output + + co-assembly rna & dna: + $ metamorph run --coa --input *.R?.fastq.gz --rna rna/*.R?.fastq.gz --output output + $ metamorph run -C --input *.R?.fastq.gz --rna rna/*.R?.fastq.gz --output output + + per-sample assembly rna & dna: + $ metamorph run --input *.R?.fastq.gz --rna rna/*.R?.fastq.gz --output output - {2}{3}Version:{4} + {2}{3}VERSION:{4} {1} """.format(name, __version__, c.bold, c.url, c.end)) @@ -469,12 +488,19 @@ def parsed_arguments(name, description): ) # a supported job scheduler, etc. - subparser_run.add_argument( - '-C', '--coa', - action="store_true", - required = False, - help = argparse.SUPPRESS - ) + # subparser_run.add_argument( + # '-C', '--coa', + # action="store_true", + # required = False, + # help = argparse.SUPPRESS + # ) + + # subparser_run.add_argument( + # '-R', '--rnacoa', + # action="store_true", + # required = False, + # help = argparse.SUPPRESS + # ) # Name of master job subparser_run.add_argument( @@ -529,7 +555,7 @@ def parsed_arguments(name, description): '--tmp-dir', type = str, required = False, - default = '/lscratch/$SLURM_JOBID/', + default = '/lscratch/$SLURM_JOB_ID/', help = argparse.SUPPRESS ) diff --git a/src/run.py b/src/run.py index c6df6c2..96b7391 100644 --- a/src/run.py +++ b/src/run.py @@ -18,6 +18,10 @@ from . import version as __version__ +FASTQ_INPUT_EXT = ".fastq.gz" +FASTQ_R1_POSTFIX = f"_R1{FASTQ_INPUT_EXT}" +FASTQ_R2_POSTFIX = f"_R2{FASTQ_INPUT_EXT}" + def init(repo_path, output_path, links=[], required=['workflow', 'resources', 'config']): """Initialize the output directory. If user provides a output @@ -125,19 +129,19 @@ def rename(filename): # key = regex to match string and value = how it will be renamed extensions = { # Matches: _R[12]_fastq.gz, _R[12].fastq.gz, _R[12]_fq.gz, etc. - ".R1.f(ast)?q.gz$": ".R1.fastq.gz", - ".R2.f(ast)?q.gz$": ".R2.fastq.gz", + ".R1.f(ast)?q.gz$": FASTQ_R1_POSTFIX, + ".R2.f(ast)?q.gz$": FASTQ_R2_POSTFIX, # Matches: _R[12]_001_fastq_gz, _R[12].001.fastq.gz, _R[12]_001.fq.gz, etc. # Capture lane information as named group - ".R1.(?P...).f(ast)?q.gz$": ".R1.fastq.gz", - ".R2.(?P...).f(ast)?q.gz$": ".R2.fastq.gz", + ".R1.(?P...).f(ast)?q.gz$": FASTQ_R1_POSTFIX, + ".R2.(?P...).f(ast)?q.gz$": FASTQ_R2_POSTFIX, # Matches: _[12].fastq.gz, _[12].fq.gz, _[12]_fastq_gz, etc. - "_1.f(ast)?q.gz$": ".R1.fastq.gz", - "_2.f(ast)?q.gz$": ".R2.fastq.gz" + "_1.f(ast)?q.gz$": FASTQ_R1_POSTFIX, + "_2.f(ast)?q.gz$": FASTQ_R2_POSTFIX } - if (filename.endswith('.R1.fastq.gz') or - filename.endswith('.R2.fastq.gz')): + if (filename.endswith(FASTQ_R1_POSTFIX) or + filename.endswith(FASTQ_R2_POSTFIX)): # Filename is already in the correct format return filename @@ -349,7 +353,7 @@ def mixed_inputs(ifiles): fastqs = False bams = False for file in ifiles: - if file.endswith('.R1.fastq.gz') or file.endswith('.R2.fastq.gz'): + if file.endswith(FASTQ_R1_POSTFIX) or file.endswith(FASTQ_R2_POSTFIX): fastqs = True fq_files.append(file) elif file.endswith('.bam'): @@ -395,13 +399,17 @@ def add_user_information(config): config['project']['userhome'] = home config['project']['username'] = username - dt = datetime.now().strftime("%m_%d_%Y") - config['project']['id'] = f"{dt}_metagenome_results" + # dt = datetime.now().strftime("%m_%d_%Y") + # config['project']['id'] = f"{dt}_metagenome_results" + + # TODO: figure up way to uniquely ID results, engineering out + # the problem of misidentifing results files + config['project']['id'] = "metagenome_results" return config -def add_sample_metadata(input_files, config, rna_files=None, group=None): +def add_sample_metadata(input_files, config, group_key='samples'): """Adds sample metadata such as sample basename, label, and group information. If sample sheet is provided, it will default to using information in that file. If no sample sheet is provided, it will only add sample basenames and labels. @@ -415,14 +423,14 @@ def add_sample_metadata(input_files, config, rna_files=None, group=None): Updated config with basenames, labels, and groups (if provided) """ added = [] - config['samples'] = [] + config[group_key] = [] for file in input_files: # Split sample name on file extension - sample = re.split('\.R[12]\.fastq\.gz', os.path.basename(file))[0] + sample = re.split('[\S]R[12]', os.path.basename(file))[0] if sample not in added: # Only add PE sample information once added.append(sample) - config['samples'].append(sample) + config[group_key].append(sample) return config @@ -453,11 +461,15 @@ def add_rawdata_information(sub_args, config, ifiles): config['project']['filetype'] = convert[nends] # Finds the set of rawdata directories to bind - rawdata_paths = get_rawdata_bind_paths(input_files = sub_args.input) - config['project']['datapath'] = ','.join(rawdata_paths) + config['project']['datapath'] = ','.join(get_rawdata_bind_paths(input_files = sub_args.input)) + if sub_args.rna: + config["project"]["rna_datapath"] = ','.join(get_rawdata_bind_paths(input_files = sub_args.rna)) # Add each sample's basename config = add_sample_metadata(ifiles['dna'], config) + + if 'rna' in ifiles: + config = add_sample_metadata(ifiles['rna'], config, group_key='rna') return config @@ -517,7 +529,7 @@ def get_nends(ifiles): bam_files = True nends_status = -1 break - elif file.endswith('.R2.fastq.gz'): + elif file.endswith(FASTQ_R2_POSTFIX): paired_end = True nends_status = 2 break # dataset is paired-end @@ -528,7 +540,7 @@ def get_nends(ifiles): nends = {} # keep count of R1 and R2 for each sample for file in ifiles: # Split sample name on file extension - sample = re.split('\.R[12]\.fastq\.gz', os.path.basename(file))[0] + sample = re.split('\_R[12]\.fastq\.gz', os.path.basename(file))[0] if sample not in nends: nends[sample] = 0 @@ -542,8 +554,8 @@ def get_nends(ifiles): both mates (R1 and R2) for the following samples:\n\t\t{}\n Please check that the basename for each sample is consistent across mates. Here is an example of a consistent basename across mates: - consistent_basename.R1.fastq.gz - consistent_basename.R2.fastq.gz + consistent_basename_R1.fastq.gz + consistent_basename_R2.fastq.gz Please do not run the pipeline with a mixture of single-end and paired-end samples. This feature is currently not supported within {}, and it is @@ -635,7 +647,7 @@ def runner( threads=2, jobname=__job_name__, submission_script='run.sh', - tmp_dir = '/lscratch/$SLURM_JOBID/' + tmp_dir = '/lscratch/$SLURM_JOB_ID/' ): """Runs the pipeline via selected executor: local or slurm. If 'local' is selected, the pipeline is executed locally on a compute node/instance. @@ -716,10 +728,11 @@ def runner( # snakemake API call: https://snakemake.readthedocs.io/en/stable/api_reference/snakemake.html masterjob = subprocess.Popen([ 'snakemake', '-pr', - #'--rerun-incomplete', + '--rerun-incomplete', + '--rerun-triggers input', '--verbose', '--use-singularity', - '--singularity-args', "\\-C \\-B '{}'".format(bindpaths), + '--singularity-args', "\\-c \\-B '{}'".format(bindpaths), '--cores', str(threads), '--configfile=config.json' ], cwd = outdir, stderr=subprocess.STDOUT, stdout=logger, env=my_env) diff --git a/src/run.sh b/src/run.sh index b01df3a..6006904 100755 --- a/src/run.sh +++ b/src/run.sh @@ -209,11 +209,10 @@ function submit(){ if [[ ${6#\'} != /lscratch* ]]; then CLUSTER_OPTS="sbatch --cpus-per-task {cluster.threads} -p {cluster.partition} -t {cluster.time} --mem {cluster.mem} --job-name={params.rname} -e $SLURM_DIR/slurm-%j_{params.rname}.out -o $SLURM_DIR/slurm-%j_{params.rname}.out" fi - # Create sbacth script to build index cat << EOF > kickoff.sh #!/usr/bin/env bash -#SBATCH --cpus-per-task=16 -#SBATCH --mem=96g +#SBATCH --cpus-per-task=16 +#SBATCH --mem=32g #SBATCH --time=5-00:00:00 #SBATCH --parsable #SBATCH -J "$2" @@ -222,16 +221,29 @@ function submit(){ #SBATCH --error "$3/logfiles/snakemake.log" set -euo pipefail # Main process of pipeline -snakemake --latency-wait 120 -s "$3/workflow/Snakefile" -d "$3" \\ - --use-singularity --singularity-args "\\-C \\-B '$4'" \\ - --use-envmodules --verbose --configfile="$3/config.json" \\ - --printshellcmds --cluster-config "$3/config/cluster.json" \\ - --cluster "${CLUSTER_OPTS}" --keep-going -j 500 \\ - --rerun-incomplete --stats "$3/logfiles/runtime_statistics.json" \\ - --keep-incomplete --restart-times 0 \\ - --keep-remote --local-cores 14 2>&1 +snakemake \\ + -p \\ + --latency-wait 120 \\ + -s "$3/workflow/Snakefile" \\ + -d "$3" \\ + --use-singularity \\ + --singularity-args "\\-c \\-B '$4'" \\ + --use-envmodules \\ + --verbose \\ + --configfile "$3/config.json" \\ + --printshellcmds \\ + --cluster-config $3/config/cluster.json \\ + --cluster "${CLUSTER_OPTS}" \\ + --keep-going \\ + --rerun-incomplete \\ + --jobs 500 \\ + --keep-remote \\ + --stats "$3/logfiles/runtime_statistics.json" \\ + --restart-times 0 \\ + --keep-incomplete \\ + --local-cores "14" 2>&1 # Create summary report -snakemake -d "$3" --report "Snakemake_Report.html" +snakemake -s "$3/workflow/Snakefile" -d "$3" --configfile="$3/config.json" --report "Snakemake_Report.html" EOF chmod +x kickoff.sh job_id=$(sbatch kickoff.sh | tee -a "$3"/logfiles/master.log)