diff --git a/Docker/Dockerfile_vis b/Docker/Dockerfile_vis index dfa7f7d..3f81e5b 100644 --- a/Docker/Dockerfile_vis +++ b/Docker/Dockerfile_vis @@ -16,19 +16,21 @@ ENV conda_env="mags_vis" USER root RUN \ - micromamba create -n $conda_env -c conda-forge -c bioconda -c dnachun -c defaults -y python=3.7 pip microbeannotator=2.0.5 pandas scipy matplotlib seaborn krona && \ - eval "$(micromamba shell hook --shell bash)" && \ - micromamba activate $conda_env && \ + micromamba create -n $conda_env -c conda-forge -c bioconda -c dnachun -c defaults -y python=3.7 \ + pip microbeannotator=2.0.5 pandas scipy matplotlib seaborn krona && \ + chmod 775 /opt/conda/envs/$conda_env/lib/python3.7/site-packages/microbeannotator/pipeline/*.py && \ + micromamba clean -y -a + +ENV PATH="${PATH}:/opt/conda/envs/$conda_env/bin" + +RUN \ pip3 install hmmer==0.1.0 pymupdf==1.22.5 && \ - chmod 775 /opt/conda/envs/$conda_env/lib/python3.7/site-packages/microbeannotator/pipeline/ko_mapper.py && \ ln -s /opt/conda/envs/$conda_env/lib/python3.7/site-packages/microbeannotator/pipeline/ko_mapper.py /opt/conda/envs/$conda_env/bin/ && \ ln -s /opt/conda/envs/mags_vis/lib/python3.7/site-packages/microbeannotator/data /opt/conda/envs/mags_vis/data && \ - micromamba clean -y -a && pip cache purge + pip3 cache purge ADD *.py /opt/conda/envs/$conda_env/bin/ -ENV PATH="${PATH}:/opt/conda/envs/$conda_env/bin" - WORKDIR /data CMD ["/bin/bash"] diff --git a/Docker/create_tarfiles.py b/Docker/create_tarfiles.py index 112a7ba..110e4f2 100755 --- a/Docker/create_tarfiles.py +++ b/Docker/create_tarfiles.py @@ -10,10 +10,13 @@ import pandas as pd from pathlib import Path import fitz -from zipfile import ZipFile from gff2txt import parse_cog_tigr_cathfunfam_smart_supfam_input_gff_files +from multiprocessing import Pool +from time import time + + +__version__ = "0.6.0" -__version__="0.5.0" # File extension mapping EXTENSION_MAPPING = { @@ -24,51 +27,83 @@ "ko.tsv": ".ko.txt", "gene_phylogeny.tsv": ".phylodist.txt", "ec.tsv": ".ec.txt", + "supfam.gff": ".supfam.txt", "pfam.gff": ".pfam.txt", "tigrfam.gff": ".tigr.txt", "cath_funfam.gff": ".cathfunfam.txt", "smart.gff": ".smart.txt", - "supfam.gff": ".supfam.txt", "functional_annotation.gff": ".gff" } -# Filter functions -def filter_gff(input_file, output_file, contig_ids): - with open(output_file, "w") as out_file: - with open(input_file) as in_file: - for line in in_file: - contig_id = line.rstrip().split()[0] - if contig_id in contig_ids: - out_file.write(line) -def filter_faa(input_file, output_file, contig_ids): - with open(output_file, "w") as out_file: - with open(input_file) as in_file: - for line in in_file: - if line.startswith(">"): - file_id = line[1:].rstrip().split()[0] - contig_id = "_".join(file_id.split("_")[0:-2]) - contig_prefix = "-".join(file_id.split("-")[0:2]) - if contig_prefix.startswith("nmdc:") and len(contig_ids) > 0 and contig_prefix not in contig_ids[0]: - print(f"{contig_prefix} not part of {contig_ids[0]}, Please check the mapping file.", file=sys.stderr) - sys.exit(1) - if contig_id in contig_ids: - out_file.write(line) - -def filter_inp(input_file, output_file, contig_ids): - with open(output_file, "w") as out_file: - with open(input_file) as in_file: - for line in in_file: - file_id = line.rstrip().split()[0] - contig_id = "_".join(file_id.split("_")[0:-2]) - if contig_id in contig_ids: - out_file.write(line) +def get_contig_faa(line, contig_id): + if line.startswith(">"): + file_id = line[1:].rstrip().split()[0] + contig_id = "_".join(file_id.split("_")[0:-2]) + return contig_id + + +def get_contig_gff(line, contig_id): + return line.rstrip().split()[0] + + +def get_contig_tsv(line, contig_id): + file_id = line.rstrip().split()[0] + return "_".join(file_id.split("_")[0:-2]) + + +def filter_one_pass(input_file, prefix, mags_data, ext, filter_func, + post=None): + """ + This function reads an input and splits it out into files for each + MAG. + + Inputs: + input_file: input filename + prefix: prefix to use for each output file + mags_data: list of dictionary objects with data for each MAG + ext: extension to use + filter_func: Function that will be called on each line to extract + the contig_id + post: optional function to call at the end given a list of output_files + """ + contig_fd_map = dict() + fd_list = [] + output_files = [] + # This will open an output file for each bin and then + # create a map for the contigs to these open files + for mag in mags_data: + bin_id = mag['bin_name'] + contig_ids = mag['members_id'] + output_dir = mag['output_dir'] + output_filename = f"{prefix}_{bin_id}{ext}" + output_file = f"{output_dir}/{output_filename}" + output_files.append(output_file) + fd = open(output_file, 'w') + fd_list.append(fd) + for contig_id in contig_ids: + if contig_id not in contig_fd_map: + contig_fd_map[contig_id] = [] + contig_fd_map[contig_id].append(fd) + contig_id = None + # Now we go through the input file and demultiplex + # the output. + with open(input_file) as in_file: + for line in in_file: + # Extract the contig_id using the provided + # function. + contig_id = filter_func(line, contig_id) + if contig_id in contig_fd_map: + for fd in contig_fd_map[contig_id]: + fd.write(line) + # Cleanup + for fd in fd_list: + fd.close() + + # Call optional post function + if post: + post(output_files) -def filter_txt(input_file, output_file, contig_ids): - temp_out = "tmp.gff" - filter_inp(input_file, temp_out, contig_ids) - parse_cog_tigr_cathfunfam_smart_supfam_input_gff_files(temp_out, output_file, "bogus", "bogus") - os.unlink(temp_out) def find_extension(input_file): for pattern, extension in EXTENSION_MAPPING.items(): @@ -76,52 +111,73 @@ def find_extension(input_file): return extension return None -def get_ko_list(input_file,output_file,contig_ids): + +def parse_gffs(output_files): + """ + post function to run on gff->txt outputs + """ + for temp_out in output_files: + output_file = temp_out.replace(".tmp", "") + parse_cog_tigr_cathfunfam_smart_supfam_input_gff_files(temp_out, + output_file, + "bogus", + "bogus") + os.unlink(temp_out) + + +def write_kos(output_files): + """ + Post function to extract the list of KOs + """ + for in_file in output_files: + bin_id = in_file.split(".")[-3] + output_file = f"bins.{bin_id}.ko" + write_ko_list(in_file, output_file) + + +def write_ko_list(input_file, output_file): with open(output_file, "w") as out_file: with open(input_file) as in_file: for line in in_file: ko_id = line.rstrip().split()[2] - file_id = line.rstrip().split()[0] - contig_id = "_".join(file_id.split("_")[0:-2]) - if contig_id in contig_ids: - out_file.write(ko_id.replace("KO:","") + "\n") - -def get_bin_annotations(prefix, bin_id, bin_file, inputs, contig_ids, output_dir): - - if not os.path.exists(output_dir): - os.mkdir(output_dir) - # Create the contigs file - output_filename = f"{prefix}_{bin_id}.fna" - shutil.copy(bin_file, os.path.join(output_dir, output_filename)) - output_files = [output_filename] + out_file.write(ko_id.replace("KO:", "") + "\n") + +def rewrite_files(prefix, inputs, mags): for input_file in inputs: # Find the input type extension = find_extension(input_file) if not extension: sys.stderr.write(f"Unknown type: {input_file}\n") continue - - output_filename = f"{prefix}_{bin_id}{extension}" - output_file = f"{output_dir}/{output_filename}" - output_files.append(output_filename) - if extension.endswith(".faa"): - filter_faa(input_file, output_file, contig_ids) - elif extension.endswith(".gff"): - filter_gff(input_file, output_file, contig_ids) + + start = time() + post = None + if input_file.endswith(".faa"): + filter_func = get_contig_faa elif input_file.endswith(".gff") and extension.endswith(".txt"): - filter_txt(input_file, output_file, contig_ids) + filter_func = get_contig_tsv + extension += ".tmp" + post = parse_gffs + elif extension.endswith(".gff"): + filter_func = get_contig_gff elif input_file.endswith("ko.tsv"): - get_ko_list(input_file, f"{bin_id}.ko", contig_ids) - filter_inp(input_file, output_file, contig_ids) + filter_func = get_contig_tsv + post = write_kos else: - filter_inp(input_file, output_file, contig_ids) + filter_func = get_contig_tsv + filter_one_pass(input_file, prefix, mags, extension, filter_func, + post=post) + print(f" - {input_file.split('/')[-1]}: {time()-start:.3f}s") + def ko_analysis(prefix): ko_list = glob.glob("*.ko") if ko_list: - cmd = ["ko_mapper.py" , "-i"] + sorted(ko_list) + [ "-p" , prefix] - proc = subprocess.Popen(shlex.split(" ".join(cmd)), shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + cmd = ["ko_mapper.py", "-i"] + sorted(ko_list) + ["-p", prefix] + proc = subprocess.Popen(shlex.split(" ".join(cmd)), shell=False, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) outs, errs = proc.communicate() if proc.returncode == 0: if os.path.exists(f"{prefix}_barplot.pdf"): @@ -130,9 +186,10 @@ def ko_analysis(prefix): pdf_to_png(f"{prefix}_heatmap.pdf") return f"{prefix}_module_completeness.tab" else: - print(errs.decode().rstrip(),file=sys.stderr) + print(errs.decode().rstrip(), file=sys.stderr) sys.exit() + def pdf_to_png(pdf): prefix = Path(pdf).stem doc = fitz.open(pdf) # open document @@ -142,35 +199,57 @@ def pdf_to_png(pdf): pix.save(f"{prefix}.png") # store image as a PNG return True -def krona_plot(ko_result,prefix): + +def krona_plot(ko_result, prefix): ko_list = glob.glob("*.ko") if ko_list: - df = pd.read_csv(ko_result,sep="\t") - krona_list=[] + df = pd.read_csv(ko_result, sep="\t") + krona_list = [] for ko in ko_list: krona_list.append(f"{ko}.krona") - df[[ko,'pathway group','name']].to_csv(f"{ko}.krona",sep="\t",index=False,header=False) - cmd=["ktImportText"] + krona_list + [ "-o" , f"{prefix}_ko_krona.html"] - proc = subprocess.Popen(shlex.split(" ".join(cmd)), shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + df[[ko, 'pathway group', 'name']].to_csv(f"{ko}.krona", sep="\t", + index=False, + header=False) + cmd = ["ktImportText"] + krona_list + ["-o", f"{prefix}_ko_krona.html"] + proc = subprocess.Popen(shlex.split(" ".join(cmd)), shell=False, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) outs, errs = proc.communicate() if proc.returncode == 0: return f"{prefix}_ko_krona.html" else: print(errs.decode().rstrip(), file=sys.stderr) return - -def create_new_zip(bin_dirs): - for dir in bin_dirs: - tar_file_name = f"{dir}.tar.gz" - with tarfile.open(tar_file_name, "w:gz") as tar: - for output_file_name in glob.glob(f"{dir}/*",recursive=True): - tar.add(f"{output_file_name}", arcname=output_file_name) - + + +def create_tar_file(bin_dir): + tar_file_name = f"{bin_dir}.tar.gz" + with tarfile.open(tar_file_name, "w:gz") as tar: + for output_file_name in glob.glob(f"{bin_dir}/*", recursive=True): + tar.add(f"{output_file_name}", arcname=output_file_name) + + +def create_tarfiles(bin_dirs, threads): + """ + This parallelize the creation of the tar files + """ + def error_cb(e): + raise e + + p = Pool(threads) + for bin_dir in bin_dirs: + p.apply_async(create_tar_file, args=(bin_dir, ), + error_callback=error_cb) + p.close() + p.join() + + if __name__ == "__main__": data = None input_files = [] bin_files_dict = {} bin_dirs = [] + threads = int(os.environ.get("THREADS", "32")) prefix = sys.argv[1] for file in sys.argv[2:]: file_name = os.path.basename(file) @@ -183,14 +262,21 @@ def create_new_zip(bin_dirs): input_files.append(file) for bin_data in data['mags_list']: if bin_data['bin_quality'] in ['MQ', 'HQ', 'LQ']: - print(f"Processing {bin_data['bin_name']}") bin_id = bin_data['bin_name'] contig_ids = bin_data['members_id'] bin_file = bin_files_dict[bin_id] output_dir = f"{prefix}_{bin_id}_{bin_data['bin_quality']}" + if not os.path.exists(output_dir): + os.mkdir(output_dir) + bin_data['output_dir'] = output_dir bin_dirs.append(output_dir) - get_bin_annotations(prefix, bin_id, bin_file, input_files, contig_ids, output_dir) - + output_filename = f"{prefix}_{bin_id}.fna" + shutil.copy(bin_file, os.path.join(output_dir, output_filename)) + print(f"Processing {len(bin_dirs)} mags") + rewrite_files(prefix, input_files, data['mags_list']) + print("Generating KOs") ko_result = ko_analysis(prefix) - krona_plot(ko_result,prefix) - create_new_zip(bin_dirs) + print("Generating Krona Plot") + krona_plot(ko_result, prefix) + print("Generating zip") + create_tarfiles(bin_dirs, threads)