Skip to content

Commit

Permalink
Fixes to BLAST functionality and refactoring (#39)
Browse files Browse the repository at this point in the history
CHANGELOG

Changes around BLAST functionality:

    Off-target amplicons are now reported as intended in the logs
    Off-target amplicons are now always considered last for the final
    scheme (no penalty is used, but the fact that they had BLAST matches
    gets recorded)
    The BLAST_PENALTY config option is no longer needed and has been removed
    Added a new off_target_amplicons column to the qPCR design, the qPCR primer and the (single/tiled) primer tsv outputs to indicate which final amplicons had BLAST hits

General reporting changes:

    Amplicon numbering now proceeds from 5' to 3' even across pools for the tiled mode and from lowest penalty to highest for the other modes (previously additional BLAST penalties weren't considered during penalty-sorting.
    In the primer bed file output in tiled mode, primers are now ordered according to the amplicon number without taking the pool into account
    In the primer bed file output in qPCR mode, oligos from the same set are now ordered LEFT, PROBE, RIGHT, i.e. by position on the reference
    The per-base mismatch plot now uses final primer names as panel titles
    The amplicon bed file, in all modes, is now formatted as proper six-column bed

Algorithmic fixes and enhancements:

    The internal representation of amplicon schemes has been unified/simplified across the different modes and steps of the analysis
    Search for final non-overlapping amplicons in single mode and for non-overlapping amplicons passing deltaG in qpcr mode has been optimized and is now significantly faster
    Some off-by-one errors in internal primer and amplicon interval calculations have been fixed
  • Loading branch information
wm75 authored May 7, 2024
1 parent 0bb373f commit 2a525d6
Show file tree
Hide file tree
Showing 9 changed files with 387 additions and 359 deletions.
2 changes: 1 addition & 1 deletion varvamp/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
"""Tool to design amplicons for highly variable virusgenomes"""
_program = "varvamp"
__version__ = "1.1.3"
__version__ = "1.2.0"
58 changes: 38 additions & 20 deletions varvamp/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,9 +314,9 @@ def single_and_tiled_shared_workflow(args, left_primer_candidates, right_primer_

if args.database is not None:
# create blast query
query_path = blast.create_BLAST_query(all_primers, amplicons, data_dir)
query_path = blast.create_BLAST_query(amplicons, data_dir)
# perform primer blast
amplicons, off_target_amplicons = blast.primer_blast(
amplicons = blast.primer_blast(
data_dir,
args.database,
query_path,
Expand All @@ -326,23 +326,21 @@ def single_and_tiled_shared_workflow(args, left_primer_candidates, right_primer_
log_file,
mode="single_tiled"
)
else:
off_target_amplicons = []

return all_primers, amplicons, off_target_amplicons
return all_primers, amplicons


def single_workflow(args, amplicons, all_primers, log_file):
"""
workflow part specific for single mode
"""

amplicon_scheme = scheme.find_single_amplicons(amplicons, all_primers, args.report_n)
amplicon_scheme = scheme.find_single_amplicons(amplicons, args.report_n)
logging.varvamp_progress(
log_file,
progress=0.9,
job="Finding amplicons with low penalties.",
progress_text=f"{len(amplicon_scheme[0])} amplicons."
progress_text=f"{len(amplicon_scheme)} amplicons."
)

return amplicon_scheme
Expand All @@ -359,8 +357,7 @@ def tiled_workflow(args, amplicons, left_primer_candidates, right_primer_candida
# search for amplicon scheme
coverage, amplicon_scheme = scheme.find_best_covering_scheme(
amplicons,
amplicon_graph,
all_primers
amplicon_graph
)

# check for dimers
Expand All @@ -377,12 +374,13 @@ def tiled_workflow(args, amplicons, left_primer_candidates, right_primer_candida
reporting.write_dimers(results_dir, dimers_not_solved)

# evaluate coverage
# ATTENTION: Genome coverage of the scheme might still change slightly through resolution of primer dimers, but this potential, minor inaccuracy is currently accepted.
percent_coverage = round(coverage/len(ambiguous_consensus)*100, 2)
logging.varvamp_progress(
log_file,
progress=0.9,
job="Creating amplicon scheme.",
progress_text=f"{percent_coverage} % total coverage with {len(amplicon_scheme[0]) + len(amplicon_scheme[1])} amplicons"
progress_text=f"{percent_coverage} % total coverage with {len(amplicon_scheme)} amplicons"
)
if percent_coverage < 70:
logging.raise_error(
Expand Down Expand Up @@ -450,9 +448,9 @@ def qpcr_workflow(args, data_dir, alignment_cleaned, ambiguous_consensus, majori
# run blast if db is given
if args.database is not None:
# create blast query
query_path = blast.create_BLAST_query_qpcr(qpcr_scheme_candidates, data_dir)
query_path = blast.create_BLAST_query(qpcr_scheme_candidates, data_dir, mode="qpcr")
# perform primer blast
amplicons, off_target_amplicons = blast.primer_blast(
qpcr_scheme_candidates = blast.primer_blast(
data_dir,
args.database,
query_path,
Expand All @@ -470,9 +468,6 @@ def qpcr_workflow(args, data_dir, alignment_cleaned, ambiguous_consensus, majori
log_file,
exit=True
)
# report potential blast warnings
if args.database is not None:
blast.write_BLAST_warning(off_target_amplicons, final_schemes, log_file)
logging.varvamp_progress(
log_file,
progress=0.9,
Expand Down Expand Up @@ -506,9 +501,21 @@ def main(sysargs=sys.argv[1:]):
reporting.write_fasta(data_dir, "majority_consensus", majority_consensus)
reporting.write_fasta(results_dir, "ambiguous_consensus", ambiguous_consensus)

# Functions called from here on return lists of amplicons that are refined step-wise into final schemes.
# These lists that are passed between functions and later used for reporting consist of dictionary elemnts,
# which represent individual amplicons. A minimal amplicon dict could take the form:
# {
# "id": amplicon_name,
# "penalty": amplicon_cost,
# "length": amplicon_length,
# "LEFT": [left primer data],
# "RIGHT": [right primer data]
# }
# to which different functions may add additional information.

# SINGLE/TILED mode
if args.mode == "tiled" or args.mode == "single":
all_primers, amplicons, off_target_amplicons = single_and_tiled_shared_workflow(
all_primers, amplicons = single_and_tiled_shared_workflow(
args,
left_primer_candidates,
right_primer_candidates,
Expand All @@ -533,15 +540,22 @@ def main(sysargs=sys.argv[1:]):
log_file,
results_dir
)
if args.database is not None:
blast.write_BLAST_warning(off_target_amplicons, amplicon_scheme, log_file)

# write files

if args.mode == "tiled":
# assign amplicon numbers from 5' to 3' along the genome
amplicon_scheme.sort(key=lambda x: x["LEFT"][1])
else:
# make sure amplicons with no off-target products and with low penalties get the lowest numbers
amplicon_scheme.sort(key=lambda x: (x.get("off_targets", False), x["penalty"]))
reporting.write_all_primers(data_dir, all_primers)
reporting.write_scheme_to_files(
results_dir,
amplicon_scheme,
ambiguous_consensus,
args.mode
args.mode,
log_file
)
reporting.varvamp_plot(
results_dir,
Expand All @@ -564,9 +578,13 @@ def main(sysargs=sys.argv[1:]):
right_primer_candidates,
log_file
)

# write files

# make sure amplicons with no off-target products and with low penalties get the lowest numbers
final_schemes.sort(key=lambda x: (x.get("off_targets", False), x["penalty"]))
reporting.write_regions_to_bed(probe_regions, data_dir, "probe")
reporting.write_qpcr_to_files(results_dir, final_schemes, ambiguous_consensus)
reporting.write_qpcr_to_files(results_dir, final_schemes, ambiguous_consensus, log_file)
reporting.varvamp_plot(
results_dir,
alignment_cleaned,
Expand Down
102 changes: 36 additions & 66 deletions varvamp/scripts/blast.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,41 +29,24 @@ def check_BLAST_installation(log_file):
logging.raise_error("BLASTN is not installed", log_file, exit=True)


def create_BLAST_query(all_primers, amplicons, data_dir):
def create_BLAST_query(amplicons, data_dir, mode="single_tiled"):
"""
create a query for the BLAST search (tiled, single mode)
create a query for the BLAST search
"""
already_written = []

query_path = os.path.join(data_dir, "BLAST_query.fasta")
with open(query_path, "w") as query:
for amp in amplicons:
fw_primer, rv_primer = amplicons[amp][2], amplicons[amp][3]
if fw_primer not in already_written:
print(f">{fw_primer}\n{all_primers['+'][fw_primer][0]}", file=query)
already_written.append(fw_primer)
if rv_primer not in already_written:
print(f">{rv_primer}\n{all_primers['-'][rv_primer][0]}", file=query)
already_written.append(rv_primer)

return query_path


def create_BLAST_query_qpcr(qpcr_scheme_candidates, data_dir):
"""
create a query for the BLAST search (qpcr mode)
"""
already_written = []
if mode == "single_tiled":
primer_types = ["LEFT", "RIGHT"]
elif mode == "qpcr":
primer_types = ["PROBE", "LEFT", "RIGHT"]
already_written = set()

query_path = os.path.join(data_dir, "BLAST_query.fasta")
with open(query_path, "w") as query:
for amp in qpcr_scheme_candidates:
for primer_type in ["PROBE", "LEFT", "RIGHT"]:
name = f"{primer_type}_{qpcr_scheme_candidates[amp][primer_type][1]}_{qpcr_scheme_candidates[amp][primer_type][2]}"
if name in already_written:
continue
print(f">{name}\n{qpcr_scheme_candidates[amp][primer_type][0]}", file=query)
already_written.append(name)
for amp in amplicons:
for primer_type in primer_types:
name = f"{primer_type}_{amp[primer_type][1]}_{amp[primer_type][2]}"
if name not in already_written:
print(f">{name}\n{amp[primer_type][0]}", file=query)
already_written.add(name)
return query_path


Expand Down Expand Up @@ -168,44 +151,41 @@ def predict_non_specific_amplicons_worker(amp, blast_df, max_length, mode):
"""
Worker function to predict unspecific targets for a single amplicon.
"""
name, data = amp
# get correct primers
if mode == "single_tiled":
primers = [data[2], data[3]]
primer_types = ["LEFT", "RIGHT"]
elif mode == "qpcr":
primers = []
for primer_type in ["PROBE", "LEFT", "RIGHT"]:
primers.append(f"{primer_type}_{data[primer_type][1]}_{data[primer_type][2]}")
primer_types = ["PROBE", "LEFT", "RIGHT"]
primers = []
for primer_type in primer_types:
primers.append(f"{primer_type}_{amp[primer_type][1]}_{amp[primer_type][2]}")
# subset df for primers
df_amp_primers = blast_df[blast_df["query"].isin(primers)]
# sort by reference and ref start
df_amp_primers_sorted = df_amp_primers.sort_values(["ref", "ref_start"])
# check for off-targets for specific primers
if check_off_targets(df_amp_primers_sorted, max_length, primers):
return name
amp["off_targets"] = True
else:
amp["off_targets"] = False
return amp


def predict_non_specific_amplicons(amplicons, blast_df, max_length, mode, n_threads):
"""
Main function to predict unspecific targets within a size range and give
these primers a high penalty. Uses multiprocessing for parallelization.
"""
off_targets = []
# process amplicons concurrently
with multiprocessing.Pool(processes=n_threads) as pool:
amp_items = amplicons.items()
results = pool.starmap(predict_non_specific_amplicons_worker, [(amp, blast_df, max_length, mode) for amp in amp_items])
# check results
for off_target in results:
if off_target is None:
continue
off_targets.append(off_target)
if mode == "single_tiled":
amplicons[off_target][5] = amplicons[off_target][5] + config.BLAST_PENALTY
elif mode == "qpcr":
amplicons[off_target]["penalty"] = amplicons[off_target]["penalty"] + config.BLAST_PENALTY

return off_targets, amplicons
annotated_amps = [
result for result in pool.starmap(
predict_non_specific_amplicons_worker,
[(amp, blast_df, max_length, mode) for amp in amplicons]
) if result is not None
]
n_off_targets = sum(amp["off_targets"] for amp in annotated_amps)
return n_off_targets, annotated_amps


def primer_blast(data_dir, db, query_path, amplicons, max_length, n_threads, log_file, mode):
Expand Down Expand Up @@ -237,14 +217,17 @@ def primer_blast(data_dir, db, query_path, amplicons, max_length, n_threads, log

blast_df = parse_and_filter_BLAST_output(blast_out)
print("Predicting non-specific amplicons...")
off_target_amplicons, amplicons = predict_non_specific_amplicons(
n_off_targets, amplicons = predict_non_specific_amplicons(
amplicons,
blast_df,
max_length,
mode,
n_threads
)
success_text = f"varVAMP successfully predicted non-specific amplicons:\n\t> {len(off_target_amplicons)}/{len(amplicons)} amplicons could produce amplicons with the blast db.\n\t> raised their amplicon penalty by {config.BLAST_PENALTY}"
if n_off_targets > 0:
success_text = f"varVAMP predicted non-specific amplicons:\n\t> {n_off_targets}/{len(amplicons)} amplicons could produce amplicons with the blast db.\n\t> will attempt to avoid them in the final list of amplicons"
else:
success_text = f"NO off-target amplicons found with the blast db and a total of {len(amplicons)} amplicons"
print(success_text)
with open(log_file, 'a') as f:
print(
Expand All @@ -253,18 +236,5 @@ def primer_blast(data_dir, db, query_path, amplicons, max_length, n_threads, log
)
print("\n#### off-target search finished ####\n")

return amplicons, off_target_amplicons

return amplicons

def write_BLAST_warning(off_target_amplicons, amplicon_scheme, log_file):
"""
for each primer pair that has potential unspecific amplicons
write warnings to file.
"""
for amp in off_target_amplicons:
if amp in amplicon_scheme:
logging.raise_error(
f"{amp} could produce off-targets. No better amplicon in this area was found.",
log_file,
exit=False,
)
3 changes: 1 addition & 2 deletions varvamp/scripts/default_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

# List of all known parameters. DO NOT CHANGE!
__all__ = [
'BLAST_MAX_DIFF', 'BLAST_PENALTY', 'BLAST_SETTINGS', 'BLAST_SIZE_MULTI',
'BLAST_MAX_DIFF', 'BLAST_SETTINGS', 'BLAST_SIZE_MULTI',
'END_OVERLAP',
'PCR_DNA_CONC', 'PCR_DNTP_CONC', 'PCR_DV_CONC', 'PCR_MV_CONC',
'PRIMER_3_PENALTY', 'PRIMER_GC_END', 'PRIMER_GC_PENALTY',
Expand Down Expand Up @@ -74,7 +74,6 @@
}
BLAST_MAX_DIFF = 0.5 # min percent match between primer and BLAST hit (coverage and/or mismatches)
BLAST_SIZE_MULTI = 2 # multiplier for the max_amp size of off targets (in relation to max amp size)
BLAST_PENALTY = 50 # amplicon penalty increase -> considered only if no other possibilities

# nucleotide definitions, do NOT change
NUCS = set("atcg")
Expand Down
7 changes: 0 additions & 7 deletions varvamp/scripts/logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,6 @@ def confirm_config(args, log_file):
(
"BLAST_MAX_DIFF",
"BLAST_SIZE_MULTI",
"BLAST_PENALTY"
)
]

Expand Down Expand Up @@ -384,7 +383,6 @@ def confirm_config(args, log_file):
("qpcr deletion size still considered for deltaG calculation", config.QAMPLICON_DEL_CUTOFF),
("maximum difference between primer and blast db", config.BLAST_MAX_DIFF),
("multiplier of the maximum length for non-specific amplicons", config.BLAST_SIZE_MULTI),
("blast penalty for off targets", config.BLAST_PENALTY)
]
for var_type, var in non_negative_var:
if var < 0:
Expand Down Expand Up @@ -468,11 +466,6 @@ def confirm_config(args, log_file):
log_file,
exit=True
)
if config.BLAST_PENALTY < 10:
raise_error(
"giving a too small penalty could result in the selection of off-target producing amplicons in the final scheme.",
log_file,
)
# confirm proper BLAST settings in dictionary
if not isinstance(config.BLAST_SETTINGS, dict):
raise_error(
Expand Down
4 changes: 2 additions & 2 deletions varvamp/scripts/primers.py
Original file line number Diff line number Diff line change
Expand Up @@ -386,13 +386,13 @@ def find_best_primers(left_primer_candidates, right_primer_candidates):
primer_candidates.sort(key=lambda x: (x[3], x[1]))
# ini everything with the primer with the lowest penalty
to_retain = [primer_candidates[0]]
primer_ranges = list(range(primer_candidates[0][1], primer_candidates[0][2]+1))
primer_ranges = list(range(primer_candidates[0][1], primer_candidates[0][2]))
primer_set = set(primer_ranges)

for primer in primer_candidates:
# get the thirds of the primer, only consider the middle
thirds_len = int((primer[2] - primer[1])/3)
primer_positions = list(range(primer[1] + thirds_len, primer[2] - thirds_len + 1))
primer_positions = list(range(primer[1] + thirds_len, primer[2] - thirds_len))
# check if none of the nucleotides of the next primer
# are already covered by a better primer
if not any(x in primer_positions for x in primer_set):
Expand Down
Loading

0 comments on commit 2a525d6

Please sign in to comment.