Skip to content

Commit

Permalink
feat: python execution infrastructure for snakemake execution with or…
Browse files Browse the repository at this point in the history
… without rna
  • Loading branch information
rroutsong committed Feb 27, 2024
1 parent 07ea12b commit e49c5ab
Show file tree
Hide file tree
Showing 3 changed files with 128 additions and 77 deletions.
108 changes: 67 additions & 41 deletions metamorph
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,10 @@ EXAMPLES:
# Python standard library
from __future__ import print_function
import sys, os, subprocess, re, json, textwrap
from datetime import timezone, datetime

# 3rd party imports from pypi
import argparse # potential python3 3rd party package, added in python/3.5
import argparse

# Local imports
from src import version
Expand All @@ -70,6 +71,7 @@ __version__ = version
__authors__ = 'Neelam Redekar, Skyler Kuhn'
__email__ = 'neelam.redekar <AT> nih.gov, skyler.kuhn <AT> nih.gov'
__home__ = os.path.dirname(os.path.abspath(__file__))
_datetime = int(datetime.now(tz=timezone.utc).timestamp() * 1000)
_name = os.path.basename(sys.argv[0])
_description = 'An awesome metagenomics and metatranscriptomics pipeline'

Expand All @@ -87,7 +89,10 @@ def unlock(sub_args):

try:
unlock_output = subprocess.check_output([
'snakemake', '--unlock',
'snakemake',
'--unlock',
'--force',
'-s', os.path.abspath(f'{outdir}/workflow/Snakefile'),
'--cores', '1',
'--configfile=config.json'
], cwd = outdir,
Expand Down Expand Up @@ -146,7 +151,8 @@ def run(sub_args):
)

config['bindpaths'] = bindpaths
config['coassembly'] = sub_args.coa
# config['coassembly'] = sub_args.coa
config['coassembly'] = False

# Step 4. Save config to output directory
with open(os.path.join(sub_args.output, 'config.json'), 'w') as fh:
Expand Down Expand Up @@ -185,7 +191,7 @@ def run(sub_args):
submission_script=os.path.join(__home__, 'src', 'run.sh'),
logger = logfh,
additional_bind_paths = ",".join(bindpaths),
tmp_dir = sub_args.tmp_dir,
tmp_dir = sub_args.tmp_dir,
)

# Step 6. Wait for subprocess to complete,
Expand Down Expand Up @@ -292,9 +298,7 @@ def parsed_arguments(name, description):
provided working directory has not been initialized,
it will be created automatically.
Example: --output /data/$USER/output
{3}{4}Analysis options:{5}
...coming soon!
{3}{4}Orchestration options:{5}
--mode {{slurm,local}}
Expand Down Expand Up @@ -372,35 +376,50 @@ def parsed_arguments(name, description):

# Display example usage in epilog
run_epilog = textwrap.dedent("""\
{2}{3}Example:{4}
# Step 1.) Grab an interactive node,
# do not run on head node!
srun -N 1 -n 1 --time=1:00:00 --mem=8gb --cpus-per-task=2 --pty bash
module purge
module load singularity snakemake
# Step 2A.) Dry-run the pipeline
./{0} run --input .tests/*.R?.fastq.gz \\
--output /data/$USER/output \\
--mode slurm \\
--dry-run
# Step 3A.) Run the {0} pipeline in per-sample fashion
# The slurm mode will submit jobs to
# the cluster. It is recommended running
# the pipeline in this mode.
./{0} run --input .tests/*.R?.fastq.gz \\
--output /data/$USER/output \\
--mode slurm
# Step 3B.) Run the {0} pipeline in co-assembly fashion
# with slurm
./{0} run --coa --input .tests/*.R?.fastq.gz \\
--output /data/$USER/output \\
--mode slurm
{2}{3}INPUT MODES:{4}
# Step 1.) Grab an interactive node,
# do not run on head node!
srun -N 1 -n 1 --time=1:00:00 --mem=8gb --cpus-per-task=2 --pty bash
module purge
module load singularity snakemake
# Step 2A.) Dry-run the pipeline
./{0} run --input .tests/*.R?.fastq.gz \\
--output /data/$USER/output \\
--mode slurm \\
--dry-run
# Step 3A.) Run the {0} pipeline in per-sample fashion
# The slurm mode will submit jobs to
# the cluster. It is recommended running
# the pipeline in this mode.
./{0} run --input .tests/*.R?.fastq.gz \\
--output /data/$USER/output \\
--mode slurm
# Step 3B.) Run the {0} pipeline in co-assembly fashion
# with slurm
./{0} run --coa --input .tests/*.R?.fastq.gz \\
--output /data/$USER/output \\
--mode slurm
{2}{3}EXAMPLES:{4}
co-assembly dna-only:
$ metamorph run --coa --input *.R?.fastq.gz --output output
$ metamorph run -C --input *.R?.fastq.gz --output output
per-sample assembly dna-only:
$ metamorph run --input *.R?.fastq.gz --output output
co-assembly rna & dna:
$ metamorph run --coa --input *.R?.fastq.gz --rna rna/*.R?.fastq.gz --output output
$ metamorph run -C --input *.R?.fastq.gz --rna rna/*.R?.fastq.gz --output output
per-sample assembly rna & dna:
$ metamorph run --input *.R?.fastq.gz --rna rna/*.R?.fastq.gz --output output
{2}{3}Version:{4}
{2}{3}VERSION:{4}
{1}
""".format(name, __version__, c.bold, c.url, c.end))

Expand Down Expand Up @@ -469,12 +488,19 @@ def parsed_arguments(name, description):
)

# a supported job scheduler, etc.
subparser_run.add_argument(
'-C', '--coa',
action="store_true",
required = False,
help = argparse.SUPPRESS
)
# subparser_run.add_argument(
# '-C', '--coa',
# action="store_true",
# required = False,
# help = argparse.SUPPRESS
# )

# subparser_run.add_argument(
# '-R', '--rnacoa',
# action="store_true",
# required = False,
# help = argparse.SUPPRESS
# )

# Name of master job
subparser_run.add_argument(
Expand Down Expand Up @@ -529,7 +555,7 @@ def parsed_arguments(name, description):
'--tmp-dir',
type = str,
required = False,
default = '/lscratch/$SLURM_JOBID/',
default = '/lscratch/$SLURM_JOB_ID/',
help = argparse.SUPPRESS
)

Expand Down
61 changes: 37 additions & 24 deletions src/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@

from . import version as __version__

FASTQ_INPUT_EXT = ".fastq.gz"
FASTQ_R1_POSTFIX = f"_R1{FASTQ_INPUT_EXT}"
FASTQ_R2_POSTFIX = f"_R2{FASTQ_INPUT_EXT}"


def init(repo_path, output_path, links=[], required=['workflow', 'resources', 'config']):
"""Initialize the output directory. If user provides a output
Expand Down Expand Up @@ -125,19 +129,19 @@ def rename(filename):
# key = regex to match string and value = how it will be renamed
extensions = {
# Matches: _R[12]_fastq.gz, _R[12].fastq.gz, _R[12]_fq.gz, etc.
".R1.f(ast)?q.gz$": ".R1.fastq.gz",
".R2.f(ast)?q.gz$": ".R2.fastq.gz",
".R1.f(ast)?q.gz$": FASTQ_R1_POSTFIX,
".R2.f(ast)?q.gz$": FASTQ_R2_POSTFIX,
# Matches: _R[12]_001_fastq_gz, _R[12].001.fastq.gz, _R[12]_001.fq.gz, etc.
# Capture lane information as named group
".R1.(?P<lane>...).f(ast)?q.gz$": ".R1.fastq.gz",
".R2.(?P<lane>...).f(ast)?q.gz$": ".R2.fastq.gz",
".R1.(?P<lane>...).f(ast)?q.gz$": FASTQ_R1_POSTFIX,
".R2.(?P<lane>...).f(ast)?q.gz$": FASTQ_R2_POSTFIX,
# Matches: _[12].fastq.gz, _[12].fq.gz, _[12]_fastq_gz, etc.
"_1.f(ast)?q.gz$": ".R1.fastq.gz",
"_2.f(ast)?q.gz$": ".R2.fastq.gz"
"_1.f(ast)?q.gz$": FASTQ_R1_POSTFIX,
"_2.f(ast)?q.gz$": FASTQ_R2_POSTFIX
}

if (filename.endswith('.R1.fastq.gz') or
filename.endswith('.R2.fastq.gz')):
if (filename.endswith(FASTQ_R1_POSTFIX) or
filename.endswith(FASTQ_R2_POSTFIX)):
# Filename is already in the correct format
return filename

Expand Down Expand Up @@ -349,7 +353,7 @@ def mixed_inputs(ifiles):
fastqs = False
bams = False
for file in ifiles:
if file.endswith('.R1.fastq.gz') or file.endswith('.R2.fastq.gz'):
if file.endswith(FASTQ_R1_POSTFIX) or file.endswith(FASTQ_R2_POSTFIX):
fastqs = True
fq_files.append(file)
elif file.endswith('.bam'):
Expand Down Expand Up @@ -395,13 +399,17 @@ def add_user_information(config):
config['project']['userhome'] = home
config['project']['username'] = username

dt = datetime.now().strftime("%m_%d_%Y")
config['project']['id'] = f"{dt}_metagenome_results"
# dt = datetime.now().strftime("%m_%d_%Y")
# config['project']['id'] = f"{dt}_metagenome_results"

# TODO: figure up way to uniquely ID results, engineering out
# the problem of misidentifing results files
config['project']['id'] = "metagenome_results"

return config


def add_sample_metadata(input_files, config, rna_files=None, group=None):
def add_sample_metadata(input_files, config, group_key='samples'):
"""Adds sample metadata such as sample basename, label, and group information.
If sample sheet is provided, it will default to using information in that file.
If no sample sheet is provided, it will only add sample basenames and labels.
Expand All @@ -415,14 +423,14 @@ def add_sample_metadata(input_files, config, rna_files=None, group=None):
Updated config with basenames, labels, and groups (if provided)
"""
added = []
config['samples'] = []
config[group_key] = []
for file in input_files:
# Split sample name on file extension
sample = re.split('\.R[12]\.fastq\.gz', os.path.basename(file))[0]
sample = re.split('[\S]R[12]', os.path.basename(file))[0]
if sample not in added:
# Only add PE sample information once
added.append(sample)
config['samples'].append(sample)
config[group_key].append(sample)
return config


Expand Down Expand Up @@ -453,11 +461,15 @@ def add_rawdata_information(sub_args, config, ifiles):
config['project']['filetype'] = convert[nends]

# Finds the set of rawdata directories to bind
rawdata_paths = get_rawdata_bind_paths(input_files = sub_args.input)
config['project']['datapath'] = ','.join(rawdata_paths)
config['project']['datapath'] = ','.join(get_rawdata_bind_paths(input_files = sub_args.input))
if sub_args.rna:
config["project"]["rna_datapath"] = ','.join(get_rawdata_bind_paths(input_files = sub_args.rna))

# Add each sample's basename
config = add_sample_metadata(ifiles['dna'], config)

if 'rna' in ifiles:
config = add_sample_metadata(ifiles['rna'], config, group_key='rna')

return config

Expand Down Expand Up @@ -517,7 +529,7 @@ def get_nends(ifiles):
bam_files = True
nends_status = -1
break
elif file.endswith('.R2.fastq.gz'):
elif file.endswith(FASTQ_R2_POSTFIX):
paired_end = True
nends_status = 2
break # dataset is paired-end
Expand All @@ -528,7 +540,7 @@ def get_nends(ifiles):
nends = {} # keep count of R1 and R2 for each sample
for file in ifiles:
# Split sample name on file extension
sample = re.split('\.R[12]\.fastq\.gz', os.path.basename(file))[0]
sample = re.split('\_R[12]\.fastq\.gz', os.path.basename(file))[0]
if sample not in nends:
nends[sample] = 0

Expand All @@ -542,8 +554,8 @@ def get_nends(ifiles):
both mates (R1 and R2) for the following samples:\n\t\t{}\n
Please check that the basename for each sample is consistent across mates.
Here is an example of a consistent basename across mates:
consistent_basename.R1.fastq.gz
consistent_basename.R2.fastq.gz
consistent_basename_R1.fastq.gz
consistent_basename_R2.fastq.gz
Please do not run the pipeline with a mixture of single-end and paired-end
samples. This feature is currently not supported within {}, and it is
Expand Down Expand Up @@ -635,7 +647,7 @@ def runner(
threads=2,
jobname=__job_name__,
submission_script='run.sh',
tmp_dir = '/lscratch/$SLURM_JOBID/'
tmp_dir = '/lscratch/$SLURM_JOB_ID/'
):
"""Runs the pipeline via selected executor: local or slurm.
If 'local' is selected, the pipeline is executed locally on a compute node/instance.
Expand Down Expand Up @@ -716,10 +728,11 @@ def runner(
# snakemake API call: https://snakemake.readthedocs.io/en/stable/api_reference/snakemake.html
masterjob = subprocess.Popen([
'snakemake', '-pr',
#'--rerun-incomplete',
'--rerun-incomplete',
'--rerun-triggers input',
'--verbose',
'--use-singularity',
'--singularity-args', "\\-C \\-B '{}'".format(bindpaths),
'--singularity-args', "\\-c \\-B '{}'".format(bindpaths),
'--cores', str(threads),
'--configfile=config.json'
], cwd = outdir, stderr=subprocess.STDOUT, stdout=logger, env=my_env)
Expand Down
36 changes: 24 additions & 12 deletions src/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -209,11 +209,10 @@ function submit(){
if [[ ${6#\'} != /lscratch* ]]; then
CLUSTER_OPTS="sbatch --cpus-per-task {cluster.threads} -p {cluster.partition} -t {cluster.time} --mem {cluster.mem} --job-name={params.rname} -e $SLURM_DIR/slurm-%j_{params.rname}.out -o $SLURM_DIR/slurm-%j_{params.rname}.out"
fi
# Create sbacth script to build index
cat << EOF > kickoff.sh
#!/usr/bin/env bash
#SBATCH --cpus-per-task=16
#SBATCH --mem=96g
#SBATCH --cpus-per-task=16
#SBATCH --mem=32g
#SBATCH --time=5-00:00:00
#SBATCH --parsable
#SBATCH -J "$2"
Expand All @@ -222,16 +221,29 @@ function submit(){
#SBATCH --error "$3/logfiles/snakemake.log"
set -euo pipefail
# Main process of pipeline
snakemake --latency-wait 120 -s "$3/workflow/Snakefile" -d "$3" \\
--use-singularity --singularity-args "\\-C \\-B '$4'" \\
--use-envmodules --verbose --configfile="$3/config.json" \\
--printshellcmds --cluster-config "$3/config/cluster.json" \\
--cluster "${CLUSTER_OPTS}" --keep-going -j 500 \\
--rerun-incomplete --stats "$3/logfiles/runtime_statistics.json" \\
--keep-incomplete --restart-times 0 \\
--keep-remote --local-cores 14 2>&1
snakemake \\
-p \\
--latency-wait 120 \\
-s "$3/workflow/Snakefile" \\
-d "$3" \\
--use-singularity \\
--singularity-args "\\-c \\-B '$4'" \\
--use-envmodules \\
--verbose \\
--configfile "$3/config.json" \\
--printshellcmds \\
--cluster-config $3/config/cluster.json \\
--cluster "${CLUSTER_OPTS}" \\
--keep-going \\
--rerun-incomplete \\
--jobs 500 \\
--keep-remote \\
--stats "$3/logfiles/runtime_statistics.json" \\
--restart-times 0 \\
--keep-incomplete \\
--local-cores "14" 2>&1
# Create summary report
snakemake -d "$3" --report "Snakemake_Report.html"
snakemake -s "$3/workflow/Snakefile" -d "$3" --configfile="$3/config.json" --report "Snakemake_Report.html"
EOF
chmod +x kickoff.sh
job_id=$(sbatch kickoff.sh | tee -a "$3"/logfiles/master.log)
Expand Down

0 comments on commit e49c5ab

Please sign in to comment.