From 199300943d1b0a376cbb73e309ba6855dc0f66d2 Mon Sep 17 00:00:00 2001 From: bermaker <495571751@qq.com> Date: Fri, 9 Sep 2022 14:20:42 +0800 Subject: [PATCH] [Metrics] Fix metrics and docs (#3233) * Fix metrics and docs * Fix ray storage metrics * Add valid check for metrics * Fix mars usage demo * Fix metrics check * Remove unused log * Update mars/metrics/api.py Co-authored-by: Shawn * Update mars/metrics/api.py Co-authored-by: Shawn * Support lazy initialize metrics * Restore op_executed_number * Fix log Co-authored-by: buhe Co-authored-by: Shawn (cherry picked from commit 6c6fc48bf229639be8870dd42227b36312c6a450) --- docs/source/development/oscar/usage.rst | 4 +- mars/core/operand/core.py | 6 +- mars/deploy/oscar/local.py | 6 + mars/metrics/api.py | 97 +++++++++++++++- .../console/tests/test_console_metric.py | 8 +- mars/metrics/backends/metric.py | 12 +- .../tests/test_prometheus_metric.py | 8 +- .../backends/ray/tests/test_ray_metric.py | 15 +-- mars/metrics/backends/tests/test_metric.py | 8 +- mars/metrics/tests/test_metric_api.py | 36 ++++-- mars/storage/ray.py | 107 +++++++++--------- 11 files changed, 211 insertions(+), 96 deletions(-) diff --git a/docs/source/development/oscar/usage.rst b/docs/source/development/oscar/usage.rst index 72e48665fb..325ec3712c 100644 --- a/docs/source/development/oscar/usage.rst +++ b/docs/source/development/oscar/usage.rst @@ -26,7 +26,7 @@ Actor Definition def method_a(self, arg_1, arg_2, **kw_1): # user-defined function pass - async def method_b(self, arg_1, arg_2, **kw_1) # user-defined async function + async def method_b(self, arg_1, arg_2, **kw_1): # user-defined async function pass @@ -38,7 +38,7 @@ Creating Actors import mars.oscar as mo actor_ref = await mo.create_actor( - MyActor, 1, 2, a=1, b=2 + MyActor, 1, 2, a=1, b=2, address=':', uid='UniqueActorName') diff --git a/mars/core/operand/core.py b/mars/core/operand/core.py index 7454615e86..b2862cb2d4 100644 --- a/mars/core/operand/core.py +++ b/mars/core/operand/core.py @@ -490,7 +490,11 @@ def execute(results: Dict[str, Any], op: OperandType): try: result = executor(results, op) succeeded = True - op_executed_number.record(1, {"op": op.__class__.__name__}) + if op.stage is not None: + op_name = f"{op.__class__.__name__}:{op.stage.name}" + else: + op_name = op.__class__.__name__ + op_executed_number.record(1, {"op": op_name}) return result except UFuncTypeError as e: # pragma: no cover raise TypeError(str(e)).with_traceback(sys.exc_info()[2]) from None diff --git a/mars/deploy/oscar/local.py b/mars/deploy/oscar/local.py index 471b66acbd..69e6aa230e 100644 --- a/mars/deploy/oscar/local.py +++ b/mars/deploy/oscar/local.py @@ -25,6 +25,7 @@ from ... import oscar as mo from ...core.entrypoints import init_extension_entrypoints from ...lib.aio import get_isolation, stop_isolation +from ...metrics import init_metrics from ...oscar.backends.router import Router from ...resource import cpu_count, cuda_count, mem_total from ...services import NodeRole @@ -218,6 +219,11 @@ async def start(self): # start service await self._start_service() + # init metrics to guarantee metrics use in driver + metric_configs = self._config.get("metrics", {}) + metric_backend = metric_configs.get("backend") + init_metrics(metric_backend, config=metric_configs.get(metric_backend)) + if self._web: from ...services.web.supervisor import WebActor diff --git a/mars/metrics/api.py b/mars/metrics/api.py index b1d1818850..74f7fc2598 100644 --- a/mars/metrics/api.py +++ b/mars/metrics/api.py @@ -14,6 +14,7 @@ import logging import time +import weakref from contextlib import contextmanager from enum import Enum @@ -21,6 +22,7 @@ from typing import Any, Callable, Dict, List, NamedTuple, Optional, Tuple from .backends.console import console_metric +from .backends.metric import AbstractMetric from .backends.prometheus import prometheus_metric from .backends.ray import ray_metric @@ -35,6 +37,9 @@ } +_metrics_to_be_initialized = weakref.WeakSet() + + def init_metrics(backend="console", config: Dict[str, Any] = None): global _init if _init is True: @@ -61,7 +66,11 @@ def init_metrics(backend="console", config: Dict[str, Any] = None): "Failed to start prometheus http server because there is no prometheus_client" ) _init = True - logger.info("Finished initialize the metrics with backend %s", _metric_backend) + for m in _metrics_to_be_initialized: + cls = getattr(_backends_cls[_metric_backend], m.type) + metric = cls(m.name, m.description, m.tag_keys) + m.set_metric(metric) + logger.info("Finished initialize the metrics of backend: %s.", _metric_backend) def shutdown_metrics(): @@ -69,6 +78,64 @@ def shutdown_metrics(): _metric_backend = "console" global _init _init = False + logger.info("Shutdown metrics of backend: %s.", _metric_backend) + + +class _MetricWrapper(AbstractMetric): + _metric: AbstractMetric + + def __init__( + self, + name: str, + description: str = "", + tag_keys: Optional[Tuple[str]] = None, + metric_type: str = "Counter", + ): + self._name = name + self._description = description + self._tag_keys = tag_keys or tuple() + self._type = metric_type + self._metric = None + + @property + def type(self): + return self._type + + @property + def value(self): + assert ( + self._metric is not None + ), "Metric is not initialized, please call `init_metrics()` before using metrics." + return self._metric.value + + def set_metric(self, metric): + assert metric is not None, "Argument metric is None, please check it." + self._metric = metric + + def record(self, value=1, tags: Optional[Dict[str, str]] = None): + if self._metric is not None: + self._metric.record(value, tags) + else: + logger.warning( + "Metric is not initialized, please call `init_metrics()` before using metrics." + ) + + +def gen_metric(func): + def wrapper(name, descriptions: str = "", tag_keys: Optional[Tuple[str]] = None): + if _init is True: + return func(name, descriptions, tag_keys) + else: + logger.info( + "Metric %s will be initialized when invoking `init_metrics()`.", name + ) + metric = _MetricWrapper( + name, descriptions, tag_keys, func.__name__.capitalize() + ) + _metrics_to_be_initialized.add(metric) + return metric + + return wrapper class Metrics: @@ -97,19 +164,47 @@ class Metrics: """ @staticmethod + @gen_metric def counter(name, description: str = "", tag_keys: Optional[Tuple[str]] = None): + logger.info( + "Initializing a counter with name: %s, tag keys: %s, backend: %s", + name, + tag_keys, + _metric_backend, + ) return _backends_cls[_metric_backend].Counter(name, description, tag_keys) @staticmethod + @gen_metric def gauge(name, description: str = "", tag_keys: Optional[Tuple[str]] = None): + logger.info( + "Initializing a gauge whose name: %s, tag keys: %s, backend: %s", + name, + tag_keys, + _metric_backend, + ) return _backends_cls[_metric_backend].Gauge(name, description, tag_keys) @staticmethod + @gen_metric def meter(name, description: str = "", tag_keys: Optional[Tuple[str]] = None): + logger.info( + "Initializing a meter whose name: %s, tag keys: %s, backend: %s", + name, + tag_keys, + _metric_backend, + ) return _backends_cls[_metric_backend].Meter(name, description, tag_keys) @staticmethod + @gen_metric def histogram(name, description: str = "", tag_keys: Optional[Tuple[str]] = None): + logger.info( + "Initializing a histogram whose name: %s, tag keys: %s, backend: %s", + name, + tag_keys, + _metric_backend, + ) return _backends_cls[_metric_backend].Histogram(name, description, tag_keys) diff --git a/mars/metrics/backends/console/tests/test_console_metric.py b/mars/metrics/backends/console/tests/test_console_metric.py index 7c86461e6f..66f07ed3e8 100644 --- a/mars/metrics/backends/console/tests/test_console_metric.py +++ b/mars/metrics/backends/console/tests/test_console_metric.py @@ -20,7 +20,7 @@ def test_counter(): assert c.name == "test_counter" assert c.description == "A test counter" assert c.tag_keys == ("service", "tenant") - assert c.type == "counter" + assert c.type == "Counter" c.record(1, {"service": "mars", "tenant": "test"}) c.record(2, {"service": "mars", "tenant": "test"}) assert c.value == 3 @@ -31,7 +31,7 @@ def test_gauge(): assert g.name == "test_gauge" assert g.description == "A test gauge" assert g.tag_keys == () - assert g.type == "gauge" + assert g.type == "Gauge" g.record(1) assert g.value == 1 g.record(2) @@ -43,7 +43,7 @@ def test_meter(): assert m.name == "test_meter" assert m.description == "" assert m.tag_keys == () - assert m.type == "meter" + assert m.type == "Meter" m.record(1) assert m.value == 0 m.record(2001) @@ -55,7 +55,7 @@ def test_histogram(): assert h.name == "test_histogram" assert h.description == "" assert h.tag_keys == () - assert h.type == "histogram" + assert h.type == "Histogram" h.record(1) assert h.value == 0 for i in range(2002): diff --git a/mars/metrics/backends/metric.py b/mars/metrics/backends/metric.py index 0e611bc1b0..c0000add21 100644 --- a/mars/metrics/backends/metric.py +++ b/mars/metrics/backends/metric.py @@ -14,7 +14,7 @@ import time -from abc import ABC, abstractmethod +from abc import ABC from typing import Dict, Optional, Tuple _THRESHOLD = 2000 @@ -56,7 +56,6 @@ def description(self): def tag_keys(self): return self._tag_keys - @abstractmethod def _init(self): """Some initialization in subclass.""" pass @@ -65,7 +64,6 @@ def record(self, value=1, tags: Optional[Dict[str, str]] = None): """A public method called by users.""" pass - @abstractmethod def _record(self, value: float = 1.0, tags: Optional[Dict[str, str]] = None): """An internal method called by record() and should be implemented by different metric backends. @@ -76,7 +74,7 @@ def _record(self, value: float = 1.0, tags: Optional[Dict[str, str]] = None): class AbstractCounter(AbstractMetric): """A counter records the counts of events.""" - _type = "counter" + _type = "Counter" def __init__( self, name: str, description: str = "", tag_keys: Optional[Tuple[str]] = None @@ -94,7 +92,7 @@ class AbstractGauge(AbstractMetric): arbitrarily set. """ - _type = "gauge" + _type = "Gauge" def record(self, value=1, tags: Optional[Dict[str, str]] = None): self._record(value, tags) @@ -103,7 +101,7 @@ def record(self, value=1, tags: Optional[Dict[str, str]] = None): class AbstractMeter(AbstractMetric): """A meter measures the rate at which a set of events occur.""" - _type = "meter" + _type = "Meter" def __init__( self, name: str, description: str = "", tag_keys: Optional[Tuple[str]] = None @@ -126,7 +124,7 @@ def record(self, value=1, tags: Optional[Dict[str, str]] = None): class AbstractHistogram(AbstractMetric): """A histogram measures the distribution of values in a stream of data.""" - _type = "histogram" + _type = "Histogram" def __init__( self, name: str, description: str = "", tag_keys: Optional[Tuple[str]] = None diff --git a/mars/metrics/backends/prometheus/tests/test_prometheus_metric.py b/mars/metrics/backends/prometheus/tests/test_prometheus_metric.py index 8653b5d812..da4813f2ac 100644 --- a/mars/metrics/backends/prometheus/tests/test_prometheus_metric.py +++ b/mars/metrics/backends/prometheus/tests/test_prometheus_metric.py @@ -56,7 +56,7 @@ def test_counter(start_prometheus_http_server): assert c.description == "A test counter" assert set(["host", "pid"]).issubset(set(c.tag_keys)) assert set(["service", "tenant"]).issubset(set(c.tag_keys)) - assert c.type == "counter" + assert c.type == "Counter" c.record(1, {"service": "mars", "tenant": "test"}) verify_metric("test_counter", 1.0) c.record(2, {"service": "mars", "tenant": "test"}) @@ -68,7 +68,7 @@ def test_gauge(start_prometheus_http_server): assert g.name == "test_gauge" assert g.description == "A test gauge" assert set(["host", "pid"]).issubset(set(g.tag_keys)) - assert g.type == "gauge" + assert g.type == "Gauge" g.record(0.1) verify_metric("test_gauge", 0.1) g.record(1.1) @@ -80,7 +80,7 @@ def test_meter(start_prometheus_http_server): assert m.name == "test_meter" assert m.description == "" assert set(["host", "pid"]).issubset(set(m.tag_keys)) - assert m.type == "meter" + assert m.type == "Meter" num = 3 while num > 0: m.record(1) @@ -94,7 +94,7 @@ def test_histogram(start_prometheus_http_server): assert h.name == "test_histogram" assert h.description == "" assert set(["host", "pid"]).issubset(set(h.tag_keys)) - assert h.type == "histogram" + assert h.type == "Histogram" num = 3 while num > 0: h.record(1) diff --git a/mars/metrics/backends/ray/tests/test_ray_metric.py b/mars/metrics/backends/ray/tests/test_ray_metric.py index 2d2e0ee57a..2feea20071 100644 --- a/mars/metrics/backends/ray/tests/test_ray_metric.py +++ b/mars/metrics/backends/ray/tests/test_ray_metric.py @@ -19,14 +19,7 @@ @require_ray def test_record(): c = Counter("test_counter") - from .. import ray_metric - - original_value = ray_metric._ray_gauge_set_available - ray_metric._ray_gauge_set_available = True - assert c.record(1) is None - ray_metric._ray_gauge_set_available = False assert c.record(1) is None - ray_metric._ray_gauge_set_available = original_value @require_ray @@ -35,7 +28,7 @@ def test_counter(): assert c.name == "test_counter" assert c.description == "A test counter" assert c.tag_keys == ("service", "tenant") - assert c.type == "counter" + assert c.type == "Counter" assert c.record(1, {"service": "mars", "tenant": "test"}) is None @@ -45,7 +38,7 @@ def test_gauge(): assert g.name == "test_gauge" assert g.description == "A test gauge" assert g.tag_keys == () - assert g.type == "gauge" + assert g.type == "Gauge" assert g.record(1) is None @@ -55,7 +48,7 @@ def test_meter(): assert m.name == "test_meter" assert m.description == "" assert m.tag_keys == () - assert m.type == "meter" + assert m.type == "Meter" assert m.record(1) is None @@ -65,5 +58,5 @@ def test_histogram(): assert h.name == "test_histogram" assert h.description == "" assert h.tag_keys == () - assert h.type == "histogram" + assert h.type == "Histogram" assert h.record(1) is None diff --git a/mars/metrics/backends/tests/test_metric.py b/mars/metrics/backends/tests/test_metric.py index 5dae8b38fc..0ead196d63 100644 --- a/mars/metrics/backends/tests/test_metric.py +++ b/mars/metrics/backends/tests/test_metric.py @@ -66,7 +66,7 @@ class DummyCounter(AbstractCounter): assert c.name == "test_counter" assert c.description == "A test counter" assert c.tag_keys == ("service", "tenant") - assert c.type == "counter" + assert c.type == "Counter" assert c.record(1, {"service": "mars", "tenant": "test"}) is None @@ -79,7 +79,7 @@ class DummyGauge(AbstractGauge): assert g.name == "test_gauge" assert g.description == "A test gauge" assert g.tag_keys == () - assert g.type == "gauge" + assert g.type == "Gauge" assert g.record(1) is None @@ -92,7 +92,7 @@ class DummyMeter(AbstractMeter): assert m.name == "test_meter" assert m.description == "" assert m.tag_keys == () - assert m.type == "meter" + assert m.type == "Meter" assert m.record(1) is None @@ -105,5 +105,5 @@ class DummyHistogram(AbstractHistogram): assert h.name == "test_histogram" assert h.description == "" assert h.tag_keys == () - assert h.type == "histogram" + assert h.type == "Histogram" assert h.record(1) is None diff --git a/mars/metrics/tests/test_metric_api.py b/mars/metrics/tests/test_metric_api.py index bf8e1d6391..9fe1ba727a 100644 --- a/mars/metrics/tests/test_metric_api.py +++ b/mars/metrics/tests/test_metric_api.py @@ -54,47 +54,67 @@ def test_init_metrics(): init_metrics("not_exist") -def test_counter(init): +@pytest.mark.parametrize("init_firstly", [True, False]) +def test_counter(init_firstly): + if init_firstly: + init_metrics() c = Metrics.counter("test_counter", "A test counter", ("service", "tenant")) assert c.name == "test_counter" assert c.description == "A test counter" assert c.tag_keys == ("service", "tenant") - assert c.type == "counter" + assert c.type == "Counter" + if not init_firstly: + init_metrics() c.record(1, {"service": "mars", "tenant": "test"}) c.record(2, {"service": "mars", "tenant": "test"}) assert c.value == 3 -def test_gauge(): +@pytest.mark.parametrize("init_firstly", [True, False]) +def test_gauge(init_firstly): + if init_firstly: + init_metrics() g = Metrics.gauge("test_gauge", "A test gauge") assert g.name == "test_gauge" assert g.description == "A test gauge" assert g.tag_keys == () - assert g.type == "gauge" + assert g.type == "Gauge" + if not init_firstly: + init_metrics() g.record(1) assert g.value == 1 g.record(2) assert g.value == 2 -def test_meter(): +@pytest.mark.parametrize("init_firstly", [True, False]) +def test_meter(init_firstly): + if init_firstly: + init_metrics() m = Metrics.meter("test_meter") assert m.name == "test_meter" assert m.description == "" assert m.tag_keys == () - assert m.type == "meter" + assert m.type == "Meter" + if not init_firstly: + init_metrics() m.record(1) assert m.value == 0 m.record(2001) assert m.value > 0 -def test_histogram(): +@pytest.mark.parametrize("init_firstly", [True, False]) +def test_histogram(init_firstly): + if init_firstly: + init_metrics() h = Metrics.histogram("test_histogram") assert h.name == "test_histogram" assert h.description == "" assert h.tag_keys == () - assert h.type == "histogram" + assert h.type == "Histogram" + if not init_firstly: + init_metrics() h.record(1) assert h.value == 0 for i in range(2002): diff --git a/mars/storage/ray.py b/mars/storage/ray.py index f6eccbcaa7..b39a839e8c 100644 --- a/mars/storage/ray.py +++ b/mars/storage/ray.py @@ -123,63 +123,62 @@ def support_specify_owner(): class RayStorage(StorageBackend): name = "ray" - _storage_get_metrics = [ - ( - Percentile.PercentileType.P99, - Metrics.gauge( - "mars.storage.ray.get_cost_time_p99_seconds", - "P99 time consuming in seconds to get object, every 1000 times report once.", - ).record, - 1000, - ), - ( - Percentile.PercentileType.P95, - Metrics.gauge( - "mars.storage.ray.get_cost_time_p95_seconds", - "P95 time consuming in seconds to get object, every 1000 times report once.", - ).record, - 1000, - ), - ( - Percentile.PercentileType.P90, - Metrics.gauge( - "mars.storage.ray.get_cost_time_p90_seconds", - "P90 time consuming in seconds to get object, every 1000 times report once.", - ).record, - 1000, - ), - ] - - _storage_put_metrics = [ - ( - Percentile.PercentileType.P99, - Metrics.gauge( - "mars.storage.ray.put_cost_time_p99_seconds", - "P99 time consuming in seconds to put object, every 1000 times report once.", - ).record, - 1000, - ), - ( - Percentile.PercentileType.P95, - Metrics.gauge( - "mars.storage.ray.put_cost_time_p95_seconds", - "P95 time consuming in seconds to put object, every 1000 times report once.", - ).record, - 1000, - ), - ( - Percentile.PercentileType.P90, - Metrics.gauge( - "mars.storage.ray.put_cost_time_p90_seconds", - "P90 time consuming in seconds to put object, every 1000 times report once.", - ).record, - 1000, - ), - ] - def __init__(self, *args, **kwargs): self._owner_address = kwargs.get("owner") self._owner = None # A ray actor which will own the objects put by workers. + self._storage_get_metrics = [ + ( + Percentile.PercentileType.P99, + Metrics.gauge( + "mars.storage.ray.get_cost_time_p99_seconds", + "P99 time consuming in seconds to get object, every 1000 times report once.", + ).record, + 1000, + ), + ( + Percentile.PercentileType.P95, + Metrics.gauge( + "mars.storage.ray.get_cost_time_p95_seconds", + "P95 time consuming in seconds to get object, every 1000 times report once.", + ).record, + 1000, + ), + ( + Percentile.PercentileType.P90, + Metrics.gauge( + "mars.storage.ray.get_cost_time_p90_seconds", + "P90 time consuming in seconds to get object, every 1000 times report once.", + ).record, + 1000, + ), + ] + + self._storage_put_metrics = [ + ( + Percentile.PercentileType.P99, + Metrics.gauge( + "mars.storage.ray.put_cost_time_p99_seconds", + "P99 time consuming in seconds to put object, every 1000 times report once.", + ).record, + 1000, + ), + ( + Percentile.PercentileType.P95, + Metrics.gauge( + "mars.storage.ray.put_cost_time_p95_seconds", + "P95 time consuming in seconds to put object, every 1000 times report once.", + ).record, + 1000, + ), + ( + Percentile.PercentileType.P90, + Metrics.gauge( + "mars.storage.ray.put_cost_time_p90_seconds", + "P90 time consuming in seconds to put object, every 1000 times report once.", + ).record, + 1000, + ), + ] @classmethod @implements(StorageBackend.setup)