From 9f865814bdab1b5568827005ae1b563056c70bca Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=98yvind=20Eide?= Date: Thu, 21 Mar 2024 12:42:31 +0100 Subject: [PATCH] Remove fields from AHM_ANALYSIS Fields were only half implemented in the AHM_ANALYSIS workflow, and it was not possible to use the workflow with FIELDS in any real cases as it would run out of memory. We remove the option here, meaning that the workflow will only work for scalar parameters. --- .../workflows/ahm_analysis/ahmanalysis.py | 104 +----------------- .../ahm_analysis/test_ahm_analysis.py | 55 --------- .../ahm_analysis/test_integration.py | 32 ------ 3 files changed, 5 insertions(+), 186 deletions(-) diff --git a/src/semeio/workflows/ahm_analysis/ahmanalysis.py b/src/semeio/workflows/ahm_analysis/ahmanalysis.py index 1728942a..690d52d3 100644 --- a/src/semeio/workflows/ahm_analysis/ahmanalysis.py +++ b/src/semeio/workflows/ahm_analysis/ahmanalysis.py @@ -4,17 +4,13 @@ import tempfile import warnings from copy import deepcopy -from pathlib import Path import numpy as np import pandas as pd -import xtgeo from ert.analysis import ErtAnalysisError, SmootherSnapshot from ert.shared.plugins.plugin_manager import hook_implementation from ert.storage import open_storage from scipy.stats import ks_2samp -from sklearn.decomposition import PCA -from sklearn.preprocessing import StandardScaler from semeio._exceptions.exceptions import ValidationError from semeio.communication import SemeioScript @@ -157,6 +153,11 @@ def run( combinations = make_obs_groups(key_map) field_parameters = sorted(self.facade.get_field_parameters()) + if field_parameters: + logger.warning( + f"AHM_ANALYSIS will only evaluate scalar parameters, skipping: {field_parameters}" + ) + scalar_parameters = sorted(self.facade.get_gen_kw()) # identify the set of actual parameters that was updated for now just go # through scalar parameters but in future if easier access to field parameter @@ -170,7 +171,6 @@ def run( ) # loop over keys and calculate the KS matrix, # conditioning one parameter at the time. - field_output = {} updated_combinations = deepcopy(combinations) for group_name, obs_group in combinations.items(): print("Processing:", group_name) @@ -235,64 +235,8 @@ def run( target_ensemble.load_all_gen_kw_data(), ) ) - field_output[group_name] = _get_field_params( - self.facade, field_parameters, target_ensemble - ) kolmogorov_smirnov_data.set_index("Parameters", inplace=True) - # Calculate Ks matrix for Fields parameters - if field_parameters: - # Get grid characteristics to be able to plot field avg maps - grid_xyzcenter = load_grid_to_dataframe(self.facade.grid_file) - all_input_prior = _get_field_params( - self.facade, field_parameters, prior_ensemble - ) - - for fieldparam in field_parameters: - scaler = StandardScaler() - scaler.fit(all_input_prior[fieldparam]) - pca = PCA(0.98).fit( - pd.DataFrame(scaler.transform(all_input_prior[fieldparam])) - ) - pc_fieldprior_df = pd.DataFrame( - data=pca.transform(scaler.transform(all_input_prior[fieldparam])) - ) - all_kolmogorov_smirnov = pd.DataFrame( - pc_fieldprior_df.columns.tolist(), columns=["PCFieldParameters"] - ) - # Get the posterior Field parameters - map_calc_properties = ( - grid_xyzcenter[grid_xyzcenter["KZ"] == 1].copy().reset_index() - ) - for group_name in updated_combinations.keys(): - map_calc_properties["Mean_D_" + group_name] = calc_mean_delta_grid( - field_output[group_name][fieldparam], - all_input_prior[fieldparam], - grid_xyzcenter, - ) - - pc_fieldpost_df = pd.DataFrame( - data=pca.transform( - scaler.transform(field_output[group_name][fieldparam]) - ) - ) - all_kolmogorov_smirnov[group_name] = all_kolmogorov_smirnov[ - "PCFieldParameters" - ].map( - calc_kolmogorov_smirnov( - pc_fieldpost_df, - pc_fieldprior_df, - pc_fieldpost_df, - ) - ) - all_kolmogorov_smirnov.set_index("PCFieldParameters", inplace=True) - # add the field max Ks to the scalar Ks matrix - kolmogorov_smirnov_data.loc["FIELD_" + fieldparam] = ( - all_kolmogorov_smirnov.max() - ) - self.reporter.publish_csv( - "delta_field" + fieldparam, map_calc_properties - ) # save/export the Ks matrix, active_obs, misfitval and prior data self.reporter.publish_csv("ks", kolmogorov_smirnov_data) self.reporter.publish_csv("active_obs_info", active_obs) @@ -313,23 +257,6 @@ def _run_ministep(facade, prior_storage, target_storage, obs_group, data_paramet ) -def _get_field_params(facade, field_parameters, target_ensemble): - """ - Because the FIELD parameters are not exposed in the Python API we have to - export them to file and read them back again. When they are exposed in the API - this function should be updated. - """ - field_data = {} - for field_param in field_parameters: - dataset = target_ensemble.load_parameters( - field_param, list(range(facade.get_ensemble_size())) - ) - field_data[field_param] = dataset["values"].values.reshape( - facade.get_ensemble_size(), len(dataset.x) * len(dataset.y) * len(dataset.z) - ) - return field_data - - def make_update_log_df(update_log: SmootherSnapshot) -> pd.DataFrame: """Read update_log file to get active and inactive observations""" obs_key = [] @@ -418,27 +345,6 @@ def calc_observationsgroup_misfit(obs_keys, df_update_log, misfit_df): return mean.mean() -def load_grid_to_dataframe(grid_path): - """Get field grid characteristics/coordinates""" - grid_path = Path(grid_path).with_suffix("") - try: - grid = xtgeo.grid_from_file(grid_path, fformat="eclipserun") - return grid.get_dataframe(activeonly=False) - except OSError as err: - raise OSError("A grid with .EGRID format is expected.") from err - - -def calc_mean_delta_grid(all_input_post, all_input_prior, grid): - """calculate mean delta of field grid data""" - delta_post_prior = np.subtract(all_input_prior, all_input_post) - delta_post_prior = np.absolute(delta_post_prior) - # Can we avoid changing in place here? - grid["Mean_delta"] = np.mean(delta_post_prior, axis=0) - df_mean_delta = grid.groupby(["IX", "JY"])[["Mean_delta"]].mean().reset_index() - - return df_mean_delta["Mean_delta"] - - def _filter_on_prefix(list_of_strings, prefixes): """returns the set of strings that has a match for any of the given prefixes""" return { diff --git a/tests/workflows/ahm_analysis/test_ahm_analysis.py b/tests/workflows/ahm_analysis/test_ahm_analysis.py index 876f274c..819862db 100644 --- a/tests/workflows/ahm_analysis/test_ahm_analysis.py +++ b/tests/workflows/ahm_analysis/test_ahm_analysis.py @@ -1,5 +1,3 @@ -from itertools import product -from pathlib import Path from unittest.mock import MagicMock import numpy as np @@ -140,43 +138,6 @@ def test_warning_calc_observationsgroup_misfit(input_obs, expected_msg, update_l ahmanalysis.calc_observationsgroup_misfit(input_obs, update_log_df, misfit_df) -def test_calc_mean_delta_grid(): - # pylint: disable=invalid-name - """test function creates a dataframe reporting mean - delta grids for field parameters""" - all_input_post = [ - [3 + i for i in range(8)] - + [10 + i for i in range(8)] - + [20 + i for i in range(8)], - [5 + i for i in range(8)] - + [30 + i for i in range(8)] - + [2 + i for i in range(8)], - ] - all_input_prior = [ - [(2 + i) * 2 for i in range(8)] - + [(3 + i) * 2 for i in range(8)] - + [(5 + i) * 2 for i in range(8)], - [(6 + i) * 2 for i in range(8)] - + [(7 + i) * 2 for i in range(8)] - + [(8 + i) * 2 for i in range(8)], - ] - ix, jy, kz = [], [], [] - for x, y, z in product(range(3), range(4), range(2)): - ix.append(x) - jy.append(y) - kz.append(z) - mygrid_ok = pd.DataFrame({"IX": ix, "JY": jy, "KZ": kz}) - caseobs = "PORO" - mygrid_ok_short = pd.DataFrame({"IX": ix[::2], "JY": jy[::2], "KZ": kz[::2]}) - mygrid_ok_short["Mean_D_" + caseobs] = ahmanalysis.calc_mean_delta_grid( - all_input_post, all_input_prior, mygrid_ok - ) - assert mygrid_ok_short["Mean_D_PORO"].max() == 12 - assert mygrid_ok_short["Mean_D_PORO"].min() == 4.5 - assert mygrid_ok_short[mygrid_ok_short["IX"] == 1]["Mean_D_PORO"].mean() == 7.25 - assert mygrid_ok_short[mygrid_ok_short["JY"] == 1]["Mean_D_PORO"].mean() == 26 / 3 - - def test_calc_kolmogorov_smirnov(): """test function creates a dataframe reporting ks value between 0 and 1 for a prior and posterior distribution""" @@ -285,22 +246,6 @@ def test_make_obs_groups(input_map, expected_keys): assert result == expected_keys -@pytest.mark.usefixtures("setup_tmpdir") -@pytest.mark.parametrize( - "file_name", - ["SNAKE_OIL_FIELD.EGRID", "SNAKE_OIL_FIELD", "SNAKE_OIL_FIELD.EXT_NOT_IMPORTANT"], -) -def test_load_grid_to_dataframe(test_data_root, file_name): - """test function creates a dataframe - with the grid characteristics and check if fails if wrong format""" - grid_path = Path(test_data_root) / "snake_oil" / "grid" / file_name - grid = ahmanalysis.load_grid_to_dataframe(grid_path) - - assert grid["IX"].nunique() == 10 - assert grid["JY"].nunique() == 12 - assert grid["KZ"].nunique() == 5 - - @pytest.mark.parametrize( "prior_data, expected_result", [ diff --git a/tests/workflows/ahm_analysis/test_integration.py b/tests/workflows/ahm_analysis/test_integration.py index c20b5126..6cc140ec 100644 --- a/tests/workflows/ahm_analysis/test_integration.py +++ b/tests/workflows/ahm_analysis/test_integration.py @@ -86,38 +86,6 @@ def test_ahmanalysis_run_deactivated_obs(snake_oil_facade, snapshot, caplog): snapshot.assert_match(ks_df.iloc[:10].to_csv(), "ks_df") -def test_ahmanalysis_run_field(snake_oil_facade): - """test data_set with scalar and Field parameters""" - with open_storage(snake_oil_facade.enspath, "w") as storage: - snake_oil_facade.run_ertscript( - ahmanalysis.AhmAnalysisJob, - storage, - storage.get_ensemble_by_name("default"), - prior_name="default", - ) - # assert that this returns/generates the delta field parameter - obs_keys = ["WPR_DIFF_1", "FOPR", "WOPR:OP1"] - output_deltafield = os.path.join( - "storage", - "snake_oil", - "reports", - "snake_oil", - "default", - "AhmAnalysisJob", - "delta_fieldPERMX.csv", - ) - assert os.path.isfile(output_deltafield) - delta_df = pd.read_csv(output_deltafield, index_col=0) - assert len(delta_df.columns) == 8 + (len(obs_keys) * 2) + 1 - # check field parameter is present and not empty in the final KS matrix - output_ks = output_deltafield.replace("delta_fieldPERMX.csv", "ks.csv") - ks_df = pd.read_csv(output_ks, index_col=0) - assert not ks_df.empty - assert "FIELD_PERMX" in ks_df.index.tolist() - check_empty = ks_df.loc[["FIELD_PERMX"], :].isnull().all(axis=1) - assert not check_empty["FIELD_PERMX"] - - @pytest.mark.usefixtures("setup_tmpdir") def test_that_dataset_with_no_prior_will_fail(test_data_root): test_data_dir = os.path.join(test_data_root, "snake_oil")