diff --git a/examples/manual_play/test.py b/examples/manual_play/test.py deleted file mode 100644 index 0c736c20e..000000000 --- a/examples/manual_play/test.py +++ /dev/null @@ -1,55 +0,0 @@ -import numpy as np -from gym_csle_cyborg.dao.csle_cyborg_wrapper_config import CSLECyborgWrapperConfig -from gym_csle_cyborg.envs.cyborg_scenario_two_wrapper import CyborgScenarioTwoWrapper -from csle_common.metastore.metastore_facade import MetastoreFacade -import csle_agents.constants.constants as constants -import random -import torch - -if __name__ == '__main__': - config = CSLECyborgWrapperConfig(maximum_steps=100, gym_env_name="", save_trace=False, reward_shaping=False) - env = CyborgScenarioTwoWrapper(config=config) - ppo_policy = MetastoreFacade.get_ppo_policy(id=58) - num_episodes = 1000 - A = env.get_action_space() - returns = [] - max_horizon = 100 - seed = 1612312 - random.seed(seed) - np.random.seed(seed) - torch.manual_seed(seed) - for ep in range(num_episodes): - done = False - o, info = env.reset() - R = 0 - t = 0 - zeros = [] - ones = [] - while t <= max_horizon: - # a = np.random.choice(A) - a = ppo_policy.action(o=o, deterministic=False) - if a == 0: - zeros.append(0) - if a == 1: - ones.append(1) - o, r, done, _, info = env.step(a) - # if ep == 132: - # if ep == 0: - # print(f"t: {t}, a: {a}, r: {r}") - # print(env.s) - # print(env.last_obs) - # print(f"{env.red_agent_target}, {env.get_red_agent_action_type_from_state(env.red_agent_state)}") - # if ep == 1: - # import sys - # sys.exit() - - # print(o) - # print(o[14*1:14*1+14]) - # print(r) - # print(a) - R += r - t += 1 - returns.append(R) - - print(f"{ep}/{num_episodes}, avg R: {np.mean(returns)}, R: {R}, ones: {len(ones)}, zeros: {len(zeros)}") - diff --git a/examples/manual_play/test2.py b/examples/manual_play/test2.py deleted file mode 100644 index ab5dd9f9a..000000000 --- a/examples/manual_play/test2.py +++ /dev/null @@ -1,70 +0,0 @@ -import numpy as np -from gym_csle_cyborg.dao.csle_cyborg_wrapper_config import CSLECyborgWrapperConfig -from gym_csle_cyborg.envs.cyborg_scenario_two_wrapper import CyborgScenarioTwoWrapper -import csle_agents.constants.constants as constants - -def rollout(rollout_a: int, env, s, num_rollouts = 100, max_horizon: int = 5, base_t: int = 0): - returns = [] - for ep in range(num_rollouts): - o, info = env.reset() - R = 0 - t = 0 - # if base_t == 1 or base_t == 2 or base_t == 3: - # print(f"Setting state: {s}") - env.set_state(s) - while t <= max_horizon: - if t == 0: - a = rollout_a - else: - a = 29 - o, r, done, _, info = env.step(a) - R += r - t += 1 - returns.append(R) - return np.mean(returns) - -if __name__ == '__main__': - config = CSLECyborgWrapperConfig(maximum_steps=100, gym_env_name="", save_trace=False) - env = CyborgScenarioTwoWrapper(config=config) - - num_episodes = 1 - A = env.get_action_space() - returns = [] - max_horizon = 3 - num_rollouts = 1000 - for ep in range(num_episodes): - done = False - o, info = env.reset() - R = 0 - t = 0 - s = info[constants.COMMON.STATE] - while t <= 100: - if t >= 2: - print("longer horizon") - max_horizon = 20 - # if t == 1: - # A = [0] - a_values = [] - for a in A: - a_values.append(rollout(rollout_a=a, env=env, s=s, num_rollouts=num_rollouts, max_horizon=max_horizon, - base_t=t)) - print(a_values) - best_a = np.argmax(a_values) - print(f"best a: {best_a}, val: {a_values[best_a]}, , 31 val: {a_values[31]}, \ns: {s[0]}") - a = best_a - # a = np.random.choice(A) - # if t == 0: - # a = 31 - # elif t == 1: - # a = 32 - # else: - # a = 29 - env.set_state(s) - o, r, done, _, info = env.step(a) - s = info[constants.COMMON.STATE] - R += r - t += 1 - returns.append(R) - - print(f"{ep}/{num_episodes}, avg R: {np.mean(returns)}, R: {R}") - diff --git a/examples/manual_play/test3.py b/examples/manual_play/test3.py deleted file mode 100644 index 02f95c82e..000000000 --- a/examples/manual_play/test3.py +++ /dev/null @@ -1,35 +0,0 @@ -import numpy as np -from gym_csle_cyborg.dao.csle_cyborg_wrapper_config import CSLECyborgWrapperConfig -from gym_csle_cyborg.envs.cyborg_scenario_two_wrapper import CyborgScenarioTwoWrapper -import csle_agents.constants.constants as constants - -if __name__ == '__main__': - config = CSLECyborgWrapperConfig(maximum_steps=100, gym_env_name="", save_trace=False) - env = CyborgScenarioTwoWrapper(config=config) - - num_episodes = 2 - A = env.get_action_space() - returns = [] - max_horizon = 5 - for ep in range(num_episodes): - done = False - o, info = env.reset() - s = info[constants.COMMON.STATE] - R = 0 - t = 0 - while t <= max_horizon: - # a = np.random.choice(A) - if t == 0: - a = 31 - elif t == 1: - a = 32 - else: - a = 29 - o, r, done, _, info = env.step(a) - s = info[constants.COMMON.STATE] - R += r - t += 1 - returns.append(R) - - print(f"{ep}/{num_episodes}, avg R: {np.mean(returns)}, R: {R}") - diff --git a/examples/manual_play/test4.py b/examples/manual_play/test4.py deleted file mode 100644 index b9019bac8..000000000 --- a/examples/manual_play/test4.py +++ /dev/null @@ -1,91 +0,0 @@ -import numpy as np -import math -import torch -import random -from gym_csle_cyborg.dao.csle_cyborg_wrapper_config import CSLECyborgWrapperConfig -from gym_csle_cyborg.envs.cyborg_scenario_two_wrapper import CyborgScenarioTwoWrapper -import csle_agents.constants.constants as constants -from csle_common.metastore.metastore_facade import MetastoreFacade - -def rollout(rollout_a: int, env, s, num_rollouts = 100, max_horizon: int = 5, base_t: int = 0): - returns = [] - for ep in range(num_rollouts): - o, info = env.reset() - R = 0 - t = 0 - # if base_t == 1 or base_t == 2 or base_t == 3: - # print(f"Setting state: {s}") - env.set_state(s) - while t <= max_horizon: - if t == 0: - a = rollout_a - else: - a = 29 - o, r, done, _, info = env.step(a) - R += r - t += 1 - returns.append(R) - return np.mean(returns) - -if __name__ == '__main__': - config = CSLECyborgWrapperConfig(maximum_steps=100, gym_env_name="", save_trace=False, reward_shaping=True) - env = CyborgScenarioTwoWrapper(config=config) - eval_config = CSLECyborgWrapperConfig(maximum_steps=100, gym_env_name="", save_trace=False, reward_shaping=False) - eval_env = CyborgScenarioTwoWrapper(config=eval_config) - rollout_policy = MetastoreFacade.get_ppo_policy(id=58) - rollout_policy.save_path = "/Users/kim/workspace/csle/examples/training/pomcp/cyborg_scenario_two_wrapper/ppo_test_1706439955.8221297/ppo_model2900_1706522984.6982665.zip" - rollout_policy.load() - - num_episodes = 100 - A = env.get_action_space() - returns = [] - max_horizon = 4 - num_rollouts = 100 - seed = 98171 - random.seed(seed) - np.random.seed(seed) - torch.manual_seed(seed) - torch.backends.cudnn.deterministic = True - for ep in range(num_episodes): - done = False - env.reset() - o, info = eval_env.reset() - R = 0 - t = 0 - s = info[constants.COMMON.STATE] - while t <= 100: - # dist = rollout_policy.model.policy.get_distribution( - # obs=torch.tensor([o]).to(rollout_policy.model.device)).log_prob( - # torch.tensor(A).to(rollout_policy.model.device)).cpu().detach().numpy() - # dist = list(map(lambda i: (math.exp(dist[i]), A[i]), list(range(len(dist))))) - # rollout_actions = list(map(lambda x: x[1], sorted(dist, reverse=True, key=lambda x: x[0])[:5])) - rollout_actions = A - # print(rollout_actions) - # if t >= 2: - # print("longer horizon") - # max_horizon = 20 - # if t == 1: - # A = [0] - a_values = [] - for a in rollout_actions: - a_values.append(rollout(rollout_a=a, env=env, s=s, num_rollouts=num_rollouts, max_horizon=max_horizon, - base_t=t)) - # print(a_values) - best_a = np.argmax(a_values) - print(f"t: {t}, best a: {rollout_actions[best_a]}, val: {a_values[best_a]}, R:{R}, \ns: {s[0]}") - a = rollout_actions[best_a] - # a = np.random.choice(A) - # if t == 0: - # a = 31 - # elif t == 1: - # a = 32 - # else: - # a = 29 - env.set_state(s) - o, r, done, _, info = eval_env.step(a) - s = info[constants.COMMON.STATE] - R += r - t += 1 - returns.append(R) - print(f"{ep}/{num_episodes}, avg R: {np.mean(returns)}, R: {R}") - diff --git a/examples/manual_play/test5.py b/examples/manual_play/test5.py deleted file mode 100644 index 40d0067ac..000000000 --- a/examples/manual_play/test5.py +++ /dev/null @@ -1,91 +0,0 @@ -import numpy as np -import torch -import random -from csle_common.metastore.metastore_facade import MetastoreFacade -from gym_csle_cyborg.dao.csle_cyborg_config import CSLECyborgConfig -from gym_csle_cyborg.dao.red_agent_type import RedAgentType -from gym_csle_cyborg.envs.cyborg_scenario_two_defender import CyborgScenarioTwoDefender -from gym_csle_cyborg.util.cyborg_env_util import CyborgEnvUtil -import gym_csle_cyborg.constants.constants as env_constants -from gym_csle_cyborg.dao.blue_agent_action_type import BlueAgentActionType - -if __name__ == '__main__': - ppo_policy = MetastoreFacade.get_ppo_policy(id=58) - config = CSLECyborgConfig( - gym_env_name="csle-cyborg-scenario-two-v1", scenario=2, baseline_red_agents=[RedAgentType.B_LINE_AGENT], - maximum_steps=100, red_agent_distribution=[1.0], reduced_action_space=True, decoy_state=True, - scanned_state=True, decoy_optimization=False, cache_visited_states=False) - csle_cyborg_env = CyborgScenarioTwoDefender(config=config) - num_evaluations = 1000 - max_horizon = 100 - returns = [] - seed = 412415 - random.seed(seed) - np.random.seed(seed) - torch.manual_seed(seed) - A = csle_cyborg_env.get_action_space() - print("Starting policy evaluation") - for i in range(num_evaluations): - done = False - o, _ = csle_cyborg_env.reset() - R = 0 - t = 0 - ones = [] - zeros = [] - while t < max_horizon: - # a = np.random.choice(A) - # a = 31 - # if t > 20: - # a = 3 - # a = 4 - a = ppo_policy.action(o=o) - if a == 0: - zeros.append(0) - if a == 1: - ones.append(1) - - # if t < 8: - # a = 32 - # else: - # a = 18 - # a = 27 - # if t == 0: - # a = 31 - # if t == 1: - # a = 30 - # if a == 34: - # import random - # a = random.choice([27, 28, 29]) - o, r, done, _, info = csle_cyborg_env.step(a) - # print("STEP DONE") - state_id = info[env_constants.ENV_METRICS.STATE] - oid = info[env_constants.ENV_METRICS.OBSERVATION] - s = CyborgEnvUtil.state_id_to_state_vector(state_id=state_id) - obs = CyborgEnvUtil.state_id_to_state_vector(state_id=oid, observation=True) - # print(f"t: {t}, r: {r}, a: {a}, {csle_cyborg_env.get_last_action(agent='Red')}") - - # print(f"a: {a}") - # print(s) - # print(obs) - # print(csle_cyborg_env.get_last_action(agent='Red')) - # print(csle_cyborg_env.get_table()) - - # print(obs[11]) - # print(o[14*10:14*10+14]) - # print(o[14*11:14*11+14]) - # print(o[14*12:14*12+14]) - # print(o[14*13:14*13+14]) - # print(o) - # print(o) - # if t == 0: - # print(o) - # print(type(o)) - # print(list(o.tolist()).index(1)) - - # print(f"t: {t}, r: {r}, a: {a}, s: {s}") - # print(f"a: {csle_cyborg_env.action_id_to_type_and_host[a]}") - # print(csle_cyborg_env.get_true_table()) - R += r - t += 1 - returns.append(R) - print(f"{i}/{num_evaluations}, avg R: {np.mean(returns)}, R: {R}, ones: {len(ones)}, zeros: {len(zeros)}") \ No newline at end of file diff --git a/examples/manual_play/test6.py b/examples/manual_play/test6.py deleted file mode 100644 index aada40cf5..000000000 --- a/examples/manual_play/test6.py +++ /dev/null @@ -1,133 +0,0 @@ -import numpy as np -import torch -import random -from csle_common.metastore.metastore_facade import MetastoreFacade -from gym_csle_cyborg.dao.csle_cyborg_config import CSLECyborgConfig -from gym_csle_cyborg.dao.red_agent_type import RedAgentType -from gym_csle_cyborg.envs.cyborg_scenario_two_defender import CyborgScenarioTwoDefender -from gym_csle_cyborg.util.cyborg_env_util import CyborgEnvUtil -import gym_csle_cyborg.constants.constants as env_constants -from gym_csle_cyborg.dao.csle_cyborg_wrapper_config import CSLECyborgWrapperConfig -from gym_csle_cyborg.envs.cyborg_scenario_two_wrapper import CyborgScenarioTwoWrapper -from gym_csle_cyborg.dao.blue_agent_action_type import BlueAgentActionType - -if __name__ == '__main__': - ppo_policy = MetastoreFacade.get_ppo_policy(id=58) - config = CSLECyborgConfig( - gym_env_name="csle-cyborg-scenario-two-v1", scenario=2, baseline_red_agents=[RedAgentType.B_LINE_AGENT], - maximum_steps=100, red_agent_distribution=[1.0], reduced_action_space=True, decoy_state=True, - scanned_state=True, decoy_optimization=False, cache_visited_states=False) - wrap_config = CSLECyborgWrapperConfig(maximum_steps=100, gym_env_name="", save_trace=False, reward_shaping=False) - wrap_env = CyborgScenarioTwoWrapper(config=wrap_config) - csle_cyborg_env = CyborgScenarioTwoDefender(config=config) - num_evaluations = 100 - max_horizon = 100 - returns = [] - # seed = 346277 - # seed = 18519 - # seed = 871823 - seed = 4151 - random.seed(seed) - np.random.seed(seed) - torch.manual_seed(seed) - A = csle_cyborg_env.get_action_space() - print("Starting policy evaluation") - for i in range(num_evaluations): - done = False - o, info = csle_cyborg_env.reset() - state_id = info[env_constants.ENV_METRICS.STATE] - oid = info[env_constants.ENV_METRICS.OBSERVATION] - s = CyborgEnvUtil.state_id_to_state_vector(state_id=state_id) - obs = CyborgEnvUtil.state_id_to_state_vector(state_id=oid, observation=True) - o2, _ = wrap_env.reset() - R = 0 - t = 0 - match = False - while t < max_horizon: - # a = np.random.choice(A) - # a = 31 - # if t > 20: - # a = 3 - # a = 4 - # a = ppo_policy.action(o=o) - # if t < 8: - # a = 32 - # else: - # a = 18 - a = 4 - # if t == 2: - # a = 4 - if t < 3: - a = 27 - # if t == 3: - # a = 1 - # - # a = 4 - # if t == 2: - # a = 28 - # if t == 4: - # a = 3 - # # else: - # # a = 1 - if t > 40: - import sys - sys.exit() - # # if t < 2: - # # a = 4 - # # else: - # # a = 28 - if s[1][2] > 0: - a = 0 - print("RESTORE") - print(f"t: {t}") - # match = True - # if t > 12: - # import sys - # sys.exit() - # a = 27 - # if t == 0: - # a = 31 - # if t == 1: - # a = 30 - # if a == 34: - # import random - # a = random.choice([27, 28, 29]) - print(f"a: {a}") - o, r, done, _, info = csle_cyborg_env.step(a) - o2, _, _, _, _ = wrap_env.step(a) - state_id = info[env_constants.ENV_METRICS.STATE] - oid = info[env_constants.ENV_METRICS.OBSERVATION] - s = CyborgEnvUtil.state_id_to_state_vector(state_id=state_id) - obs = CyborgEnvUtil.state_id_to_state_vector(state_id=oid, observation=True) - if match: - print(f"t: {t}, r: {r}, a: {a}, {csle_cyborg_env.get_last_action(agent='Red')}") - print(s) - print(csle_cyborg_env.get_last_action(agent='Red')) - print(csle_cyborg_env.get_ip_to_host_mapping()) - # else: - # print(csle_cyborg_env.get_last_action(agent='Red')) - # print(s) - # print(obs[11]) - # print(wrap_env.last_obs[11]) - # print(wrap_env.s[11]) - # # print(o[14*10:14*10+14]) - # print(o[14*11:14*11+14]) - # print(o2[14*11:14*11+14]) - # print(csle_cyborg_env.get_true_table()) - # print(csle_cyborg_env.get_table()) - # print(o[14*12:14*12+14]) - # print(o[14*13:14*13+14]) - # print(o) - # print(o) - # if t == 0: - # print(o) - # print(type(o)) - # print(list(o.tolist()).index(1)) - - # print(f"t: {t}, r: {r}, a: {a}, s: {s}") - # print(f"a: {csle_cyborg_env.action_id_to_type_and_host[a]}") - # print(csle_cyborg_env.get_true_table()) - R += r - t += 1 - returns.append(R) - print(f"{i}/{num_evaluations}, avg R: {np.mean(returns)}") \ No newline at end of file diff --git a/simulation-system/libs/gym-csle-cyborg/src/gym_csle_cyborg/dao/cyborg_wrapper_state.py b/simulation-system/libs/gym-csle-cyborg/src/gym_csle_cyborg/dao/cyborg_wrapper_state.py new file mode 100644 index 000000000..30ae77f07 --- /dev/null +++ b/simulation-system/libs/gym-csle-cyborg/src/gym_csle_cyborg/dao/cyborg_wrapper_state.py @@ -0,0 +1,105 @@ +from typing import List, Dict, Any +from csle_base.json_serializable import JSONSerializable + + +class CyborgWrapperState(JSONSerializable): + """ + A DAO for managing the state in the cyborg wrapper + """ + + def __init__(self, s: List[List[int]], scan_state: List[int], op_server_restored: bool, obs: List[List[int]], + red_action_targets: Dict[int, int], privilege_escalation_detected: int, red_agent_state: int, + red_agent_target: int, attacker_observed_decoy: List[int]) -> None: + """ + Initializes the DAO + + :param s: the vectorized state + :param scan_state: the scan state + :param op_server_restored: boolean flag inidicating whether the op server has been restored or not + :param obs: the defender observation + :param red_action_targets: the history of red agent targets + :param privilege_escalation_detected: a boolean flag indicating whether a privilege escalation + has been detected + :param red_agent_state: the state of the red agent + :param red_agent_target: the target of the red agent + :param attacker_observed_decoy: a list of observed decoys of the attacker + """ + self.s = s + self.scan_state = scan_state + self.op_server_restored = op_server_restored + self.obs = obs + self.red_action_targets = red_action_targets + self.privilege_escalation_detected = privilege_escalation_detected + self.red_agent_state = red_agent_state + self.red_agent_target = red_agent_target + self.attacker_observed_decoy = attacker_observed_decoy + + def __str__(self) -> str: + """ + :return: a string representation of the object + """ + return (f"s: {self.s}, scan_state: {self.scan_state}, op_server_restored: {self.op_server_restored}, " + f"obs: {self.obs}, red_action_targets: {self.red_action_targets}, " + f"privilege_escalation_deteceted: {self.privilege_escalation_detected}, " + f"red_agent_state: {self.red_agent_state}, red_agent_target: {self.red_agent_target}, " + f"attacker_observed_decoy: {self.attacker_observed_decoy}") + + @staticmethod + def from_dict(d: Dict[str, Any]) -> "CyborgWrapperState": + """ + Converts a dict representation into an instance + + :param d: the dict to convert + :return: the created instance + """ + obj = CyborgWrapperState( + s=d["s"], scan_state=d["scan_state"], op_server_restored=d["op_server_restored"], obs=d["obs"], + red_action_targets=d["red_action_targets"], + privilege_escalation_detected=d["privilege_escalation_deteceted"], red_agent_state=d["red_agent_state"], + red_agent_target=d["red_agent_target"], attacker_observed_decoy=d["attacker_observed_decoy"] + ) + return obj + + def to_dict(self) -> Dict[str, Any]: + """ + Converts the object to a dict representation + + :return: a dict representation of the object + """ + d: Dict[str, Any] = {} + d["s"] = self.s + d["scan_state"] = self.scan_state + d["op_server_restored"] = self.op_server_restored + d["obs"] = self.obs + d["red_action_targets"] = self.red_action_targets + d["privilege_escalation_detected"] = self.privilege_escalation_detected + d["red_agent_state"] = self.red_agent_state + d["red_agent_target"] = self.red_agent_target + d["attacker_observed_decoy"] = self.attacker_observed_decoy + return d + + @staticmethod + def from_json_str(json_str: str) -> "CyborgWrapperState": + """ + Converts json string into a DTO + + :param json_str: the json string representation + :return: the DTO instance + """ + import json + dto: CyborgWrapperState = CyborgWrapperState.from_dict(json.loads(json_str)) + return dto + + @staticmethod + def from_json_file(json_file_path: str) -> "CyborgWrapperState": + """ + Reads a json file and converts it into a dto + + :param json_file_path: the json file path to save the DTO to + :return: None + """ + import io + with io.open(json_file_path, 'r', encoding='utf-8') as f: + json_str = f.read() + dto = CyborgWrapperState.from_json_str(json_str=json_str) + return dto diff --git a/simulation-system/libs/gym-csle-cyborg/src/gym_csle_cyborg/envs/cyborg_scenario_two_wrapper.py b/simulation-system/libs/gym-csle-cyborg/src/gym_csle_cyborg/envs/cyborg_scenario_two_wrapper.py index 340146e0f..bcc498463 100644 --- a/simulation-system/libs/gym-csle-cyborg/src/gym_csle_cyborg/envs/cyborg_scenario_two_wrapper.py +++ b/simulation-system/libs/gym-csle-cyborg/src/gym_csle_cyborg/envs/cyborg_scenario_two_wrapper.py @@ -16,6 +16,7 @@ from gym_csle_cyborg.dao.exploit_type import ExploitType from gym_csle_cyborg.util.cyborg_env_util import CyborgEnvUtil from gym_csle_cyborg.dao.csle_cyborg_wrapper_config import CSLECyborgWrapperConfig +from gym_csle_cyborg.dao.cyborg_wrapper_state import CyborgWrapperState class CyborgScenarioTwoWrapper(BaseEnv): @@ -194,12 +195,13 @@ def step(self, action: int) -> Tuple[npt.NDArray[Any], float, bool, bool, Dict[s r = self.reward_function(defender_action_type=defender_action_type, red_action_type=next_red_action_type, red_success=(is_red_action_feasible and exploit_successful)) info: Dict[str, Any] = {} - info[env_constants.ENV_METRICS.STATE] = ( - copy.deepcopy(s_prime), scan_state, self.op_server_restored, - obs, copy.deepcopy(self.red_action_targets), - self.privilege_escalation_detected, self.red_agent_state, self.red_agent_target, - copy.deepcopy(self.attacker_observed_decoy) + wrapper_state = CyborgWrapperState( + s=copy.deepcopy(s_prime), scan_state=scan_state, op_server_restored=self.op_server_restored, + obs=obs, red_action_targets=copy.deepcopy(self.red_action_targets), + privilege_escalation_detected=self.privilege_escalation_detected, red_agent_state=self.red_agent_state, + red_agent_target=self.red_agent_target, attacker_observed_decoy=copy.deepcopy(self.attacker_observed_decoy) ) + info[env_constants.ENV_METRICS.STATE] = wrapper_state info[env_constants.ENV_METRICS.OBSERVATION] = CyborgEnvUtil.state_vector_to_state_id( state_vector=obs, observation=True) info[env_constants.ENV_METRICS.OBSERVATION_VECTOR] = obs @@ -278,23 +280,22 @@ def reward_function(self, defender_action_type: BlueAgentActionType, r -= 10 return r - def set_state(self, state: Tuple[ - List[List[int]], List[int], bool, List[List[int]], Dict[int, int], bool, int, int, List[int]]) -> None: + def set_state(self, state: CyborgWrapperState) -> None: """ Sets the state of the environment :param state: the new state :return: None """ - self.s = copy.deepcopy(state[0]) - self.scan_state = copy.deepcopy(state[1]) - self.op_server_restored = state[2] - self.last_obs = copy.deepcopy(state[3]) - self.red_action_targets = copy.deepcopy(state[4]) - self.privilege_escalation_detected = state[5] - self.red_agent_state = state[6] - self.red_agent_target = state[7] - self.attacker_observed_decoy = copy.deepcopy(state[8]) + self.s = copy.deepcopy(state.s) + self.scan_state = copy.deepcopy(state.scan_state) + self.op_server_restored = state.op_server_restored + self.last_obs = copy.deepcopy(state.obs) + self.red_action_targets = copy.deepcopy(state.red_action_targets) + self.privilege_escalation_detected = state.privilege_escalation_detected + self.red_agent_state = state.red_agent_state + self.red_agent_target = state.red_agent_target + self.attacker_observed_decoy = copy.deepcopy(state.attacker_observed_decoy) def get_observation_from_history(self, history: List[List[Any]]) -> List[Any]: """