From 0b622b632ddb8bb27ab20ef5f9c882faa9f39e55 Mon Sep 17 00:00:00 2001 From: foroogh shahab Date: Sat, 3 Aug 2024 20:00:49 +0200 Subject: [PATCH 1/2] Filebeats added. --- .../libs/csle-cli/src/csle_cli/cli.py | 155 +++++++++++++++++- 1 file changed, 151 insertions(+), 4 deletions(-) diff --git a/simulation-system/libs/csle-cli/src/csle_cli/cli.py b/simulation-system/libs/csle-cli/src/csle_cli/cli.py index a49bde4c0..381ad949d 100755 --- a/simulation-system/libs/csle-cli/src/csle_cli/cli.py +++ b/simulation-system/libs/csle-cli/src/csle_cli/cli.py @@ -686,7 +686,7 @@ def stop_shell_complete(ctx, param, incomplete) -> List[str]: "emulation-name | statsmanager | emulation_executions | pgadmin | all | nginx | postgresql " "| docker | clustermanager | hostmanagers | hostmanager | clientmanager | snortmanagers " "| snortmanager | elkmanager | trafficmanagers | trafficmanager | kafkamanager " - "| ossecmanagers | ossecmanager | ryumanager") + "| ossecmanagers | ossecmanager | ryumanager | filebeats | filebeat") def stop(entity: str, name: str, id: int = -1, ip: str = "", container_ip: str = "") -> None: """ Stops an entity @@ -754,6 +754,10 @@ def stop(entity: str, name: str, id: int = -1, ip: str = "", container_ip: str = stop_ossec_ids_manager(ip=ip, container_ip=container_ip, emulation=name, ip_first_octet=id) elif entity == "ryumanager": stop_ryu_manager(ip=ip, emulation=name, ip_first_octet=id) + elif entity == "filebeats": + stop_filebeats(ip=ip, emulation=name, ip_first_octet=id) + elif entity == "filebeat": + stop_filebeat(ip=ip, container_ip=container_ip, emulation=name, ip_first_octet=id) else: container_stopped = False for node in config.cluster_config.cluster_nodes: @@ -1224,6 +1228,58 @@ def stop_traffic_manager(ip: str, container_ip: str, emulation: str, ip_first_oc bold=False) +def stop_filebeats(ip: str, emulation: str, ip_first_octet: int) -> None: + """ + Utility function for stopping the filebeats + + :param ip: the ip of the node to stop the filebeats + :param emulation: the emulation of the execution + :param ip_first_octet: the ID of the execution + :return: None + """ + import csle_common.constants.constants as constants + from csle_common.metastore.metastore_facade import MetastoreFacade + config = MetastoreFacade.get_config(id=1) + for node in config.cluster_config.cluster_nodes: + if node.ip == ip or ip == "": + stopped = ClusterController.stop_filebeats( + ip=ip, port=constants.GRPC_SERVERS.CLUSTER_MANAGER_PORT, emulation=emulation, + ip_first_octet=ip_first_octet) + if stopped.outcome: + click.secho(f"Stopping filebeats on port:{constants.GRPC_SERVERS.CLUSTER_MANAGER_PORT}") + else: + click.secho(f"Filebeats are not stopped:{constants.GRPC_SERVERS.CLUSTER_MANAGER_PORT}", + bold=False) + + +def stop_filebeat(ip: str, container_ip: str, emulation: str, ip_first_octet: int) -> None: + """ + Utility function for stopping the filebeat + + :param ip: the ip of the node to stop the filebeat + :param container_ip: the ip of the host that traffic is running on + :param emulation: the emulation of the execution + :param ip_first_octet: the ID of the execution + :return: None + """ + import csle_common.constants.constants as constants + from csle_common.metastore.metastore_facade import MetastoreFacade + config = MetastoreFacade.get_config(id=1) + for node in config.cluster_config.cluster_nodes: + if node.ip == ip or ip == "": + stopped = ClusterController.stop_filebeat( + ip=ip, port=constants.GRPC_SERVERS.CLUSTER_MANAGER_PORT, emulation=emulation, + ip_first_octet=ip_first_octet, container_ip=container_ip) + if stopped.outcome: + click.secho( + f"Stopping filebeat with ip {container_ip} on port:" + f"{constants.GRPC_SERVERS.CLUSTER_MANAGER_PORT}") + else: + click.secho(f"Filebeat with ip {container_ip} is not " + f"stopped:{constants.GRPC_SERVERS.CLUSTER_MANAGER_PORT}", + bold=False) + + @click.argument('max_workers', default=10, type=int) @click.argument('log_file', default="docker_statsmanager.log", type=str) @click.argument('log_dir', default="/var/log/csle", type=str) @@ -1405,6 +1461,7 @@ def start_shell_complete(ctx, param, incomplete) -> List[str]: @click.option('--ip', default="", type=str) @click.option('--container_ip', default="", type=str) +@click.option('--initial_start', default=False, type=bool) @click.option('--id', default=None, type=int) @click.option('--no_clients', is_flag=True, help='skip starting the client population') @click.option('--no_traffic', is_flag=True, help='skip starting the traffic generators') @@ -1417,9 +1474,9 @@ def start_shell_complete(ctx, param, incomplete) -> List[str]: "| system_id_job | nginx | postgresql | docker | clustermanager | hostmanagers " "| hostmanager | clientmanager | snortmanagers | snortmanager | elkmanager " "| trafficmanagers | trafficmanager | kafkamanager | ossecmanagers | ossecmanager " - "| ryumanager") + "| ryumanager | filebeats | filebeat") def start(entity: str, no_traffic: bool, name: str, id: int, no_clients: bool, no_network: bool, ip: str, - container_ip: str, no_beats: bool) -> None: + container_ip: str, no_beats: bool, initial_start: bool) -> None: """ Starts an entity, e.g., a container or the management system @@ -1496,6 +1553,11 @@ def start(entity: str, no_traffic: bool, name: str, id: int, no_clients: bool, n start_ossec_ids_manager(ip=ip, container_ip=container_ip, emulation=name, ip_first_octet=id) elif entity == "ryumanager": start_ryu_manager(ip=ip, emulation=name, ip_first_octet=id) + elif entity == "filebeats": + start_filebeats(ip=ip, emulation=name, ip_first_octet=id, initial_start=initial_start) + elif entity == "filebeat": + start_filebeat(ip=ip, container_ip=container_ip, emulation=name, ip_first_octet=id, + initial_start=initial_start) else: container_started = False for node in config.cluster_config.cluster_nodes: @@ -1718,6 +1780,57 @@ def start_ryu_manager(ip: str, emulation: str, ip_first_octet: int): bold=False) +def start_filebeats(ip: str, emulation: str, ip_first_octet: int, initial_start: bool): + """ + Utility function for starting filebeats + + :param ip: the ip of the node to start filebeats + :param emulation: the emulation of the execution + :param ip_first_octet: the ID of the execution + :return: None + """ + import csle_common.constants.constants as constants + from csle_common.metastore.metastore_facade import MetastoreFacade + config = MetastoreFacade.get_config(id=1) + for node in config.cluster_config.cluster_nodes: + if node.ip == ip or ip == "": + operation_outcome = ClusterController.start_filebeats( + ip=ip, port=constants.GRPC_SERVERS.CLUSTER_MANAGER_PORT, emulation=emulation, + ip_first_octet=ip_first_octet, initial_start=initial_start) + if operation_outcome.outcome: + click.secho(f"Starting filebeats on port:{constants.GRPC_SERVERS.CLUSTER_MANAGER_PORT}") + else: + click.secho(f"Filebeats are not started:{constants.GRPC_SERVERS.CLUSTER_MANAGER_PORT}", + bold=False) + + +def start_filebeat(ip: str, container_ip: str, emulation: str, ip_first_octet: int, initial_start: bool): + """ + Utility function for starting filebeat + + :param ip: the ip of the node to start filebeat + :param container_ip: the ip of the host to start + :param emulation: the emulation of the execution + :param ip_first_octet: the ID of the execution + :return: None + """ + import csle_common.constants.constants as constants + from csle_common.metastore.metastore_facade import MetastoreFacade + config = MetastoreFacade.get_config(id=1) + for node in config.cluster_config.cluster_nodes: + if node.ip == ip or ip == "": + operation_outcome = ClusterController.start_filebeat( + ip=ip, port=constants.GRPC_SERVERS.CLUSTER_MANAGER_PORT, emulation=emulation, + ip_first_octet=ip_first_octet, container_ip=container_ip, initial_start=initial_start) + if operation_outcome.outcome: + click.secho(f"Started filebeat with ip {container_ip} on " + f"port:{constants.GRPC_SERVERS.CLUSTER_MANAGER_PORT}") + else: + click.secho(f"Filebeat with ip {container_ip} is not " + f"started:{constants.GRPC_SERVERS.CLUSTER_MANAGER_PORT}", + bold=False) + + def start_host_manager(ip: str, container_ip: str, emulation: str, ip_first_octet: int): """ Utility function for starting host manager @@ -2278,7 +2391,7 @@ def ls_shell_complete(ctx, param, incomplete) -> List[str]: "| node_exporter | cadvisor | pgadmin | statsmanager | flask | " "simulations | emulation_executions | cluster | nginx | postgresql | docker | hostmanagers | " "clientmanager | snortmanagers | elkmanager | trafficmanagers | kafkamanager | " - "ossecmanagers | ryumanager") + "ossecmanagers | ryumanager | filebeats") @click.argument('entity', default='all', type=str, shell_complete=ls_shell_complete) @click.option('--all', is_flag=True, help='list all') @click.option('--running', is_flag=True, help='list running only (default)') @@ -2359,6 +2472,8 @@ def ls(entity: str, all: bool, running: bool, stopped: bool, ip: str, name: str, list_ossec_ids_managers(ip=ip, emulation=name, ip_first_octet=id) elif entity == "ryumanager": list_ryu_manager(ip=ip, emulation=name, ip_first_octet=id) + elif entity == "filebeats": + list_filebeats(ip=ip, emulation=name, ip_first_octet=id) else: container = get_running_container(name=entity) if container is not None: @@ -2393,6 +2508,38 @@ def ls(entity: str, all: bool, running: bool, stopped: bool, ip: str, name: str, click.secho(f"entity: {entity} is not recognized", fg="red", bold=True) +def list_filebeats(ip: str, emulation: str, ip_first_octet: int) -> None: + """ + Utility function for listing filebeats + + :param ip: the ip of the node to list filebeats + :param emulation: the emulation of the execution + :param ip_first_octet: the ID of the execution + + :return: None + """ + import csle_common.constants.constants as constants + from csle_common.metastore.metastore_facade import MetastoreFacade + config = MetastoreFacade.get_config(id=1) + for node in config.cluster_config.cluster_nodes: + if node.ip == ip or ip == "": + filebeats_info = ClusterController.get_host_managers_info( + ip=ip, port=constants.GRPC_SERVERS.CLUSTER_MANAGER_PORT, emulation=emulation, + ip_first_octet=ip_first_octet) + click.secho('+' + '-' * 60 + '+', fg='white') + click.secho(f'|{"Host IP":^30}|{"Filebeats running Status":^29}|', fg='white') + click.secho('+' + '-' * 60 + '+', fg='white') + for i in range(len(filebeats_info.hostManagersStatuses)): + status = "Running" if filebeats_info.hostManagersStatuses[i].filebeat_running else "Stopped" + status_color = 'green' if filebeats_info.hostManagersStatuses[i].filebeat_running else 'red' + click.secho('|', nl=False, fg='white') + click.secho(f'{filebeats_info.ips[i]:<30}', nl=False, fg='white') + click.secho('|', nl=False, fg='white') + click.secho(f'{status:^29}', nl=False, fg=status_color) + click.secho('|', fg='white') + click.secho('+' + '-' * 60 + '+', fg='white') + + def list_ryu_manager(ip: str, emulation: str, ip_first_octet: int) -> None: """ Utility function for listing ryu manager From f7abc4f8fc305256d4a3f4b41e828b1dbe1d546f Mon Sep 17 00:00:00 2001 From: foroogh shahab Date: Sat, 3 Aug 2024 20:20:21 +0200 Subject: [PATCH 2/2] Metricbeats added. --- .../libs/csle-cli/src/csle_cli/cli.py | 153 +++++++++++++++++- 1 file changed, 150 insertions(+), 3 deletions(-) diff --git a/simulation-system/libs/csle-cli/src/csle_cli/cli.py b/simulation-system/libs/csle-cli/src/csle_cli/cli.py index 381ad949d..6ce28ca0d 100755 --- a/simulation-system/libs/csle-cli/src/csle_cli/cli.py +++ b/simulation-system/libs/csle-cli/src/csle_cli/cli.py @@ -686,7 +686,8 @@ def stop_shell_complete(ctx, param, incomplete) -> List[str]: "emulation-name | statsmanager | emulation_executions | pgadmin | all | nginx | postgresql " "| docker | clustermanager | hostmanagers | hostmanager | clientmanager | snortmanagers " "| snortmanager | elkmanager | trafficmanagers | trafficmanager | kafkamanager " - "| ossecmanagers | ossecmanager | ryumanager | filebeats | filebeat") + "| ossecmanagers | ossecmanager | ryumanager | filebeats | filebeat | metricbeat " + "| metricbeats") def stop(entity: str, name: str, id: int = -1, ip: str = "", container_ip: str = "") -> None: """ Stops an entity @@ -758,6 +759,10 @@ def stop(entity: str, name: str, id: int = -1, ip: str = "", container_ip: str = stop_filebeats(ip=ip, emulation=name, ip_first_octet=id) elif entity == "filebeat": stop_filebeat(ip=ip, container_ip=container_ip, emulation=name, ip_first_octet=id) + elif entity == "metricbeats": + stop_metricbeats(ip=ip, emulation=name, ip_first_octet=id) + elif entity == "metricbeat": + stop_metricbeat(ip=ip, container_ip=container_ip, emulation=name, ip_first_octet=id) else: container_stopped = False for node in config.cluster_config.cluster_nodes: @@ -1280,6 +1285,58 @@ def stop_filebeat(ip: str, container_ip: str, emulation: str, ip_first_octet: in bold=False) +def stop_metricbeats(ip: str, emulation: str, ip_first_octet: int) -> None: + """ + Utility function for stopping the metricbeats + + :param ip: the ip of the node to stop the metricbeats + :param emulation: the emulation of the execution + :param ip_first_octet: the ID of the execution + :return: None + """ + import csle_common.constants.constants as constants + from csle_common.metastore.metastore_facade import MetastoreFacade + config = MetastoreFacade.get_config(id=1) + for node in config.cluster_config.cluster_nodes: + if node.ip == ip or ip == "": + stopped = ClusterController.stop_metricbeats( + ip=ip, port=constants.GRPC_SERVERS.CLUSTER_MANAGER_PORT, emulation=emulation, + ip_first_octet=ip_first_octet) + if stopped.outcome: + click.secho(f"Stopping metricbeats on port:{constants.GRPC_SERVERS.CLUSTER_MANAGER_PORT}") + else: + click.secho(f"Metricbeats are not stopped:{constants.GRPC_SERVERS.CLUSTER_MANAGER_PORT}", + bold=False) + + +def stop_metricbeat(ip: str, container_ip: str, emulation: str, ip_first_octet: int) -> None: + """ + Utility function for stopping the metricbeat + + :param ip: the ip of the node to stop the metricbeat + :param container_ip: the ip of the host that traffic is running on + :param emulation: the emulation of the execution + :param ip_first_octet: the ID of the execution + :return: None + """ + import csle_common.constants.constants as constants + from csle_common.metastore.metastore_facade import MetastoreFacade + config = MetastoreFacade.get_config(id=1) + for node in config.cluster_config.cluster_nodes: + if node.ip == ip or ip == "": + stopped = ClusterController.stop_metricbeat( + ip=ip, port=constants.GRPC_SERVERS.CLUSTER_MANAGER_PORT, emulation=emulation, + ip_first_octet=ip_first_octet, container_ip=container_ip) + if stopped.outcome: + click.secho( + f"Stopping metricbeat with ip {container_ip} on port:" + f"{constants.GRPC_SERVERS.CLUSTER_MANAGER_PORT}") + else: + click.secho(f"Metricbeat with ip {container_ip} is not " + f"stopped:{constants.GRPC_SERVERS.CLUSTER_MANAGER_PORT}", + bold=False) + + @click.argument('max_workers', default=10, type=int) @click.argument('log_file', default="docker_statsmanager.log", type=str) @click.argument('log_dir', default="/var/log/csle", type=str) @@ -1474,7 +1531,7 @@ def start_shell_complete(ctx, param, incomplete) -> List[str]: "| system_id_job | nginx | postgresql | docker | clustermanager | hostmanagers " "| hostmanager | clientmanager | snortmanagers | snortmanager | elkmanager " "| trafficmanagers | trafficmanager | kafkamanager | ossecmanagers | ossecmanager " - "| ryumanager | filebeats | filebeat") + "| ryumanager | filebeats | filebeat | metricbeats | metricbeat") def start(entity: str, no_traffic: bool, name: str, id: int, no_clients: bool, no_network: bool, ip: str, container_ip: str, no_beats: bool, initial_start: bool) -> None: """ @@ -1558,6 +1615,11 @@ def start(entity: str, no_traffic: bool, name: str, id: int, no_clients: bool, n elif entity == "filebeat": start_filebeat(ip=ip, container_ip=container_ip, emulation=name, ip_first_octet=id, initial_start=initial_start) + elif entity == "metricbeats": + start_metricbeats(ip=ip, emulation=name, ip_first_octet=id, initial_start=initial_start) + elif entity == "metricbeat": + start_metricbeat(ip=ip, container_ip=container_ip, emulation=name, ip_first_octet=id, + initial_start=initial_start) else: container_started = False for node in config.cluster_config.cluster_nodes: @@ -1831,6 +1893,57 @@ def start_filebeat(ip: str, container_ip: str, emulation: str, ip_first_octet: i bold=False) +def start_metricbeats(ip: str, emulation: str, ip_first_octet: int, initial_start: bool): + """ + Utility function for starting metricbeats + + :param ip: the ip of the node to start metricbeats + :param emulation: the emulation of the execution + :param ip_first_octet: the ID of the execution + :return: None + """ + import csle_common.constants.constants as constants + from csle_common.metastore.metastore_facade import MetastoreFacade + config = MetastoreFacade.get_config(id=1) + for node in config.cluster_config.cluster_nodes: + if node.ip == ip or ip == "": + operation_outcome = ClusterController.start_metricbeats( + ip=ip, port=constants.GRPC_SERVERS.CLUSTER_MANAGER_PORT, emulation=emulation, + ip_first_octet=ip_first_octet, initial_start=initial_start) + if operation_outcome.outcome: + click.secho(f"Starting metricbeats on port:{constants.GRPC_SERVERS.CLUSTER_MANAGER_PORT}") + else: + click.secho(f"Metricbeats are not started:{constants.GRPC_SERVERS.CLUSTER_MANAGER_PORT}", + bold=False) + + +def start_metricbeat(ip: str, container_ip: str, emulation: str, ip_first_octet: int, initial_start: bool): + """ + Utility function for starting metricbeat + + :param ip: the ip of the node to start metricbeat + :param container_ip: the ip of the host to start + :param emulation: the emulation of the execution + :param ip_first_octet: the ID of the execution + :return: None + """ + import csle_common.constants.constants as constants + from csle_common.metastore.metastore_facade import MetastoreFacade + config = MetastoreFacade.get_config(id=1) + for node in config.cluster_config.cluster_nodes: + if node.ip == ip or ip == "": + operation_outcome = ClusterController.start_metricbeat( + ip=ip, port=constants.GRPC_SERVERS.CLUSTER_MANAGER_PORT, emulation=emulation, + ip_first_octet=ip_first_octet, container_ip=container_ip, initial_start=initial_start) + if operation_outcome.outcome: + click.secho(f"Started metricbeat with ip {container_ip} on " + f"port:{constants.GRPC_SERVERS.CLUSTER_MANAGER_PORT}") + else: + click.secho(f"Metricbeat with ip {container_ip} is not " + f"started:{constants.GRPC_SERVERS.CLUSTER_MANAGER_PORT}", + bold=False) + + def start_host_manager(ip: str, container_ip: str, emulation: str, ip_first_octet: int): """ Utility function for starting host manager @@ -2391,7 +2504,7 @@ def ls_shell_complete(ctx, param, incomplete) -> List[str]: "| node_exporter | cadvisor | pgadmin | statsmanager | flask | " "simulations | emulation_executions | cluster | nginx | postgresql | docker | hostmanagers | " "clientmanager | snortmanagers | elkmanager | trafficmanagers | kafkamanager | " - "ossecmanagers | ryumanager | filebeats") + "ossecmanagers | ryumanager | filebeats | metricbeats") @click.argument('entity', default='all', type=str, shell_complete=ls_shell_complete) @click.option('--all', is_flag=True, help='list all') @click.option('--running', is_flag=True, help='list running only (default)') @@ -2474,6 +2587,8 @@ def ls(entity: str, all: bool, running: bool, stopped: bool, ip: str, name: str, list_ryu_manager(ip=ip, emulation=name, ip_first_octet=id) elif entity == "filebeats": list_filebeats(ip=ip, emulation=name, ip_first_octet=id) + elif entity == "metricbeats": + list_metricbeats(ip=ip, emulation=name, ip_first_octet=id) else: container = get_running_container(name=entity) if container is not None: @@ -2540,6 +2655,38 @@ def list_filebeats(ip: str, emulation: str, ip_first_octet: int) -> None: click.secho('+' + '-' * 60 + '+', fg='white') +def list_metricbeats(ip: str, emulation: str, ip_first_octet: int) -> None: + """ + Utility function for listing filebeats + + :param ip: the ip of the node to list filebeats + :param emulation: the emulation of the execution + :param ip_first_octet: the ID of the execution + + :return: None + """ + import csle_common.constants.constants as constants + from csle_common.metastore.metastore_facade import MetastoreFacade + config = MetastoreFacade.get_config(id=1) + for node in config.cluster_config.cluster_nodes: + if node.ip == ip or ip == "": + metricbeats_info = ClusterController.get_host_managers_info( + ip=ip, port=constants.GRPC_SERVERS.CLUSTER_MANAGER_PORT, emulation=emulation, + ip_first_octet=ip_first_octet) + click.secho('+' + '-' * 60 + '+', fg='white') + click.secho(f'|{"Host IP":^30}|{"Metricbeats running Status":^29}|', fg='white') + click.secho('+' + '-' * 60 + '+', fg='white') + for i in range(len(metricbeats_info.hostManagersStatuses)): + status = "Running" if metricbeats_info.hostManagersStatuses[i].metricbeat_running else "Stopped" + status_color = 'green' if metricbeats_info.hostManagersStatuses[i].metricbeat_running else 'red' + click.secho('|', nl=False, fg='white') + click.secho(f'{metricbeats_info.ips[i]:<30}', nl=False, fg='white') + click.secho('|', nl=False, fg='white') + click.secho(f'{status:^29}', nl=False, fg=status_color) + click.secho('|', fg='white') + click.secho('+' + '-' * 60 + '+', fg='white') + + def list_ryu_manager(ip: str, emulation: str, ip_first_octet: int) -> None: """ Utility function for listing ryu manager