-
Notifications
You must be signed in to change notification settings - Fork 0
/
muprax.py
executable file
·108 lines (84 loc) · 2.79 KB
/
muprax.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
#!/usr/bin/env python3
import astropy.convolution
import numpy as np
import pandas as pd
import pyproj
import rasterio
import rasterio.warp
import yaml
def transform(s_crs=4326, t_crs=4326):
return pyproj.Transformer.from_crs(s_crs, t_crs, always_xy=True).transform
def estimate_utm_epsg(lon, lat):
return pyproj.database.query_utm_crs_info(
datum_name="WGS84",
area_of_interest=pyproj.aoi.AreaOfInterest(
np.min(lon), np.min(lat), np.max(lon), np.max(lat)
),
)[0].code
def warp(src, epsg, resampling):
crs = rasterio.CRS.from_epsg(epsg)
T, w, h = rasterio.warp.calculate_default_transform(
src.crs, crs, src.width, src.height, *src.bounds
)
data = src.read(1)
data[data <= -9999] = np.nan
dest = np.zeros((h, w))
return rasterio.warp.reproject(
data,
dest,
src_transform=src.transform,
src_crs=src.crs,
dst_transform=T,
dst_crs=crs,
dst_nodata=np.nan,
resampling=resampling,
)
def arr_equal(a, b):
return len(a) == len(b) and np.all(a == b)
def main(points, raster, outfile, params):
db = pd.read_csv(points)
if not arr_equal(db.columns, ["ID", "Latitude", "Longitude"]):
raise ValueError("Wrong point list format")
lat = pd.to_numeric(db["Latitude"], errors="coerce")
lon = pd.to_numeric(db["Longitude"], errors="coerce")
mask = ~(np.isnan(lon) | np.isnan(lat))
lat = lat[mask]
lon = lon[mask]
rst = rasterio.open(raster)
epsg = rst.crs.to_epsg()
x, y = transform(4326, epsg)(lon, lat)
bounds = (
(x > rst.bounds.left)
& (x < rst.bounds.right)
& (y > rst.bounds.bottom)
& (y < rst.bounds.top)
)
lat = lat[bounds]
lon = lon[bounds]
epsg = estimate_utm_epsg(lon.mean(), lat.mean())
x, y = transform(4326, epsg)(lon, lat)
arr, T_warp = warp(rst, epsg, resampling=rasterio.enums.Resampling.average)
n = int(np.ceil(params["dist"] / abs(T_warp.a)))
Y, X = np.mgrid[-n : n + 1, -n : n + 1]
r = np.sqrt(X * X + Y * Y)
r[n, n] = params["min_dist"]
kernel = np.power(r, -params["exp"])
kernel[r * abs(T_warp.a) > params["dist"]] = 0
arr_conv = astropy.convolution.convolve(arr, kernel, fill_value=np.nan)
trans_arr = arr_conv[rasterio.transform.rowcol(T_warp, x, y)]
out = pd.DataFrame()
out["ID"] = db["ID"][mask][bounds]
out["Latitude"] = lat
out["Longitude"] = lon
out["Interp"] = trans_arr
out.to_csv(outfile, index=False)
if __name__ == "__main__":
input_file = "muprax.in"
with open(input_file.strip().strip("'").strip('"')) as f:
p = yaml.safe_load(f)
main(
points=p["points_file"],
raster=p["raster_file"],
outfile=p["out_file"],
params=p["params"],
)