From 918daab050c219e3232cfe102595b9a552ceb04f Mon Sep 17 00:00:00 2001 From: Nitin Garg Date: Thu, 3 Oct 2024 07:22:27 +0000 Subject: [PATCH] Replace mash with monitoring-api everywhere Purposes. * Consistent behavior across all machines * Monitoring API has faster runtime than mash. * Monitoring API is supported on GCE VM too. --- .../examples/dlio/parse_logs.py | 58 ++++------ .../testing_on_gke/examples/fio/parse_logs.py | 58 ++++------ .../testing_on_gke/examples/run-gke-tests.sh | 7 +- .../testing_on_gke/examples/utils/utils.py | 107 ------------------ .../examples/utils/utils_test.py | 28 +---- 5 files changed, 42 insertions(+), 216 deletions(-) diff --git a/perfmetrics/scripts/testing_on_gke/examples/dlio/parse_logs.py b/perfmetrics/scripts/testing_on_gke/examples/dlio/parse_logs.py index 66146090df..58f76223cd 100644 --- a/perfmetrics/scripts/testing_on_gke/examples/dlio/parse_logs.py +++ b/perfmetrics/scripts/testing_on_gke/examples/dlio/parse_logs.py @@ -27,8 +27,8 @@ # local library imports sys.path.append("../") import dlio_workload -from utils.utils import get_memory, get_cpu, unix_to_timestamp, standard_timestamp, is_mash_installed, get_memory_from_monitoring_api, get_cpu_from_monitoring_api, timestamp_to_epoch from utils.parse_logs_common import ensure_directory_exists, download_gcs_objects, parse_arguments, SUPPORTED_SCENARIOS, default_service_account_key_file, export_to_csv, export_to_gsheet +from utils.utils import unix_to_timestamp, standard_timestamp, get_memory_from_monitoring_api, get_cpu_from_monitoring_api, timestamp_to_epoch _LOCAL_LOGS_LOCATION = "../../bin/dlio-logs/logs" @@ -52,8 +52,6 @@ "gcsfuse_mount_options": "", } -mash_installed = False - _HEADER = ( "File Size", "File #", @@ -204,38 +202,24 @@ def createOutputScenariosFromDownloadedFiles(args: dict) -> dict: def fetch_cpu_memory_data(): if r["scenario"] != "local-ssd": - if mash_installed: - r["lowest_memory"], r["highest_memory"] = get_memory( - r["pod_name"], - r["start"], - r["end"], - project_number=args.project_number, - ) - r["lowest_cpu"], r["highest_cpu"] = get_cpu( - r["pod_name"], - r["start"], - r["end"], - project_number=args.project_number, - ) - else: - r["lowest_memory"], r["highest_memory"] = ( - get_memory_from_monitoring_api( - pod_name=r["pod_name"], - start_epoch=r["start_epoch"], - end_epoch=r["end_epoch"], - project_id=args.project_id, - cluster_name=args.cluster_name, - namespace_name=args.namespace_name, - ) - ) - r["lowest_cpu"], r["highest_cpu"] = get_cpu_from_monitoring_api( - pod_name=r["pod_name"], - start_epoch=r["start_epoch"], - end_epoch=r["end_epoch"], - project_id=args.project_id, - cluster_name=args.cluster_name, - namespace_name=args.namespace_name, - ) + r["lowest_memory"], r["highest_memory"] = ( + get_memory_from_monitoring_api( + pod_name=r["pod_name"], + start_epoch=r["start_epoch"], + end_epoch=r["end_epoch"], + project_id=args.project_id, + cluster_name=args.cluster_name, + namespace_name=args.namespace_name, + ) + ) + r["lowest_cpu"], r["highest_cpu"] = get_cpu_from_monitoring_api( + pod_name=r["pod_name"], + start_epoch=r["start_epoch"], + end_epoch=r["end_epoch"], + project_id=args.project_id, + cluster_name=args.cluster_name, + namespace_name=args.namespace_name, + ) fetch_cpu_memory_data() @@ -349,9 +333,5 @@ def writeOutput( if ret != 0: print(f"failed to download dlio outputs: {ret}") - mash_installed = is_mash_installed() - if not mash_installed: - print("Mash is not installed, will skip parsing CPU and memory usage.") - output = createOutputScenariosFromDownloadedFiles(args) writeOutput(output, args) diff --git a/perfmetrics/scripts/testing_on_gke/examples/fio/parse_logs.py b/perfmetrics/scripts/testing_on_gke/examples/fio/parse_logs.py index 1cef365805..8424a30adf 100644 --- a/perfmetrics/scripts/testing_on_gke/examples/fio/parse_logs.py +++ b/perfmetrics/scripts/testing_on_gke/examples/fio/parse_logs.py @@ -27,8 +27,8 @@ # local library imports sys.path.append("../") import fio_workload -from utils.utils import get_memory, get_cpu, unix_to_timestamp, is_mash_installed, get_memory_from_monitoring_api, get_cpu_from_monitoring_api from utils.parse_logs_common import ensure_directory_exists, download_gcs_objects, parse_arguments, SUPPORTED_SCENARIOS, default_service_account_key_file, export_to_csv, export_to_gsheet +from utils.utils import unix_to_timestamp, get_memory_from_monitoring_api, get_cpu_from_monitoring_api _LOCAL_LOGS_LOCATION = "../../bin/fio-logs" @@ -54,8 +54,6 @@ "numThreads": 0, } -mash_installed = False - _HEADER = ( "File Size", "Read Type", @@ -244,38 +242,24 @@ def createOutputScenariosFromDownloadedFiles(args: dict) -> dict: def fetch_cpu_memory_data(): if r["scenario"] != "local-ssd": - if mash_installed: - r["lowest_memory"], r["highest_memory"] = get_memory( - r["pod_name"], - r["start"], - r["end"], - project_number=args.project_number, - ) - r["lowest_cpu"], r["highest_cpu"] = get_cpu( - r["pod_name"], - r["start"], - r["end"], - project_number=args.project_number, - ) - else: - r["lowest_memory"], r["highest_memory"] = ( - get_memory_from_monitoring_api( - pod_name=r["pod_name"], - start_epoch=r["start_epoch"], - end_epoch=r["end_epoch"], - project_id=args.project_id, - cluster_name=args.cluster_name, - namespace_name=args.namespace_name, - ) - ) - r["lowest_cpu"], r["highest_cpu"] = get_cpu_from_monitoring_api( - pod_name=r["pod_name"], - start_epoch=r["start_epoch"], - end_epoch=r["end_epoch"], - project_id=args.project_id, - cluster_name=args.cluster_name, - namespace_name=args.namespace_name, - ) + r["lowest_memory"], r["highest_memory"] = ( + get_memory_from_monitoring_api( + pod_name=r["pod_name"], + start_epoch=r["start_epoch"], + end_epoch=r["end_epoch"], + project_id=args.project_id, + cluster_name=args.cluster_name, + namespace_name=args.namespace_name, + ) + ) + r["lowest_cpu"], r["highest_cpu"] = get_cpu_from_monitoring_api( + pod_name=r["pod_name"], + start_epoch=r["start_epoch"], + end_epoch=r["end_epoch"], + project_id=args.project_id, + cluster_name=args.cluster_name, + namespace_name=args.namespace_name, + ) fetch_cpu_memory_data() @@ -383,9 +367,5 @@ def writeOutput( if ret != 0: print(f"failed to download fio outputs: {ret}") - mash_installed = is_mash_installed() - if not mash_installed: - print("Mash is not installed, will skip parsing CPU and memory usage.") - output = createOutputScenariosFromDownloadedFiles(args) writeOutput(output, args) diff --git a/perfmetrics/scripts/testing_on_gke/examples/run-gke-tests.sh b/perfmetrics/scripts/testing_on_gke/examples/run-gke-tests.sh index 6cfb32c6db..38e15ec32f 100755 --- a/perfmetrics/scripts/testing_on_gke/examples/run-gke-tests.sh +++ b/perfmetrics/scripts/testing_on_gke/examples/run-gke-tests.sh @@ -306,12 +306,7 @@ function installDependencies() { apt-cache policy docker-ce sudo apt install docker-ce -y fi - # Install mash, as it is needed for fetching cpu/memory values for test runs - # in cloudtop. Even if mash install fails, don't panic, go ahead and install - # google-cloud-monitoring as an alternative. - which mash || sudo apt-get install -y monarch-tools || true - # Ensure that gcloud monitoring tools are installed. This is alternative to - # mash on gce vm. + # Ensure that gcloud monitoring tools are installed. # pip install --upgrade google-cloud-storage # pip install --ignore-installed --upgrade google-api-python-client # pip install --ignore-installed --upgrade google-cloud diff --git a/perfmetrics/scripts/testing_on_gke/examples/utils/utils.py b/perfmetrics/scripts/testing_on_gke/examples/utils/utils.py index 140096b689..90561a1b12 100644 --- a/perfmetrics/scripts/testing_on_gke/examples/utils/utils.py +++ b/perfmetrics/scripts/testing_on_gke/examples/utils/utils.py @@ -24,113 +24,6 @@ _GCSFUSE_CONTAINER_NAME = "gke-gcsfuse-sidecar" -def is_mash_installed() -> bool: - try: - subprocess.run( - ["mash", "--version"], - stdout=subprocess.DEVNULL, - stderr=subprocess.DEVNULL, - check=True, - ) - return True - except subprocess.CalledProcessError: - return False - except FileNotFoundError: - return False - - -def get_memory( - pod_name: str, start: str, end: str, project_number: int -) -> Tuple[int, int]: - # for some reason, the mash filter does not always work, so we fetch all the metrics for all the pods and filter later. - result = subprocess.run( - [ - "mash", - "--namespace=cloud_prod", - "--output=csv", - ( - "Query(Fetch(Raw('cloud.kubernetes.K8sContainer'," - " 'kubernetes.io/container/memory/used_bytes'), {'project':" - f" '{project_number}', 'metric:memory_type': 'non-evictable'}})|" - " Window(Align('10m'))| GroupBy(['pod_name', 'container_name']," - f" Max()), TimeInterval('{start}', '{end}'), '5s')" - ), - ], - capture_output=True, - text=True, - ) - - data_points_int = [] - data_points_by_pod_container = result.stdout.strip().split("\n") - for data_points in data_points_by_pod_container[1:]: - data_points_split = data_points.split(",") - if len(data_points_split) < 6: - continue - pn = data_points_split[4] - container_name = data_points_split[5] - if pn == pod_name and container_name == _GCSFUSE_CONTAINER_NAME: - try: - data_points_int = [int(d) for d in data_points_split[7:]] - except: - print( - f"failed to parse memory for pod {pod_name}, {start}, {end}, data" - f" {data_points_int}" - ) - break - if not data_points_int: - return 0, 0 - - return int(min(data_points_int) / 1024**2), int( - max(data_points_int) / 1024**2 - ) - - -def get_cpu( - pod_name: str, start: str, end: str, project_number: int -) -> Tuple[float, float]: - # for some reason, the mash filter does not always work, so we fetch all the metrics for all the pods and filter later. - result = subprocess.run( - [ - "mash", - "--namespace=cloud_prod", - "--output=csv", - ( - "Query(Fetch(Raw('cloud.kubernetes.K8sContainer'," - " 'kubernetes.io/container/cpu/core_usage_time'), {'project':" - f" '{project_number}'}})| Window(Rate('10m'))|" - " GroupBy(['pod_name', 'container_name'], Max())," - f" TimeInterval('{start}', '{end}'), '5s')" - ), - ], - capture_output=True, - text=True, - ) - - data_points_float = [] - data_points_by_pod_container = result.stdout.split("\n") - for data_points in data_points_by_pod_container[1:]: - data_points_split = data_points.split(",") - if len(data_points_split) < 6: - continue - pn = data_points_split[4] - container_name = data_points_split[5] - if pn == pod_name and container_name == _GCSFUSE_CONTAINER_NAME: - try: - data_points_float = [float(d) for d in data_points_split[6:]] - except: - print( - f"failed to parse CPU for pod {pod_name}, {start}, {end}, data" - f" {data_points_float}" - ) - - break - - if not data_points_float: - return 0.0, 0.0 - - return round(min(data_points_float), 5), round(max(data_points_float), 5) - - def unix_to_timestamp(unix_timestamp: int) -> str: # Convert Unix timestamp to a datetime object (aware of UTC) datetime_utc = datetime.datetime.fromtimestamp( diff --git a/perfmetrics/scripts/testing_on_gke/examples/utils/utils_test.py b/perfmetrics/scripts/testing_on_gke/examples/utils/utils_test.py index dae89cd41a..1c4d2b3b57 100644 --- a/perfmetrics/scripts/testing_on_gke/examples/utils/utils_test.py +++ b/perfmetrics/scripts/testing_on_gke/examples/utils/utils_test.py @@ -17,7 +17,7 @@ import unittest import utils -from utils import get_cpu, get_cpu_from_monitoring_api, get_memory, get_memory_from_monitoring_api, timestamp_to_epoch +from utils import get_cpu_from_monitoring_api, get_memory_from_monitoring_api, timestamp_to_epoch class UtilsTest(unittest.TestCase): @@ -34,7 +34,7 @@ def setUpClass(self): self.start = '2024-09-25 06:32:22 UTC' self.end = '2024-09-25 07:06:22 UTC' - def test_get_memory_methods(self): + def test_get_memory(self): low1, high1 = get_memory_from_monitoring_api( project_id=self.project_id, cluster_name=self.cluster_name, @@ -46,18 +46,7 @@ def test_get_memory_methods(self): self.assertLessEqual(low1, high1) self.assertGreater(high1, 0) - low2, high2 = get_memory( - project_number=self.project_number, - pod_name=self.pod_name, - start=self.start, - end=self.end, - ) - self.assertLessEqual(low2, high2) - self.assertGreater(high2, 0) - - self.assertTrue(high1 >= 0.99 * high2 and high1 <= 1.01 * high2) - - def test_get_cpu_methods(self): + def test_get_cpu(self): low1, high1 = get_cpu_from_monitoring_api( project_id=self.project_id, cluster_name=self.cluster_name, @@ -69,17 +58,6 @@ def test_get_cpu_methods(self): self.assertLessEqual(low1, high1) self.assertGreater(high1, 0) - low2, high2 = get_cpu( - project_number=self.project_number, - pod_name=self.pod_name, - start=self.start, - end=self.end, - ) - self.assertLessEqual(low2, high2) - self.assertGreater(high2, 0) - - self.assertTrue(high1 >= 0.99 * high2 and high1 <= 1.01 * high2) - def test_timestamp_to_epoch(self): self.assertEqual(timestamp_to_epoch('2024-08-21T19:20:25'), 1724268025)