-
Notifications
You must be signed in to change notification settings - Fork 0
/
find_distances_to_same_uklin_predating.py
293 lines (242 loc) · 10.7 KB
/
find_distances_to_same_uklin_predating.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
#!/usr/bin/env python3
"""
"""
import sys
import argparse
import logging
import csv
__version__ = '0.1'
__date__ = '2020-09-10'
__author__ = ''
# --------------------------------------------------------------------------------------------------
def get_args():
"""
Parge arguments
Parameters
----------
no inputs
Returns
-------
oArgs: obj
arguments object
"""
sDescription = 'version %s, date %s, author %s' %(__version__, __date__, __author__)
oParser = argparse.ArgumentParser(description=sDescription)
oParser.add_argument('--cogmetadata',
'-m',
required=True,
help='COG cog_<DATE>_metadata.csv file')
oParser.add_argument('--phemetadata',
'-p',
required=True,
help='civet input cvs with additional columns')
oParser.add_argument('--cogalign',
'-a',
required=True,
help='cog_<DATE>_alignment.fasta file. GOES REALLY BAD IF THIS IS NOT ONE LINE FASTA!')
oArgs = oParser.parse_args()
return oArgs
# --------------------------------------------------------------------------------------------------
def main():
"""
This is the main function, innit?
"""
logging.basicConfig(format="[%(asctime)s] %(levelname)s: %(message)s",
level=logging.DEBUG)
logging.debug("----------- STARTING ----------")
args = vars(get_args())
logging.debug("Args: %s", args)
# read in the phe metadata fromt he civet input file, like region etc.
phe_metadata = {}
with open(args['phemetadata'], 'r') as phemd:
reader = csv.DictReader(phemd)
for row in reader:
phe_metadata[row['name']] = {'country': row['region'], 'casecontact': row['casecontact']}
# read in cogmetadata and store uklin and epiweek per samples
qry_samples = {}
with open(args['cogmetadata'], 'r') as cogmetadata:
reader = csv.DictReader(cogmetadata)
for row in reader:
for query_id in phe_metadata:
if query_id in row['sequence_name']:
uklin = row['uk_lineage']
epiweek = int(row['epi_week'])
qry_samples[query_id] = {'uklin': uklin, 'epiweek': epiweek}
logging.debug("Read %i query samples", len(qry_samples))
# output a table
sys.stdout.write("#query_id\tregion\tcasecontact\tuklin\ttotal\twk_b4\t2wk_b4\t2prec_wks\t4prec_wks\tmin_d\t2follow\t4follow\tPreceding Dist\n")
# open cog metadata again
with open(args['cogmetadata'], 'r') as mdfile:
reader = csv.DictReader(mdfile)
# iterate through samples
for qryid in qry_samples:
# reset values that are outputted for each query sequence.
cnt = 0
wkb4_1 = 0
wkb4_2 = 0
prec_wks2 = 0
prec_wks4 = 0
foll_wks2 = 0
foll_wks4 = 0
# get the seq of the query samples from the alignment
qry_seq = get_seq_from_aln([qryid], args['cogalign'])
assert len(qry_seq) == 1 # better check ...
this_sample_epi_week = qry_samples[qryid]['epiweek']
# iterate through the COG metadata
beforeseqnames = []
foll2wks_seqnames = []
foll4wks_seqnames = []
for row in reader:
if row['uk_lineage'] == qry_samples[qryid]['uklin']:
# count samples w/ same lineage
cnt += 1
row_epi_week = int(row['epi_week'])
# count all older samples by epiweek
if row_epi_week <= this_sample_epi_week - 1:
wkb4_1 += 1
# count all samples that are two weeks older
if row_epi_week <= this_sample_epi_week - 2:
wkb4_2 += 1
# count all samples in the preceeding 2 epi weeks
if row_epi_week < this_sample_epi_week and row_epi_week >= this_sample_epi_week - 2:
prec_wks2 += 1
# count all samples in the preceeding 4 epi weeks
if row_epi_week < this_sample_epi_week and row_epi_week >= this_sample_epi_week - 4:
prec_wks4 += 1
# store other seqname
beforeseqnames.append(row['sequence_name'])
# count all samples in the following 2 epi weeks
if row_epi_week > this_sample_epi_week and row_epi_week <= this_sample_epi_week + 2:
foll2wks_seqnames.append(row['sequence_name'])
# count all samples in the following 4 epi weeks
if row_epi_week > this_sample_epi_week and row_epi_week <= this_sample_epi_week + 4:
foll4wks_seqnames.append(row['sequence_name'])
# calculate minimum distance to preceding samples
mindis = -1
minoth = 1000
prec4 = ""
if len(beforeseqnames) > 0:
# get stored min dist from for this query sample
otherseqs = get_seq_from_aln(beforeseqnames, args['cogalign'])
y = list(qry_seq.values())[0]
mindis = min([calc_pw_dist(y, x) for x in otherseqs.values()])
#look at all seq in the preceeding 4 weeks
if len(otherseqs) <= 1000:
for ky1, vl1 in otherseqs.items():
minoth = 10000
for ky2, vl2 in otherseqs.items():
if ky2 != ky1:
odist = calc_pw_dist(vl1, vl2)
if odist < minoth:
minoth = odist
if minoth < 10000:
prec4 += "{},".format(minoth)
else:
prec4 = "-1"
# calculate how many samples are within 2 SNPs in the follwing 2 and 4 weeks
foll_wks2 = 0
foll_wks4 = 0
if len(foll4wks_seqnames) > 0:
# logging.debug("%i in the following 4 weeks", len(foll4wks_seqnames))
# logging.debug("%i in the following 2 weeks", len(foll2wks_seqnames))
# get all seqs in the 4 weeks after from alignment - 2 weeks are a strict subset
otherseqs = get_seq_from_aln(foll4wks_seqnames, args['cogalign'])
# y is now the query sequence
y = list(qry_seq.values())[0]
all_dists = {}
wks2_dists = []
for (nme, sq1) in otherseqs.items():
nme = nme.replace(">", "").strip()
dst = calc_pw_dist(y, sq1)
# all_dists now contains all seqs from the 4 weeks after
all_dists[nme] = dst
if nme in foll2wks_seqnames:
# wks2_dists contains a subset of those
wks2_dists.append(dst)
# logging.debug("4 weeks all dists: %s", all_dists.values())
# logging.debug("2 weeks all dists: %s", wks2_dists)
foll_wks4 = sum([1 if d <= 2 else 0 for d in all_dists.values()])
foll_wks2 = sum([1 if d <= 2 else 0 for d in wks2_dists])
# output table row for this query sample
# logging.debug("%i samples with UK lineage %s, %i earlier ones, same as %s", cnt, qry_samples[qryid]['uklin'], earlier, qryid)
sys.stdout.write("%s\t%s\t%s\t%s\t%i\t%i\t%i\t%i\t%i\t%i\t%i\t%i\t%s\n" % (qryid,
phe_metadata[qryid]['country'],
phe_metadata[qryid]['casecontact'],
qry_samples[qryid]['uklin'],
cnt,
wkb4_1,
wkb4_2,
prec_wks2,
prec_wks4,
mindis,
foll_wks2,
foll_wks4,
prec4))
# reset the iterator of the COG metadatafile for next query sample
mdfile.seek(0)
next(reader)
logging.debug("----------- FINISHED ----------")
return 0
# end of main --------------------------------------------------------------------------------------
def calc_pw_dist(s1, s2):
"""
Calculate the pw distance between 2 seqs
Parameters
------
s1: str
sequence 1
s2: str
seq2
Returns
-------
d: int
pw SNP dist bnetween them
"""
assert len(s1) == len(s2)
s1 = s1.upper()
s2 = s2.upper()
d = 0
for a, b in zip(s1, s2):
if not a in ['A', 'C', 'G', 'T']:
continue
if not b in ['A', 'C', 'G', 'T']:
continue
if a != b:
d += 1
return d
# ---------------------------------------------------------------------------------------------------
def get_seq_from_aln(qryids, cogalign):
"""
Get the seqquence for this ID from the alignment.
GOES TERRIBLY WRONG IF THIS ISN'T ONE LINE FASTA as in
>head1
AACGTAGCTAC
>head2
ACTGTACGTAC
...
Parameters
------
qryids: list
list of COGID to be mached to seq header in alignment
cogalign: str
alignemnt fasta file name
Returns
-------
answer: dict
{header: seq, head2: seq, ...}
empty if none found
"""
answer = {}
with open(cogalign, 'r') as f:
while True:
header = f.readline()
seq = f.readline()
if not seq:
break # EOF
for qid in qryids:
if qid in header:
answer[header.strip()] = seq.strip()
return answer
# --------------------------------------------------------------------------------------------------
if __name__ == '__main__':
sys.exit(main())