Skip to content

Commit

Permalink
Remove fields from AHM_ANALYSIS
Browse files Browse the repository at this point in the history
Fields were only half implemented in the AHM_ANALYSIS workflow,
and it was not possible to use the workflow with FIELDS in any
real cases as it would run out of memory. We remove the option
here, meaning that the workflow will only work for scalar parameters.
  • Loading branch information
oyvindeide committed Apr 4, 2024
1 parent 5d67946 commit 9f86581
Show file tree
Hide file tree
Showing 3 changed files with 5 additions and 186 deletions.
104 changes: 5 additions & 99 deletions src/semeio/workflows/ahm_analysis/ahmanalysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,13 @@
import tempfile
import warnings
from copy import deepcopy
from pathlib import Path

import numpy as np
import pandas as pd
import xtgeo
from ert.analysis import ErtAnalysisError, SmootherSnapshot
from ert.shared.plugins.plugin_manager import hook_implementation
from ert.storage import open_storage
from scipy.stats import ks_2samp
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler

from semeio._exceptions.exceptions import ValidationError
from semeio.communication import SemeioScript
Expand Down Expand Up @@ -157,6 +153,11 @@ def run(
combinations = make_obs_groups(key_map)

field_parameters = sorted(self.facade.get_field_parameters())
if field_parameters:
logger.warning(
f"AHM_ANALYSIS will only evaluate scalar parameters, skipping: {field_parameters}"
)

scalar_parameters = sorted(self.facade.get_gen_kw())
# identify the set of actual parameters that was updated for now just go
# through scalar parameters but in future if easier access to field parameter
Expand All @@ -170,7 +171,6 @@ def run(
)
# loop over keys and calculate the KS matrix,
# conditioning one parameter at the time.
field_output = {}
updated_combinations = deepcopy(combinations)
for group_name, obs_group in combinations.items():
print("Processing:", group_name)
Expand Down Expand Up @@ -235,64 +235,8 @@ def run(
target_ensemble.load_all_gen_kw_data(),
)
)
field_output[group_name] = _get_field_params(
self.facade, field_parameters, target_ensemble
)
kolmogorov_smirnov_data.set_index("Parameters", inplace=True)

# Calculate Ks matrix for Fields parameters
if field_parameters:
# Get grid characteristics to be able to plot field avg maps
grid_xyzcenter = load_grid_to_dataframe(self.facade.grid_file)
all_input_prior = _get_field_params(
self.facade, field_parameters, prior_ensemble
)

for fieldparam in field_parameters:
scaler = StandardScaler()
scaler.fit(all_input_prior[fieldparam])
pca = PCA(0.98).fit(
pd.DataFrame(scaler.transform(all_input_prior[fieldparam]))
)
pc_fieldprior_df = pd.DataFrame(
data=pca.transform(scaler.transform(all_input_prior[fieldparam]))
)
all_kolmogorov_smirnov = pd.DataFrame(
pc_fieldprior_df.columns.tolist(), columns=["PCFieldParameters"]
)
# Get the posterior Field parameters
map_calc_properties = (
grid_xyzcenter[grid_xyzcenter["KZ"] == 1].copy().reset_index()
)
for group_name in updated_combinations.keys():
map_calc_properties["Mean_D_" + group_name] = calc_mean_delta_grid(
field_output[group_name][fieldparam],
all_input_prior[fieldparam],
grid_xyzcenter,
)

pc_fieldpost_df = pd.DataFrame(
data=pca.transform(
scaler.transform(field_output[group_name][fieldparam])
)
)
all_kolmogorov_smirnov[group_name] = all_kolmogorov_smirnov[
"PCFieldParameters"
].map(
calc_kolmogorov_smirnov(
pc_fieldpost_df,
pc_fieldprior_df,
pc_fieldpost_df,
)
)
all_kolmogorov_smirnov.set_index("PCFieldParameters", inplace=True)
# add the field max Ks to the scalar Ks matrix
kolmogorov_smirnov_data.loc["FIELD_" + fieldparam] = (
all_kolmogorov_smirnov.max()
)
self.reporter.publish_csv(
"delta_field" + fieldparam, map_calc_properties
)
# save/export the Ks matrix, active_obs, misfitval and prior data
self.reporter.publish_csv("ks", kolmogorov_smirnov_data)
self.reporter.publish_csv("active_obs_info", active_obs)
Expand All @@ -313,23 +257,6 @@ def _run_ministep(facade, prior_storage, target_storage, obs_group, data_paramet
)


def _get_field_params(facade, field_parameters, target_ensemble):
"""
Because the FIELD parameters are not exposed in the Python API we have to
export them to file and read them back again. When they are exposed in the API
this function should be updated.
"""
field_data = {}
for field_param in field_parameters:
dataset = target_ensemble.load_parameters(
field_param, list(range(facade.get_ensemble_size()))
)
field_data[field_param] = dataset["values"].values.reshape(
facade.get_ensemble_size(), len(dataset.x) * len(dataset.y) * len(dataset.z)
)
return field_data


def make_update_log_df(update_log: SmootherSnapshot) -> pd.DataFrame:
"""Read update_log file to get active and inactive observations"""
obs_key = []
Expand Down Expand Up @@ -418,27 +345,6 @@ def calc_observationsgroup_misfit(obs_keys, df_update_log, misfit_df):
return mean.mean()


def load_grid_to_dataframe(grid_path):
"""Get field grid characteristics/coordinates"""
grid_path = Path(grid_path).with_suffix("")
try:
grid = xtgeo.grid_from_file(grid_path, fformat="eclipserun")
return grid.get_dataframe(activeonly=False)
except OSError as err:
raise OSError("A grid with .EGRID format is expected.") from err


def calc_mean_delta_grid(all_input_post, all_input_prior, grid):
"""calculate mean delta of field grid data"""
delta_post_prior = np.subtract(all_input_prior, all_input_post)
delta_post_prior = np.absolute(delta_post_prior)
# Can we avoid changing in place here?
grid["Mean_delta"] = np.mean(delta_post_prior, axis=0)
df_mean_delta = grid.groupby(["IX", "JY"])[["Mean_delta"]].mean().reset_index()

return df_mean_delta["Mean_delta"]


def _filter_on_prefix(list_of_strings, prefixes):
"""returns the set of strings that has a match for any of the given prefixes"""
return {
Expand Down
55 changes: 0 additions & 55 deletions tests/workflows/ahm_analysis/test_ahm_analysis.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
from itertools import product
from pathlib import Path
from unittest.mock import MagicMock

import numpy as np
Expand Down Expand Up @@ -140,43 +138,6 @@ def test_warning_calc_observationsgroup_misfit(input_obs, expected_msg, update_l
ahmanalysis.calc_observationsgroup_misfit(input_obs, update_log_df, misfit_df)


def test_calc_mean_delta_grid():
# pylint: disable=invalid-name
"""test function creates a dataframe reporting mean
delta grids for field parameters"""
all_input_post = [
[3 + i for i in range(8)]
+ [10 + i for i in range(8)]
+ [20 + i for i in range(8)],
[5 + i for i in range(8)]
+ [30 + i for i in range(8)]
+ [2 + i for i in range(8)],
]
all_input_prior = [
[(2 + i) * 2 for i in range(8)]
+ [(3 + i) * 2 for i in range(8)]
+ [(5 + i) * 2 for i in range(8)],
[(6 + i) * 2 for i in range(8)]
+ [(7 + i) * 2 for i in range(8)]
+ [(8 + i) * 2 for i in range(8)],
]
ix, jy, kz = [], [], []
for x, y, z in product(range(3), range(4), range(2)):
ix.append(x)
jy.append(y)
kz.append(z)
mygrid_ok = pd.DataFrame({"IX": ix, "JY": jy, "KZ": kz})
caseobs = "PORO"
mygrid_ok_short = pd.DataFrame({"IX": ix[::2], "JY": jy[::2], "KZ": kz[::2]})
mygrid_ok_short["Mean_D_" + caseobs] = ahmanalysis.calc_mean_delta_grid(
all_input_post, all_input_prior, mygrid_ok
)
assert mygrid_ok_short["Mean_D_PORO"].max() == 12
assert mygrid_ok_short["Mean_D_PORO"].min() == 4.5
assert mygrid_ok_short[mygrid_ok_short["IX"] == 1]["Mean_D_PORO"].mean() == 7.25
assert mygrid_ok_short[mygrid_ok_short["JY"] == 1]["Mean_D_PORO"].mean() == 26 / 3


def test_calc_kolmogorov_smirnov():
"""test function creates a dataframe reporting
ks value between 0 and 1 for a prior and posterior distribution"""
Expand Down Expand Up @@ -285,22 +246,6 @@ def test_make_obs_groups(input_map, expected_keys):
assert result == expected_keys


@pytest.mark.usefixtures("setup_tmpdir")
@pytest.mark.parametrize(
"file_name",
["SNAKE_OIL_FIELD.EGRID", "SNAKE_OIL_FIELD", "SNAKE_OIL_FIELD.EXT_NOT_IMPORTANT"],
)
def test_load_grid_to_dataframe(test_data_root, file_name):
"""test function creates a dataframe
with the grid characteristics and check if fails if wrong format"""
grid_path = Path(test_data_root) / "snake_oil" / "grid" / file_name
grid = ahmanalysis.load_grid_to_dataframe(grid_path)

assert grid["IX"].nunique() == 10
assert grid["JY"].nunique() == 12
assert grid["KZ"].nunique() == 5


@pytest.mark.parametrize(
"prior_data, expected_result",
[
Expand Down
32 changes: 0 additions & 32 deletions tests/workflows/ahm_analysis/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,38 +86,6 @@ def test_ahmanalysis_run_deactivated_obs(snake_oil_facade, snapshot, caplog):
snapshot.assert_match(ks_df.iloc[:10].to_csv(), "ks_df")


def test_ahmanalysis_run_field(snake_oil_facade):
"""test data_set with scalar and Field parameters"""
with open_storage(snake_oil_facade.enspath, "w") as storage:
snake_oil_facade.run_ertscript(
ahmanalysis.AhmAnalysisJob,
storage,
storage.get_ensemble_by_name("default"),
prior_name="default",
)
# assert that this returns/generates the delta field parameter
obs_keys = ["WPR_DIFF_1", "FOPR", "WOPR:OP1"]
output_deltafield = os.path.join(
"storage",
"snake_oil",
"reports",
"snake_oil",
"default",
"AhmAnalysisJob",
"delta_fieldPERMX.csv",
)
assert os.path.isfile(output_deltafield)
delta_df = pd.read_csv(output_deltafield, index_col=0)
assert len(delta_df.columns) == 8 + (len(obs_keys) * 2) + 1
# check field parameter is present and not empty in the final KS matrix
output_ks = output_deltafield.replace("delta_fieldPERMX.csv", "ks.csv")
ks_df = pd.read_csv(output_ks, index_col=0)
assert not ks_df.empty
assert "FIELD_PERMX" in ks_df.index.tolist()
check_empty = ks_df.loc[["FIELD_PERMX"], :].isnull().all(axis=1)
assert not check_empty["FIELD_PERMX"]


@pytest.mark.usefixtures("setup_tmpdir")
def test_that_dataset_with_no_prior_will_fail(test_data_root):
test_data_dir = os.path.join(test_data_root, "snake_oil")
Expand Down

0 comments on commit 9f86581

Please sign in to comment.