Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Speedup packing #45

Merged
merged 7 commits into from
Aug 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 9 additions & 7 deletions Docker/Dockerfile_vis
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,21 @@ ENV conda_env="mags_vis"
USER root

RUN \
micromamba create -n $conda_env -c conda-forge -c bioconda -c dnachun -c defaults -y python=3.7 pip microbeannotator=2.0.5 pandas scipy matplotlib seaborn krona && \
eval "$(micromamba shell hook --shell bash)" && \
micromamba activate $conda_env && \
micromamba create -n $conda_env -c conda-forge -c bioconda -c dnachun -c defaults -y python=3.7 \
pip microbeannotator=2.0.5 pandas scipy matplotlib seaborn krona && \
chmod 775 /opt/conda/envs/$conda_env/lib/python3.7/site-packages/microbeannotator/pipeline/*.py && \
scanon marked this conversation as resolved.
Show resolved Hide resolved
micromamba clean -y -a

ENV PATH="${PATH}:/opt/conda/envs/$conda_env/bin"

RUN \
pip3 install hmmer==0.1.0 pymupdf==1.22.5 && \
chmod 775 /opt/conda/envs/$conda_env/lib/python3.7/site-packages/microbeannotator/pipeline/ko_mapper.py && \
ln -s /opt/conda/envs/$conda_env/lib/python3.7/site-packages/microbeannotator/pipeline/ko_mapper.py /opt/conda/envs/$conda_env/bin/ && \
ln -s /opt/conda/envs/mags_vis/lib/python3.7/site-packages/microbeannotator/data /opt/conda/envs/mags_vis/data && \
micromamba clean -y -a && pip cache purge
pip3 cache purge

ADD *.py /opt/conda/envs/$conda_env/bin/

ENV PATH="${PATH}:/opt/conda/envs/$conda_env/bin"

WORKDIR /data

CMD ["/bin/bash"]
Expand Down
258 changes: 172 additions & 86 deletions Docker/create_tarfiles.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,13 @@
import pandas as pd
from pathlib import Path
import fitz
from zipfile import ZipFile
from gff2txt import parse_cog_tigr_cathfunfam_smart_supfam_input_gff_files
from multiprocessing import Pool
from time import time


__version__ = "0.6.0"

__version__="0.5.0"

# File extension mapping
EXTENSION_MAPPING = {
Expand All @@ -24,104 +27,157 @@
"ko.tsv": ".ko.txt",
"gene_phylogeny.tsv": ".phylodist.txt",
"ec.tsv": ".ec.txt",
"supfam.gff": ".supfam.txt",
"pfam.gff": ".pfam.txt",
"tigrfam.gff": ".tigr.txt",
"cath_funfam.gff": ".cathfunfam.txt",
"smart.gff": ".smart.txt",
"supfam.gff": ".supfam.txt",
"functional_annotation.gff": ".gff"
}

# Filter functions
def filter_gff(input_file, output_file, contig_ids):
with open(output_file, "w") as out_file:
with open(input_file) as in_file:
for line in in_file:
contig_id = line.rstrip().split()[0]
if contig_id in contig_ids:
out_file.write(line)

def filter_faa(input_file, output_file, contig_ids):
with open(output_file, "w") as out_file:
with open(input_file) as in_file:
for line in in_file:
if line.startswith(">"):
file_id = line[1:].rstrip().split()[0]
contig_id = "_".join(file_id.split("_")[0:-2])
contig_prefix = "-".join(file_id.split("-")[0:2])
if contig_prefix.startswith("nmdc:") and len(contig_ids) > 0 and contig_prefix not in contig_ids[0]:
print(f"{contig_prefix} not part of {contig_ids[0]}, Please check the mapping file.", file=sys.stderr)
sys.exit(1)
if contig_id in contig_ids:
out_file.write(line)

def filter_inp(input_file, output_file, contig_ids):
with open(output_file, "w") as out_file:
with open(input_file) as in_file:
for line in in_file:
file_id = line.rstrip().split()[0]
contig_id = "_".join(file_id.split("_")[0:-2])
if contig_id in contig_ids:
out_file.write(line)
def get_contig_faa(line, contig_id):
scanon marked this conversation as resolved.
Show resolved Hide resolved
if line.startswith(">"):
file_id = line[1:].rstrip().split()[0]
contig_id = "_".join(file_id.split("_")[0:-2])
return contig_id


def get_contig_gff(line, contig_id):
return line.rstrip().split()[0]


def get_contig_tsv(line, contig_id):
file_id = line.rstrip().split()[0]
return "_".join(file_id.split("_")[0:-2])


def filter_one_pass(input_file, prefix, mags_data, ext, filter_func,
post=None):
"""
This function reads an input and splits it out into files for each
MAG.

Inputs:
input_file: input filename
prefix: prefix to use for each output file
mags_data: list of dictionary objects with data for each MAG
ext: extension to use
filter_func: Function that will be called on each line to extract
the contig_id
post: optional function to call at the end given a list of output_files
"""
contig_fd_map = dict()
fd_list = []
output_files = []
# This will open an output file for each bin and then
# create a map for the contigs to these open files
for mag in mags_data:
bin_id = mag['bin_name']
contig_ids = mag['members_id']
output_dir = mag['output_dir']
output_filename = f"{prefix}_{bin_id}{ext}"
output_file = f"{output_dir}/{output_filename}"
output_files.append(output_file)
fd = open(output_file, 'w')
fd_list.append(fd)
for contig_id in contig_ids:
if contig_id not in contig_fd_map:
contig_fd_map[contig_id] = []
contig_fd_map[contig_id].append(fd)
contig_id = None
# Now we go through the input file and demultiplex
# the output.
with open(input_file) as in_file:
for line in in_file:
# Extract the contig_id using the provided
# function.
contig_id = filter_func(line, contig_id)
if contig_id in contig_fd_map:
for fd in contig_fd_map[contig_id]:
fd.write(line)
# Cleanup
for fd in fd_list:
fd.close()

# Call optional post function
if post:
post(output_files)

def filter_txt(input_file, output_file, contig_ids):
temp_out = "tmp.gff"
filter_inp(input_file, temp_out, contig_ids)
parse_cog_tigr_cathfunfam_smart_supfam_input_gff_files(temp_out, output_file, "bogus", "bogus")
os.unlink(temp_out)

def find_extension(input_file):
for pattern, extension in EXTENSION_MAPPING.items():
if pattern in input_file:
return extension
return None

def get_ko_list(input_file,output_file,contig_ids):

def parse_gffs(output_files):
"""
post function to run on gff->txt outputs
"""
for temp_out in output_files:
output_file = temp_out.replace(".tmp", "")
parse_cog_tigr_cathfunfam_smart_supfam_input_gff_files(temp_out,
output_file,
"bogus",
"bogus")
os.unlink(temp_out)


def write_kos(output_files):
"""
Post function to extract the list of KOs
"""
for in_file in output_files:
bin_id = in_file.split(".")[-3]
output_file = f"bins.{bin_id}.ko"
write_ko_list(in_file, output_file)


def write_ko_list(input_file, output_file):
with open(output_file, "w") as out_file:
with open(input_file) as in_file:
for line in in_file:
ko_id = line.rstrip().split()[2]
file_id = line.rstrip().split()[0]
contig_id = "_".join(file_id.split("_")[0:-2])
if contig_id in contig_ids:
out_file.write(ko_id.replace("KO:","") + "\n")

def get_bin_annotations(prefix, bin_id, bin_file, inputs, contig_ids, output_dir):

if not os.path.exists(output_dir):
os.mkdir(output_dir)
# Create the contigs file
output_filename = f"{prefix}_{bin_id}.fna"
shutil.copy(bin_file, os.path.join(output_dir, output_filename))
output_files = [output_filename]
out_file.write(ko_id.replace("KO:", "") + "\n")


def rewrite_files(prefix, inputs, mags):
for input_file in inputs:
# Find the input type
extension = find_extension(input_file)
if not extension:
sys.stderr.write(f"Unknown type: {input_file}\n")
continue

output_filename = f"{prefix}_{bin_id}{extension}"
output_file = f"{output_dir}/{output_filename}"
output_files.append(output_filename)
if extension.endswith(".faa"):
filter_faa(input_file, output_file, contig_ids)
elif extension.endswith(".gff"):
filter_gff(input_file, output_file, contig_ids)

start = time()
post = None
if input_file.endswith(".faa"):
filter_func = get_contig_faa
elif input_file.endswith(".gff") and extension.endswith(".txt"):
filter_txt(input_file, output_file, contig_ids)
filter_func = get_contig_tsv
extension += ".tmp"
post = parse_gffs
elif extension.endswith(".gff"):
filter_func = get_contig_gff
elif input_file.endswith("ko.tsv"):
get_ko_list(input_file, f"{bin_id}.ko", contig_ids)
filter_inp(input_file, output_file, contig_ids)
filter_func = get_contig_tsv
post = write_kos
else:
filter_inp(input_file, output_file, contig_ids)
filter_func = get_contig_tsv
filter_one_pass(input_file, prefix, mags, extension, filter_func,
post=post)
print(f" - {input_file.split('/')[-1]}: {time()-start:.3f}s")


def ko_analysis(prefix):
ko_list = glob.glob("*.ko")
if ko_list:
cmd = ["ko_mapper.py" , "-i"] + sorted(ko_list) + [ "-p" , prefix]
proc = subprocess.Popen(shlex.split(" ".join(cmd)), shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
cmd = ["ko_mapper.py", "-i"] + sorted(ko_list) + ["-p", prefix]
proc = subprocess.Popen(shlex.split(" ".join(cmd)), shell=False,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
outs, errs = proc.communicate()
if proc.returncode == 0:
if os.path.exists(f"{prefix}_barplot.pdf"):
Expand All @@ -130,9 +186,10 @@ def ko_analysis(prefix):
pdf_to_png(f"{prefix}_heatmap.pdf")
return f"{prefix}_module_completeness.tab"
else:
print(errs.decode().rstrip(),file=sys.stderr)
print(errs.decode().rstrip(), file=sys.stderr)
sys.exit()


def pdf_to_png(pdf):
prefix = Path(pdf).stem
doc = fitz.open(pdf) # open document
Expand All @@ -142,35 +199,57 @@ def pdf_to_png(pdf):
pix.save(f"{prefix}.png") # store image as a PNG
return True

def krona_plot(ko_result,prefix):

def krona_plot(ko_result, prefix):
ko_list = glob.glob("*.ko")
if ko_list:
df = pd.read_csv(ko_result,sep="\t")
krona_list=[]
df = pd.read_csv(ko_result, sep="\t")
krona_list = []
for ko in ko_list:
krona_list.append(f"{ko}.krona")
df[[ko,'pathway group','name']].to_csv(f"{ko}.krona",sep="\t",index=False,header=False)
cmd=["ktImportText"] + krona_list + [ "-o" , f"{prefix}_ko_krona.html"]
proc = subprocess.Popen(shlex.split(" ".join(cmd)), shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
df[[ko, 'pathway group', 'name']].to_csv(f"{ko}.krona", sep="\t",
index=False,
header=False)
cmd = ["ktImportText"] + krona_list + ["-o", f"{prefix}_ko_krona.html"]
proc = subprocess.Popen(shlex.split(" ".join(cmd)), shell=False,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
outs, errs = proc.communicate()
if proc.returncode == 0:
return f"{prefix}_ko_krona.html"
else:
print(errs.decode().rstrip(), file=sys.stderr)
return

def create_new_zip(bin_dirs):
for dir in bin_dirs:
tar_file_name = f"{dir}.tar.gz"
with tarfile.open(tar_file_name, "w:gz") as tar:
for output_file_name in glob.glob(f"{dir}/*",recursive=True):
tar.add(f"{output_file_name}", arcname=output_file_name)



def create_tar_file(bin_dir):
tar_file_name = f"{bin_dir}.tar.gz"
with tarfile.open(tar_file_name, "w:gz") as tar:
for output_file_name in glob.glob(f"{bin_dir}/*", recursive=True):
tar.add(f"{output_file_name}", arcname=output_file_name)


def create_tarfiles(bin_dirs, threads):
"""
This parallelize the creation of the tar files
"""
def error_cb(e):
raise e

p = Pool(threads)
for bin_dir in bin_dirs:
p.apply_async(create_tar_file, args=(bin_dir, ),
error_callback=error_cb)
p.close()
p.join()


if __name__ == "__main__":
data = None
input_files = []
bin_files_dict = {}
bin_dirs = []
threads = int(os.environ.get("THREADS", "32"))
prefix = sys.argv[1]
for file in sys.argv[2:]:
file_name = os.path.basename(file)
Expand All @@ -183,14 +262,21 @@ def create_new_zip(bin_dirs):
input_files.append(file)
for bin_data in data['mags_list']:
if bin_data['bin_quality'] in ['MQ', 'HQ', 'LQ']:
print(f"Processing {bin_data['bin_name']}")
bin_id = bin_data['bin_name']
contig_ids = bin_data['members_id']
bin_file = bin_files_dict[bin_id]
output_dir = f"{prefix}_{bin_id}_{bin_data['bin_quality']}"
if not os.path.exists(output_dir):
os.mkdir(output_dir)
bin_data['output_dir'] = output_dir
bin_dirs.append(output_dir)
get_bin_annotations(prefix, bin_id, bin_file, input_files, contig_ids, output_dir)

output_filename = f"{prefix}_{bin_id}.fna"
shutil.copy(bin_file, os.path.join(output_dir, output_filename))
print(f"Processing {len(bin_dirs)} mags")
rewrite_files(prefix, input_files, data['mags_list'])
print("Generating KOs")
ko_result = ko_analysis(prefix)
krona_plot(ko_result,prefix)
create_new_zip(bin_dirs)
print("Generating Krona Plot")
krona_plot(ko_result, prefix)
print("Generating zip")
create_tarfiles(bin_dirs, threads)
Loading