Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bugfix reproducibility & ensemble member order with dask #347

Merged
merged 4 commits into from
Mar 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion doc/source/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ def set_root():
latex_elements = {
"papersize": "a4paper",
"pointsize": "10pt",
"preamble": latex_preamble
"preamble": latex_preamble,
# Latex figure (float) alignment
#
# 'figure_align': 'htbp',
Expand Down
5 changes: 4 additions & 1 deletion pysteps/blending/steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,7 @@ def forecast(
)

# 2. Initialize the noise method
np.random.seed(seed)
pp, generate_noise, noise_std_coeffs = _init_noise(
precip,
precip_thr,
Expand All @@ -526,6 +527,7 @@ def forecast(
noise_stddev_adj,
measure_time,
num_workers,
seed,
)

# 3. Perform the cascade decomposition for the input precip fields and
Expand Down Expand Up @@ -1662,6 +1664,7 @@ def _init_noise(
noise_stddev_adj,
measure_time,
num_workers,
seed,
):
"""Initialize the noise method."""
if noise_method is None:
Expand Down Expand Up @@ -1690,6 +1693,7 @@ def _init_noise(
20,
conditional=True,
num_workers=num_workers,
seed=seed,
)

if measure_time:
Expand Down Expand Up @@ -1944,7 +1948,6 @@ def _init_random_generators(
if noise_method is not None:
randgen_prec = []
randgen_motion = []
np.random.seed(seed)
for j in range(n_ens_members):
rs = np.random.RandomState(seed)
randgen_prec.append(rs)
Expand Down
6 changes: 3 additions & 3 deletions pysteps/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,9 +228,9 @@ def _interpolator_with_preamble(xy_coord, values, xgrid, ygrid, **kwargs):
indy = 0
for subygrid in subygrids:
deltay = subygrid.size
interpolated[
:, indy : (indy + deltay), indx : (indx + deltax)
] = interpolator(xy_coord, values, subxgrid, subygrid, **kwargs)
interpolated[:, indy : (indy + deltay), indx : (indx + deltax)] = (
interpolator(xy_coord, values, subxgrid, subygrid, **kwargs)
)
indy += deltay
indx += deltax

Expand Down
6 changes: 3 additions & 3 deletions pysteps/io/importers.py
Original file line number Diff line number Diff line change
Expand Up @@ -1366,9 +1366,9 @@
mask = np.logical_and(~mask_u, ~mask_n)
quality = np.empty(arr.shape) # , dtype=float)
quality[mask] = arr[mask] * gain + offset
quality[
~mask
] = np.nan # a qui -----------------------------
quality[~mask] = (

Check warning on line 1369 in pysteps/io/importers.py

View check run for this annotation

Codecov / codecov/patch

pysteps/io/importers.py#L1369

Added line #L1369 was not covered by tests
np.nan
) # a qui -----------------------------

if precip is None:
raise IOError("requested quantity %s not found" % qty)
Expand Down
4 changes: 1 addition & 3 deletions pysteps/noise/fftgenerators.py
Original file line number Diff line number Diff line change
Expand Up @@ -708,9 +708,7 @@ def initialize_nonparam_2d_nested_filter(field, gridres=1.0, **kwargs):
# update indices
level += 1
Idxi, Idxj = _split_field((0, dim[0]), (0, dim[1]), 2**level)
Idxipsd, Idxjpsd = _split_field(
(0, 2**max_level), (0, 2**max_level), 2**level
)
Idxipsd, Idxjpsd = _split_field((0, 2**max_level), (0, 2**max_level), 2**level)

return {"field": F, "input_shape": field.shape[1:], "use_full_fft": True}

Expand Down
44 changes: 21 additions & 23 deletions pysteps/noise/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,39 +96,37 @@

if dask_imported and num_workers > 1:
res = []
else:
N_stds = []

N_stds = [None] * num_iter
randstates = []
seed = None

for k in range(num_iter):
randstates.append(np.random.RandomState(seed=seed))
seed = np.random.randint(0, high=1e9)

for k in range(num_iter):

def worker():
# generate Gaussian white noise field, filter it using the chosen
# method, multiply it with the standard deviation of the observed
# field and apply the precipitation mask
N = noise_generator(noise_filter, randstate=randstates[k], seed=seed)
N = N / np.std(N) * sigma + mu
N[~MASK] = R_thr_2
def worker(k):
# generate Gaussian white noise field, filter it using the chosen
# method, multiply it with the standard deviation of the observed
# field and apply the precipitation mask
N = noise_generator(noise_filter, randstate=randstates[k])
N = N / np.std(N) * sigma + mu
N[~MASK] = R_thr_2

# subtract the mean and decompose the masked noise field into a
# cascade
N -= mu
decomp_N = decomp_method(N, F, mask=MASK_)
# subtract the mean and decompose the masked noise field into a
# cascade
N -= mu
decomp_N = decomp_method(N, F, mask=MASK_)

return decomp_N["stds"]

if dask_imported and num_workers > 1:
res.append(dask.delayed(worker)())
else:
N_stds.append(worker())
N_stds[k] = decomp_N["stds"]

if dask_imported and num_workers > 1:
N_stds = dask.compute(*res, num_workers=num_workers)
for k in range(num_iter):
res.append(dask.delayed(worker)(k))
dask.compute(*res, num_workers=num_workers)

Check warning on line 125 in pysteps/noise/utils.py

View check run for this annotation

Codecov / codecov/patch

pysteps/noise/utils.py#L123-L125

Added lines #L123 - L125 were not covered by tests

else:
for k in range(num_iter):
worker(k)

# for each cascade level, compare the standard deviations between the
# observed field and the masked noise field, which gives the correction
Expand Down
3 changes: 1 addition & 2 deletions pysteps/nowcasts/linda.py
Original file line number Diff line number Diff line change
Expand Up @@ -572,8 +572,7 @@ def _compute_window_weights(coords, grid_height, grid_width, window_radius):
dx = c[1] - grid_x

w[i, :] = np.exp(
-dy * dy / (2 * window_radius_1**2)
- dx * dx / (2 * window_radius_2**2)
-dy * dy / (2 * window_radius_1**2) - dx * dx / (2 * window_radius_2**2)
)
else:
w[0, :] = np.ones((grid_height, grid_width))
Expand Down
12 changes: 6 additions & 6 deletions pysteps/nowcasts/sseps.py
Original file line number Diff line number Diff line change
Expand Up @@ -729,12 +729,12 @@ def worker(j):
else:
EPS_ = None
# apply AR(p) process to cascade level
precip_cascades[
i, :, :, :
] = autoregression.iterate_ar_model(
precip_cascades[i, :, :, :],
phi[m, n, i, :],
eps=EPS_,
precip_cascades[i, :, :, :] = (
autoregression.iterate_ar_model(
precip_cascades[i, :, :, :],
phi[m, n, i, :],
eps=EPS_,
)
)
EPS_ = None
rc[m][n][j] = precip_cascades.copy()
Expand Down
7 changes: 4 additions & 3 deletions pysteps/nowcasts/steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,7 @@ def f(precip, i):
precip[i, ~np.isfinite(precip[i, :])] = np.nanmin(precip[i, :])

if noise_method is not None:
np.random.seed(seed)
# get methods for perturbations
init_noise, generate_noise = noise.get_method(noise_method)

Expand All @@ -466,6 +467,7 @@ def f(precip, i):
20,
conditional=True,
num_workers=num_workers,
seed=seed,
)

if measure_time:
Expand Down Expand Up @@ -543,7 +545,6 @@ def f(precip, i):
if noise_method is not None:
randgen_prec = []
randgen_motion = []
np.random.seed(seed)
for _ in range(n_ens_members):
rs = np.random.RandomState(seed)
randgen_prec.append(rs)
Expand Down Expand Up @@ -706,7 +707,7 @@ def _check_inputs(precip, velocity, timesteps, ar_order):


def _update(state, params):
precip_forecast_out = []
precip_forecast_out = [None] * params["n_ens_members"]

if params["noise_method"] is None or params["mask_method"] == "sprog":
for i in range(params["n_cascade_levels"]):
Expand Down Expand Up @@ -828,7 +829,7 @@ def worker(j):

precip_forecast[params["domain_mask"]] = np.nan

precip_forecast_out.append(precip_forecast)
precip_forecast_out[j] = precip_forecast

if (
DASK_IMPORTED
Expand Down
4 changes: 1 addition & 3 deletions pysteps/scripts/fit_vel_pert_params.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,7 @@
mu = dp_perp_sum / dp_perp_n

std_perp.append(
np.sqrt(
(dp_perp_sq_sum - 2 * mu * dp_perp_sum + dp_perp_n * mu**2) / dp_perp_n
)
np.sqrt((dp_perp_sq_sum - 2 * mu * dp_perp_sum + dp_perp_n * mu**2) / dp_perp_n)
)

try:
Expand Down
1 change: 1 addition & 0 deletions pysteps/tests/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

Collection of helper functions for the testing suite.
"""

from datetime import datetime

import numpy as np
Expand Down
2 changes: 1 addition & 1 deletion pysteps/tests/test_motion.py
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ def test_vet_padding():
verbose=False,
sectors=((16, 4, 2), (16, 4, 2)),
options=dict(maxiter=5),
padding=padding
padding=padding,
# We use only a few iterations since
# we don't care about convergence in this test
)
Expand Down
6 changes: 3 additions & 3 deletions pysteps/utils/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,9 +205,9 @@ def donothing(R, metadata=None, *args, **kwargs):
methods_objects["rm_rdisc"] = spectral.remove_rain_norain_discontinuity

# tapering methods
methods_objects[
"compute_mask_window_function"
] = tapering.compute_mask_window_function
methods_objects["compute_mask_window_function"] = (
tapering.compute_mask_window_function
)
methods_objects["compute_window_function"] = tapering.compute_window_function

# transformation methods
Expand Down
1 change: 1 addition & 0 deletions pysteps/visualization/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
get_geogrid
get_basemap_axis
"""

import warnings

import matplotlib.pylab as plt
Expand Down
Loading