Skip to content

Commit

Permalink
Replace mash with monitoring-api everywhere
Browse files Browse the repository at this point in the history
Purposes.
* Consistent behavior across all machines
* Monitoring API has faster runtime than mash.
* Monitoring API is supported on GCE VM too.
  • Loading branch information
gargnitingoogle committed Oct 3, 2024
1 parent ed56ec3 commit 13c8592
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 216 deletions.
58 changes: 19 additions & 39 deletions perfmetrics/scripts/testing_on_gke/examples/dlio/parse_logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
# local library imports
sys.path.append("../")
import dlio_workload
from utils.utils import get_memory, get_cpu, unix_to_timestamp, standard_timestamp, is_mash_installed, get_memory_from_monitoring_api, get_cpu_from_monitoring_api, timestamp_to_epoch
from utils.utils import unix_to_timestamp, standard_timestamp, get_memory_from_monitoring_api, get_cpu_from_monitoring_api, timestamp_to_epoch
from utils.parse_logs_common import ensure_directory_exists, download_gcs_objects, parse_arguments, SUPPORTED_SCENARIOS, default_service_account_key_file
from utils.gsheet import append_data_to_gsheet, url

Expand All @@ -53,8 +53,6 @@
"gcsfuse_mount_options": "",
}

mash_installed = False

_HEADER = (
"File Size",
"File #",
Expand Down Expand Up @@ -205,38 +203,24 @@ def createOutputScenariosFromDownloadedFiles(args: dict) -> dict:

def fetch_cpu_memory_data():
if r["scenario"] != "local-ssd":
if mash_installed:
r["lowest_memory"], r["highest_memory"] = get_memory(
r["pod_name"],
r["start"],
r["end"],
project_number=args.project_number,
)
r["lowest_cpu"], r["highest_cpu"] = get_cpu(
r["pod_name"],
r["start"],
r["end"],
project_number=args.project_number,
)
else:
r["lowest_memory"], r["highest_memory"] = (
get_memory_from_monitoring_api(
pod_name=r["pod_name"],
start_epoch=r["start_epoch"],
end_epoch=r["end_epoch"],
project_id=args.project_id,
cluster_name=args.cluster_name,
namespace_name=args.namespace_name,
)
)
r["lowest_cpu"], r["highest_cpu"] = get_cpu_from_monitoring_api(
pod_name=r["pod_name"],
start_epoch=r["start_epoch"],
end_epoch=r["end_epoch"],
project_id=args.project_id,
cluster_name=args.cluster_name,
namespace_name=args.namespace_name,
)
r["lowest_memory"], r["highest_memory"] = (
get_memory_from_monitoring_api(
pod_name=r["pod_name"],
start_epoch=r["start_epoch"],
end_epoch=r["end_epoch"],
project_id=args.project_id,
cluster_name=args.cluster_name,
namespace_name=args.namespace_name,
)
)
r["lowest_cpu"], r["highest_cpu"] = get_cpu_from_monitoring_api(
pod_name=r["pod_name"],
start_epoch=r["start_epoch"],
end_epoch=r["end_epoch"],
project_id=args.project_id,
cluster_name=args.cluster_name,
namespace_name=args.namespace_name,
)

fetch_cpu_memory_data()

Expand Down Expand Up @@ -392,9 +376,5 @@ def exportToGsheet(
if ret != 0:
print(f"failed to download dlio outputs: {ret}")

mash_installed = is_mash_installed()
if not mash_installed:
print("Mash is not installed, will skip parsing CPU and memory usage.")

output = createOutputScenariosFromDownloadedFiles(args)
writeOutput(output, args)
58 changes: 19 additions & 39 deletions perfmetrics/scripts/testing_on_gke/examples/fio/parse_logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
# local library imports
sys.path.append("../")
import fio_workload
from utils.utils import get_memory, get_cpu, unix_to_timestamp, is_mash_installed, get_memory_from_monitoring_api, get_cpu_from_monitoring_api
from utils.utils import unix_to_timestamp, get_memory_from_monitoring_api, get_cpu_from_monitoring_api
from utils.parse_logs_common import ensure_directory_exists, download_gcs_objects, parse_arguments, SUPPORTED_SCENARIOS, default_service_account_key_file
from utils.gsheet import append_data_to_gsheet, url

Expand Down Expand Up @@ -55,8 +55,6 @@
"numThreads": 0,
}

mash_installed = False

_HEADER = (
"File Size",
"Read Type",
Expand Down Expand Up @@ -245,38 +243,24 @@ def createOutputScenariosFromDownloadedFiles(args: dict) -> dict:

def fetch_cpu_memory_data():
if r["scenario"] != "local-ssd":
if mash_installed:
r["lowest_memory"], r["highest_memory"] = get_memory(
r["pod_name"],
r["start"],
r["end"],
project_number=args.project_number,
)
r["lowest_cpu"], r["highest_cpu"] = get_cpu(
r["pod_name"],
r["start"],
r["end"],
project_number=args.project_number,
)
else:
r["lowest_memory"], r["highest_memory"] = (
get_memory_from_monitoring_api(
pod_name=r["pod_name"],
start_epoch=r["start_epoch"],
end_epoch=r["end_epoch"],
project_id=args.project_id,
cluster_name=args.cluster_name,
namespace_name=args.namespace_name,
)
)
r["lowest_cpu"], r["highest_cpu"] = get_cpu_from_monitoring_api(
pod_name=r["pod_name"],
start_epoch=r["start_epoch"],
end_epoch=r["end_epoch"],
project_id=args.project_id,
cluster_name=args.cluster_name,
namespace_name=args.namespace_name,
)
r["lowest_memory"], r["highest_memory"] = (
get_memory_from_monitoring_api(
pod_name=r["pod_name"],
start_epoch=r["start_epoch"],
end_epoch=r["end_epoch"],
project_id=args.project_id,
cluster_name=args.cluster_name,
namespace_name=args.namespace_name,
)
)
r["lowest_cpu"], r["highest_cpu"] = get_cpu_from_monitoring_api(
pod_name=r["pod_name"],
start_epoch=r["start_epoch"],
end_epoch=r["end_epoch"],
project_id=args.project_id,
cluster_name=args.cluster_name,
namespace_name=args.namespace_name,
)

fetch_cpu_memory_data()

Expand Down Expand Up @@ -426,9 +410,5 @@ def exportToGsheet(
if ret != 0:
print(f"failed to download fio outputs: {ret}")

mash_installed = is_mash_installed()
if not mash_installed:
print("Mash is not installed, will skip parsing CPU and memory usage.")

output = createOutputScenariosFromDownloadedFiles(args)
writeOutput(output, args)
7 changes: 1 addition & 6 deletions perfmetrics/scripts/testing_on_gke/examples/run-gke-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -306,12 +306,7 @@ function installDependencies() {
apt-cache policy docker-ce
sudo apt install docker-ce -y
fi
# Install mash, as it is needed for fetching cpu/memory values for test runs
# in cloudtop. Even if mash install fails, don't panic, go ahead and install
# google-cloud-monitoring as an alternative.
which mash || sudo apt-get install -y monarch-tools || true
# Ensure that gcloud monitoring tools are installed. This is alternative to
# mash on gce vm.
# Ensure that gcloud monitoring tools are installed.
# pip install --upgrade google-cloud-storage
# pip install --ignore-installed --upgrade google-api-python-client
# pip install --ignore-installed --upgrade google-cloud
Expand Down
107 changes: 0 additions & 107 deletions perfmetrics/scripts/testing_on_gke/examples/utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,113 +24,6 @@
_GCSFUSE_CONTAINER_NAME = "gke-gcsfuse-sidecar"


def is_mash_installed() -> bool:
try:
subprocess.run(
["mash", "--version"],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
check=True,
)
return True
except subprocess.CalledProcessError:
return False
except FileNotFoundError:
return False


def get_memory(
pod_name: str, start: str, end: str, project_number: int
) -> Tuple[int, int]:
# for some reason, the mash filter does not always work, so we fetch all the metrics for all the pods and filter later.
result = subprocess.run(
[
"mash",
"--namespace=cloud_prod",
"--output=csv",
(
"Query(Fetch(Raw('cloud.kubernetes.K8sContainer',"
" 'kubernetes.io/container/memory/used_bytes'), {'project':"
f" '{project_number}', 'metric:memory_type': 'non-evictable'}})|"
" Window(Align('10m'))| GroupBy(['pod_name', 'container_name'],"
f" Max()), TimeInterval('{start}', '{end}'), '5s')"
),
],
capture_output=True,
text=True,
)

data_points_int = []
data_points_by_pod_container = result.stdout.strip().split("\n")
for data_points in data_points_by_pod_container[1:]:
data_points_split = data_points.split(",")
if len(data_points_split) < 6:
continue
pn = data_points_split[4]
container_name = data_points_split[5]
if pn == pod_name and container_name == _GCSFUSE_CONTAINER_NAME:
try:
data_points_int = [int(d) for d in data_points_split[7:]]
except:
print(
f"failed to parse memory for pod {pod_name}, {start}, {end}, data"
f" {data_points_int}"
)
break
if not data_points_int:
return 0, 0

return int(min(data_points_int) / 1024**2), int(
max(data_points_int) / 1024**2
)


def get_cpu(
pod_name: str, start: str, end: str, project_number: int
) -> Tuple[float, float]:
# for some reason, the mash filter does not always work, so we fetch all the metrics for all the pods and filter later.
result = subprocess.run(
[
"mash",
"--namespace=cloud_prod",
"--output=csv",
(
"Query(Fetch(Raw('cloud.kubernetes.K8sContainer',"
" 'kubernetes.io/container/cpu/core_usage_time'), {'project':"
f" '{project_number}'}})| Window(Rate('10m'))|"
" GroupBy(['pod_name', 'container_name'], Max()),"
f" TimeInterval('{start}', '{end}'), '5s')"
),
],
capture_output=True,
text=True,
)

data_points_float = []
data_points_by_pod_container = result.stdout.split("\n")
for data_points in data_points_by_pod_container[1:]:
data_points_split = data_points.split(",")
if len(data_points_split) < 6:
continue
pn = data_points_split[4]
container_name = data_points_split[5]
if pn == pod_name and container_name == _GCSFUSE_CONTAINER_NAME:
try:
data_points_float = [float(d) for d in data_points_split[6:]]
except:
print(
f"failed to parse CPU for pod {pod_name}, {start}, {end}, data"
f" {data_points_float}"
)

break

if not data_points_float:
return 0.0, 0.0

return round(min(data_points_float), 5), round(max(data_points_float), 5)


def unix_to_timestamp(unix_timestamp: int) -> str:
# Convert Unix timestamp to a datetime object (aware of UTC)
datetime_utc = datetime.datetime.fromtimestamp(
Expand Down
28 changes: 3 additions & 25 deletions perfmetrics/scripts/testing_on_gke/examples/utils/utils_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

import unittest
import utils
from utils import get_cpu, get_cpu_from_monitoring_api, get_memory, get_memory_from_monitoring_api, timestamp_to_epoch
from utils import get_cpu_from_monitoring_api, get_memory_from_monitoring_api, timestamp_to_epoch


class UtilsTest(unittest.TestCase):
Expand All @@ -34,7 +34,7 @@ def setUpClass(self):
self.start = '2024-09-25 06:32:22 UTC'
self.end = '2024-09-25 07:06:22 UTC'

def test_get_memory_methods(self):
def test_get_memory(self):
low1, high1 = get_memory_from_monitoring_api(
project_id=self.project_id,
cluster_name=self.cluster_name,
Expand All @@ -46,18 +46,7 @@ def test_get_memory_methods(self):
self.assertLessEqual(low1, high1)
self.assertGreater(high1, 0)

low2, high2 = get_memory(
project_number=self.project_number,
pod_name=self.pod_name,
start=self.start,
end=self.end,
)
self.assertLessEqual(low2, high2)
self.assertGreater(high2, 0)

self.assertTrue(high1 >= 0.99 * high2 and high1 <= 1.01 * high2)

def test_get_cpu_methods(self):
def test_get_cpu(self):
low1, high1 = get_cpu_from_monitoring_api(
project_id=self.project_id,
cluster_name=self.cluster_name,
Expand All @@ -69,17 +58,6 @@ def test_get_cpu_methods(self):
self.assertLessEqual(low1, high1)
self.assertGreater(high1, 0)

low2, high2 = get_cpu(
project_number=self.project_number,
pod_name=self.pod_name,
start=self.start,
end=self.end,
)
self.assertLessEqual(low2, high2)
self.assertGreater(high2, 0)

self.assertTrue(high1 >= 0.99 * high2 and high1 <= 1.01 * high2)

def test_timestamp_to_epoch(self):
self.assertEqual(timestamp_to_epoch('2024-08-21T19:20:25'), 1724268025)

Expand Down

0 comments on commit 13c8592

Please sign in to comment.