Skip to content

Commit

Permalink
Merge pull request #45 from microbiomedata/speedup_packing
Browse files Browse the repository at this point in the history
Speedup packing
  • Loading branch information
chienchi authored Aug 5, 2024
2 parents 1880586 + c6bef26 commit b6d75e8
Show file tree
Hide file tree
Showing 2 changed files with 181 additions and 93 deletions.
16 changes: 9 additions & 7 deletions Docker/Dockerfile_vis
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,21 @@ ENV conda_env="mags_vis"
USER root

RUN \
micromamba create -n $conda_env -c conda-forge -c bioconda -c dnachun -c defaults -y python=3.7 pip microbeannotator=2.0.5 pandas scipy matplotlib seaborn krona && \
eval "$(micromamba shell hook --shell bash)" && \
micromamba activate $conda_env && \
micromamba create -n $conda_env -c conda-forge -c bioconda -c dnachun -c defaults -y python=3.7 \
pip microbeannotator=2.0.5 pandas scipy matplotlib seaborn krona && \
chmod 775 /opt/conda/envs/$conda_env/lib/python3.7/site-packages/microbeannotator/pipeline/*.py && \
micromamba clean -y -a

ENV PATH="${PATH}:/opt/conda/envs/$conda_env/bin"

RUN \
pip3 install hmmer==0.1.0 pymupdf==1.22.5 && \
chmod 775 /opt/conda/envs/$conda_env/lib/python3.7/site-packages/microbeannotator/pipeline/ko_mapper.py && \
ln -s /opt/conda/envs/$conda_env/lib/python3.7/site-packages/microbeannotator/pipeline/ko_mapper.py /opt/conda/envs/$conda_env/bin/ && \
ln -s /opt/conda/envs/mags_vis/lib/python3.7/site-packages/microbeannotator/data /opt/conda/envs/mags_vis/data && \
micromamba clean -y -a && pip cache purge
pip3 cache purge

ADD *.py /opt/conda/envs/$conda_env/bin/

ENV PATH="${PATH}:/opt/conda/envs/$conda_env/bin"

WORKDIR /data

CMD ["/bin/bash"]
Expand Down
258 changes: 172 additions & 86 deletions Docker/create_tarfiles.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,13 @@
import pandas as pd
from pathlib import Path
import fitz
from zipfile import ZipFile
from gff2txt import parse_cog_tigr_cathfunfam_smart_supfam_input_gff_files
from multiprocessing import Pool
from time import time


__version__ = "0.6.0"

__version__="0.5.0"

# File extension mapping
EXTENSION_MAPPING = {
Expand All @@ -24,104 +27,157 @@
"ko.tsv": ".ko.txt",
"gene_phylogeny.tsv": ".phylodist.txt",
"ec.tsv": ".ec.txt",
"supfam.gff": ".supfam.txt",
"pfam.gff": ".pfam.txt",
"tigrfam.gff": ".tigr.txt",
"cath_funfam.gff": ".cathfunfam.txt",
"smart.gff": ".smart.txt",
"supfam.gff": ".supfam.txt",
"functional_annotation.gff": ".gff"
}

# Filter functions
def filter_gff(input_file, output_file, contig_ids):
with open(output_file, "w") as out_file:
with open(input_file) as in_file:
for line in in_file:
contig_id = line.rstrip().split()[0]
if contig_id in contig_ids:
out_file.write(line)

def filter_faa(input_file, output_file, contig_ids):
with open(output_file, "w") as out_file:
with open(input_file) as in_file:
for line in in_file:
if line.startswith(">"):
file_id = line[1:].rstrip().split()[0]
contig_id = "_".join(file_id.split("_")[0:-2])
contig_prefix = "-".join(file_id.split("-")[0:2])
if contig_prefix.startswith("nmdc:") and len(contig_ids) > 0 and contig_prefix not in contig_ids[0]:
print(f"{contig_prefix} not part of {contig_ids[0]}, Please check the mapping file.", file=sys.stderr)
sys.exit(1)
if contig_id in contig_ids:
out_file.write(line)

def filter_inp(input_file, output_file, contig_ids):
with open(output_file, "w") as out_file:
with open(input_file) as in_file:
for line in in_file:
file_id = line.rstrip().split()[0]
contig_id = "_".join(file_id.split("_")[0:-2])
if contig_id in contig_ids:
out_file.write(line)
def get_contig_faa(line, contig_id):
if line.startswith(">"):
file_id = line[1:].rstrip().split()[0]
contig_id = "_".join(file_id.split("_")[0:-2])
return contig_id


def get_contig_gff(line, contig_id):
return line.rstrip().split()[0]


def get_contig_tsv(line, contig_id):
file_id = line.rstrip().split()[0]
return "_".join(file_id.split("_")[0:-2])


def filter_one_pass(input_file, prefix, mags_data, ext, filter_func,
post=None):
"""
This function reads an input and splits it out into files for each
MAG.
Inputs:
input_file: input filename
prefix: prefix to use for each output file
mags_data: list of dictionary objects with data for each MAG
ext: extension to use
filter_func: Function that will be called on each line to extract
the contig_id
post: optional function to call at the end given a list of output_files
"""
contig_fd_map = dict()
fd_list = []
output_files = []
# This will open an output file for each bin and then
# create a map for the contigs to these open files
for mag in mags_data:
bin_id = mag['bin_name']
contig_ids = mag['members_id']
output_dir = mag['output_dir']
output_filename = f"{prefix}_{bin_id}{ext}"
output_file = f"{output_dir}/{output_filename}"
output_files.append(output_file)
fd = open(output_file, 'w')
fd_list.append(fd)
for contig_id in contig_ids:
if contig_id not in contig_fd_map:
contig_fd_map[contig_id] = []
contig_fd_map[contig_id].append(fd)
contig_id = None
# Now we go through the input file and demultiplex
# the output.
with open(input_file) as in_file:
for line in in_file:
# Extract the contig_id using the provided
# function.
contig_id = filter_func(line, contig_id)
if contig_id in contig_fd_map:
for fd in contig_fd_map[contig_id]:
fd.write(line)
# Cleanup
for fd in fd_list:
fd.close()

# Call optional post function
if post:
post(output_files)

def filter_txt(input_file, output_file, contig_ids):
temp_out = "tmp.gff"
filter_inp(input_file, temp_out, contig_ids)
parse_cog_tigr_cathfunfam_smart_supfam_input_gff_files(temp_out, output_file, "bogus", "bogus")
os.unlink(temp_out)

def find_extension(input_file):
for pattern, extension in EXTENSION_MAPPING.items():
if pattern in input_file:
return extension
return None

def get_ko_list(input_file,output_file,contig_ids):

def parse_gffs(output_files):
"""
post function to run on gff->txt outputs
"""
for temp_out in output_files:
output_file = temp_out.replace(".tmp", "")
parse_cog_tigr_cathfunfam_smart_supfam_input_gff_files(temp_out,
output_file,
"bogus",
"bogus")
os.unlink(temp_out)


def write_kos(output_files):
"""
Post function to extract the list of KOs
"""
for in_file in output_files:
bin_id = in_file.split(".")[-3]
output_file = f"bins.{bin_id}.ko"
write_ko_list(in_file, output_file)


def write_ko_list(input_file, output_file):
with open(output_file, "w") as out_file:
with open(input_file) as in_file:
for line in in_file:
ko_id = line.rstrip().split()[2]
file_id = line.rstrip().split()[0]
contig_id = "_".join(file_id.split("_")[0:-2])
if contig_id in contig_ids:
out_file.write(ko_id.replace("KO:","") + "\n")

def get_bin_annotations(prefix, bin_id, bin_file, inputs, contig_ids, output_dir):

if not os.path.exists(output_dir):
os.mkdir(output_dir)
# Create the contigs file
output_filename = f"{prefix}_{bin_id}.fna"
shutil.copy(bin_file, os.path.join(output_dir, output_filename))
output_files = [output_filename]
out_file.write(ko_id.replace("KO:", "") + "\n")


def rewrite_files(prefix, inputs, mags):
for input_file in inputs:
# Find the input type
extension = find_extension(input_file)
if not extension:
sys.stderr.write(f"Unknown type: {input_file}\n")
continue

output_filename = f"{prefix}_{bin_id}{extension}"
output_file = f"{output_dir}/{output_filename}"
output_files.append(output_filename)
if extension.endswith(".faa"):
filter_faa(input_file, output_file, contig_ids)
elif extension.endswith(".gff"):
filter_gff(input_file, output_file, contig_ids)

start = time()
post = None
if input_file.endswith(".faa"):
filter_func = get_contig_faa
elif input_file.endswith(".gff") and extension.endswith(".txt"):
filter_txt(input_file, output_file, contig_ids)
filter_func = get_contig_tsv
extension += ".tmp"
post = parse_gffs
elif extension.endswith(".gff"):
filter_func = get_contig_gff
elif input_file.endswith("ko.tsv"):
get_ko_list(input_file, f"{bin_id}.ko", contig_ids)
filter_inp(input_file, output_file, contig_ids)
filter_func = get_contig_tsv
post = write_kos
else:
filter_inp(input_file, output_file, contig_ids)
filter_func = get_contig_tsv
filter_one_pass(input_file, prefix, mags, extension, filter_func,
post=post)
print(f" - {input_file.split('/')[-1]}: {time()-start:.3f}s")


def ko_analysis(prefix):
ko_list = glob.glob("*.ko")
if ko_list:
cmd = ["ko_mapper.py" , "-i"] + sorted(ko_list) + [ "-p" , prefix]
proc = subprocess.Popen(shlex.split(" ".join(cmd)), shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
cmd = ["ko_mapper.py", "-i"] + sorted(ko_list) + ["-p", prefix]
proc = subprocess.Popen(shlex.split(" ".join(cmd)), shell=False,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
outs, errs = proc.communicate()
if proc.returncode == 0:
if os.path.exists(f"{prefix}_barplot.pdf"):
Expand All @@ -130,9 +186,10 @@ def ko_analysis(prefix):
pdf_to_png(f"{prefix}_heatmap.pdf")
return f"{prefix}_module_completeness.tab"
else:
print(errs.decode().rstrip(),file=sys.stderr)
print(errs.decode().rstrip(), file=sys.stderr)
sys.exit()


def pdf_to_png(pdf):
prefix = Path(pdf).stem
doc = fitz.open(pdf) # open document
Expand All @@ -142,35 +199,57 @@ def pdf_to_png(pdf):
pix.save(f"{prefix}.png") # store image as a PNG
return True

def krona_plot(ko_result,prefix):

def krona_plot(ko_result, prefix):
ko_list = glob.glob("*.ko")
if ko_list:
df = pd.read_csv(ko_result,sep="\t")
krona_list=[]
df = pd.read_csv(ko_result, sep="\t")
krona_list = []
for ko in ko_list:
krona_list.append(f"{ko}.krona")
df[[ko,'pathway group','name']].to_csv(f"{ko}.krona",sep="\t",index=False,header=False)
cmd=["ktImportText"] + krona_list + [ "-o" , f"{prefix}_ko_krona.html"]
proc = subprocess.Popen(shlex.split(" ".join(cmd)), shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
df[[ko, 'pathway group', 'name']].to_csv(f"{ko}.krona", sep="\t",
index=False,
header=False)
cmd = ["ktImportText"] + krona_list + ["-o", f"{prefix}_ko_krona.html"]
proc = subprocess.Popen(shlex.split(" ".join(cmd)), shell=False,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
outs, errs = proc.communicate()
if proc.returncode == 0:
return f"{prefix}_ko_krona.html"
else:
print(errs.decode().rstrip(), file=sys.stderr)
return

def create_new_zip(bin_dirs):
for dir in bin_dirs:
tar_file_name = f"{dir}.tar.gz"
with tarfile.open(tar_file_name, "w:gz") as tar:
for output_file_name in glob.glob(f"{dir}/*",recursive=True):
tar.add(f"{output_file_name}", arcname=output_file_name)



def create_tar_file(bin_dir):
tar_file_name = f"{bin_dir}.tar.gz"
with tarfile.open(tar_file_name, "w:gz") as tar:
for output_file_name in glob.glob(f"{bin_dir}/*", recursive=True):
tar.add(f"{output_file_name}", arcname=output_file_name)


def create_tarfiles(bin_dirs, threads):
"""
This parallelize the creation of the tar files
"""
def error_cb(e):
raise e

p = Pool(threads)
for bin_dir in bin_dirs:
p.apply_async(create_tar_file, args=(bin_dir, ),
error_callback=error_cb)
p.close()
p.join()


if __name__ == "__main__":
data = None
input_files = []
bin_files_dict = {}
bin_dirs = []
threads = int(os.environ.get("THREADS", "32"))
prefix = sys.argv[1]
for file in sys.argv[2:]:
file_name = os.path.basename(file)
Expand All @@ -183,14 +262,21 @@ def create_new_zip(bin_dirs):
input_files.append(file)
for bin_data in data['mags_list']:
if bin_data['bin_quality'] in ['MQ', 'HQ', 'LQ']:
print(f"Processing {bin_data['bin_name']}")
bin_id = bin_data['bin_name']
contig_ids = bin_data['members_id']
bin_file = bin_files_dict[bin_id]
output_dir = f"{prefix}_{bin_id}_{bin_data['bin_quality']}"
if not os.path.exists(output_dir):
os.mkdir(output_dir)
bin_data['output_dir'] = output_dir
bin_dirs.append(output_dir)
get_bin_annotations(prefix, bin_id, bin_file, input_files, contig_ids, output_dir)

output_filename = f"{prefix}_{bin_id}.fna"
shutil.copy(bin_file, os.path.join(output_dir, output_filename))
print(f"Processing {len(bin_dirs)} mags")
rewrite_files(prefix, input_files, data['mags_list'])
print("Generating KOs")
ko_result = ko_analysis(prefix)
krona_plot(ko_result,prefix)
create_new_zip(bin_dirs)
print("Generating Krona Plot")
krona_plot(ko_result, prefix)
print("Generating zip")
create_tarfiles(bin_dirs, threads)

0 comments on commit b6d75e8

Please sign in to comment.