diff --git a/.github/workflows/check_black.yml b/.github/workflows/check_black.yml index 51ad6f5de..ebfe07385 100644 --- a/.github/workflows/check_black.yml +++ b/.github/workflows/check_black.yml @@ -21,9 +21,9 @@ jobs: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - name: Set up Python 3.9 - uses: actions/setup-python@v4 + uses: actions/setup-python@v5 with: python-version: 3.9 - name: Install dependencies diff --git a/.github/workflows/python-publish.yml b/.github/workflows/python-publish.yml index 6a24d8bd3..f1f681fb4 100644 --- a/.github/workflows/python-publish.yml +++ b/.github/workflows/python-publish.yml @@ -15,9 +15,9 @@ jobs: id-token: write steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - name: Set up Python - uses: actions/setup-python@v4 + uses: actions/setup-python@v5 with: python-version: '3.x' - name: Install dependencies diff --git a/.github/workflows/test_pysteps.yml b/.github/workflows/test_pysteps.yml index 866fa8cf2..749dbc6ee 100644 --- a/.github/workflows/test_pysteps.yml +++ b/.github/workflows/test_pysteps.yml @@ -28,12 +28,25 @@ jobs: shell: bash -l {0} steps: - - uses: actions/checkout@v3 - - uses: actions/setup-python@v4 + - uses: actions/checkout@v4 + - uses: actions/setup-python@v5 with: python-version: ${{ matrix.python-version }} - - name: Install mamba and create environment + # need headless opencv on Linux, see https://github.com/conda-forge/opencv-feedstock/issues/401 + - name: Install mamba and create environment for Linux + if: matrix.os == 'ubuntu-latest' + uses: mamba-org/setup-micromamba@v1 + with: + environment-file: ci/ci_test_env.yml + environment-name: test_environment + generate-run-shell: false + create-args: >- + python=${{ matrix.python-version }} + libopencv=*=headless* + + - name: Install mamba and create environment (not Linux) + if: matrix.os != 'ubuntu-latest' uses: mamba-org/setup-micromamba@v1 with: environment-file: ci/ci_test_env.yml @@ -49,14 +62,14 @@ jobs: if: matrix.os == 'macos-latest' working-directory: ${{github.workspace}} env: - CC: gcc-9 - CXX: g++-9 - CXX1X: g++-9 + CC: gcc-13 + CXX: g++-13 + CXX1X: g++-13 HOMEBREW_NO_INSTALL_CLEANUP: 1 run: | brew update-reset brew update - gcc-9 --version || brew install gcc@9 + gcc-13 --version || brew install gcc@13 pip install . - name: Install pysteps @@ -87,7 +100,7 @@ jobs: - name: Upload coverage to Codecov (Linux only) if: matrix.os == 'ubuntu-latest' - uses: codecov/codecov-action@v3 + uses: codecov/codecov-action@v4 env: OS: ${{ matrix.os }} PYTHON: ${{ matrix.python-version }} diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index f2daa085d..1e915e89e 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,6 +1,6 @@ repos: - repo: https://github.com/psf/black - rev: 22.6.0 + rev: 24.3.0 hooks: - id: black language_version: python3 diff --git a/PKG-INFO b/PKG-INFO index d3e7f6150..2277570d0 100644 --- a/PKG-INFO +++ b/PKG-INFO @@ -1,6 +1,6 @@ Metadata-Version: 1.2 Name: pysteps -Version: 1.8.1 +Version: 1.10.0 Summary: Python framework for short-term ensemble prediction systems Home-page: http://pypi.python.org/pypi/pysteps/ License: LICENSE diff --git a/README.rst b/README.rst index 8023d34a9..40a9033e3 100644 --- a/README.rst +++ b/README.rst @@ -14,7 +14,7 @@ pysteps - Python framework for short-term ensemble prediction systems * - package - |github| |conda| |pypi| |zenodo| * - community - - |slack| |contributors| |downloads| |license| + - |contributors| |downloads| |license| .. |docs| image:: https://readthedocs.org/projects/pysteps/badge/?version=latest @@ -49,10 +49,6 @@ pysteps - Python framework for short-term ensemble prediction systems :alt: License :target: https://opensource.org/licenses/BSD-3-Clause -.. |slack| image:: https://pysteps-slackin.herokuapp.com/badge.svg - :alt: Slack invitation page - :target: https://pysteps-slackin.herokuapp.com/ - .. |contributors| image:: https://img.shields.io/github/contributors/pySTEPS/pysteps :alt: GitHub contributors :target: https://github.com/pySTEPS/pysteps/graphs/contributors @@ -129,11 +125,6 @@ For feedback, suggestions for developments, and bug reports please use the dedic For more information, please read our `contributors guidelines `_. -Get in touch -============ - -You can get in touch with the pysteps community on our `pysteps slack `_. -To get access to it, you need to ask for an invitation or you can use this `automatic invitation page `_. Reference publications ====================== diff --git a/doc/source/conf.py b/doc/source/conf.py index 8e60e99b7..90360feaf 100644 --- a/doc/source/conf.py +++ b/doc/source/conf.py @@ -85,7 +85,7 @@ def get_version(): # # This is also used if you do content translation via gettext catalogs. # Usually you set "language" from the command line for these cases. -language = None +language = "en" # List of patterns, relative to source directory, that match files and # directories to ignore when looking for source files. diff --git a/doc/source/references.bib b/doc/source/references.bib index 66b4b9720..b78c58009 100644 --- a/doc/source/references.bib +++ b/doc/source/references.bib @@ -51,6 +51,17 @@ @ARTICLE{CRS2004 DOI = "10.1017/S1350482704001239" } +@ARTICLE{DOnofrio2014, + TITLE = "Stochastic rainfall downscaling of climate models", + AUTHOR = "D'Onofrio, D and Palazzi, E and von Hardenberg, J and Provenzale, A and Calmanti, S", + JOURNAL = "J. Hydrometeorol.", + PUVLISHER = "American Meteorological Society", + VOLUME = 15, + NUMBER = 2, + PAGES = "830--843", + YEAR = 2014, +} + @ARTICLE{EWWM2013, AUTHOR = "E. Ebert and L. Wilson and A. Weigel and M. Mittermaier and P. Nurmi and P. Gill and M. Göber and S. Joslyn and B. Brown and T. Fowler and A. Watkins", TITLE = "Progress and challenges in forecast verification", @@ -306,6 +317,17 @@ @ARTICLE{SPN2013 DOI = "10.1002/wrcr.20536" } +@Article{Terzago2018, + AUTHOR = "Terzago, S. and Palazzi, E. and von Hardenberg, J.", + TITLE = "Stochastic downscaling of precipitation in complex orography: a simple method to reproduce a realistic fine-scale climatology", + JOURNAL = "Natural Hazards and Earth System Sciences", + VOLUME = 18, + YEAR = 2018, + NUMBER = 11, + PAGES = "2825--2840", + DOI = "10.5194/nhess-18-2825-2018" +} + @ARTICLE{TRT2004, AUTHOR = "A. M. Hering and C. Morel and G. Galli and P. Ambrosetti and M. Boscacci", TITLE = "Nowcasting thunderstorms in the Alpine Region using a radar based adaptive thresholding scheme", diff --git a/doc/source/user_guide/install_pysteps.rst b/doc/source/user_guide/install_pysteps.rst index 79365a3f9..8e8e43de6 100644 --- a/doc/source/user_guide/install_pysteps.rst +++ b/doc/source/user_guide/install_pysteps.rst @@ -148,12 +148,12 @@ To make sure that the installer uses the homebrew's gcc, export the following environmental variables in the terminal (supposing that gcc version 8 was installed):: - export CC=gcc-8 - export CXX=g++-8 + export CC=gcc-13 + export CXX=g++-13 First, check that the homebrew's gcc is detected:: - which gcc-8 + which gcc-13 This should point to the homebrew's gcc installation. @@ -162,8 +162,8 @@ gcc executables under /usr/local/bin. If that is the case, specify the CC and CCX variables using the full path to the homebrew installation. For example:: - export CC=/usr/local/Cellar/gcc/8.3.0/bin/gcc-8 - export CXX=/usr/local/Cellar/gcc/8.3.0/bin/g++-8 + export CC=/usr/local/Cellar/gcc/13.2.0/bin/gcc-13 + export CXX=/usr/local/Cellar/gcc/13.2.0/bin/g++-13 Then, you can continue with the normal installation procedure described next. diff --git a/examples/optical_flow_methods_convergence.py b/examples/optical_flow_methods_convergence.py index ae80d7df7..569c6709f 100644 --- a/examples/optical_flow_methods_convergence.py +++ b/examples/optical_flow_methods_convergence.py @@ -22,7 +22,7 @@ import matplotlib.pyplot as plt import numpy as np -from matplotlib.cm import get_cmap +from matplotlib.pyplot import get_cmap from scipy.ndimage import uniform_filter import pysteps as stp diff --git a/examples/rainfarm_downscale.py b/examples/rainfarm_downscale.py index d4095201d..a40a318e2 100644 --- a/examples/rainfarm_downscale.py +++ b/examples/rainfarm_downscale.py @@ -10,18 +10,40 @@ al. (2006). The method can represent the realistic small-scale variability of the downscaled precipitation field by means of Gaussian random fields. +Steps: + 1. Read the input precipitation data. + 2. Upscale the precipitation field. + 3. Downscale the field to its original resolution using RainFARM with defaults. + 4. Downscale with smoothing. + 5. Downscale with spectral fusion. + 6. Downscale with smoothing and spectral fusion. + +References: + + Rebora, N., L. Ferraris, J. von Hardenberg, and A. Provenzale, 2006: RainFARM: + Rainfall downscaling by a filtered autoregressive model. J. Hydrometeor., 7, + 724–738. + + D D'Onofrio, E Palazzi, J von Hardenberg, A Provenzale, and S Calmanti, 2014: + Stochastic rainfall downscaling of climate models. J. Hydrometeorol., 15(2):830–843. """ import matplotlib.pyplot as plt import numpy as np import os from pprint import pprint +import logging from pysteps import io, rcparams from pysteps.utils import aggregate_fields_space, square_domain, to_rainrate from pysteps.downscaling import rainfarm from pysteps.visualization import plot_precip_field +# Configure logging +logging.basicConfig( + level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" +) + ############################################################################### # Read the input data # ------------------- @@ -29,24 +51,30 @@ # As first step, we need to import the precipitation field that we are going # to use in this example. + +def read_precipitation_data(file_path): + """Read and process precipitation data from a file.""" + precip, _, metadata = io.import_mch_gif( + file_path, product="AQC", unit="mm", accutime=5.0 + ) + precip, metadata = to_rainrate(precip, metadata) + precip, metadata = square_domain(precip, metadata, "crop") + return precip, metadata + + # Import the example radar composite root_path = rcparams.data_sources["mch"]["root_path"] filename = os.path.join(root_path, "20160711", "AQC161932100V_00005.801.gif") -precip, _, metadata = io.import_mch_gif( - filename, product="AQC", unit="mm", accutime=5.0 -) - -# Convert to mm/h -precip, metadata = to_rainrate(precip, metadata) -# Reduce to a square domain -precip, metadata = square_domain(precip, metadata, "crop") +# Read and process data +precip, metadata = read_precipitation_data(filename) # Nicely print the metadata pprint(metadata) # Plot the original rainfall field plot_precip_field(precip, geodata=metadata) +plt.title("Original Rainfall Field") plt.show() # Assign the fill value to all the Nans @@ -61,60 +89,87 @@ # create a lower resolution field to apply our downscaling method. # We are going to use a factor of 16 x. + +def upscale_field(precip, metadata, scale_factor): + """Upscale the precipitation field by a given scale factor.""" + upscaled_resolution = metadata["xpixelsize"] * scale_factor + precip_lr, metadata_lr = aggregate_fields_space( + precip, metadata, upscaled_resolution + ) + return precip_lr, metadata_lr + + scale_factor = 16 -upscaled_resolution = ( - metadata["xpixelsize"] * scale_factor -) # upscaled resolution : 16 km -precip_lr, metadata_lr = aggregate_fields_space(precip, metadata, upscaled_resolution) +precip_lr, metadata_lr = upscale_field(precip, metadata, scale_factor) # Plot the upscaled rainfall field plt.figure() plot_precip_field(precip_lr, geodata=metadata_lr) +plt.title("Upscaled Rainfall Field") +plt.show() ############################################################################### # Downscale the field # ------------------- # -# We can now use RainFARM to generate stochastic realizations of the downscaled -# precipitation field. - -fig = plt.figure(figsize=(5, 8)) -# Set the number of stochastic realizations -num_realizations = 5 - -# Per realization, generate a stochastically downscaled precipitation field -# and plot it. -# The first time, the spectral slope alpha needs to be estimated. To illustrate -# the sensitivity of this parameter, we are going to plot some realizations with -# half or double the estimated slope. -alpha = None -for n in range(num_realizations): - # Spectral slope estimated from the upscaled field - precip_hr, alpha = rainfarm.downscale( - precip_lr, ds_factor=scale_factor, alpha=alpha, return_alpha=True - ) - plt.subplot(num_realizations, 3, n * 3 + 2) - plot_precip_field(precip_hr, geodata=metadata, axis="off", colorbar=False) - if n == 0: - plt.title(f"alpha={alpha:.1f}") - - # Half the estimated slope - precip_hr = rainfarm.downscale(precip_lr, ds_factor=scale_factor, alpha=alpha * 0.5) - plt.subplot(num_realizations, 3, n * 3 + 1) - plot_precip_field(precip_hr, geodata=metadata, axis="off", colorbar=False) - if n == 0: - plt.title(f"alpha={alpha * 0.5:.1f}") - - # Double the estimated slope - precip_hr = rainfarm.downscale(precip_lr, ds_factor=scale_factor, alpha=alpha * 2) - plt.subplot(num_realizations, 3, n * 3 + 3) - plot_precip_field(precip_hr, geodata=metadata, axis="off", colorbar=False) - if n == 0: - plt.title(f"alpha={alpha * 2:.1f}") - - plt.subplots_adjust(wspace=0, hspace=0) - -plt.tight_layout() +# We can now use RainFARM to downscale the precipitation field. + +# Basic downscaling +precip_hr = rainfarm.downscale(precip_lr, ds_factor=scale_factor) + +# Plot the downscaled rainfall field +plt.figure() +plot_precip_field(precip_hr, geodata=metadata) +plt.title("Downscaled Rainfall Field") +plt.show() + +############################################################################### +# Downscale with smoothing +# ------------------------ +# +# Add smoothing with a Gaussian kernel during the downscaling process. + +precip_hr_smooth = rainfarm.downscale( + precip_lr, ds_factor=scale_factor, kernel_type="gaussian" +) + +# Plot the downscaled rainfall field with smoothing +plt.figure() +plot_precip_field(precip_hr_smooth, geodata=metadata) +plt.title("Downscaled Rainfall Field with Gaussian Smoothing") +plt.show() + +############################################################################### +# Downscale with spectral fusion +# ------------------------------ +# +# Apply spectral merging as described in D'Onofrio et al. (2014). + +precip_hr_fusion = rainfarm.downscale( + precip_lr, ds_factor=scale_factor, spectral_fusion=True +) + +# Plot the downscaled rainfall field with spectral fusion +plt.figure() +plot_precip_field(precip_hr_fusion, geodata=metadata) +plt.title("Downscaled Rainfall Field with Spectral Fusion") +plt.show() + +############################################################################### +# Combined Downscale with smoothing and spectral fusion +# ----------------------------------------------------- +# +# Apply both smoothing with a Gaussian kernel and spectral fusion during the +# downscaling process to observe the combined effect. + +precip_hr_combined = rainfarm.downscale( + precip_lr, ds_factor=scale_factor, kernel_type="gaussian", spectral_fusion=True +) + +# Plot the downscaled rainfall field with smoothing and spectral fusion +plt.figure() +plot_precip_field(precip_hr_combined, geodata=metadata) +plt.title("Downscaled Rainfall Field with Gaussian Smoothing and Spectral Fusion") plt.show() ############################################################################### @@ -126,13 +181,4 @@ # the original algorithm from Rebora et al. (2006), it cannot downscale the temporal # dimension. - -############################################################################### -# References -# ---------- -# -# Rebora, N., L. Ferraris, J. von Hardenberg, and A. Provenzale, 2006: RainFARM: -# Rainfall downscaling by a filtered autoregressive model. J. Hydrometeor., 7, -# 724–738. - # sphinx_gallery_thumbnail_number = 2 diff --git a/pysteps/blending/steps.py b/pysteps/blending/steps.py index ca51c1017..21a0ffc05 100644 --- a/pysteps/blending/steps.py +++ b/pysteps/blending/steps.py @@ -8,13 +8,13 @@ consists of the following main steps: #. Set the radar rainfall fields in a Lagrangian space. - #. Initialize the noise method. #. Perform the cascade decomposition for the input radar rainfall fields. The method assumes that the cascade decomposition of the NWP model fields is already done prior to calling the function, as the NWP model fields are generally not updated with the same frequency (which is more efficient). A method to decompose and store the NWP model fields whenever a new NWP model field is present, is present in pysteps.blending.utils.decompose_NWP. + #. Initialize the noise method. #. Estimate AR parameters for the extrapolation nowcast and noise cascade. #. Initialize all the random generators. #. Calculate the initial skill of the NWP model forecasts at t=0. @@ -77,6 +77,7 @@ def forecast( n_cascade_levels=8, blend_nwp_members=False, precip_thr=None, + norain_thr=0.0, kmperpixel=None, extrap_method="semilagrangian", decomp_method="fft", @@ -161,6 +162,11 @@ def forecast( precip_thr: float, optional Specifies the threshold value for minimum observable precipitation intensity. Required if mask_method is not None or conditional is True. + norain_thr: float, optional + Specifies the threshold value for the fraction of rainy (see above) pixels + in the radar rainfall field below which we consider there to be no rain. + Depends on the amount of clutter typically present. + Standard set to 0.0 kmperpixel: float, optional Spatial resolution of the input data (kilometers/pixel). Required if vel_pert_method is not None or mask_method is 'incremental'. @@ -466,6 +472,7 @@ def forecast( if conditional or mask_method is not None: print(f"precip. intensity threshold: {precip_thr}") + print(f"no-rain fraction threshold for radar: {norain_thr}") print("") # 0.3 Get the methods that will be used @@ -507,33 +514,18 @@ def forecast( else: MASK_thr = None + # we need to know the zerovalue of precip to replace the mask when decomposing after extrapolation + zerovalue = np.nanmin(precip) + # 1. Start with the radar rainfall fields. We want the fields in a # Lagrangian space precip = _transform_to_lagrangian( precip, velocity, ar_order, xy_coords, extrapolator, extrap_kwargs, num_workers ) - # 2. Initialize the noise method - np.random.seed(seed) - pp, generate_noise, noise_std_coeffs = _init_noise( - precip, - precip_thr, - n_cascade_levels, - bp_filter, - decompositor, - fft, - noise_method, - noise_kwargs, - noise_stddev_adj, - measure_time, - num_workers, - seed, - ) - - # 3. Perform the cascade decomposition for the input precip fields and + # 2. Perform the cascade decomposition for the input precip fields and # and, if necessary, for the (NWP) model fields - - # 3.1 Compute the cascade decompositions of the input precipitation fields + # 2.1 Compute the cascade decompositions of the input precipitation fields ( precip_cascade, mu_extrapolation, @@ -550,7 +542,7 @@ def forecast( fft, ) - # 3.2 If necessary, decompose (NWP) model forecasts and stack cascades + # 2.2 If necessary, decompose (NWP) model forecasts and stack cascades ( precip_models_cascade, mu_models, @@ -560,336 +552,661 @@ def forecast( precip_models, bp_filter, decompositor, recompositor, fft, domain ) - # 4. Estimate AR parameters for the radar rainfall field - PHI = _estimate_ar_parameters_radar( - precip_cascade, ar_order, n_cascade_levels, MASK_thr + # 2.3 Check for zero input fields in the radar and NWP data. + zero_precip_radar = blending.utils.check_norain(precip, precip_thr, norain_thr) + # The norain fraction threshold used for nwp is the default value of 0.0, + # since nwp does not suffer from clutter. + zero_model_fields = blending.utils.check_norain( + precip_models_pm, precip_thr, norain_thr ) - # 5. Repeat precip_cascade for n ensemble members - # First, discard all except the p-1 last cascades because they are not needed - # for the AR(p) model - precip_cascade = [precip_cascade[i][-ar_order:] for i in range(n_cascade_levels)] - - precip_cascade = [ - [precip_cascade[j].copy() for j in range(n_cascade_levels)] - for i in range(n_ens_members) - ] - precip_cascade = np.stack(precip_cascade) - - # Also initialize the cascade of temporally correlated noise, which has the - # same shape as precip_cascade, but starts with value zero. - noise_cascade = np.zeros(precip_cascade.shape) - - # 6. Initialize all the random generators and prepare for the forecast loop - randgen_prec, vps, generate_vel_noise = _init_random_generators( - velocity, - noise_method, - vel_pert_method, - vp_par, - vp_perp, - seed, - n_ens_members, - kmperpixel, - timestep, - ) - D, D_Yn, D_pb, R_f, R_m, mask_rim, struct, fft_objs = _prepare_forecast_loop( - precip_cascade, - noise_method, - fft_method, - n_cascade_levels, - n_ens_members, - mask_method, - mask_kwargs, - timestep, - kmperpixel, - ) - - precip = precip[-1, :, :] + # 2.3.1 If precip is below the norain threshold and precip_models_pm is zero, + # we consider it as no rain in the domain. + # The forecast will directly return an array filled with the minimum + # value present in precip (which equals zero rainfall in the used + # transformation) + if zero_precip_radar and zero_model_fields: + print( + "No precipitation above the threshold found in both the radar and NWP fields" + ) + print("The resulting forecast will contain only zeros") + # Create the output list + R_f = [[] for j in range(n_ens_members)] - # 7. initizalize the current and previous extrapolation forecast scale - # for the nowcasting component - rho_extr_prev = np.repeat(1.0, PHI.shape[0]) - rho_extr = PHI[:, 0] / (1.0 - PHI[:, 1]) # phi1 / (1 - phi2), see BPS2004 + if isinstance(timesteps, int): + timesteps = range(timesteps + 1) + timestep_type = "int" + else: + original_timesteps = [0] + list(timesteps) + timesteps = nowcast_utils.binned_timesteps(original_timesteps) + timestep_type = "list" + + # Save per time step to ensure the array does not become too large if + # no return_output is requested and callback is not None. + for t, subtimestep_idx in enumerate(timesteps): + # If the timestep is not the first one, we need to provide the zero forecast + if t > 0: + # Create an empty np array with shape [n_ens_members, rows, cols] + # and fill it with the minimum value from precip (corresponding to + # zero precipitation) + R_f_ = np.full( + (n_ens_members, precip_shape[0], precip_shape[1]), np.nanmin(precip) + ) + if callback is not None: + if R_f_.shape[1] > 0: + callback(R_f_.squeeze()) + if return_output: + for j in range(n_ens_members): + R_f[j].append(R_f_[j]) - if measure_time: - init_time = time.time() - starttime_init + R_f_ = None - ### - # 8. Start the forecasting loop - ### - print("Starting blended nowcast computation.") + if measure_time: + zero_precip_time = time.time() - starttime_init - if measure_time: - starttime_mainloop = time.time() + if return_output: + outarr = np.stack([np.stack(R_f[j]) for j in range(n_ens_members)]) + if measure_time: + return outarr, zero_precip_time, zero_precip_time + else: + return outarr + else: + return None - if isinstance(timesteps, int): - timesteps = range(timesteps + 1) - timestep_type = "int" else: - original_timesteps = [0] + list(timesteps) - timesteps = nowcast_utils.binned_timesteps(original_timesteps) - timestep_type = "list" - - extrap_kwargs["return_displacement"] = True - forecast_prev = precip_cascade - noise_prev = noise_cascade - t_prev = [0.0 for j in range(n_ens_members)] - t_total = [0.0 for j in range(n_ens_members)] - - # iterate each time step - for t, subtimestep_idx in enumerate(timesteps): - if timestep_type == "list": - subtimesteps = [original_timesteps[t_] for t_ in subtimestep_idx] - else: - subtimesteps = [t] + # 2.3.3 If zero_precip_radar, make sure that precip_cascade does not contain + # only nans or infs. If so, fill it with the zero value. + if zero_precip_radar: + precip_cascade = np.nan_to_num( + precip_cascade, + copy=True, + nan=np.nanmin(precip_models_cascade), + posinf=np.nanmin(precip_models_cascade), + neginf=np.nanmin(precip_models_cascade), + ) - if (timestep_type == "list" and subtimesteps) or ( - timestep_type == "int" and t > 0 - ): - is_nowcast_time_step = True - else: - is_nowcast_time_step = False + # 2.3.4 Check if the NWP fields contain nans or infinite numbers. If so, + # fill these with the minimum value present in precip (corresponding to + # zero rainfall in the radar observations) + ( + precip_models_cascade, + precip_models_pm, + mu_models, + sigma_models, + ) = _fill_nans_infs_nwp_cascade( + precip_models_cascade, + precip_models_pm, + precip_cascade, + precip, + mu_models, + sigma_models, + ) - if is_nowcast_time_step: - print( - f"Computing nowcast for time step {t}... ", - end="", - flush=True, + # 2.3.5 If zero_precip_radar is True, only use the velocity field of the NWP + # forecast. I.e., velocity (radar) equals velocity_model at the first time + # step. + if zero_precip_radar: + # Use the velocity from velocity_models at time step 0 + velocity = velocity_models[:, 0, :, :, :] + # Take the average over the first axis, which corresponds to n_models + # (hence, the model average) + velocity = np.mean(velocity, axis=0) + + # 3. Initialize the noise method. + # If zero_precip_radar is True, initialize noise based on the NWP field time + # step where the fraction of rainy cells is highest (because other lead times + # might be zero as well). Else, initialize the noise with the radar + # rainfall data + if zero_precip_radar: + precip_noise_input = _determine_max_nr_rainy_cells_nwp( + precip_models_pm, precip_thr, precip_models_pm.shape[0], timesteps ) + # Make sure precip_noise_input is three dimensional + precip_noise_input = precip_noise_input[np.newaxis, :, :] + else: + precip_noise_input = precip.copy() - if measure_time: - starttime = time.time() + pp, generate_noise, noise_std_coeffs = _init_noise( + precip_noise_input, + precip_thr, + n_cascade_levels, + bp_filter, + decompositor, + fft, + noise_method, + noise_kwargs, + noise_stddev_adj, + measure_time, + num_workers, + seed, + ) + precip_noise_input = None - # 8.1.1 Before calling the worker for the forecast loop, determine which (NWP) - # models will be combined with which nowcast ensemble members. With the - # way it is implemented at this moment: n_ens_members of the output equals - # the maximum number of (ensemble) members in the input (either the nowcasts or NWP). - ( - precip_models_cascade_temp, - precip_models_pm_temp, - velocity_models_temp, - mu_models_temp, - sigma_models_temp, - n_model_indices, - ) = _find_nwp_combination( - precip_models_cascade[:, t, :, :, :], - precip_models_pm[:, t, :, :], - velocity_models[:, t, :, :, :], - mu_models[:, t, :], - sigma_models[:, t, :], - n_ens_members, + # 4. Estimate AR parameters for the radar rainfall field + PHI = _estimate_ar_parameters_radar( + precip_cascade, ar_order, n_cascade_levels, - blend_nwp_members, + MASK_thr, + zero_precip_radar, ) - if t == 0: - # 8.1.2 Calculate the initial skill of the (NWP) model forecasts at t=0 - rho_nwp_models = _compute_initial_nwp_skill( - precip_cascade, - precip_models_cascade_temp, - domain_mask, - issuetime, - outdir_path_skill, - clim_kwargs, - ) + # 5. Repeat precip_cascade for n ensemble members + # First, discard all except the p-1 last cascades because they are not needed + # for the AR(p) model + precip_cascade = [ + precip_cascade[i][-ar_order:] for i in range(n_cascade_levels) + ] + + precip_cascade = [ + [precip_cascade[j].copy() for j in range(n_cascade_levels)] + for i in range(n_ens_members) + ] + precip_cascade = np.stack(precip_cascade) + + # 6. Initialize all the random generators and prepare for the forecast loop + randgen_prec, vps, generate_vel_noise = _init_random_generators( + velocity, + noise_method, + vel_pert_method, + vp_par, + vp_perp, + seed, + n_ens_members, + kmperpixel, + timestep, + ) + D, D_Yn, D_pb, R_f, R_m, mask_rim, struct, fft_objs = _prepare_forecast_loop( + precip_cascade, + noise_method, + fft_method, + n_cascade_levels, + n_ens_members, + mask_method, + mask_kwargs, + timestep, + kmperpixel, + ) - if t > 0: - # 8.1.3 Determine the skill of the components for lead time (t0 + t) - # First for the extrapolation component. Only calculate it when t > 0. - ( - rho_extr, - rho_extr_prev, - ) = blending.skill_scores.lt_dependent_cor_extrapolation( - PHI=PHI, correlations=rho_extr, correlations_prev=rho_extr_prev - ) + # Also initialize the cascade of temporally correlated noise, which has the + # same shape as precip_cascade, but starts random noise. + noise_cascade, mu_noise, sigma_noise = _init_noise_cascade( + shape=precip_cascade.shape, + n_ens_members=n_ens_members, + n_cascade_levels=n_cascade_levels, + generate_noise=generate_noise, + decompositor=decompositor, + pp=pp, + randgen_prec=randgen_prec, + fft_objs=fft_objs, + bp_filter=bp_filter, + domain=domain, + noise_method=noise_method, + noise_std_coeffs=noise_std_coeffs, + ar_order=ar_order, + ) - # the nowcast iteration for each ensemble member - def worker(j): - # 8.1.2 Determine the skill of the nwp components for lead time (t0 + t) - # Then for the model components - if blend_nwp_members: - rho_nwp_fc = [ - blending.skill_scores.lt_dependent_cor_nwp( - lt=(t * int(timestep)), - correlations=rho_nwp_models[n_model], - outdir_path=outdir_path_skill, - n_model=n_model, - skill_kwargs=clim_kwargs, - ) - for n_model in range(rho_nwp_models.shape[0]) - ] - rho_nwp_fc = np.stack(rho_nwp_fc) - # Concatenate rho_extr and rho_nwp - rho_fc = np.concatenate((rho_extr[None, :], rho_nwp_fc), axis=0) + precip = precip[-1, :, :] + + # 7. initizalize the current and previous extrapolation forecast scale + # for the nowcasting component + rho_extr_prev = np.repeat(1.0, PHI.shape[0]) + rho_extr = PHI[:, 0] / (1.0 - PHI[:, 1]) # phi1 / (1 - phi2), see BPS2004 + + if measure_time: + init_time = time.time() - starttime_init + + ### + # 8. Start the forecasting loop + ### + print("Starting blended nowcast computation.") + + if measure_time: + starttime_mainloop = time.time() + + if isinstance(timesteps, int): + timesteps = range(timesteps + 1) + timestep_type = "int" + else: + original_timesteps = [0] + list(timesteps) + timesteps = nowcast_utils.binned_timesteps(original_timesteps) + timestep_type = "list" + + extrap_kwargs["return_displacement"] = True + forecast_prev = precip_cascade + noise_prev = noise_cascade + t_prev = [0.0 for j in range(n_ens_members)] + t_total = [0.0 for j in range(n_ens_members)] + + # iterate each time step + for t, subtimestep_idx in enumerate(timesteps): + if timestep_type == "list": + subtimesteps = [original_timesteps[t_] for t_ in subtimestep_idx] else: - rho_nwp_fc = blending.skill_scores.lt_dependent_cor_nwp( - lt=(t * int(timestep)), - correlations=rho_nwp_models[j], - outdir_path=outdir_path_skill, - n_model=n_model_indices[j], - skill_kwargs=clim_kwargs, - ) - # Concatenate rho_extr and rho_nwp - rho_fc = np.concatenate( - (rho_extr[None, :], rho_nwp_fc[None, :]), axis=0 - ) + subtimesteps = [t] - # 8.2 Determine the weights per component - - # Weights following the bps method. These are needed for the velocity - # weights prior to the advection step. If weights method spn is - # selected, weights will be overwritten with those weights prior to - # blending step. - # weight = [(extr_field, n_model_fields, noise), n_cascade_levels, ...] - weights = calculate_weights_bps(rho_fc) - - # The model only weights - if weights_method == "bps": - # Determine the weights of the components without the extrapolation - # cascade, in case this is no data or outside the mask. - weights_model_only = calculate_weights_bps(rho_fc[1:, :]) - elif weights_method == "spn": - # Only the weights of the components without the extrapolation - # cascade will be determined here. The full set of weights are - # determined after the extrapolation step in this method. - if blend_nwp_members and precip_models_cascade_temp.shape[0] > 1: - weights_model_only = np.zeros( - (precip_models_cascade_temp.shape[0] + 1, n_cascade_levels) - ) - for i in range(n_cascade_levels): - # Determine the normalized covariance matrix (containing) - # the cross-correlations between the models - cov = np.corrcoef( - np.stack( - [ - precip_models_cascade_temp[ - n_model, i, :, : - ].flatten() - for n_model in range( - precip_models_cascade_temp.shape[0] - ) - ] - ) - ) - # Determine the weights for this cascade level - weights_model_only[:, i] = calculate_weights_spn( - correlations=rho_fc[1:, i], cov=cov - ) - else: - # Same as correlation and noise is 1 - correlation - weights_model_only = calculate_weights_bps(rho_fc[1:, :]) + if (timestep_type == "list" and subtimesteps) or ( + timestep_type == "int" and t > 0 + ): + is_nowcast_time_step = True else: - raise ValueError( - "Unknown weights method %s: must be 'bps' or 'spn'" % weights_method + is_nowcast_time_step = False + + if is_nowcast_time_step: + print( + f"Computing nowcast for time step {t}... ", + end="", + flush=True, ) - # 8.3 Determine the noise cascade and regress this to the subsequent - # time step + regress the extrapolation component to the subsequent - # time step + if measure_time: + starttime = time.time() - # 8.3.1 Determine the epsilon, a cascade of temporally independent - # but spatially correlated noise - if noise_method is not None: - # generate noise field - EPS = generate_noise( - pp, randstate=randgen_prec[j], fft_method=fft_objs[j], domain=domain + # 8.1.1 Before calling the worker for the forecast loop, determine which (NWP) + # models will be combined with which nowcast ensemble members. With the + # way it is implemented at this moment: n_ens_members of the output equals + # the maximum number of (ensemble) members in the input (either the nowcasts or NWP). + ( + precip_models_cascade_temp, + precip_models_pm_temp, + velocity_models_temp, + mu_models_temp, + sigma_models_temp, + n_model_indices, + ) = _find_nwp_combination( + precip_models_cascade[:, t, :, :, :], + precip_models_pm[:, t, :, :], + velocity_models[:, t, :, :, :], + mu_models[:, t, :], + sigma_models[:, t, :], + n_ens_members, + ar_order, + n_cascade_levels, + blend_nwp_members, + ) + + # If zero_precip_radar is True, set the velocity field equal to the NWP + # velocity field for the current time step (velocity_models_temp). + if zero_precip_radar: + # Use the velocity from velocity_models and take the average over + # n_models (axis=0) + velocity = np.mean(velocity_models_temp, axis=0) + + if t == 0: + # 8.1.2 Calculate the initial skill of the (NWP) model forecasts at t=0 + rho_nwp_models = _compute_initial_nwp_skill( + precip_cascade, + precip_models_cascade_temp, + domain_mask, + issuetime, + outdir_path_skill, + clim_kwargs, ) - # decompose the noise field into a cascade - EPS = decompositor( - EPS, - bp_filter, - fft_method=fft_objs[j], - input_domain=domain, - output_domain=domain, - compute_stats=True, - normalize=True, - compact_output=True, + if t > 0: + # 8.1.3 Determine the skill of the components for lead time (t0 + t) + # First for the extrapolation component. Only calculate it when t > 0. + ( + rho_extr, + rho_extr_prev, + ) = blending.skill_scores.lt_dependent_cor_extrapolation( + PHI=PHI, correlations=rho_extr, correlations_prev=rho_extr_prev ) - else: - EPS = None - # 8.3.2 regress the extrapolation component to the subsequent time - # step - # iterate the AR(p) model for each cascade level - for i in range(n_cascade_levels): - # apply AR(p) process to extrapolation cascade level - if EPS is not None or vel_pert_method is not None: - precip_cascade[j][i] = autoregression.iterate_ar_model( - precip_cascade[j][i], PHI[i, :] - ) + # the nowcast iteration for each ensemble member + R_f_ = [None for _ in range(n_ens_members)] + def worker(j): + # 8.1.2 Determine the skill of the nwp components for lead time (t0 + t) + # Then for the model components + if blend_nwp_members: + rho_nwp_fc = [ + blending.skill_scores.lt_dependent_cor_nwp( + lt=(t * int(timestep)), + correlations=rho_nwp_models[n_model], + outdir_path=outdir_path_skill, + n_model=n_model, + skill_kwargs=clim_kwargs, + ) + for n_model in range(rho_nwp_models.shape[0]) + ] + rho_nwp_fc = np.stack(rho_nwp_fc) + # Concatenate rho_extr and rho_nwp + rho_fc = np.concatenate((rho_extr[None, :], rho_nwp_fc), axis=0) else: - # use the deterministic AR(p) model computed above if - # perturbations are disabled - precip_cascade[j][i] = R_m[i] + rho_nwp_fc = blending.skill_scores.lt_dependent_cor_nwp( + lt=(t * int(timestep)), + correlations=rho_nwp_models[j], + outdir_path=outdir_path_skill, + n_model=n_model_indices[j], + skill_kwargs=clim_kwargs, + ) + # Concatenate rho_extr and rho_nwp + rho_fc = np.concatenate( + (rho_extr[None, :], rho_nwp_fc[None, :]), axis=0 + ) - # 8.3.3 regress the noise component to the subsequent time step - # iterate the AR(p) model for each cascade level - for i in range(n_cascade_levels): - # normalize the noise cascade - if EPS is not None: - EPS_ = EPS["cascade_levels"][i] - EPS_ *= noise_std_coeffs[i] + # 8.2 Determine the weights per component + + # Weights following the bps method. These are needed for the velocity + # weights prior to the advection step. If weights method spn is + # selected, weights will be overwritten with those weights prior to + # blending step. + # weight = [(extr_field, n_model_fields, noise), n_cascade_levels, ...] + weights = calculate_weights_bps(rho_fc) + + # The model only weights + if weights_method == "bps": + # Determine the weights of the components without the extrapolation + # cascade, in case this is no data or outside the mask. + weights_model_only = calculate_weights_bps(rho_fc[1:, :]) + elif weights_method == "spn": + # Only the weights of the components without the extrapolation + # cascade will be determined here. The full set of weights are + # determined after the extrapolation step in this method. + if blend_nwp_members and precip_models_cascade_temp.shape[0] > 1: + weights_model_only = np.zeros( + (precip_models_cascade_temp.shape[0] + 1, n_cascade_levels) + ) + for i in range(n_cascade_levels): + # Determine the normalized covariance matrix (containing) + # the cross-correlations between the models + cov = np.corrcoef( + np.stack( + [ + precip_models_cascade_temp[ + n_model, i, :, : + ].flatten() + for n_model in range( + precip_models_cascade_temp.shape[0] + ) + ] + ) + ) + # Determine the weights for this cascade level + weights_model_only[:, i] = calculate_weights_spn( + correlations=rho_fc[1:, i], cov=cov + ) + else: + # Same as correlation and noise is 1 - correlation + weights_model_only = calculate_weights_bps(rho_fc[1:, :]) else: - EPS_ = None - # apply AR(p) process to noise cascade level - # (Returns zero noise if EPS is None) - noise_cascade[j][i] = autoregression.iterate_ar_model( - noise_cascade[j][i], PHI[i, :], eps=EPS_ - ) + raise ValueError( + "Unknown weights method %s: must be 'bps' or 'spn'" + % weights_method + ) - EPS = None - EPS_ = None + # 8.3 Determine the noise cascade and regress this to the subsequent + # time step + regress the extrapolation component to the subsequent + # time step + + # 8.3.1 Determine the epsilon, a cascade of temporally independent + # but spatially correlated noise + if noise_method is not None: + # generate noise field + EPS = generate_noise( + pp, + randstate=randgen_prec[j], + fft_method=fft_objs[j], + domain=domain, + ) + + # decompose the noise field into a cascade + EPS = decompositor( + EPS, + bp_filter, + fft_method=fft_objs[j], + input_domain=domain, + output_domain=domain, + compute_stats=True, + normalize=True, + compact_output=True, + ) + else: + EPS = None - # 8.4 Perturb and blend the advection fields + advect the - # extrapolation and noise cascade to the current time step - # (or subtimesteps if non-integer time steps are given) - - # Settings and initialize the output - extrap_kwargs_ = extrap_kwargs.copy() - extrap_kwargs_noise = extrap_kwargs.copy() - extrap_kwargs_pb = extrap_kwargs.copy() - velocity_pert = velocity - R_f_ep_out = [] - Yn_ep_out = [] - R_pm_ep = [] - - # Extrapolate per sub time step - for t_sub in subtimesteps: - if t_sub > 0: - t_diff_prev_int = t_sub - int(t_sub) - if t_diff_prev_int > 0.0: - R_f_ip = [ - (1.0 - t_diff_prev_int) * forecast_prev[j][i][-1, :] - + t_diff_prev_int * precip_cascade[j][i][-1, :] - for i in range(n_cascade_levels) - ] - Yn_ip = [ - (1.0 - t_diff_prev_int) * noise_prev[j][i][-1, :] - + t_diff_prev_int * noise_cascade[j][i][-1, :] - for i in range(n_cascade_levels) - ] + # 8.3.2 regress the extrapolation component to the subsequent time + # step + # iterate the AR(p) model for each cascade level + for i in range(n_cascade_levels): + # apply AR(p) process to extrapolation cascade level + if EPS is not None or vel_pert_method is not None: + precip_cascade[j][i] = autoregression.iterate_ar_model( + precip_cascade[j][i], PHI[i, :] + ) + # Renormalize the cascade + precip_cascade[j][i][1] /= np.std(precip_cascade[j][i][1]) + else: + # use the deterministic AR(p) model computed above if + # perturbations are disabled + precip_cascade[j][i] = R_m[i] + # 8.3.3 regress the noise component to the subsequent time step + # iterate the AR(p) model for each cascade level + for i in range(n_cascade_levels): + # normalize the noise cascade + if EPS is not None: + EPS_ = EPS["cascade_levels"][i] + EPS_ *= noise_std_coeffs[i] else: - R_f_ip = [ - forecast_prev[j][i][-1, :] for i in range(n_cascade_levels) - ] - Yn_ip = [ - noise_prev[j][i][-1, :] for i in range(n_cascade_levels) - ] + EPS_ = None + # apply AR(p) process to noise cascade level + # (Returns zero noise if EPS is None) + noise_cascade[j][i] = autoregression.iterate_ar_model( + noise_cascade[j][i], PHI[i, :], eps=EPS_ + ) + + EPS = None + EPS_ = None + + # 8.4 Perturb and blend the advection fields + advect the + # extrapolation and noise cascade to the current time step + # (or subtimesteps if non-integer time steps are given) + + # Settings and initialize the output + extrap_kwargs_ = extrap_kwargs.copy() + extrap_kwargs_noise = extrap_kwargs.copy() + extrap_kwargs_pb = extrap_kwargs.copy() + velocity_pert = velocity + R_f_ep_out = [] + Yn_ep_out = [] + R_pm_ep = [] + + # Extrapolate per sub time step + for t_sub in subtimesteps: + if t_sub > 0: + t_diff_prev_int = t_sub - int(t_sub) + if t_diff_prev_int > 0.0: + R_f_ip = [ + (1.0 - t_diff_prev_int) * forecast_prev[j][i][-1, :] + + t_diff_prev_int * precip_cascade[j][i][-1, :] + for i in range(n_cascade_levels) + ] + Yn_ip = [ + (1.0 - t_diff_prev_int) * noise_prev[j][i][-1, :] + + t_diff_prev_int * noise_cascade[j][i][-1, :] + for i in range(n_cascade_levels) + ] + + else: + R_f_ip = [ + forecast_prev[j][i][-1, :] + for i in range(n_cascade_levels) + ] + Yn_ip = [ + noise_prev[j][i][-1, :] for i in range(n_cascade_levels) + ] + + R_f_ip = np.stack(R_f_ip) + Yn_ip = np.stack(Yn_ip) + + t_diff_prev = t_sub - t_prev[j] + t_total[j] += t_diff_prev + + # compute the perturbed motion field - include the NWP + # velocities and the weights. Note that we only perturb + # the extrapolation velocity field, as the NWP velocity + # field is present per time step + if vel_pert_method is not None: + velocity_pert = velocity + generate_vel_noise( + vps[j], t_total[j] * timestep + ) - R_f_ip = np.stack(R_f_ip) - Yn_ip = np.stack(Yn_ip) + # Stack the perturbed extrapolation and the NWP velocities + if blend_nwp_members: + V_stack = np.concatenate( + ( + velocity_pert[None, :, :, :], + velocity_models_temp, + ), + axis=0, + ) + else: + V_model_ = velocity_models_temp[j] + V_stack = np.concatenate( + (velocity_pert[None, :, :, :], V_model_[None, :, :, :]), + axis=0, + ) + V_model_ = None + + # Obtain a blended optical flow, using the weights of the + # second cascade following eq. 24 in BPS2006 + velocity_blended = blending.utils.blend_optical_flows( + flows=V_stack, + weights=weights[ + :-1, 1 + ], # [(extr_field, n_model_fields), cascade_level=2] + ) - t_diff_prev = t_sub - t_prev[j] + # Extrapolate both cascades to the next time step + # First recompose the cascade, advect it and decompose it again + # This is needed to remove the interpolation artifacts. + # In addition, the number of extrapolations is greatly reduced + # A. Radar Rain + R_f_ip_recomp = blending.utils.recompose_cascade( + combined_cascade=R_f_ip, + combined_mean=mu_extrapolation, + combined_sigma=sigma_extrapolation, + ) + # Make sure we have values outside the mask + if zero_precip_radar: + R_f_ip_recomp = np.nan_to_num( + R_f_ip_recomp, + copy=True, + nan=zerovalue, + posinf=zerovalue, + neginf=zerovalue, + ) + # Put back the mask + R_f_ip_recomp[domain_mask] = np.nan + extrap_kwargs["displacement_prev"] = D[j] + R_f_ep_recomp_, D[j] = extrapolator( + R_f_ip_recomp, + velocity_blended, + [t_diff_prev], + allow_nonfinite_values=True, + **extrap_kwargs, + ) + R_f_ep_recomp = R_f_ep_recomp_[0].copy() + temp_mask = ~np.isfinite(R_f_ep_recomp) + # TODO WHERE DO CAN I FIND THIS -15.0 + R_f_ep_recomp[~np.isfinite(R_f_ep_recomp)] = zerovalue + R_f_ep = decompositor( + R_f_ep_recomp, + bp_filter, + mask=MASK_thr, + fft_method=fft, + output_domain=domain, + normalize=True, + compute_stats=True, + compact_output=True, + )["cascade_levels"] + # Make sure we have values outside the mask + if zero_precip_radar: + R_f_ep = np.nan_to_num( + R_f_ep, + copy=True, + nan=np.nanmin(R_f_ip), + posinf=np.nanmin(R_f_ip), + neginf=np.nanmin(R_f_ip), + ) + for i in range(n_cascade_levels): + R_f_ep[i][temp_mask] = np.nan + # B. Noise + Yn_ip_recomp = blending.utils.recompose_cascade( + combined_cascade=Yn_ip, + combined_mean=mu_noise[j], + combined_sigma=sigma_noise[j], + ) + extrap_kwargs_noise["displacement_prev"] = D_Yn[j] + extrap_kwargs_noise["map_coordinates_mode"] = "wrap" + Yn_ep_recomp_, D_Yn[j] = extrapolator( + Yn_ip_recomp, + velocity_blended, + [t_diff_prev], + allow_nonfinite_values=True, + **extrap_kwargs_noise, + ) + Yn_ep_recomp = Yn_ep_recomp_[0].copy() + Yn_ep = decompositor( + Yn_ep_recomp, + bp_filter, + mask=MASK_thr, + fft_method=fft, + output_domain=domain, + normalize=True, + compute_stats=True, + compact_output=True, + )["cascade_levels"] + for i in range(n_cascade_levels): + Yn_ep[i] *= noise_std_coeffs[i] + + # Append the results to the output lists + R_f_ep_out.append(R_f_ep.copy()) + Yn_ep_out.append(Yn_ep.copy()) + R_f_ip = None + R_f_ip_recomp = None + R_f_ep_recomp_ = None + R_f_ep_recomp = None + R_f_ep = None + Yn_ip = None + Yn_ip_recomp = None + Yn_ep_recomp_ = None + Yn_ep_recomp = None + Yn_ep = None + + # Finally, also extrapolate the initial radar rainfall + # field. This will be blended with the rainfall field(s) + # of the (NWP) model(s) for Lagrangian blended prob. matching + # min_R = np.min(precip) + extrap_kwargs_pb["displacement_prev"] = D_pb[j] + # Apply the domain mask to the extrapolation component + R_ = precip.copy() + R_[domain_mask] = np.nan + R_pm_ep_, D_pb[j] = extrapolator( + R_, + velocity_blended, + [t_diff_prev], + allow_nonfinite_values=True, + **extrap_kwargs_pb, + ) + R_pm_ep.append(R_pm_ep_[0]) + + t_prev[j] = t_sub + + if len(R_f_ep_out) > 0: + R_f_ep_out = np.stack(R_f_ep_out) + Yn_ep_out = np.stack(Yn_ep_out) + R_pm_ep = np.stack(R_pm_ep) + + # advect the forecast field by one time step if no subtimesteps in the + # current interval were found + if not subtimesteps: + t_diff_prev = t + 1 - t_prev[j] t_total[j] += t_diff_prev # compute the perturbed motion field - include the NWP - # velocities and the weights. Note that we only perturb - # the extrapolation velocity field, as the NWP velocity - # field is present per time step + # velocities and the weights if vel_pert_method is not None: velocity_pert = velocity + generate_vel_noise( vps[j], t_total[j] * timestep @@ -898,10 +1215,7 @@ def worker(j): # Stack the perturbed extrapolation and the NWP velocities if blend_nwp_members: V_stack = np.concatenate( - ( - velocity_pert[None, :, :, :], - velocity_models_temp, - ), + (velocity_pert[None, :, :, :], velocity_models_temp), axis=0, ) else: @@ -921,410 +1235,320 @@ def worker(j): ], # [(extr_field, n_model_fields), cascade_level=2] ) - # Extrapolate both cascades to the next time step - R_f_ep = np.zeros(R_f_ip.shape) - Yn_ep = np.zeros(Yn_ip.shape) + # Extrapolate the extrapolation and noise cascade - for i in range(n_cascade_levels): - extrap_kwargs_["displacement_prev"] = D[j][i] - extrap_kwargs_noise["displacement_prev"] = D_Yn[j][i] - extrap_kwargs_noise["map_coordinates_mode"] = "wrap" - - # First, extrapolate the extrapolation component - # Apply the domain mask to the extrapolation component - R_f_ip[i][domain_mask] = np.NaN - R_f_ep_, D[j][i] = extrapolator( - R_f_ip[i], - velocity_blended, - [t_diff_prev], - allow_nonfinite_values=True, - **extrap_kwargs_, - ) - R_f_ep[i] = R_f_ep_[0] + extrap_kwargs_["displacement_prev"] = D[j] + extrap_kwargs_noise["displacement_prev"] = D_Yn[j] + extrap_kwargs_noise["map_coordinates_mode"] = "wrap" - # Then, extrapolate the noise component - Yn_ep_, D_Yn[j][i] = extrapolator( - Yn_ip[i], - velocity_blended, - [t_diff_prev], - allow_nonfinite_values=True, - **extrap_kwargs_noise, - ) - Yn_ep[i] = Yn_ep_[0] - - # Append the results to the output lists - R_f_ep_out.append(R_f_ep) - Yn_ep_out.append(Yn_ep) - R_f_ep_ = None - Yn_ep_ = None - - # Finally, also extrapolate the initial radar rainfall - # field. This will be blended with the rainfall field(s) - # of the (NWP) model(s) for Lagrangian blended prob. matching - # min_R = np.min(precip) - extrap_kwargs_pb["displacement_prev"] = D_pb[j] - # Apply the domain mask to the extrapolation component - R_ = precip.copy() - R_[domain_mask] = np.NaN - R_pm_ep_, D_pb[j] = extrapolator( - R_, + _, D[j] = extrapolator( + None, velocity_blended, [t_diff_prev], allow_nonfinite_values=True, - **extrap_kwargs_pb, - ) - R_pm_ep.append(R_pm_ep_[0]) - - t_prev[j] = t_sub - - if len(R_f_ep_out) > 0: - R_f_ep_out = np.stack(R_f_ep_out) - Yn_ep_out = np.stack(Yn_ep_out) - R_pm_ep = np.stack(R_pm_ep) - - # advect the forecast field by one time step if no subtimesteps in the - # current interval were found - if not subtimesteps: - t_diff_prev = t + 1 - t_prev[j] - t_total[j] += t_diff_prev - - # compute the perturbed motion field - include the NWP - # velocities and the weights - if vel_pert_method is not None: - velocity_pert = velocity + generate_vel_noise( - vps[j], t_total[j] * timestep - ) - - # Stack the perturbed extrapolation and the NWP velocities - if blend_nwp_members: - V_stack = np.concatenate( - (velocity_pert[None, :, :, :], velocity_models_temp), - axis=0, - ) - else: - V_model_ = velocity_models_temp[j] - V_stack = np.concatenate( - (velocity_pert[None, :, :, :], V_model_[None, :, :, :]), axis=0 + **extrap_kwargs_, ) - V_model_ = None - - # Obtain a blended optical flow, using the weights of the - # second cascade following eq. 24 in BPS2006 - velocity_blended = blending.utils.blend_optical_flows( - flows=V_stack, - weights=weights[ - :-1, 1 - ], # [(extr_field, n_model_fields), cascade_level=2] - ) - # Extrapolate the extrapolation and noise cascade - for i in range(n_cascade_levels): - extrap_kwargs_["displacement_prev"] = D[j][i] - extrap_kwargs_noise["displacement_prev"] = D_Yn[j][i] - extrap_kwargs_noise["map_coordinates_mode"] = "wrap" - - _, D[j][i] = extrapolator( + _, D_Yn[j] = extrapolator( None, velocity_blended, [t_diff_prev], allow_nonfinite_values=True, - **extrap_kwargs_, + **extrap_kwargs_noise, ) - _, D_Yn[j][i] = extrapolator( + # Also extrapolate the radar observation, used for the probability + # matching and post-processing steps + extrap_kwargs_pb["displacement_prev"] = D_pb[j] + _, D_pb[j] = extrapolator( None, velocity_blended, [t_diff_prev], allow_nonfinite_values=True, - **extrap_kwargs_noise, + **extrap_kwargs_pb, ) - # Also extrapolate the radar observation, used for the probability - # matching and post-processing steps - extrap_kwargs_pb["displacement_prev"] = D_pb[j] - _, D_pb[j] = extrapolator( - None, - velocity_blended, - [t_diff_prev], - allow_nonfinite_values=True, - **extrap_kwargs_pb, - ) - - t_prev[j] = t + 1 + t_prev[j] = t + 1 + + forecast_prev[j] = precip_cascade[j] + noise_prev[j] = noise_cascade[j] + + # 8.5 Blend the cascades + R_f_out = [] + + for t_sub in subtimesteps: + # TODO: does it make sense to use sub time steps - check if it works? + if t_sub > 0: + t_index = np.where(np.array(subtimesteps) == t_sub)[0][0] + # First concatenate the cascades and the means and sigmas + # precip_models = [n_models,timesteps,n_cascade_levels,m,n] + if blend_nwp_members: + cascades_stacked = np.concatenate( + ( + R_f_ep_out[None, t_index], + precip_models_cascade_temp, + Yn_ep_out[None, t_index], + ), + axis=0, + ) # [(extr_field, n_model_fields, noise), n_cascade_levels, ...] + means_stacked = np.concatenate( + (mu_extrapolation[None, :], mu_models_temp), axis=0 + ) + sigmas_stacked = np.concatenate( + (sigma_extrapolation[None, :], sigma_models_temp), + axis=0, + ) + else: + cascades_stacked = np.concatenate( + ( + R_f_ep_out[None, t_index], + precip_models_cascade_temp[None, j], + Yn_ep_out[None, t_index], + ), + axis=0, + ) # [(extr_field, n_model_fields, noise), n_cascade_levels, ...] + means_stacked = np.concatenate( + (mu_extrapolation[None, :], mu_models_temp[None, j]), + axis=0, + ) + sigmas_stacked = np.concatenate( + ( + sigma_extrapolation[None, :], + sigma_models_temp[None, j], + ), + axis=0, + ) - forecast_prev[j] = precip_cascade[j] + # First determine the blending weights if method is spn. The + # weights for method bps have already been determined. + if weights_method == "spn": + weights = np.zeros( + (cascades_stacked.shape[0], n_cascade_levels) + ) + for i in range(n_cascade_levels): + # Determine the normalized covariance matrix (containing) + # the cross-correlations between the models + cascades_stacked_ = np.stack( + [ + cascades_stacked[n_model, i, :, :].flatten() + for n_model in range( + cascades_stacked.shape[0] - 1 + ) + ] + ) # -1 to exclude the noise component + cov = np.ma.corrcoef( + np.ma.masked_invalid(cascades_stacked_) + ) + # Determine the weights for this cascade level + weights[:, i] = calculate_weights_spn( + correlations=rho_fc[:, i], cov=cov + ) + + # Blend the extrapolation, (NWP) model(s) and noise cascades + R_f_blended = blending.utils.blend_cascades( + cascades_norm=cascades_stacked, weights=weights + ) - # 8.5 Blend the cascades - R_f_out = [] + # Also blend the cascade without the extrapolation component + R_f_blended_mod_only = blending.utils.blend_cascades( + cascades_norm=cascades_stacked[1:, :], + weights=weights_model_only, + ) - for t_sub in subtimesteps: - # TODO: does it make sense to use sub time steps - check if it works? - if t_sub > 0: - t_index = np.where(np.array(subtimesteps) == t_sub)[0][0] - # First concatenate the cascades and the means and sigmas - # precip_models = [n_models,timesteps,n_cascade_levels,m,n] - if blend_nwp_members: - cascades_stacked = np.concatenate( - ( - R_f_ep_out[None, t_index], - precip_models_cascade_temp, - Yn_ep_out[None, t_index], - ), - axis=0, - ) # [(extr_field, n_model_fields, noise), n_cascade_levels, ...] - means_stacked = np.concatenate( - (mu_extrapolation[None, :], mu_models_temp), axis=0 + # Blend the means and standard deviations + # Input is array of shape [number_components, scale_level, ...] + means_blended, sigmas_blended = blend_means_sigmas( + means=means_stacked, sigmas=sigmas_stacked, weights=weights ) - sigmas_stacked = np.concatenate( - (sigma_extrapolation[None, :], sigma_models_temp), - axis=0, + # Also blend the means and sigmas for the cascade without extrapolation + ( + means_blended_mod_only, + sigmas_blended_mod_only, + ) = blend_means_sigmas( + means=means_stacked[1:, :], + sigmas=sigmas_stacked[1:, :], + weights=weights_model_only, ) - else: - cascades_stacked = np.concatenate( - ( - R_f_ep_out[None, t_index], - precip_models_cascade_temp[None, j], - Yn_ep_out[None, t_index], - ), - axis=0, - ) # [(extr_field, n_model_fields, noise), n_cascade_levels, ...] - means_stacked = np.concatenate( - (mu_extrapolation[None, :], mu_models_temp[None, j]), axis=0 + + # 8.6 Recompose the cascade to a precipitation field + # (The function first normalizes the blended cascade, R_f_blended + # again) + R_f_new = blending.utils.recompose_cascade( + combined_cascade=R_f_blended, + combined_mean=means_blended, + combined_sigma=sigmas_blended, ) - sigmas_stacked = np.concatenate( - (sigma_extrapolation[None, :], sigma_models_temp[None, j]), - axis=0, + # The recomposed cascade without the extrapolation (for NaN filling + # outside the radar domain) + R_f_new_mod_only = blending.utils.recompose_cascade( + combined_cascade=R_f_blended_mod_only, + combined_mean=means_blended_mod_only, + combined_sigma=sigmas_blended_mod_only, ) - - # First determine the blending weights if method is spn. The - # weights for method bps have already been determined. - if weights_method == "spn": - weights = np.zeros( - (cascades_stacked.shape[0], n_cascade_levels) + if domain == "spectral": + # TODO: Check this! (Only tested with domain == 'spatial') + R_f_new = fft_objs[j].irfft2(R_f_new) + R_f_new_mod_only = fft_objs[j].irfft2(R_f_new_mod_only) + + # 8.7 Post-processing steps - use the mask and fill no data with + # the blended NWP forecast. Probability matching following + # Lagrangian blended probability matching which uses the + # latest extrapolated radar rainfall field blended with the + # nwp model(s) rainfall forecast fields as 'benchmark'. + + # TODO: Check probability matching method + # 8.7.1 first blend the extrapolated rainfall field (the field + # that is only used for post-processing steps) with the NWP + # rainfall forecast for this time step using the weights + # at scale level 2. + weights_pm = weights[:-1, 1] # Weights without noise, level 2 + weights_pm_normalized = weights_pm / np.sum(weights_pm) + # And the weights for outside the radar domain + weights_pm_mod_only = weights_model_only[ + :-1, 1 + ] # Weights without noise, level 2 + weights_pm_normalized_mod_only = weights_pm_mod_only / np.sum( + weights_pm_mod_only ) - for i in range(n_cascade_levels): - # Determine the normalized covariance matrix (containing) - # the cross-correlations between the models - cascades_stacked_ = np.stack( - [ - cascades_stacked[n_model, i, :, :].flatten() - for n_model in range(cascades_stacked.shape[0] - 1) - ] - ) # -1 to exclude the noise component - cov = np.ma.corrcoef( - np.ma.masked_invalid(cascades_stacked_) + # Stack the fields + if blend_nwp_members: + R_pm_stacked = np.concatenate( + ( + R_pm_ep[None, t_index], + precip_models_pm_temp, + ), + axis=0, ) - # Determine the weights for this cascade level - weights[:, i] = calculate_weights_spn( - correlations=rho_fc[:, i], cov=cov + else: + R_pm_stacked = np.concatenate( + ( + R_pm_ep[None, t_index], + precip_models_pm_temp[None, j], + ), + axis=0, ) - - # Blend the extrapolation, (NWP) model(s) and noise cascades - R_f_blended = blending.utils.blend_cascades( - cascades_norm=cascades_stacked, weights=weights - ) - - # Also blend the cascade without the extrapolation component - R_f_blended_mod_only = blending.utils.blend_cascades( - cascades_norm=cascades_stacked[1:, :], - weights=weights_model_only, - ) - - # Blend the means and standard deviations - # Input is array of shape [number_components, scale_level, ...] - means_blended, sigmas_blended = blend_means_sigmas( - means=means_stacked, sigmas=sigmas_stacked, weights=weights - ) - # Also blend the means and sigmas for the cascade without extrapolation - ( - means_blended_mod_only, - sigmas_blended_mod_only, - ) = blend_means_sigmas( - means=means_stacked[1:, :], - sigmas=sigmas_stacked[1:, :], - weights=weights_model_only, - ) - - # 8.6 Recompose the cascade to a precipitation field - # (The function first normalizes the blended cascade, R_f_blended - # again) - R_f_new = blending.utils.recompose_cascade( - combined_cascade=R_f_blended, - combined_mean=means_blended, - combined_sigma=sigmas_blended, - ) - # The recomposed cascade without the extrapolation (for NaN filling - # outside the radar domain) - R_f_new_mod_only = blending.utils.recompose_cascade( - combined_cascade=R_f_blended_mod_only, - combined_mean=means_blended_mod_only, - combined_sigma=sigmas_blended_mod_only, - ) - if domain == "spectral": - # TODO: Check this! (Only tested with domain == 'spatial') - R_f_new = fft_objs[j].irfft2(R_f_new) - R_f_new_mod_only = fft_objs[j].irfft2(R_f_new_mod_only) - - # 8.7 Post-processing steps - use the mask and fill no data with - # the blended NWP forecast. Probability matching following - # Lagrangian blended probability matching which uses the - # latest extrapolated radar rainfall field blended with the - # nwp model(s) rainfall forecast fields as 'benchmark'. - - # TODO: Check probability matching method - # 8.7.1 first blend the extrapolated rainfall field (the field - # that is only used for post-processing steps) with the NWP - # rainfall forecast for this time step using the weights - # at scale level 2. - weights_pm = weights[:-1, 1] # Weights without noise, level 2 - weights_pm_normalized = weights_pm / np.sum(weights_pm) - # And the weights for outside the radar domain - weights_pm_mod_only = weights_model_only[ - :-1, 1 - ] # Weights without noise, level 2 - weights_pm_normalized_mod_only = weights_pm_mod_only / np.sum( - weights_pm_mod_only - ) - # Stack the fields - if blend_nwp_members: - R_pm_stacked = np.concatenate( - ( - R_pm_ep[None, t_index], - precip_models_pm_temp, - ), - axis=0, - ) - else: - R_pm_stacked = np.concatenate( - ( - R_pm_ep[None, t_index], - precip_models_pm_temp[None, j], - ), - axis=0, - ) - # Blend it - R_pm_blended = np.sum( - weights_pm_normalized.reshape( - weights_pm_normalized.shape[0], 1, 1 - ) - * R_pm_stacked, - axis=0, - ) - if blend_nwp_members: - R_pm_blended_mod_only = np.sum( - weights_pm_normalized_mod_only.reshape( - weights_pm_normalized_mod_only.shape[0], 1, 1 + # Blend it + R_pm_blended = np.sum( + weights_pm_normalized.reshape( + weights_pm_normalized.shape[0], 1, 1 ) - * precip_models_pm_temp, + * R_pm_stacked, axis=0, ) - else: - R_pm_blended_mod_only = precip_models_pm_temp[j] - - # The extrapolation components are NaN outside the advected - # radar domain. This results in NaN values in the blended - # forecast outside the radar domain. Therefore, fill these - # areas with the "..._mod_only" blended forecasts, consisting - # of the NWP and noise components. - nan_indices = np.isnan(R_f_new) - R_f_new[nan_indices] = R_f_new_mod_only[nan_indices] - nan_indices = np.isnan(R_pm_blended) - R_pm_blended[nan_indices] = R_pm_blended_mod_only[nan_indices] - # Finally, fill the remaining nan values, if present, with - # the minimum value in the forecast - nan_indices = np.isnan(R_f_new) - R_f_new[nan_indices] = np.nanmin(R_f_new) - nan_indices = np.isnan(R_pm_blended) - R_pm_blended[nan_indices] = np.nanmin(R_pm_blended) - - # 8.7.2. Apply the masking and prob. matching - if mask_method is not None: - # apply the precipitation mask to prevent generation of new - # precipitation into areas where it was not originally - # observed - R_cmin = R_f_new.min() - if mask_method == "incremental": - # The incremental mask is slightly different from - # the implementation in the non-blended steps.py, as - # it is not based on the last forecast, but instead - # on R_pm_blended. Therefore, the buffer does not - # increase over time. - # Get the mask for this forecast - MASK_prec = R_pm_blended >= precip_thr - # Buffer the mask - MASK_prec = _compute_incremental_mask( - MASK_prec, struct, mask_rim + if blend_nwp_members: + R_pm_blended_mod_only = np.sum( + weights_pm_normalized_mod_only.reshape( + weights_pm_normalized_mod_only.shape[0], 1, 1 + ) + * precip_models_pm_temp, + axis=0, ) - # Get the final mask - R_f_new = R_cmin + (R_f_new - R_cmin) * MASK_prec - MASK_prec_ = R_f_new > R_cmin - elif mask_method == "obs": - # The mask equals the most recent benchmark - # rainfall field - MASK_prec_ = R_pm_blended >= precip_thr - - # Set to min value outside of mask - R_f_new[~MASK_prec_] = R_cmin - - if probmatching_method == "cdf": - # adjust the CDF of the forecast to match the most recent - # benchmark rainfall field (R_pm_blended) - R_f_new = probmatching.nonparam_match_empirical_cdf( - R_f_new, R_pm_blended - ) - elif probmatching_method == "mean": - # Use R_pm_blended as benchmark field and - mu_0 = np.mean(R_pm_blended[R_pm_blended >= precip_thr]) - MASK = R_f_new >= precip_thr - mu_fct = np.mean(R_f_new[MASK]) - R_f_new[MASK] = R_f_new[MASK] - mu_fct + mu_0 - - R_f_out.append(R_f_new) - - return R_f_out - - res = [] - for j in range(n_ens_members): - if not DASK_IMPORTED or n_ens_members == 1: - res.append(worker(j)) + else: + R_pm_blended_mod_only = precip_models_pm_temp[j] + + # The extrapolation components are NaN outside the advected + # radar domain. This results in NaN values in the blended + # forecast outside the radar domain. Therefore, fill these + # areas with the "..._mod_only" blended forecasts, consisting + # of the NWP and noise components. + nan_indices = np.isnan(R_f_new) + R_f_new[nan_indices] = R_f_new_mod_only[nan_indices] + nan_indices = np.isnan(R_pm_blended) + R_pm_blended[nan_indices] = R_pm_blended_mod_only[nan_indices] + # Finally, fill the remaining nan values, if present, with + # the minimum value in the forecast + nan_indices = np.isnan(R_f_new) + R_f_new[nan_indices] = np.nanmin(R_f_new) + nan_indices = np.isnan(R_pm_blended) + R_pm_blended[nan_indices] = np.nanmin(R_pm_blended) + + # 8.7.2. Apply the masking and prob. matching + if mask_method is not None: + # apply the precipitation mask to prevent generation of new + # precipitation into areas where it was not originally + # observed + R_cmin = R_f_new.min() + if mask_method == "incremental": + # The incremental mask is slightly different from + # the implementation in the non-blended steps.py, as + # it is not based on the last forecast, but instead + # on R_pm_blended. Therefore, the buffer does not + # increase over time. + # Get the mask for this forecast + MASK_prec = R_pm_blended >= precip_thr + # Buffer the mask + MASK_prec = _compute_incremental_mask( + MASK_prec, struct, mask_rim + ) + # Get the final mask + R_f_new = R_cmin + (R_f_new - R_cmin) * MASK_prec + MASK_prec_ = R_f_new > R_cmin + elif mask_method == "obs": + # The mask equals the most recent benchmark + # rainfall field + MASK_prec_ = R_pm_blended >= precip_thr + + # Set to min value outside of mask + R_f_new[~MASK_prec_] = R_cmin + + if probmatching_method == "cdf": + # Adjust the CDF of the forecast to match the most recent + # benchmark rainfall field (R_pm_blended). If the forecast + if np.any(np.isfinite(R_f_new)): + R_f_new = probmatching.nonparam_match_empirical_cdf( + R_f_new, R_pm_blended + ) + elif probmatching_method == "mean": + # Use R_pm_blended as benchmark field and + mu_0 = np.mean(R_pm_blended[R_pm_blended >= precip_thr]) + MASK = R_f_new >= precip_thr + mu_fct = np.mean(R_f_new[MASK]) + R_f_new[MASK] = R_f_new[MASK] - mu_fct + mu_0 + + R_f_out.append(R_f_new) + + R_f_[j] = R_f_out + + res = [] + + if DASK_IMPORTED and n_ens_members > 1: + for j in range(n_ens_members): + res.append(dask.delayed(worker)(j)) + dask.compute(*res, num_workers=num_ensemble_workers) else: - res.append(dask.delayed(worker)(j)) + for j in range(n_ens_members): + worker(j) - R_f_ = ( - dask.compute(*res, num_workers=num_ensemble_workers) - if DASK_IMPORTED and n_ens_members > 1 - else res - ) - res = None + res = None - if is_nowcast_time_step: - if measure_time: - print(f"{time.time() - starttime:.2f} seconds.") - else: - print("done.") + if is_nowcast_time_step: + if measure_time: + print(f"{time.time() - starttime:.2f} seconds.") + else: + print("done.") - if callback is not None: - R_f_stacked = np.stack(R_f_) - if R_f_stacked.shape[1] > 0: - callback(R_f_stacked.squeeze()) + if callback is not None: + R_f_stacked = np.stack(R_f_) + if R_f_stacked.shape[1] > 0: + callback(R_f_stacked.squeeze()) - if return_output: - for j in range(n_ens_members): - R_f[j].extend(R_f_[j]) + if return_output: + for j in range(n_ens_members): + R_f[j].extend(R_f_[j]) - R_f_ = None + R_f_ = None - if measure_time: - mainloop_time = time.time() - starttime_mainloop - - if return_output: - outarr = np.stack([np.stack(R_f[j]) for j in range(n_ens_members)]) if measure_time: - return outarr, init_time, mainloop_time + mainloop_time = time.time() - starttime_mainloop + + if return_output: + outarr = np.stack([np.stack(R_f[j]) for j in range(n_ens_members)]) + if measure_time: + return outarr, init_time, mainloop_time + else: + return outarr else: - return outarr - else: - return None + return None def calculate_ratios(correlations): @@ -1823,13 +2047,54 @@ def _compute_cascade_decomposition_nwp( return precip_models, mu_models, sigma_models, precip_models_pm -def _estimate_ar_parameters_radar(R_c, ar_order, n_cascade_levels, MASK_thr): +def _estimate_ar_parameters_radar( + R_c, ar_order, n_cascade_levels, MASK_thr, zero_precip_radar +): """Estimate AR parameters for the radar rainfall field.""" - # compute lag-l temporal autocorrelation coefficients for each cascade level + # If there are values in the radar fields, compute the autocorrelations GAMMA = np.empty((n_cascade_levels, ar_order)) - for i in range(n_cascade_levels): - GAMMA[i, :] = correlation.temporal_autocorrelation(R_c[i], mask=MASK_thr) + if not zero_precip_radar: + # compute lag-l temporal autocorrelation coefficients for each cascade level + for i in range(n_cascade_levels): + GAMMA[i, :] = correlation.temporal_autocorrelation(R_c[i], mask=MASK_thr) + + # Else, use standard values for the autocorrelations + else: + # Get the climatological lag-1 and lag-2 autocorrelation values from Table 2 + # in `BPS2004`. + # Hard coded, change to own (climatological) values when present. + GAMMA = np.array( + [ + [0.99805, 0.9925, 0.9776, 0.9297, 0.796, 0.482, 0.079, 0.0006], + [0.9933, 0.9752, 0.923, 0.750, 0.367, 0.069, 0.0018, 0.0014], + ] + ) + + # Check whether the number of cascade_levels is correct + if GAMMA.shape[1] > n_cascade_levels: + GAMMA = GAMMA[:, 0:n_cascade_levels] + elif GAMMA.shape[1] < n_cascade_levels: + # Get the number of cascade levels that is missing + n_extra_lev = n_cascade_levels - GAMMA.shape[1] + # Append the array with correlation values of 10e-4 + GAMMA = np.append( + GAMMA, + [np.repeat(0.0006, n_extra_lev), np.repeat(0.0014, n_extra_lev)], + axis=1, + ) + + # Finally base GAMMA.shape[0] on the AR-level + if ar_order == 1: + GAMMA = GAMMA[0, :] + if ar_order > 2: + for repeat_index in range(ar_order - 2): + GAMMA = np.vstack((GAMMA, GAMMA[1, :])) + # Finally, transpose GAMMA to ensure that the shape is the same as np.empty((n_cascade_levels, ar_order)) + GAMMA = GAMMA.transpose() + assert GAMMA.shape == (n_cascade_levels, ar_order) + + # Print the GAMMA value nowcast_utils.print_corrcoefs(GAMMA) if ar_order == 2: @@ -1988,8 +2253,8 @@ def _prepare_forecast_loop( ): """Prepare for the forecast loop.""" # Empty arrays for the previous displacements and the forecast cascade - D = np.stack([np.full(n_cascade_levels, None) for j in range(n_ens_members)]) - D_Yn = np.stack([np.full(n_cascade_levels, None) for j in range(n_ens_members)]) + D = np.stack([None for j in range(n_ens_members)]) + D_Yn = np.stack([None for j in range(n_ens_members)]) D_pb = np.stack([None for j in range(n_ens_members)]) R_f = [[] for j in range(n_ens_members)] @@ -2023,8 +2288,8 @@ def _compute_initial_nwp_skill( """Calculate the initial skill of the (NWP) model forecasts at t=0.""" rho_nwp_models = [ blending.skill_scores.spatial_correlation( - obs=R_c[0, :, -1, :, :], - mod=precip_models[n_model, :, :, :], + obs=R_c[0, :, -1, :, :].copy(), + mod=precip_models[n_model, :, :, :].copy(), domain_mask=domain_mask, ) for n_model in range(precip_models.shape[0]) @@ -2046,3 +2311,121 @@ def _compute_initial_nwp_skill( **clim_kwargs, ) return rho_nwp_models + + +def _init_noise_cascade( + shape, + n_ens_members, + n_cascade_levels, + generate_noise, + decompositor, + pp, + randgen_prec, + fft_objs, + bp_filter, + domain, + noise_method, + noise_std_coeffs, + ar_order, +): + """Initialize the noise cascade with identical noise for all AR(n) steps + We also need to return the mean and standard deviations of the noise + for the recombination of the noise before advecting it. + """ + noise_cascade = np.zeros(shape) + mu_noise = np.zeros((n_ens_members, n_cascade_levels)) + sigma_noise = np.zeros((n_ens_members, n_cascade_levels)) + if noise_method: + for j in range(n_ens_members): + EPS = generate_noise( + pp, randstate=randgen_prec[j], fft_method=fft_objs[j], domain=domain + ) + EPS = decompositor( + EPS, + bp_filter, + fft_method=fft_objs[j], + input_domain=domain, + output_domain=domain, + compute_stats=True, + normalize=True, + compact_output=True, + ) + mu_noise[j] = EPS["means"] + sigma_noise[j] = EPS["stds"] + for i in range(n_cascade_levels): + EPS_ = EPS["cascade_levels"][i] + EPS_ *= noise_std_coeffs[i] + for n in range(ar_order): + noise_cascade[j][i][n] = EPS_ + EPS = None + EPS_ = None + return noise_cascade, mu_noise, sigma_noise + + +def _fill_nans_infs_nwp_cascade( + precip_models_cascade, + precip_models_pm, + precip_cascade, + precip, + mu_models, + sigma_models, +): + """Ensure that the NWP cascade and fields do no contain any nans or infinite number""" + # Fill nans and infinite numbers with the minimum value present in precip + # (corresponding to zero rainfall in the radar observations) + precip_models_cascade = np.nan_to_num( + precip_models_cascade, + copy=True, + nan=np.nanmin(precip_cascade), + posinf=np.nanmin(precip_cascade), + neginf=np.nanmin(precip_cascade), + ) + precip_models_pm = np.nan_to_num( + precip_models_pm, + copy=True, + nan=np.nanmin(precip), + posinf=np.nanmin(precip), + neginf=np.nanmin(precip), + ) + # Also set any nans or infs in the mean and sigma of the cascade to + # respectively 0.0 and 1.0 + mu_models = np.nan_to_num( + mu_models, + copy=True, + nan=0.0, + posinf=0.0, + neginf=0.0, + ) + sigma_models = np.nan_to_num( + sigma_models, + copy=True, + nan=0.0, + posinf=0.0, + neginf=0.0, + ) + + return precip_models_cascade, precip_models_pm, mu_models, sigma_models + + +def _determine_max_nr_rainy_cells_nwp( + precip_models_pm, precip_thr, n_models, timesteps +): + """Initialize noise based on the NWP field time step where the fraction of rainy cells is highest""" + if precip_thr is None: + precip_thr = np.nanmin(precip_models_pm) + + max_rain_pixels = -1 + max_rain_pixels_j = -1 + max_rain_pixels_t = -1 + for j in range(n_models): + for t in range(timesteps): + rain_pixels = precip_models_pm[j][t][ + precip_models_pm[j][t] > precip_thr + ].size + if rain_pixels > max_rain_pixels: + max_rain_pixels = rain_pixels + max_rain_pixels_j = j + max_rain_pixels_t = t + precip_noise_input = precip_models_pm[max_rain_pixels_j][max_rain_pixels_t] + + return precip_noise_input diff --git a/pysteps/blending/utils.py b/pysteps/blending/utils.py index b1bebc579..6349f193f 100644 --- a/pysteps/blending/utils.py +++ b/pysteps/blending/utils.py @@ -15,6 +15,7 @@ decompose_NWP compute_store_nwp_motion load_NWP + check_norain """ import datetime @@ -490,6 +491,13 @@ def load_NWP(input_nc_path_decomp, input_path_velocities, start_time, n_timestep assert analysis_time + start_i * timestep == start_time end_i = start_i + n_timesteps + 1 + # Check if the requested end time (the forecast horizon) is in the stored data. + # If not, raise an error + if end_i > ncf_decomp.variables["pr_decomposed"].shape[0]: + raise IndexError( + "The requested forecast horizon is outside the stored NWP forecast horizon. Either request a shorter forecast horizon or store a longer NWP forecast horizon" + ) + # Add the valid times to the output decomp_dict["valid_times"] = valid_times[start_i:end_i] @@ -502,23 +510,50 @@ def load_NWP(input_nc_path_decomp, input_path_velocities, start_time, n_timestep for i in range(start_i, end_i): decomp_dict_ = decomp_dict.copy() + # Obtain the decomposed cascades for time step i cascade_levels = ncf_decomp.variables["pr_decomposed"][i, :, :, :] - - # In the netcdf file this is saved as a masked array, so we're checking if there is no mask - assert not cascade_levels.mask - + # Obtain the mean values means = ncf_decomp.variables["means"][i, :] - assert not means.mask - + # Obtain de standard deviations stds = ncf_decomp.variables["stds"][i, :] - assert not stds.mask # Save the values in the dictionary as normal arrays with the filled method - decomp_dict_["cascade_levels"] = np.ma.filled(cascade_levels) - decomp_dict_["means"] = np.ma.filled(means) - decomp_dict_["stds"] = np.ma.filled(stds) + decomp_dict_["cascade_levels"] = np.ma.filled(cascade_levels, fill_value=np.nan) + decomp_dict_["means"] = np.ma.filled(means, fill_value=np.nan) + decomp_dict_["stds"] = np.ma.filled(stds, fill_value=np.nan) # Append the output list R_d.append(decomp_dict_) + ncf_decomp.close() return R_d, uv + + +def check_norain(precip_arr, precip_thr=None, norain_thr=0.0): + """ + + Parameters + ---------- + precip_arr: array-like + Array containing the input precipitation field + precip_thr: float, optional + Specifies the threshold value for minimum observable precipitation intensity. If None, the + minimum value over the domain is taken. + norain_thr: float, optional + Specifies the threshold value for the fraction of rainy pixels in precip_arr below which we consider there to be + no rain. Standard set to 0.0 + Returns + ------- + norain: bool + Returns whether the fraction of rainy pixels is below the norain_thr threshold. + + """ + + if precip_thr is None: + precip_thr = np.nanmin(precip_arr) + rain_pixels = precip_arr[precip_arr > precip_thr] + norain = rain_pixels.size / precip_arr.size <= norain_thr + print( + f"Rain fraction is: {str(rain_pixels.size / precip_arr.size)}, while minimum fraction is {str(norain_thr)}" + ) + return norain diff --git a/pysteps/downscaling/rainfarm.py b/pysteps/downscaling/rainfarm.py index 4b9bb0bf7..b40e359df 100644 --- a/pysteps/downscaling/rainfarm.py +++ b/pysteps/downscaling/rainfarm.py @@ -4,7 +4,7 @@ ============================ Implementation of the RainFARM stochastic downscaling method as described in -:cite:`Rebora2006`. +:cite:`Rebora2006` and :cite:`DOnofrio2014`. RainFARM is a downscaling algorithm for rainfall fields developed by Rebora et al. (2006). The method can represent the realistic small-scale variability of the @@ -20,38 +20,211 @@ import warnings import numpy as np -from scipy.ndimage import convolve +from scipy.signal import convolve +from pysteps.utils.spectral import rapsd +from pysteps.utils.dimension import aggregate_fields + + +def _gaussianize(precip): + """ + Gaussianize field using rank ordering as in :cite:`DOnofrio2014`. + """ + m, n = np.shape(precip) + nn = m * n + ii = np.argsort(precip.reshape(nn)) + precip_gaussianize = np.zeros(nn) + precip_gaussianize[ii] = sorted(np.random.normal(0, 1, nn)) + precip_gaussianize = precip_gaussianize.reshape(m, n) + sd = np.std(precip_gaussianize) + if sd == 0: + sd = 1 + return precip_gaussianize / sd + + +def _compute_freq_array(array, ds_factor=1): + """ + Compute the frequency array following a given downscaling factor. + """ + freq_i = np.fft.fftfreq(array.shape[0] * ds_factor, d=1 / ds_factor) + freq_j = np.fft.fftfreq(array.shape[1] * ds_factor, d=1 / ds_factor) + freq_sqr = freq_i[:, None] ** 2 + freq_j[None, :] ** 2 + return np.sqrt(freq_sqr) def _log_slope(log_k, log_power_spectrum): + """ + Calculate the log-slope of the power spectrum given an array of logarithmic wavenumbers + and an array of logarithmic power spectrum values. + """ lk_min = log_k.min() lk_max = log_k.max() lk_range = lk_max - lk_min lk_min += (1 / 6) * lk_range lk_max -= (1 / 6) * lk_range - selected = (lk_min <= log_k) & (log_k <= lk_max) lk_sel = log_k[selected] ps_sel = log_power_spectrum[selected] alpha = np.polyfit(lk_sel, ps_sel, 1)[0] alpha = -alpha + return alpha + +def _estimate_alpha(array, k): + """ + Estimate the alpha parameter using the power spectrum of the input array. + """ + fp = np.fft.fft2(array) + fp_abs = abs(fp) + log_power_spectrum = np.log(fp_abs**2) + valid = (k != 0) & np.isfinite(log_power_spectrum) + alpha = _log_slope(np.log(k[valid]), log_power_spectrum[valid]) return alpha -def _balanced_spatial_average(x, k): - ones = np.ones_like(x) - return convolve(x, k) / convolve(ones, k) +def _compute_noise_field(freq_array_highres, alpha): + """ + Compute a field of correlated noise field using the given frequency array and alpha + value. + """ + white_noise_field = np.random.rand(*freq_array_highres.shape) + white_noise_field_complex = np.exp(complex(0, 1) * 2 * np.pi * white_noise_field) + with warnings.catch_warnings(): + warnings.simplefilter("ignore") + noise_field_complex = white_noise_field_complex * np.sqrt( + freq_array_highres**-alpha + ) + noise_field_complex[0, 0] = 0 + return np.fft.ifft2(noise_field_complex).real -def downscale(precip, ds_factor, alpha=None, threshold=None, return_alpha=False): +def _apply_spectral_fusion( + array_low, array_high, freq_array_low, freq_array_high, ds_factor +): """ - Downscale a rainfall field by increasing its spatial resolution by - a positive integer factor. + Apply spectral fusion to merge two arrays in the frequency domain. + """ + + # Validate inputs + if array_low.shape != freq_array_low.shape: + raise ValueError("Shape of array_low must match shape of freq_array_low.") + if array_high.shape != freq_array_high.shape: + raise ValueError("Shape of array_high must match shape of freq_array_high.") + + nax, _ = np.shape(array_low) + nx, _ = np.shape(array_high) + k0 = nax // 2 + + # Calculate power spectral density at specific frequency + def compute_psd(array, fft_size): + return rapsd(array, fft_method=np.fft)[k0 - 1] * fft_size**2 + + psd_low = compute_psd(array_low, nax) + psd_high = compute_psd(array_high, nx) + + # Normalize high-resolution array + normalization_factor = np.sqrt(psd_low / psd_high) + array_high *= normalization_factor + + # Perform FFT on both arrays + fft_low = np.fft.fft2(array_low) + fft_high = np.fft.fft2(array_high) + + # Initialize the merged FFT array with low-resolution data + fft_merged = np.zeros_like(fft_high, dtype=np.complex128) + fft_merged[0:k0, 0:k0] = fft_low[0:k0, 0:k0] + fft_merged[nx - k0 : nx, 0:k0] = fft_low[k0 : 2 * k0, 0:k0] + fft_merged[0:k0, nx - k0 : nx] = fft_low[0:k0, k0 : 2 * k0] + fft_merged[nx - k0 : nx, nx - k0 : nx] = fft_low[k0 : 2 * k0, k0 : 2 * k0] + + fft_merged[k0, 0] = np.conj(fft_merged[nx - k0, 0]) + fft_merged[0, k0] = np.conj(fft_merged[0, nx - k0]) + + # Compute frequency arrays + freq_i = np.fft.fftfreq(nx, d=1 / ds_factor) + freq_i = np.tile(freq_i, (nx, 1)) + freq_j = freq_i.T + + # Compute frequency domain adjustment + ddx = np.pi * (1 / nax - 1 / nx) / np.abs(freq_i[0, 1] - freq_i[0, 0]) + freq_squared_high = freq_array_high**2 + freq_squared_low_center = freq_array_low[k0, k0] ** 2 + + # Fuse in the frequency domain + mask_high = freq_squared_high > freq_squared_low_center + mask_low = ~mask_high + fft_merged = fft_high * mask_high + fft_merged * mask_low * np.exp( + -1j * ddx * freq_i - 1j * ddx * freq_j + ) + + # Inverse FFT to obtain the merged array in the spatial domain + merged = np.real(np.fft.ifftn(fft_merged)) / fft_merged.size + + return merged + + +def _compute_kernel_radius(ds_factor): + return int(round(ds_factor / np.sqrt(np.pi))) + + +def _make_tophat_kernel(ds_factor): + """Compute 2d uniform (tophat) kernel""" + radius = _compute_kernel_radius(ds_factor) + (mx, my) = np.mgrid[-radius : radius + 0.01, -radius : radius + 0.01] + tophat = ((mx**2 + my**2) <= radius**2).astype(float) + return tophat / tophat.sum() + + +def _make_gaussian_kernel(ds_factor): + """ + Compute 2d gaussian kernel + ref: https://github.com/scipy/scipy/blob/de80faf9d3480b9dbb9b888568b64499e0e70c19/scipy/ndimage/_filters.py#L179 + the smoothing sigma has width half a large pixel + """ + radius = _compute_kernel_radius(ds_factor) + sigma = ds_factor / 2 + sigma2 = sigma * sigma + x = np.arange(-radius, radius + 1) + kern1d = np.exp(-0.5 / sigma2 * x**2) + kern2d = np.outer(kern1d, kern1d) + return kern2d / kern2d.sum() + + +def _balanced_spatial_average(array, kernel): + """ + Compute the balanced spatial average of an array using a given kernel while handling + missing or invalid values. + """ + array = array.copy() + mask_valid = np.isfinite(array) + array[~mask_valid] = 0.0 + array_conv = convolve(array, kernel, mode="same") + array_conv /= convolve(mask_valid, kernel, mode="same") + array_conv[~mask_valid] = np.nan + return array_conv + + +_make_kernel = dict() +_make_kernel["gaussian"] = _make_gaussian_kernel +_make_kernel["tophat"] = _make_tophat_kernel +_make_kernel["uniform"] = _make_tophat_kernel + + +def downscale( + precip, + ds_factor, + alpha=None, + threshold=None, + return_alpha=False, + kernel_type=None, + spectral_fusion=False, +): + """ + Downscale a rainfall field by increasing its spatial resolution by a positive + integer factor. Parameters ---------- - precip: array-like + precip: array_like Array of shape (m, n) containing the input field. The input is expected to contain rain rate values. All values are required to be finite. @@ -65,10 +238,14 @@ def downscale(precip, ds_factor, alpha=None, threshold=None, return_alpha=False) Set all values lower than the threshold to zero. return_alpha: bool, optional Whether to return the estimated spectral slope ``alpha``. + kernel_type: {None, "gaussian", "uniform", "tophat"} + The name of the smoothing operator. If None no smoothing is applied. + spectral_fusion: bool, optional + Whether to apply spectral merging as in :cite:`DOnofrio2014`. Returns ------- - r: array-like + precip_highres: ndarray Array of shape (m * ds_factor, n * ds_factor) containing the downscaled field. alpha: float @@ -79,54 +256,74 @@ def downscale(precip, ds_factor, alpha=None, threshold=None, return_alpha=False) Currently, the pysteps implementation of RainFARM only covers spatial downscaling. That is, it can improve the spatial resolution of a rainfall field. However, unlike the original algorithm from Rebora et al. (2006), it cannot downscale the temporal - dimension. + dimension. It implements spectral merging from D'Onofrio et al. (2014). References ---------- :cite:`Rebora2006` + :cite:`DOnofrio2014` """ - ki = np.fft.fftfreq(precip.shape[0]) - kj = np.fft.fftfreq(precip.shape[1]) - k_sqr = ki[:, None] ** 2 + kj[None, :] ** 2 - k = np.sqrt(k_sqr) + # Validate inputs + if not np.isfinite(precip).all(): + raise ValueError("All values in 'precip' must be finite.") + if not isinstance(ds_factor, int) or ds_factor <= 0: + raise ValueError("'ds_factor' must be a positive integer.") + + # Preprocess the input field if spectral fusion is enabled + precip_transformed = _gaussianize(precip) if spectral_fusion else precip - ki_ds = np.fft.fftfreq(precip.shape[0] * ds_factor, d=1 / ds_factor) - kj_ds = np.fft.fftfreq(precip.shape[1] * ds_factor, d=1 / ds_factor) - k_ds_sqr = ki_ds[:, None] ** 2 + kj_ds[None, :] ** 2 - k_ds = np.sqrt(k_ds_sqr) + # Compute frequency arrays for the original and high-resolution fields + freq_array = _compute_freq_array(precip_transformed) + freq_array_highres = _compute_freq_array(precip_transformed, ds_factor) + # Estimate spectral slope alpha if not provided if alpha is None: - fp = np.fft.fft2(precip) - fp_abs = abs(fp) - log_power_spectrum = np.log(fp_abs**2) - valid = (k != 0) & np.isfinite(log_power_spectrum) - alpha = _log_slope(np.log(k[valid]), log_power_spectrum[valid]) + alpha = _estimate_alpha(precip_transformed, freq_array) - fg = np.exp(complex(0, 1) * 2 * np.pi * np.random.rand(*k_ds.shape)) - with warnings.catch_warnings(): - warnings.simplefilter("ignore") - fg *= np.sqrt(k_ds_sqr ** (-alpha / 2)) - fg[0, 0] = 0 - g = np.fft.ifft2(fg).real - g /= g.std() - r = np.exp(g) - - P_u = np.repeat(np.repeat(precip, ds_factor, axis=0), ds_factor, axis=1) - rad = int(round(ds_factor / np.sqrt(np.pi))) - (mx, my) = np.mgrid[-rad : rad + 0.01, -rad : rad + 0.01] - tophat = ((mx**2 + my**2) <= rad**2).astype(float) - tophat /= tophat.sum() - - P_agg = _balanced_spatial_average(P_u, tophat) - r_agg = _balanced_spatial_average(r, tophat) - r *= P_agg / r_agg + # Generate noise field + noise_field = _compute_noise_field(freq_array_highres, alpha) + + # Apply spectral fusion if enabled + if spectral_fusion: + noise_field /= noise_field.shape[0] ** 2 + noise_field = np.exp(noise_field) + noise_field = _apply_spectral_fusion( + precip_transformed, noise_field, freq_array, freq_array_highres, ds_factor + ) + + # Normalize and exponentiate the noise field + noise_field /= noise_field.std() + noise_field = np.exp(noise_field) + + # Aggregate the noise field to low resolution + noise_lowres = aggregate_fields(noise_field, ds_factor, axis=(0, 1)) + + # Expand input and noise fields to high resolution + precip_expanded = np.kron(precip, np.ones((ds_factor, ds_factor))) + noise_lowres_expanded = np.kron(noise_lowres, np.ones((ds_factor, ds_factor))) + + # Apply smoothing if a kernel type is provided + if kernel_type: + if kernel_type not in _make_kernel: + raise ValueError( + f"kernel type '{kernel_type}' is invalid, available kernels: {list(_make_kernel)}" + ) + kernel = _make_kernel[kernel_type](ds_factor) + precip_expanded = _balanced_spatial_average(precip_expanded, kernel) + noise_lowres_expanded = _balanced_spatial_average(noise_lowres_expanded, kernel) + + # Normalize the high-res precipitation field by the low-res noise field + norm_k0 = precip_expanded / noise_lowres_expanded + precip_highres = noise_field * norm_k0 + # Apply thresholding if specified if threshold is not None: - r[r < threshold] = 0 + precip_highres[precip_highres < threshold] = 0 + # Return the downscaled field and optionally the spectral slope alpha if return_alpha: - return r, alpha + return precip_highres, alpha - return r + return precip_highres diff --git a/pysteps/feature/shitomasi.py b/pysteps/feature/shitomasi.py index 7de1199f0..b4fc68d96 100644 --- a/pysteps/feature/shitomasi.py +++ b/pysteps/feature/shitomasi.py @@ -149,7 +149,7 @@ def detection( # convert to 8-bit input_image = np.ndarray.astype(input_image, "uint8") - mask = (-1 * mask + 1).astype("uint8") + mask = ~mask & 1 params = dict( maxCorners=max_num_features if max_num_features is not None else max_corners, diff --git a/pysteps/io/exporters.py b/pysteps/io/exporters.py index 06e1a1712..38b2159a7 100644 --- a/pysteps/io/exporters.py +++ b/pysteps/io/exporters.py @@ -371,7 +371,11 @@ def initialize_forecast_exporter_netcdf( shape, metadata, n_ens_members=1, + datatype=np.float32, incremental=None, + fill_value=None, + scale_factor=None, + offset=None, **kwargs, ): """ @@ -388,9 +392,10 @@ def initialize_forecast_exporter_netcdf( Start date of the forecast. timestep: int Time step of the forecast (minutes). - n_timesteps: int - Number of time steps in the forecast this argument is ignored if - incremental is set to 'timestep'. + n_timesteps: int or list of integers + Number of time steps to forecast or a list of time steps for which the + forecasts are computed (relative to the input time step). The elements of + the list are required to be in ascending order. shape: tuple of int Two-element tuple defining the shape (height,width) of the forecast grids. @@ -401,12 +406,35 @@ def initialize_forecast_exporter_netcdf( n_ens_members: int Number of ensemble members in the forecast. This argument is ignored if incremental is set to 'member'. + datatype: np.dtype, optional + The datatype of the output values. Defaults to np.float32. incremental: {None,'timestep','member'}, optional Allow incremental writing of datasets into the netCDF files.\n The available options are: 'timestep' = write a forecast or a forecast ensemble for a given time step; 'member' = write a forecast sequence for a given ensemble member. If set to None, incremental writing is disabled. + fill_value: int, optional + Fill_value for missing data. Defaults to None, which means that the + standard netCDF4 fill_value is used. + scale_factor: float, optional + The scale factor to scale the data as: store_value = scale_factor * + precipitation_value + offset. Defaults to None. The scale_factor + can be used to reduce data storage. + offset: float, optional + The offset to offset the data as: store_value = scale_factor * + precipitation_value + offset. Defaults to None. + + Other Parameters + ---------------- + institution: str + The instute, company or community that has created the nowcast. + Default: the pySTEPS community (https://pysteps.github.io) + references: str + Any references to be included in the netCDF file. Defaults to " ". + comment: str + Any comments about the data or storage protocol that should be + included in the netCDF file. Defaults to " ". Returns ------- @@ -433,8 +461,14 @@ def initialize_forecast_exporter_netcdf( + "'timestep' or 'member'" ) + n_timesteps_is_list = isinstance(n_timesteps, list) + if n_timesteps_is_list: + num_timesteps = len(n_timesteps) + else: + num_timesteps = n_timesteps + if incremental == "timestep": - n_timesteps = None + num_timesteps = None elif incremental == "member": n_ens_members = None elif incremental is not None: @@ -448,6 +482,13 @@ def initialize_forecast_exporter_netcdf( if n_ens_members > 1: n_ens_gt_one = True + # Kwargs to be used as description strings in the netCDF + institution = kwargs.get( + "institution", "the pySTEPS community (https://pysteps.github.io)" + ) + references = kwargs.get("references", "") + comment = kwargs.get("comment", "") + exporter = {} outfn = os.path.join(outpath, outfnprefix + ".nc") @@ -455,16 +496,16 @@ def initialize_forecast_exporter_netcdf( ncf.Conventions = "CF-1.7" ncf.title = "pysteps-generated nowcast" - ncf.institution = "the pySTEPS community (https://pysteps.github.io)" + ncf.institution = institution ncf.source = "pysteps" # TODO(exporters): Add pySTEPS version here ncf.history = "" - ncf.references = "" - ncf.comment = "" + ncf.references = references + ncf.comment = comment h, w = shape ncf.createDimension("ens_number", size=n_ens_members) - ncf.createDimension("time", size=n_timesteps) + ncf.createDimension("time", size=num_timesteps) ncf.createDimension("y", size=h) ncf.createDimension("x", size=w) @@ -551,7 +592,10 @@ def initialize_forecast_exporter_netcdf( var_time = ncf.createVariable("time", int, dimensions=("time",)) if incremental != "timestep": - var_time[:] = [i * timestep * 60 for i in range(1, n_timesteps + 1)] + if n_timesteps_is_list: + var_time[:] = np.array(n_timesteps) * timestep * 60 + else: + var_time[:] = [i * timestep * 60 for i in range(1, n_timesteps + 1)] var_time.long_name = "forecast time" startdate_str = datetime.strftime(startdate, "%Y-%m-%d %H:%M:%S") var_time.units = "seconds since %s" % startdate_str @@ -559,14 +603,22 @@ def initialize_forecast_exporter_netcdf( if incremental == "member" or n_ens_gt_one: var_f = ncf.createVariable( var_name, - np.float32, + datatype=datatype, dimensions=("ens_number", "time", "y", "x"), + compression="zlib", zlib=True, complevel=9, + fill_value=fill_value, ) else: var_f = ncf.createVariable( - var_name, np.float32, dimensions=("time", "y", "x"), zlib=True, complevel=9 + var_name, + datatype=datatype, + dimensions=("time", "y", "x"), + compression="zlib", + zlib=True, + complevel=9, + fill_value=fill_value, ) if var_standard_name is not None: @@ -576,6 +628,11 @@ def initialize_forecast_exporter_netcdf( var_f.units = var_unit if grid_mapping_var_name is not None: var_f.grid_mapping = grid_mapping_var_name + # Add gain and offset + if scale_factor is not None: + var_f.scale_factor = scale_factor + if offset is not None: + var_f.add_offset = offset exporter["method"] = "netcdf" exporter["ncfile"] = ncf @@ -588,7 +645,8 @@ def initialize_forecast_exporter_netcdf( exporter["timestep"] = timestep exporter["metadata"] = metadata exporter["incremental"] = incremental - exporter["num_timesteps"] = n_timesteps + exporter["num_timesteps"] = num_timesteps + exporter["timesteps"] = n_timesteps exporter["num_ens_members"] = n_ens_members exporter["shape"] = shape @@ -806,7 +864,12 @@ def _export_netcdf(field, exporter): else: var_f[var_f.shape[0], :, :] = field var_time = exporter["var_time"] - var_time[len(var_time) - 1] = len(var_time) * exporter["timestep"] * 60 + if isinstance(exporter["timesteps"], list): + var_time[len(var_time) - 1] = ( + exporter["timesteps"][len(var_time) - 1] * exporter["timestep"] * 60 + ) + else: + var_time[len(var_time) - 1] = len(var_time) * exporter["timestep"] * 60 else: var_f[var_f.shape[0], :, :, :] = field var_ens_num = exporter["var_ens_num"] diff --git a/pysteps/io/importers.py b/pysteps/io/importers.py index d7a9a850e..1493b6d4b 100644 --- a/pysteps/io/importers.py +++ b/pysteps/io/importers.py @@ -810,7 +810,7 @@ def import_knmi_hdf5(filename, qty="ACRR", accutime=5.0, pixelsize=1000.0, **kwa # the no data value. The precision of the data is two decimals (0.01 mm). if qty == "ACRR": precip = np.where( - precip_intermediate == 65535, np.NaN, precip_intermediate / 100.0 + precip_intermediate == 65535, np.nan, precip_intermediate / 100.0 ) # In case reflectivities are imported, the no data value is 255. Values are @@ -818,7 +818,7 @@ def import_knmi_hdf5(filename, qty="ACRR", accutime=5.0, pixelsize=1000.0, **kwa # as: dBZ = 0.5 * pixel_value - 32.0 (this used to be 31.5). if qty == "DBZH": precip = np.where( - precip_intermediate == 255, np.NaN, precip_intermediate * 0.5 - 32.0 + precip_intermediate == 255, np.nan, precip_intermediate * 0.5 - 32.0 ) if precip is None: @@ -1374,7 +1374,10 @@ def import_odim_hdf5(filename, qty="RATE", **kwargs): raise IOError("requested quantity %s not found" % qty) where = f["where"] - proj4str = where.attrs["projdef"].decode() + if isinstance(where.attrs["projdef"], str): + proj4str = where.attrs["projdef"] + else: + proj4str = where.attrs["projdef"].decode() pr = pyproj.Proj(proj4str) ll_lat = where.attrs["LL_lat"] diff --git a/pysteps/nowcasts/utils.py b/pysteps/nowcasts/utils.py index 09d7be3b9..7d7b8dc0d 100644 --- a/pysteps/nowcasts/utils.py +++ b/pysteps/nowcasts/utils.py @@ -103,6 +103,9 @@ def compute_percentile_mask(precip, pct): """Compute a precipitation mask, where True/False values are assigned for pixels above/below the given percentile. + .. _ndarray:\ + https://docs.scipy.org/doc/numpy/reference/generated/numpy.ndarray.html + Parameters ---------- precip: array_like diff --git a/pysteps/tests/test_blending_steps.py b/pysteps/tests/test_blending_steps.py index d3246657f..3f4a0580b 100644 --- a/pysteps/tests/test_blending_steps.py +++ b/pysteps/tests/test_blending_steps.py @@ -7,6 +7,32 @@ from pysteps import cascade, blending +steps_arg_values = [ + (1, 3, 4, 8, None, None, False, "spn", True, 4, False, False), + (1, 3, 4, 8, "obs", None, False, "spn", True, 4, False, False), + (1, 3, 4, 8, "incremental", None, False, "spn", True, 4, False, False), + (1, 3, 4, 8, None, "mean", False, "spn", True, 4, False, False), + (1, 3, 4, 8, None, "cdf", False, "spn", True, 4, False, False), + (1, 3, 4, 8, "incremental", "cdf", False, "spn", True, 4, False, False), + (1, 3, 4, 6, "incremental", "cdf", False, "bps", True, 4, False, False), + (1, 3, 4, 6, "incremental", "cdf", False, "bps", False, 4, False, False), + (1, 3, 4, 9, "incremental", "cdf", False, "spn", True, 4, False, False), + (2, 3, 10, 8, "incremental", "cdf", False, "spn", True, 10, False, False), + (5, 3, 5, 8, "incremental", "cdf", False, "spn", True, 5, False, False), + (1, 10, 1, 8, "incremental", "cdf", False, "spn", True, 1, False, False), + (2, 3, 2, 8, "incremental", "cdf", True, "spn", True, 2, False, False), + (1, 3, 6, 8, None, None, False, "spn", True, 6, False, False), + # Test the case where the radar image contains no rain. + (1, 3, 6, 8, None, None, False, "spn", True, 6, True, False), + (5, 3, 5, 6, "incremental", "cdf", False, "spn", False, 5, True, False), + # Test the case where the NWP fields contain no rain. + (1, 3, 6, 8, None, None, False, "spn", True, 6, False, True), + (5, 3, 5, 6, "incremental", "cdf", False, "spn", False, 5, False, True), + # Test the case where both the radar image and the NWP fields contain no rain. + (1, 3, 6, 8, None, None, False, "spn", True, 6, True, True), + (5, 3, 5, 6, "incremental", "cdf", False, "spn", False, 5, True, True), + (5, 3, 5, 6, "obs", "mean", True, "spn", True, 5, True, True), +] steps_arg_names = ( "n_models", "n_timesteps", @@ -18,24 +44,10 @@ "weights_method", "decomposed_nwp", "expected_n_ens_members", + "zero_radar", + "zero_nwp", ) -steps_arg_values = [ - (1, 3, 4, 8, None, None, False, "spn", True, 4), - (1, 3, 4, 8, "obs", None, False, "spn", True, 4), - (1, 3, 4, 8, "incremental", None, False, "spn", True, 4), - (1, 3, 4, 8, None, "mean", False, "spn", True, 4), - (1, 3, 4, 8, None, "cdf", False, "spn", True, 4), - (1, 3, 4, 8, "incremental", "cdf", False, "spn", True, 4), - (1, 3, 4, 6, "incremental", "cdf", False, "bps", True, 4), - (1, 3, 4, 6, "incremental", "cdf", False, "bps", False, 4), - (1, 3, 4, 9, "incremental", "cdf", False, "spn", True, 4), - (2, 3, 10, 8, "incremental", "cdf", False, "spn", True, 10), - (5, 3, 5, 8, "incremental", "cdf", False, "spn", True, 5), - (1, 10, 1, 8, "incremental", "cdf", False, "spn", True, 1), - (2, 3, 2, 8, "incremental", "cdf", True, "spn", True, 2), -] - @pytest.mark.parametrize(steps_arg_names, steps_arg_values) def test_steps_blending( @@ -49,6 +61,8 @@ def test_steps_blending( weights_method, decomposed_nwp, expected_n_ens_members, + zero_radar, + zero_nwp, ): pytest.importorskip("cv2") @@ -58,49 +72,51 @@ def test_steps_blending( # Initialise dummy NWP data nwp_precip = np.zeros((n_models, n_timesteps + 1, 200, 200)) - for n_model in range(n_models): - for i in range(nwp_precip.shape[1]): - nwp_precip[n_model, i, 30:185, 30 + 1 * (i + 1) * n_model] = 0.1 - nwp_precip[n_model, i, 30:185, 31 + 1 * (i + 1) * n_model] = 0.1 - nwp_precip[n_model, i, 30:185, 32 + 1 * (i + 1) * n_model] = 1.0 - nwp_precip[n_model, i, 30:185, 33 + 1 * (i + 1) * n_model] = 5.0 - nwp_precip[n_model, i, 30:185, 34 + 1 * (i + 1) * n_model] = 5.0 - nwp_precip[n_model, i, 30:185, 35 + 1 * (i + 1) * n_model] = 4.5 - nwp_precip[n_model, i, 30:185, 36 + 1 * (i + 1) * n_model] = 4.5 - nwp_precip[n_model, i, 30:185, 37 + 1 * (i + 1) * n_model] = 4.0 - nwp_precip[n_model, i, 30:185, 38 + 1 * (i + 1) * n_model] = 2.0 - nwp_precip[n_model, i, 30:185, 39 + 1 * (i + 1) * n_model] = 1.0 - nwp_precip[n_model, i, 30:185, 40 + 1 * (i + 1) * n_model] = 0.5 - nwp_precip[n_model, i, 30:185, 41 + 1 * (i + 1) * n_model] = 0.1 + if not zero_nwp: + for n_model in range(n_models): + for i in range(nwp_precip.shape[1]): + nwp_precip[n_model, i, 30:185, 30 + 1 * (i + 1) * n_model] = 0.1 + nwp_precip[n_model, i, 30:185, 31 + 1 * (i + 1) * n_model] = 0.1 + nwp_precip[n_model, i, 30:185, 32 + 1 * (i + 1) * n_model] = 1.0 + nwp_precip[n_model, i, 30:185, 33 + 1 * (i + 1) * n_model] = 5.0 + nwp_precip[n_model, i, 30:185, 34 + 1 * (i + 1) * n_model] = 5.0 + nwp_precip[n_model, i, 30:185, 35 + 1 * (i + 1) * n_model] = 4.5 + nwp_precip[n_model, i, 30:185, 36 + 1 * (i + 1) * n_model] = 4.5 + nwp_precip[n_model, i, 30:185, 37 + 1 * (i + 1) * n_model] = 4.0 + nwp_precip[n_model, i, 30:185, 38 + 1 * (i + 1) * n_model] = 2.0 + nwp_precip[n_model, i, 30:185, 39 + 1 * (i + 1) * n_model] = 1.0 + nwp_precip[n_model, i, 30:185, 40 + 1 * (i + 1) * n_model] = 0.5 + nwp_precip[n_model, i, 30:185, 41 + 1 * (i + 1) * n_model] = 0.1 # Define dummy nowcast input data radar_precip = np.zeros((3, 200, 200)) - for i in range(2): - radar_precip[i, 5:150, 30 + 1 * i] = 0.1 - radar_precip[i, 5:150, 31 + 1 * i] = 0.5 - radar_precip[i, 5:150, 32 + 1 * i] = 0.5 - radar_precip[i, 5:150, 33 + 1 * i] = 5.0 - radar_precip[i, 5:150, 34 + 1 * i] = 5.0 - radar_precip[i, 5:150, 35 + 1 * i] = 4.5 - radar_precip[i, 5:150, 36 + 1 * i] = 4.5 - radar_precip[i, 5:150, 37 + 1 * i] = 4.0 - radar_precip[i, 5:150, 38 + 1 * i] = 1.0 - radar_precip[i, 5:150, 39 + 1 * i] = 0.5 - radar_precip[i, 5:150, 40 + 1 * i] = 0.5 - radar_precip[i, 5:150, 41 + 1 * i] = 0.1 - radar_precip[2, 30:155, 30 + 1 * 2] = 0.1 - radar_precip[2, 30:155, 31 + 1 * 2] = 0.1 - radar_precip[2, 30:155, 32 + 1 * 2] = 1.0 - radar_precip[2, 30:155, 33 + 1 * 2] = 5.0 - radar_precip[2, 30:155, 34 + 1 * 2] = 5.0 - radar_precip[2, 30:155, 35 + 1 * 2] = 4.5 - radar_precip[2, 30:155, 36 + 1 * 2] = 4.5 - radar_precip[2, 30:155, 37 + 1 * 2] = 4.0 - radar_precip[2, 30:155, 38 + 1 * 2] = 2.0 - radar_precip[2, 30:155, 39 + 1 * 2] = 1.0 - radar_precip[2, 30:155, 40 + 1 * 3] = 0.5 - radar_precip[2, 30:155, 41 + 1 * 3] = 0.1 + if not zero_radar: + for i in range(2): + radar_precip[i, 5:150, 30 + 1 * i] = 0.1 + radar_precip[i, 5:150, 31 + 1 * i] = 0.5 + radar_precip[i, 5:150, 32 + 1 * i] = 0.5 + radar_precip[i, 5:150, 33 + 1 * i] = 5.0 + radar_precip[i, 5:150, 34 + 1 * i] = 5.0 + radar_precip[i, 5:150, 35 + 1 * i] = 4.5 + radar_precip[i, 5:150, 36 + 1 * i] = 4.5 + radar_precip[i, 5:150, 37 + 1 * i] = 4.0 + radar_precip[i, 5:150, 38 + 1 * i] = 1.0 + radar_precip[i, 5:150, 39 + 1 * i] = 0.5 + radar_precip[i, 5:150, 40 + 1 * i] = 0.5 + radar_precip[i, 5:150, 41 + 1 * i] = 0.1 + radar_precip[2, 30:155, 30 + 1 * 2] = 0.1 + radar_precip[2, 30:155, 31 + 1 * 2] = 0.1 + radar_precip[2, 30:155, 32 + 1 * 2] = 1.0 + radar_precip[2, 30:155, 33 + 1 * 2] = 5.0 + radar_precip[2, 30:155, 34 + 1 * 2] = 5.0 + radar_precip[2, 30:155, 35 + 1 * 2] = 4.5 + radar_precip[2, 30:155, 36 + 1 * 2] = 4.5 + radar_precip[2, 30:155, 37 + 1 * 2] = 4.0 + radar_precip[2, 30:155, 38 + 1 * 2] = 2.0 + radar_precip[2, 30:155, 39 + 1 * 2] = 1.0 + radar_precip[2, 30:155, 40 + 1 * 3] = 0.5 + radar_precip[2, 30:155, 41 + 1 * 3] = 0.1 metadata = dict() metadata["unit"] = "mm" diff --git a/pysteps/tests/test_blending_utils.py b/pysteps/tests/test_blending_utils.py index 40f138d9c..63fc1ffa5 100644 --- a/pysteps/tests/test_blending_utils.py +++ b/pysteps/tests/test_blending_utils.py @@ -15,6 +15,7 @@ decompose_NWP, compute_store_nwp_motion, load_NWP, + check_norain, ) pytest.importorskip("netCDF4") @@ -96,8 +97,8 @@ precip_nwp, nwp_metadata = converter(precip_nwp, nwp_metadata) # Threshold the data -precip_nwp[precip_nwp < 0.1] = 0.0 nwp_metadata["threshold"] = 0.1 +precip_nwp[precip_nwp < nwp_metadata["threshold"]] = 0.0 # Transform the data transformer = pysteps.utils.get_method("dB") @@ -378,3 +379,28 @@ def test_blending_utils( decimal=3, err_msg="Recomposed field of second forecast does not equal original field", ) + + precip_arr = precip_nwp + # rainy fraction is 0.005847 + assert not check_norain(precip_arr) + assert not check_norain(precip_arr, precip_thr=nwp_metadata["threshold"]) + assert not check_norain( + precip_arr, precip_thr=nwp_metadata["threshold"], norain_thr=0.005 + ) + assert not check_norain(precip_arr, norain_thr=0.005) + # so with norain_thr beyond this number it should report that there's no rain + assert check_norain(precip_arr, norain_thr=0.006) + assert check_norain( + precip_arr, precip_thr=nwp_metadata["threshold"], norain_thr=0.006 + ) + + # also if we set the precipitation threshold sufficiently high, it should report there's no rain + # rainy fraction > 4mm/h is 0.004385 + assert not check_norain(precip_arr, precip_thr=4.0, norain_thr=0.004) + assert check_norain(precip_arr, precip_thr=4.0, norain_thr=0.005) + + # no rain above 100mm/h so it should give norain + assert check_norain(precip_arr, precip_thr=100) + + # should always give norain if the threshold is set to 100% + assert check_norain(precip_arr, norain_thr=1.0) diff --git a/pysteps/tests/test_downscaling_rainfarm.py b/pysteps/tests/test_downscaling_rainfarm.py index ec2a3fbb8..884270d09 100644 --- a/pysteps/tests/test_downscaling_rainfarm.py +++ b/pysteps/tests/test_downscaling_rainfarm.py @@ -1,10 +1,10 @@ # -*- coding: utf-8 -*- import pytest - +import numpy as np from pysteps import downscaling from pysteps.tests.helpers import get_precipitation_fields -from pysteps.utils import aggregate_fields_space, square_domain +from pysteps.utils import aggregate_fields_space, square_domain, aggregate_fields @pytest.fixture(scope="module") @@ -17,12 +17,32 @@ def data(): return precip, metadata -rainfarm_arg_names = ("alpha", "ds_factor", "threshold", "return_alpha") -rainfarm_arg_values = [(1.0, 1, 0, False), (1, 2, 0, False), (1, 4, 0, False)] +rainfarm_arg_names = ( + "alpha", + "ds_factor", + "threshold", + "return_alpha", + "spectral_fusion", + "kernel_type", +) +rainfarm_arg_values = [ + (1.0, 1, 0, False, False, None), + (1, 2, 0, False, False, "gaussian"), + (1, 4, 0, False, False, "tophat"), + (1, 4, 0, False, True, "uniform"), +] @pytest.mark.parametrize(rainfarm_arg_names, rainfarm_arg_values) -def test_rainfarm_shape(data, alpha, ds_factor, threshold, return_alpha): +def test_rainfarm_shape( + data, + alpha, + ds_factor, + threshold, + return_alpha, + spectral_fusion, + kernel_type, +): """Test that the output of rainfarm is consistent with the downscaling factor.""" precip, metadata = data window = metadata["xpixelsize"] * ds_factor @@ -35,6 +55,8 @@ def test_rainfarm_shape(data, alpha, ds_factor, threshold, return_alpha): ds_factor=ds_factor, threshold=threshold, return_alpha=return_alpha, + spectral_fusion=spectral_fusion, + kernel_type=kernel_type, ) assert precip_hr.ndim == precip.ndim @@ -42,11 +64,58 @@ def test_rainfarm_shape(data, alpha, ds_factor, threshold, return_alpha): assert precip_hr.shape[1] == precip.shape[1] -rainfarm_arg_values = [(1.0, 2, 0, True), (None, 2, 0, True)] +rainfarm_arg_values = [ + (1.0, 1, 0, False, False, None), + (1, 2, 0, False, False, None), + (1, 4, 0, False, False, None), + (1, 4, 0, False, True, None), +] + + +@pytest.mark.parametrize(rainfarm_arg_names, rainfarm_arg_values) +def test_rainfarm_aggregate( + data, + alpha, + ds_factor, + threshold, + return_alpha, + spectral_fusion, + kernel_type, +): + """Test that the output of rainfarm is equal to original when aggregated.""" + precip, metadata = data + window = metadata["xpixelsize"] * ds_factor + precip_lr, __ = aggregate_fields_space(precip, metadata, window) + + rainfarm = downscaling.get_method("rainfarm") + precip_hr = rainfarm( + precip_lr, + alpha=alpha, + ds_factor=ds_factor, + threshold=threshold, + return_alpha=return_alpha, + spectral_fusion=spectral_fusion, + kernel_type=kernel_type, + ) + precip_low = aggregate_fields(precip_hr, ds_factor, axis=(0, 1)) + precip_lr[precip_lr < threshold] = 0.0 + + np.testing.assert_array_almost_equal(precip_lr, precip_low) + + +rainfarm_arg_values = [(1.0, 2, 0, True, False, None), (None, 2, 0, True, True, None)] @pytest.mark.parametrize(rainfarm_arg_names, rainfarm_arg_values) -def test_rainfarm_alpha(data, alpha, ds_factor, threshold, return_alpha): +def test_rainfarm_alpha( + data, + alpha, + ds_factor, + threshold, + return_alpha, + spectral_fusion, + kernel_type, +): """Test that rainfarm computes and returns alpha.""" precip, metadata = data window = metadata["xpixelsize"] * ds_factor @@ -59,6 +128,8 @@ def test_rainfarm_alpha(data, alpha, ds_factor, threshold, return_alpha): ds_factor=ds_factor, threshold=threshold, return_alpha=return_alpha, + spectral_fusion=spectral_fusion, + kernel_type=kernel_type, ) assert len(precip_hr) == 2 diff --git a/pysteps/tests/test_exporters.py b/pysteps/tests/test_exporters.py index fe0ac9462..f72c248bb 100644 --- a/pysteps/tests/test_exporters.py +++ b/pysteps/tests/test_exporters.py @@ -16,14 +16,24 @@ from pysteps.tests.helpers import get_precipitation_fields, get_invalid_mask # Test arguments -exporter_arg_names = ("n_ens_members", "incremental") +exporter_arg_names = ( + "n_ens_members", + "incremental", + "datatype", + "fill_value", + "scale_factor", + "offset", + "n_timesteps", +) exporter_arg_values = [ - (1, None), - (1, "timestep"), - (2, None), - (2, "timestep"), - (2, "member"), + (1, None, np.float32, None, None, None, 3), + (1, "timestep", np.float32, 65535, None, None, 3), + (2, None, np.float32, 65535, None, None, 3), + (2, None, np.float32, 65535, None, None, [1, 2, 4]), + (2, "timestep", np.float32, None, None, None, 3), + (2, "timestep", np.float32, None, None, None, [1, 2, 4]), + (2, "member", np.float64, None, 0.01, 1.0, 3), ] @@ -46,7 +56,9 @@ def test_get_geotiff_filename(): @pytest.mark.parametrize(exporter_arg_names, exporter_arg_values) -def test_io_export_netcdf_one_member_one_time_step(n_ens_members, incremental): +def test_io_export_netcdf_one_member_one_time_step( + n_ens_members, incremental, datatype, fill_value, scale_factor, offset, n_timesteps +): """ Test the export netcdf. Also, test that the exported file can be read by the importer. @@ -66,7 +78,6 @@ def test_io_export_netcdf_one_member_one_time_step(n_ens_members, incremental): file_path = os.path.join(outpath, outfnprefix + ".nc") startdate = metadata["timestamps"][0] timestep = metadata["accutime"] - n_timesteps = 3 shape = precip.shape[1:] exporter = initialize_forecast_exporter_netcdf( @@ -78,7 +89,11 @@ def test_io_export_netcdf_one_member_one_time_step(n_ens_members, incremental): shape, metadata, n_ens_members=n_ens_members, + datatype=datatype, incremental=incremental, + fill_value=fill_value, + scale_factor=scale_factor, + offset=offset, ) if n_ens_members > 1: @@ -87,7 +102,11 @@ def test_io_export_netcdf_one_member_one_time_step(n_ens_members, incremental): if incremental == None: export_forecast_dataset(precip, exporter) if incremental == "timestep": - for t in range(n_timesteps): + if isinstance(n_timesteps, list): + timesteps = len(n_timesteps) + else: + timesteps = n_timesteps + for t in range(timesteps): if n_ens_members > 1: export_forecast_dataset(precip[:, t, :, :], exporter) else: diff --git a/pysteps/tests/test_extrapolation_semilagrangian.py b/pysteps/tests/test_extrapolation_semilagrangian.py index feb4029b7..ef364fce1 100644 --- a/pysteps/tests/test_extrapolation_semilagrangian.py +++ b/pysteps/tests/test_extrapolation_semilagrangian.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- import numpy as np +import pytest from numpy.testing import assert_array_almost_equal from pysteps.extrapolation.semilagrangian import extrapolate @@ -23,6 +24,36 @@ def test_semilagrangian(): assert_array_almost_equal(result, expected) +def test_wrong_input_dimensions(): + p_1d = np.ones(8) + p_2d = np.ones((8, 8)) + p_3d = np.ones((8, 8, 2)) + v_2d = np.ones((8, 8)) + v_3d = np.stack([v_2d, v_2d]) + + num_timesteps = 1 + + invalid_inputs = [ + (p_1d, v_3d), + (p_2d, v_2d), + (p_3d, v_2d), + (p_3d, v_3d), + ] + for precip, velocity in invalid_inputs: + with pytest.raises(ValueError): + extrapolate(precip, velocity, num_timesteps) + + +def test_ascending_time_step(): + precip = np.ones((8, 8)) + v = np.ones((8, 8)) + velocity = np.stack([v, v]) + + not_ascending_timesteps = [1, 2, 3, 5, 4, 6, 7] + with pytest.raises(ValueError): + extrapolate(precip, velocity, not_ascending_timesteps) + + def test_semilagrangian_timesteps(): """Test semilagrangian extrapolation with list of timesteps.""" # inputs diff --git a/pysteps/tests/test_io_nowcast_importers.py b/pysteps/tests/test_io_nowcast_importers.py index eafe972ef..d99414f14 100644 --- a/pysteps/tests/test_io_nowcast_importers.py +++ b/pysteps/tests/test_io_nowcast_importers.py @@ -18,6 +18,9 @@ [(precip, metadata), (np.zeros_like(precip), metadata)], ) def test_import_netcdf(precip, metadata, tmp_path): + + pytest.importorskip("pyproj") + field_shape = (precip.shape[1], precip.shape[2]) startdate = metadata["timestamps"][-1] timestep = metadata["accutime"] diff --git a/pysteps/tests/test_nowcasts_linda.py b/pysteps/tests/test_nowcasts_linda.py index d631bac87..2d5f03b71 100644 --- a/pysteps/tests/test_nowcasts_linda.py +++ b/pysteps/tests/test_nowcasts_linda.py @@ -144,6 +144,9 @@ def test_linda_wrong_inputs(): def test_linda_callback(tmp_path): """Test LINDA callback functionality to export the output as a netcdf.""" + + pytest.importorskip("skimage") + n_ens_members = 2 n_timesteps = 3 diff --git a/pysteps/visualization/precipfields.py b/pysteps/visualization/precipfields.py index 1fff2c304..da231ffb4 100644 --- a/pysteps/visualization/precipfields.py +++ b/pysteps/visualization/precipfields.py @@ -16,7 +16,7 @@ import matplotlib.pylab as plt import numpy as np -from matplotlib import cm, colors +from matplotlib import pyplot, colors from pysteps.visualization.utils import get_geogrid, get_basemap_axis @@ -294,7 +294,7 @@ def get_colormap(ptype, units="mm/h", colorscale="pysteps"): clevs_str = [f"{clev:.1f}" for clev in clevs] return cmap, norm, clevs, clevs_str - return cm.get_cmap("jet"), colors.Normalize(), None, None + return pyplot.get_cmap("jet"), colors.Normalize(), None, None def _get_colorlist(units="mm/h", colorscale="pysteps"): diff --git a/setup.py b/setup.py index 489d4401f..58b23be59 100644 --- a/setup.py +++ b/setup.py @@ -70,7 +70,7 @@ setup( name="pysteps", - version="1.8.1", + version="1.10.0", author="PySteps developers", packages=find_packages(), license="LICENSE",