-
Notifications
You must be signed in to change notification settings - Fork 4
/
find_storms.py
107 lines (86 loc) · 3.49 KB
/
find_storms.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
from datetime import datetime, timedelta
from scipy import ndimage
import multiprocessing as mp
import numpy as np
import gc
import pandas as pd
import pyart
from glob import glob
from joblib import Parallel, delayed
def parse_grid_datetime(grid_obj):
""" Obtains datetime object from pyart grid_object. """
dt_string = grid_obj.time['units'].split(' ')[-1]
date = dt_string[:10]
time = dt_string[11:19]
dt = datetime.datetime.strptime(date + ' ' + time, '%Y-%m-%d %H:%M:%S')
return dt
def clear_small_echoes(label_image, min_size):
"""Takes in binary image and clears objects less than min_size."""
flat_image = pd.Series(label_image.flatten())
flat_image = flat_image[flat_image > 0]
size_table = flat_image.value_counts(sort=False)
small_objects = size_table.keys()[size_table < min_size]
for obj in small_objects:
label_image[label_image == obj] = 0
label_image = ndimage.label(label_image)
return label_image[0]
def get_vert_projection(grid, thresh=40):
"""Returns binary vertical projection from grid."""
return np.any(grid > thresh, axis=0)
def get_filtered_frame(grid, min_size, thresh):
"""Returns a labeled frame from gridded radar data. Smaller objects are
removed and the rest are labeled."""
echo_height = get_vert_projection(grid, thresh)
labeled_echo = ndimage.label(echo_height)[0]
frame = clear_small_echoes(labeled_echo, min_size)
return frame
def detect_cells(file_name, min_size, thresh):
grid = pyart.io.read_grid(file_name)
datetime = parse_grid_datetime(grid)
try:
data = grid.fields['reflectivity']['data']
except (RuntimeError, OSError) as e:
return file_name, datetime, False
frame = get_filtered_frame(data, min_size, thresh)
gc.collect()
print(file_name)
if np.max(frame) > 0:
return file_name, datetime, True
else:
return file_name, datetime, False
#%%
if __name__ == '__main__':
file_dir = '/lcrc/group/earthscience/radar/houston/data/'
out_dir = 'home/picel/khgx/testrun_2015/'
pattern = 'KHGX_grid_2016*'
files = glob((file_dir + '/**/' + pattern + '*'), recursive=True)
files.sort()
print('number of files:', len(files))
min_size = 24
thresh = 32
start_time = datetime.now()
scans_unfiltered = Parallel(n_jobs=mp.cpu_count())(
delayed(detect_cells)(file, min_size, thresh) for file in files
)
print('Cell Detection Complete')
print('Parsing Times')
times = [scan[1] for scan in scans_unfiltered if scan[2] is True]
inventory = pd.DataFrame({'begin': times[:-1], 'end': times[1:]})
inventory['interval'] = inventory['end'] - inventory['begin']
max_interval = timedelta(minutes=30)
print('Building Storms Dataframe')
storms = inventory[inventory.interval < max_interval].copy()
storms.end = storms.end.shift(1)
storms.drop(storms.index[0], inplace=True)
storms['new'] = storms.begin != storms.end
storms['storm_id'] = storms.new.astype('i').cumsum()
storms.to_csv(out_dir + 'storms.csv', index=False)
coverage = pd.DataFrame(data=scans_unfiltered,
columns=['file', 'time', 'cells'])
coverage.sort_values(by='file', inplace=True)
coverage.to_csv('coverage.csv', index=False)
metadata = open('meta.txt', 'w')
metadata.write('number of files: ' + str(len(files)))
metadata.write('total time: ' + str(datetime.now() - start_time))
metadata.close()
print('total time:', datetime.now() - start_time)