Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fft_method pyfftw causes unexpected noise additions when using multi-threading #337

Closed
RubenImhoff opened this issue Aug 17, 2023 · 5 comments
Assignees
Labels
bug Something isn't working

Comments

@RubenImhoff
Copy link
Contributor

If I run an ensemble using multiple cores (so, dask will parallellize the ensemble members over the cores), it seems that the ensemble order is lost, resulting in weird transitions (due to noise from a different member that ends up in that member). See for instance (these are three 15-min instance in the forecast):
2023-08-17 15_21_02-P_fc in 202308170600_blended_nowcast_olddask

2023-08-17 15_21_19-P_fc in 202308170600_blended_nowcast_olddask

2023-08-17 15_21_30-P_fc in 202308170600_blended_nowcast_olddask

Or maybe even clearer:
MicrosoftTeams-image

If I run on 1 core, this problem does not occur, so it gives the impression that this has to do with parallellizing using dask. After having contact with @mpvginde, I could not reproduce the error with the Belgian test case that we have (the figures above are with our Dutch data and setup). Only difference between our setups turned out to be the setting that 'fft_method' = pyfftw instead of numpy in the Belgian setup. After changing this, the problems disappeared when running multi-threaded. This gives the impression that pyfftw should not be used for nowcasting and blended forecasting when using more than 1 worker/thread.
Is this a familiar issue to you and is there anything we can do about it?

@RubenImhoff
Copy link
Contributor Author

Update on this possible bug: I see that it still comes back in the STEPS nowcasts (not the blending anymore) with both pyfftw and numpy as fft_methods. Seems that in the newer Dask versions, we have to more explicitly pin the activities to specific cores and keep track of it. For the nowcast loop this still goes well multi-threaded, provided that numpy is used as method, but for all other multi-threaded processes (cascade decomposition, noise initialization, etc.) it seems to go wrong. We should either fix those options to one worker or find a different solution, I'm afraid.

@dnerini
Copy link
Member

dnerini commented Nov 7, 2023

Hi @RubenImhoff thanks for the update. Would it be possible to get a better idea of the changes you are suggesting? If I understand well, one option would be to explicitly set some arguments for dask, right?

@RubenImhoff
Copy link
Contributor Author

Hi @dnerini, of course. The simple solution is to only use one worker (thread) for the parts where it goes wrong. I have tested it by fixing num_workers to 1 in the steps.py code in the nowcasting module, except for the nowcasting main loop:

precip_forecast = nowcast_main_loop(
        precip,
        velocity,
        state,
        timesteps,
        extrap_method,
        _update,
        extrap_kwargs=extrap_kwargs,
        velocity_pert_gen=velocity_perturbators,
        params=params,
        ensemble=True,
        num_ensemble_members=n_ens_members,
        callback=callback,
        return_output=return_output,
        num_workers=num_ensemble_workers,
        measure_time=measure_time,
    )

where num_workers can be > 1.

I think ideally, we would make full use of dask. In that case, we would have to pin the work to specific cores or so. I believe that is possible in Dask too, but I have no experience with it. Maybe you do or @pulkkins?

@mpvginde
Copy link
Contributor

mpvginde commented Feb 9, 2024

Hi everyone,
I think I found the issue:
In the _update helper function in nowcasts.steps the numpy-array holding the final forecasted precipation: precip_forecast_out is created from a list which is initialized at the start of the _update function:

def _update(state, params):
precip_forecast_out = []

The workers inside the _update function then append their result to this list:

if params["mask_method"] == "incremental":
state["mask_prec"][j] = nowcast_utils.compute_dilated_mask(
precip_forecast >= params["precip_thr"],
params["struct"],
params["mask_rim"],
)
precip_forecast[params["domain_mask"]] = np.nan
precip_forecast_out.append(precip_forecast)

Finally the list is converted to a numpy array:

if (
DASK_IMPORTED
and params["n_ens_members"] > 1
and params["num_ensemble_workers"] > 1
):
res = []
for j in range(params["n_ens_members"]):
res.append(dask.delayed(worker)(j))
dask.compute(*res, num_workers=params["num_ensemble_workers"])
else:
for j in range(params["n_ens_members"]):
worker(j)
return np.stack(precip_forecast_out), state

When dask is not used their is no concern since the workers will always be triggered in the same order. But when dask is used the order in which the workers are triggered is quite random. This will cause that during the _update the ensemble members might get shuffled.

I have proposed a fix in PR #347.

@RubenImhoff could you check if this solves your issues?

dnerini added a commit that referenced this issue Mar 5, 2024
* Bugfix: fix random placement of ensemble members in numpy array due to dask multi-threading (#337)

* Bugfix: make STEPS (blending) nowcast reproducable when the seed argument is given (#346)

* Bugfix: make STEPS (blending) nowcast reproducable, independent of number of workers (#346)

* Formatting with black

---------

Co-authored-by: ned <[email protected]>
@RubenImhoff
Copy link
Contributor Author

After some testing, I can confirm that #347 fixes the issue. :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

When branches are created from issues, their pull requests are automatically linked.

4 participants