-
Notifications
You must be signed in to change notification settings - Fork 0
/
nnm_analysis.py
326 lines (254 loc) · 14 KB
/
nnm_analysis.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
import os
import sys
import logging
# logging.disable(logging.WARNING)
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'
import h5py
import numpy as np
import keras
import tensorflow as tf
from keras import layers
from itertools import repeat
import importlib
import contextlib
import KratosMultiphysics
import KratosMultiphysics.RomApplication as KratosROM
from KratosMultiphysics.RomApplication import kratos_io as kratos_io
from KratosMultiphysics.RomApplication import clustering as clustering
from KratosMultiphysics.RomApplication import new_python_solvers_wrapper_rom
from KratosMultiphysics.RomApplication import python_solvers_wrapper_rom as solver_wrapper
from KratosMultiphysics.RomApplication.networks import gradient_shallow as gradient_shallow_ae
from KratosMultiphysics.RomApplication.hrom_training_utility import HRomTrainingUtility
from KratosMultiphysics.RomApplication.calculate_rom_basis_output_process import CalculateRomBasisOutputProcess
from KratosMultiphysics.RomApplication.randomized_singular_value_decomposition import RandomizedSingularValueDecomposition
def custom_loss(y_true, y_pred):
y_diff = y_true-y_pred
y_diff = y_diff ** 2
return y_diff
def CreateRomAnalysisInstance(cls, global_model, parameters):
class RomAnalysis(cls):
def __init__(self,global_model, parameters):
super().__init__(global_model, parameters)
self.override_first_iter = False
self.snapshots_matrix = []
### NN ##
def CreateModel(self):
# List of files to read from
data_inputs = [
"training/bases/result_80000.h5",
"training/bases/result_90000.h5",
"training/bases/result_100000.h5",
"training/bases/result_110000.h5",
"training/bases/result_120000.h5",
]
residuals = [
"training/residuals/result_80000.npy",
"training/residuals/result_90000.npy",
"training/residuals/result_100000.npy",
"training/residuals/result_110000.npy",
"training/residuals/result_120000.npy",
]
# List of variables to run be used (VELOCITY, PRESSURE, etc...)
S = kratos_io.build_snapshot_grid(
data_inputs,
[
"DISPLACEMENT",
]
)
self.SORG = S
# Some configuration
self.config = {
"load_model": True,
"train_model": False,
"save_model": False,
"print_results": False,
"use_reduced": False,
}
# Select the netwrok to use
self.kratos_network = gradient_shallow_ae.GradientShallow()
print("=== Calculating Randomized Singular Value Decomposition ===")
with contextlib.redirect_stdout(None):
# Note: we redirect the output here because there is no option to reduce the log level of this method.
# self.U,sigma,_,error = RandomizedSingularValueDecomposition().Calculate(S,1e-16)
self.U,sigma,_ = np.linalg.svd(S, full_matrices=True, compute_uv=True, hermitian=False)
self.U = self.U[:,:20]
SPri = self.U @ self.U.T @ S
# Select the reduced snapshot or the full input
if self.config["use_reduced"]:
SReduced = self.U.T @ S
else:
SReduced = S
data_rows = SReduced.shape[0]
data_cols = SReduced.shape[1]
self.kratos_network.calculate_data_limits(SReduced)
# Set the properties for the clusters
num_clusters=1 # Number of different bases chosen
num_cluster_col=2 # If I use num_cluster_col = num_variables result should be exact.
num_encoding_var=num_cluster_col
# Load the model or train a new one. TODO: Fix custom_loss not being saved correctly
if self.config["load_model"]:
self.autoencoder = tf.keras.models.load_model(self.kratos_network.model_name, custom_objects={
"custom_loss":custom_loss,
"set_m_grad":gradient_shallow_ae.GradModel2.set_m_grad
})
self.kratos_network.encoder_model = tf.keras.models.load_model(self.kratos_network.model_name+"_encoder", custom_objects={
"custom_loss":custom_loss,
"set_m_grad":gradient_shallow_ae.GradModel2.set_m_grad
})
self.kratos_network.decoder_model = tf.keras.models.load_model(self.kratos_network.model_name+"_decoder", custom_objects={
"custom_loss":custom_loss,
"set_m_grad":gradient_shallow_ae.GradModel2.set_m_grad
})
else:
self.autoencoder, self.autoencoder_err = self.kratos_network.define_network(SReduced, custom_loss, num_encoding_var)
def _CreateSolver(self):
""" Create the Solver (and create and import the ModelPart if it is not alread in the model) """
# Get the ROM settings from the RomParameters.json input file
with open('RomParameters.json') as rom_parameters:
self.rom_parameters = KratosMultiphysics.Parameters(rom_parameters.read())
# Set the ROM settings in the "solver_settings" of the solver introducing the physics
# self.project_parameters["solver_settings"].AddValue("rom_settings", self.rom_parameters["rom_settings"])
# HROM operations flags
self.rom_basis_process_list_check = False
self.rom_basis_output_process_check = False
self.run_hrom = False # self.rom_parameters["run_hrom"].GetBool() if self.rom_parameters.Has("run_hrom") else False
self.train_hrom = False # self.rom_parameters["train_hrom"].GetBool() if self.rom_parameters.Has("train_hrom") else False
if self.run_hrom and self.train_hrom:
# Check that train an run HROM are not set at the same time
err_msg = "\'run_hrom\' and \'train_hrom\' are both \'true\'. Select either training or running (if training has been already done)."
raise Exception(err_msg)
# Create the ROM solver
return new_python_solvers_wrapper_rom.CreateSolver(
self.model,
self.project_parameters)
def _GetListOfProcesses(self):
# Get the already existent processes list
list_of_processes = super()._GetListOfProcesses()
# Check if there is any instance of ROM basis output
if self.rom_basis_process_list_check:
for process in list_of_processes:
if isinstance(process, KratosROM.calculate_rom_basis_output_process.CalculateRomBasisOutputProcess):
warn_msg = "\'CalculateRomBasisOutputProcess\' instance found in ROM stage. Basis must be already stored in \'RomParameters.json\'. Removing instance from processes list."
KratosMultiphysics.Logger.PrintWarning("RomAnalysis", warn_msg)
list_of_processes.remove(process)
self.rom_basis_process_list_check = False
return list_of_processes
def _GetListOfOutputProcesses(self):
# Get the already existent output processes list
list_of_output_processes = super()._GetListOfOutputProcesses()
# Check if there is any instance of ROM basis output
if self.rom_basis_output_process_check:
for process in list_of_output_processes:
if isinstance(process, KratosROM.calculate_rom_basis_output_process.CalculateRomBasisOutputProcess):
warn_msg = "\'CalculateRomBasisOutputProcess\' instance found in ROM stage. Basis must be already stored in \'RomParameters.json\'. Removing instance from output processes list."
KratosMultiphysics.Logger.PrintWarning("RomAnalysis", warn_msg)
list_of_output_processes.remove(process)
self.rom_basis_output_process_check = False
return list_of_output_processes
def _GetSimulationName(self):
return "::[ROM Simulation]:: "
def InitializeSolutionStep(self):
computing_model_part = self._GetSolver().GetComputingModelPart().GetRootModelPart()
# Reset the diplacement for this iteration
computing_model_part.SetValue(KratosROM.ROM_SOLUTION_INCREMENT, [0.0, 0.0])
# Generate the input snapshot
self.S = []
for node in computing_model_part.Nodes:
self.S.append(node.GetSolutionStepValue(KratosMultiphysics.DISPLACEMENT_X))
self.S.append(node.GetSolutionStepValue(KratosMultiphysics.DISPLACEMENT_Y))
self.S = np.array(self.S)
if self.config["use_reduced"]:
self.sc = self.U.T @ np.array([self.S]).T
else:
self.sc = np.array([self.S]).T
self.nsc = self.kratos_network.normalize_data(self.sc)
self.NSPredict_d = self.kratos_network.predict_snapshot(self.autoencoder, self.nsc)
self.q = self.kratos_network.predict_snapshot(self.kratos_network.encoder_model, self.nsc)
print("Shape of nsc:", self.nsc.shape)
# Check the prediction correctness:
s__norm = np.linalg.norm(self.nsc)
sp_norm = np.linalg.norm((self.NSPredict_d[0].T)-(self.nsc))
if s__norm != 0:
print("Norm error for this iteration", sp_norm/s__norm)
else:
print("Norm error is 0")
np.set_printoptions(threshold=sys.maxsize)
# Impose prediction into initial solution
# iter = 0
# for node in computing_model_part.Nodes:
# node.SetSolutionStepValue(KratosMultiphysics.DISPLACEMENT_X, self.NSPredict_d[iter+0])
# node.SetSolutionStepValue(KratosMultiphysics.DISPLACEMENT_Y, self.NSPredict_d[iter+1])
# iter+=2
grad_enc_input = np.array([self.nsc.T[0]], dtype="float32")
grad_dec_input = np.array([self.q.T[0]])
# nn_nodal_modes_enc = self.kratos_network.get_gradients(
# self.kratos_network.encoder_model,
# [self.kratos_network.encoder_model],
# grad_enc_input,
# None
# )
nn_nodal_modes_dec = self.kratos_network.get_gradients(
self.kratos_network.decoder_model,
[self.kratos_network.decoder_model],
grad_dec_input,
None
)
print("U shapes:", self.U.shape)
# print("gradient shapes:", nn_nodal_modes_enc.T.shape)
print("gradient shapes:", nn_nodal_modes_dec.T.shape)
# Set the nodal ROM basis
# nodal_modes_e = self.U @ nn_nodal_modes_enc.T
if self.config["use_reduced"]:
nodal_modes_d = self.U @ nn_nodal_modes_dec # This is the same as (nn_nodal_modes_dec.T @ self.U.T).T
else:
nodal_modes_d = nn_nodal_modes_dec
nodal_dofs = 2
rom_dofs = 2
# aux_e = KratosMultiphysics.Matrix(nodal_dofs, rom_dofs)
aux_d = KratosMultiphysics.Matrix(nodal_dofs, rom_dofs)
for node in computing_model_part.Nodes:
node_id = node.Id-1
for j in range(nodal_dofs):
for i in range(rom_dofs):
# aux_e[j,i] = nodal_modes_e[node_id*nodal_dofs+j][i]
aux_d[j,i] = nodal_modes_d[node_id*nodal_dofs+j][i]
node.SetValue(KratosROM.ROM_BASIS, aux_d)
# node.SetValue(KratosROM.ROM_BASIS_DEC, aux_d)
super().InitializeSolutionStep()
def FinalizeSolutionStep(self):
super().FinalizeSolutionStep()
snapshot = []
for node in self._GetSolver().GetComputingModelPart().Nodes:
snapshot.append(node.GetSolutionStepValue(KratosMultiphysics.DISPLACEMENT_X))
snapshot.append(node.GetSolutionStepValue(KratosMultiphysics.DISPLACEMENT_Y))
self.snapshots_matrix.append(snapshot)
def ModifyAfterSolverInitialize(self):
"""Here is where the ROM_BASIS is imposed to each node"""
super().ModifyAfterSolverInitialize()
# Get the model part where the ROM is to be applied
computing_model_part = self._GetSolver().GetComputingModelPart().GetRootModelPart()
# Matrix to compare results
self.snapshots_matrix = []
# Store 1st Level ROM residuals
self.residuals_matrix = []
# Set ROM basis
nodal_modes = self.rom_parameters["nodal_modes"]
nodal_dofs = len(self.project_parameters["solver_settings"]["rom_settings"]["nodal_unknowns"].GetStringArray())
rom_dofs = self.project_parameters["solver_settings"]["rom_settings"]["number_of_rom_dofs"].GetInt()
# Set the nodal ROM basis
aux = KratosMultiphysics.Matrix(nodal_dofs, rom_dofs)
for node in computing_model_part.Nodes:
node_id = str(node.Id)
for j in range(nodal_dofs):
for i in range(rom_dofs):
aux[j,i] = nodal_modes[node_id][j][i].GetDouble()
node.SetValue(KratosROM.ROM_BASIS, aux)
def Finalize(self):
# This calls the physics Finalize
super().Finalize()
np.save("NNM.npy",self.snapshots_matrix)
# Once simulation is completed, calculate and save the HROM weights
if self.train_hrom:
self.__hrom_training_utility.CalculateAndSaveHRomWeights()
self.__hrom_training_utility.CreateHRomModelParts()
return RomAnalysis(global_model, parameters)