-
Notifications
You must be signed in to change notification settings - Fork 0
/
reconstruct.py
92 lines (73 loc) · 2.97 KB
/
reconstruct.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
# author: Andrej Jezik
import sys
from Bio import Phylo
import csv
import os
DEBUG = False
DEBUG_LONG = False
ANCESTRAL_FILE = "given_inputs/ancestrals.csv"
PHYLO_TREE_FILE = "given_inputs/tree.tre"
OUTPUT_DIR = "output_sequences"
# Read a FASTA file as list of pairs [(sequence id, sequence), ..]
def read_fasta(file_name: str) -> list:
list_of_pairs = []
key = ''
value = ''
with open(file_name, 'r') as file:
for line in file:
if line[0] == '>':
if key != '':
list_of_pairs.append((key, value))
key = line[1:].rstrip()
value = ''
else:
value += line.rstrip()
list_of_pairs.append((key, value))
return list_of_pairs
def read_csv(filename: str) -> list:
with open(filename, newline='') as csvfile:
reader = csv.DictReader(csvfile)
read_list = list(reader)
return read_list
def take(dct, low=None):
return dict(list(dct.items())[low:])
def insert_newlines(string, every):
return '\n'.join(string[i:i+every] for i in range(0, len(string), every))
# --------------- Main -------------------
if __name__ == '__main__':
if (len(sys.argv) != 2): # checking arguments
print("ERROR: program expects one argument with path of a input file\n", file=sys.stderr)
list_of_fasta_seq = read_fasta(file_name=sys.argv[1])
tree = Phylo.read(PHYLO_TREE_FILE, "newick")
ancestrals_list = read_csv(ANCESTRAL_FILE)
if not os.path.exists(OUTPUT_DIR):
os.makedirs(OUTPUT_DIR)
for clade in tree.get_nonterminals():
rced_seq = [""] * len(list_of_fasta_seq[0][1])
# couner for each position of sequence which determines if on the position will be gap
gap = [0] * len(list_of_fasta_seq[0][1])
for t in tree.get_terminals():
# iterates throught each position of sequence we are constructing
for i in range(len(list_of_fasta_seq[0][1])):
if(clade.is_parent_of(t)):
for sublist in list_of_fasta_seq:
if sublist[0] == str(t):
if(sublist[1][i] == "-"):
gap[i] += clade.distance(str(t))
else:
gap[i] -= clade.distance(str(t))
break
nodes = [x for x in ancestrals_list if x['node']
== str(clade.confidence)]
# creates sequence
for i in range(len(list_of_fasta_seq[0][1])):
if(gap[i] > 0):
rced_seq[i] = '-'
else:
rced_seq[i] = max(take(nodes[i], 2), key=nodes[i].get)
rced_seq_str = "".join(map(str, rced_seq))
print(str(clade.confidence) + " --> " + rced_seq_str)
f = open(OUTPUT_DIR + "/node_" +
str(clade.confidence) + ".fasta", "w+")
f.write(">" + str(clade.confidence) + "\n" +
insert_newlines(rced_seq_str, 60))