diff --git a/src/semeio/workflows/ahm_analysis/ahmanalysis.py b/src/semeio/workflows/ahm_analysis/ahmanalysis.py index 1728942a..690d52d3 100644 --- a/src/semeio/workflows/ahm_analysis/ahmanalysis.py +++ b/src/semeio/workflows/ahm_analysis/ahmanalysis.py @@ -4,17 +4,13 @@ import tempfile import warnings from copy import deepcopy -from pathlib import Path import numpy as np import pandas as pd -import xtgeo from ert.analysis import ErtAnalysisError, SmootherSnapshot from ert.shared.plugins.plugin_manager import hook_implementation from ert.storage import open_storage from scipy.stats import ks_2samp -from sklearn.decomposition import PCA -from sklearn.preprocessing import StandardScaler from semeio._exceptions.exceptions import ValidationError from semeio.communication import SemeioScript @@ -157,6 +153,11 @@ def run( combinations = make_obs_groups(key_map) field_parameters = sorted(self.facade.get_field_parameters()) + if field_parameters: + logger.warning( + f"AHM_ANALYSIS will only evaluate scalar parameters, skipping: {field_parameters}" + ) + scalar_parameters = sorted(self.facade.get_gen_kw()) # identify the set of actual parameters that was updated for now just go # through scalar parameters but in future if easier access to field parameter @@ -170,7 +171,6 @@ def run( ) # loop over keys and calculate the KS matrix, # conditioning one parameter at the time. - field_output = {} updated_combinations = deepcopy(combinations) for group_name, obs_group in combinations.items(): print("Processing:", group_name) @@ -235,64 +235,8 @@ def run( target_ensemble.load_all_gen_kw_data(), ) ) - field_output[group_name] = _get_field_params( - self.facade, field_parameters, target_ensemble - ) kolmogorov_smirnov_data.set_index("Parameters", inplace=True) - # Calculate Ks matrix for Fields parameters - if field_parameters: - # Get grid characteristics to be able to plot field avg maps - grid_xyzcenter = load_grid_to_dataframe(self.facade.grid_file) - all_input_prior = _get_field_params( - self.facade, field_parameters, prior_ensemble - ) - - for fieldparam in field_parameters: - scaler = StandardScaler() - scaler.fit(all_input_prior[fieldparam]) - pca = PCA(0.98).fit( - pd.DataFrame(scaler.transform(all_input_prior[fieldparam])) - ) - pc_fieldprior_df = pd.DataFrame( - data=pca.transform(scaler.transform(all_input_prior[fieldparam])) - ) - all_kolmogorov_smirnov = pd.DataFrame( - pc_fieldprior_df.columns.tolist(), columns=["PCFieldParameters"] - ) - # Get the posterior Field parameters - map_calc_properties = ( - grid_xyzcenter[grid_xyzcenter["KZ"] == 1].copy().reset_index() - ) - for group_name in updated_combinations.keys(): - map_calc_properties["Mean_D_" + group_name] = calc_mean_delta_grid( - field_output[group_name][fieldparam], - all_input_prior[fieldparam], - grid_xyzcenter, - ) - - pc_fieldpost_df = pd.DataFrame( - data=pca.transform( - scaler.transform(field_output[group_name][fieldparam]) - ) - ) - all_kolmogorov_smirnov[group_name] = all_kolmogorov_smirnov[ - "PCFieldParameters" - ].map( - calc_kolmogorov_smirnov( - pc_fieldpost_df, - pc_fieldprior_df, - pc_fieldpost_df, - ) - ) - all_kolmogorov_smirnov.set_index("PCFieldParameters", inplace=True) - # add the field max Ks to the scalar Ks matrix - kolmogorov_smirnov_data.loc["FIELD_" + fieldparam] = ( - all_kolmogorov_smirnov.max() - ) - self.reporter.publish_csv( - "delta_field" + fieldparam, map_calc_properties - ) # save/export the Ks matrix, active_obs, misfitval and prior data self.reporter.publish_csv("ks", kolmogorov_smirnov_data) self.reporter.publish_csv("active_obs_info", active_obs) @@ -313,23 +257,6 @@ def _run_ministep(facade, prior_storage, target_storage, obs_group, data_paramet ) -def _get_field_params(facade, field_parameters, target_ensemble): - """ - Because the FIELD parameters are not exposed in the Python API we have to - export them to file and read them back again. When they are exposed in the API - this function should be updated. - """ - field_data = {} - for field_param in field_parameters: - dataset = target_ensemble.load_parameters( - field_param, list(range(facade.get_ensemble_size())) - ) - field_data[field_param] = dataset["values"].values.reshape( - facade.get_ensemble_size(), len(dataset.x) * len(dataset.y) * len(dataset.z) - ) - return field_data - - def make_update_log_df(update_log: SmootherSnapshot) -> pd.DataFrame: """Read update_log file to get active and inactive observations""" obs_key = [] @@ -418,27 +345,6 @@ def calc_observationsgroup_misfit(obs_keys, df_update_log, misfit_df): return mean.mean() -def load_grid_to_dataframe(grid_path): - """Get field grid characteristics/coordinates""" - grid_path = Path(grid_path).with_suffix("") - try: - grid = xtgeo.grid_from_file(grid_path, fformat="eclipserun") - return grid.get_dataframe(activeonly=False) - except OSError as err: - raise OSError("A grid with .EGRID format is expected.") from err - - -def calc_mean_delta_grid(all_input_post, all_input_prior, grid): - """calculate mean delta of field grid data""" - delta_post_prior = np.subtract(all_input_prior, all_input_post) - delta_post_prior = np.absolute(delta_post_prior) - # Can we avoid changing in place here? - grid["Mean_delta"] = np.mean(delta_post_prior, axis=0) - df_mean_delta = grid.groupby(["IX", "JY"])[["Mean_delta"]].mean().reset_index() - - return df_mean_delta["Mean_delta"] - - def _filter_on_prefix(list_of_strings, prefixes): """returns the set of strings that has a match for any of the given prefixes""" return { diff --git a/tests/workflows/ahm_analysis/test_ahm_analysis.py b/tests/workflows/ahm_analysis/test_ahm_analysis.py index 876f274c..819862db 100644 --- a/tests/workflows/ahm_analysis/test_ahm_analysis.py +++ b/tests/workflows/ahm_analysis/test_ahm_analysis.py @@ -1,5 +1,3 @@ -from itertools import product -from pathlib import Path from unittest.mock import MagicMock import numpy as np @@ -140,43 +138,6 @@ def test_warning_calc_observationsgroup_misfit(input_obs, expected_msg, update_l ahmanalysis.calc_observationsgroup_misfit(input_obs, update_log_df, misfit_df) -def test_calc_mean_delta_grid(): - # pylint: disable=invalid-name - """test function creates a dataframe reporting mean - delta grids for field parameters""" - all_input_post = [ - [3 + i for i in range(8)] - + [10 + i for i in range(8)] - + [20 + i for i in range(8)], - [5 + i for i in range(8)] - + [30 + i for i in range(8)] - + [2 + i for i in range(8)], - ] - all_input_prior = [ - [(2 + i) * 2 for i in range(8)] - + [(3 + i) * 2 for i in range(8)] - + [(5 + i) * 2 for i in range(8)], - [(6 + i) * 2 for i in range(8)] - + [(7 + i) * 2 for i in range(8)] - + [(8 + i) * 2 for i in range(8)], - ] - ix, jy, kz = [], [], [] - for x, y, z in product(range(3), range(4), range(2)): - ix.append(x) - jy.append(y) - kz.append(z) - mygrid_ok = pd.DataFrame({"IX": ix, "JY": jy, "KZ": kz}) - caseobs = "PORO" - mygrid_ok_short = pd.DataFrame({"IX": ix[::2], "JY": jy[::2], "KZ": kz[::2]}) - mygrid_ok_short["Mean_D_" + caseobs] = ahmanalysis.calc_mean_delta_grid( - all_input_post, all_input_prior, mygrid_ok - ) - assert mygrid_ok_short["Mean_D_PORO"].max() == 12 - assert mygrid_ok_short["Mean_D_PORO"].min() == 4.5 - assert mygrid_ok_short[mygrid_ok_short["IX"] == 1]["Mean_D_PORO"].mean() == 7.25 - assert mygrid_ok_short[mygrid_ok_short["JY"] == 1]["Mean_D_PORO"].mean() == 26 / 3 - - def test_calc_kolmogorov_smirnov(): """test function creates a dataframe reporting ks value between 0 and 1 for a prior and posterior distribution""" @@ -285,22 +246,6 @@ def test_make_obs_groups(input_map, expected_keys): assert result == expected_keys -@pytest.mark.usefixtures("setup_tmpdir") -@pytest.mark.parametrize( - "file_name", - ["SNAKE_OIL_FIELD.EGRID", "SNAKE_OIL_FIELD", "SNAKE_OIL_FIELD.EXT_NOT_IMPORTANT"], -) -def test_load_grid_to_dataframe(test_data_root, file_name): - """test function creates a dataframe - with the grid characteristics and check if fails if wrong format""" - grid_path = Path(test_data_root) / "snake_oil" / "grid" / file_name - grid = ahmanalysis.load_grid_to_dataframe(grid_path) - - assert grid["IX"].nunique() == 10 - assert grid["JY"].nunique() == 12 - assert grid["KZ"].nunique() == 5 - - @pytest.mark.parametrize( "prior_data, expected_result", [ diff --git a/tests/workflows/ahm_analysis/test_integration.py b/tests/workflows/ahm_analysis/test_integration.py index c20b5126..6cc140ec 100644 --- a/tests/workflows/ahm_analysis/test_integration.py +++ b/tests/workflows/ahm_analysis/test_integration.py @@ -86,38 +86,6 @@ def test_ahmanalysis_run_deactivated_obs(snake_oil_facade, snapshot, caplog): snapshot.assert_match(ks_df.iloc[:10].to_csv(), "ks_df") -def test_ahmanalysis_run_field(snake_oil_facade): - """test data_set with scalar and Field parameters""" - with open_storage(snake_oil_facade.enspath, "w") as storage: - snake_oil_facade.run_ertscript( - ahmanalysis.AhmAnalysisJob, - storage, - storage.get_ensemble_by_name("default"), - prior_name="default", - ) - # assert that this returns/generates the delta field parameter - obs_keys = ["WPR_DIFF_1", "FOPR", "WOPR:OP1"] - output_deltafield = os.path.join( - "storage", - "snake_oil", - "reports", - "snake_oil", - "default", - "AhmAnalysisJob", - "delta_fieldPERMX.csv", - ) - assert os.path.isfile(output_deltafield) - delta_df = pd.read_csv(output_deltafield, index_col=0) - assert len(delta_df.columns) == 8 + (len(obs_keys) * 2) + 1 - # check field parameter is present and not empty in the final KS matrix - output_ks = output_deltafield.replace("delta_fieldPERMX.csv", "ks.csv") - ks_df = pd.read_csv(output_ks, index_col=0) - assert not ks_df.empty - assert "FIELD_PERMX" in ks_df.index.tolist() - check_empty = ks_df.loc[["FIELD_PERMX"], :].isnull().all(axis=1) - assert not check_empty["FIELD_PERMX"] - - @pytest.mark.usefixtures("setup_tmpdir") def test_that_dataset_with_no_prior_will_fail(test_data_root): test_data_dir = os.path.join(test_data_root, "snake_oil")