diff --git a/numaprom/metrics/__init__.py b/numaprom/metrics/__init__.py index e69de29..4c023b6 100644 --- a/numaprom/metrics/__init__.py +++ b/numaprom/metrics/__init__.py @@ -0,0 +1,15 @@ +from numaprom.metrics._metrics import ( + increase_redis_conn_status, + inc_inference_count, + start_metrics_server, + inc_redis_conn_success, + inc_redis_conn_failed, +) + +__all__ = [ + "increase_redis_conn_status", + "inc_inference_count", + "start_metrics_server", + "inc_redis_conn_success", + "inc_redis_conn_failed", +] diff --git a/numaprom/metrics/_metrics.py b/numaprom/metrics/_metrics.py new file mode 100644 index 0000000..ded1228 --- /dev/null +++ b/numaprom/metrics/_metrics.py @@ -0,0 +1,31 @@ +from functools import partial + +from prometheus_client import start_http_server +from prometheus_client import Counter + +from numaprom import LOGGER + +# Metrics +REDIS_CONN_STATUS_COUNT = Counter("numaprom_redis_conn_status_count", "", ["vertex", "status"]) +INFERENCE_COUNT = Counter( + "numaprom_inference_count", "", ["model", "namespace", "app", "metric", "status"] +) + + +def increase_redis_conn_status(vertex: str, status: str) -> None: + global REDIS_CONN_STATUS_COUNT + REDIS_CONN_STATUS_COUNT.labels(vertex, status).inc() + + +inc_redis_conn_success = partial(increase_redis_conn_status, status="success") +inc_redis_conn_failed = partial(increase_redis_conn_status, status="failed") + + +def inc_inference_count(model: str, namespace: str, app: str, metric: str, status: str) -> None: + global INFERENCE_COUNT + INFERENCE_COUNT.labels(model, namespace, app, metric, status).inc() + + +def start_metrics_server(port: int) -> None: + LOGGER.info("Starting Prometheus metrics server on port: {port}", port=port) + start_http_server(port) diff --git a/numaprom/metrics/metrics.py b/numaprom/metrics/metrics.py deleted file mode 100644 index 9a41e16..0000000 --- a/numaprom/metrics/metrics.py +++ /dev/null @@ -1,5 +0,0 @@ -from prometheus_client import start_http_server - - -def start_metrics_server(port): - start_http_server(port) diff --git a/numaprom/udf/inference.py b/numaprom/udf/inference.py index 706345a..33574f1 100644 --- a/numaprom/udf/inference.py +++ b/numaprom/udf/inference.py @@ -1,5 +1,7 @@ import os import time +from typing import Final + from numalogic.config import NumalogicConf from numalogic.models.autoencoder import AutoencoderTrainer from numalogic.registry import ArtifactData, RedisRegistry, LocalLRUCache @@ -14,10 +16,11 @@ from numaprom.entities import PayloadFactory from numaprom.entities import Status, StreamPayload, Header from numaprom.tools import msg_forward -from numaprom.udf.metrics import increase_redis_conn_status, increase_interface_count +from numaprom.metrics import inc_redis_conn_success, inc_inference_count from numaprom.watcher import ConfigManager +_VERTEX: Final[str] = "inference" REDIS_CLIENT = get_redis_client_from_conf(master_node=False) LOCAL_CACHE_TTL = int(os.getenv("LOCAL_CACHE_TTL", 3600)) # default ttl set to 1 hour @@ -86,10 +89,10 @@ def inference(_: list[str], datum: Datum) -> bytes: ) payload.set_header(Header.STATIC_INFERENCE) payload.set_status(Status.RUNTIME_ERROR) - increase_redis_conn_status("inference", "failed") + inc_redis_conn_success(_VERTEX) return orjson.dumps(payload, option=orjson.OPT_SERIALIZE_NUMPY) - else: - increase_redis_conn_status("inference", "success") + + inc_redis_conn_success(_VERTEX) if not artifact_data: LOGGER.info( "{uuid} - Inference artifact not found, " @@ -127,13 +130,12 @@ def inference(_: list[str], datum: Datum) -> bytes: payload.set_status(Status.RUNTIME_ERROR) return orjson.dumps(payload, option=orjson.OPT_SERIALIZE_NUMPY) - increase_interface_count( - payload.header, - payload.composite_keys["namespace"], - payload.composite_keys["app"], - payload.composite_keys["namespace"], - payload.composite_keys["name"], - payload.metadata["version"], + inc_inference_count( + model=payload.get_metadata("version"), + namespace=payload.composite_keys.get("namespace"), + app=payload.composite_keys.get("app"), + metric=payload.composite_keys.get("name"), + status=payload.header, ) LOGGER.info("{uuid} - Sending Payload: {payload} ", uuid=payload.uuid, payload=payload) LOGGER.debug( diff --git a/numaprom/udf/metrics.py b/numaprom/udf/metrics.py deleted file mode 100644 index e93d626..0000000 --- a/numaprom/udf/metrics.py +++ /dev/null @@ -1,17 +0,0 @@ -from prometheus_client import Counter - -# Metrics -redis_conn_status_count = Counter("numaprom_redis_conn_status_count", "", ["vertex", "status"]) - - -def increase_redis_conn_status(vertex, status): - redis_conn_status_count.labels(vertex, status).inc() - - -inference_count = Counter( - "numaprom_inference_count", "", ["model", "namespace", "app", "metric", "status"] -) - - -def increase_interface_count(model, namespace, app, metric, status): - inference_count.labels(model, namespace, app, metric, status).inc() diff --git a/numaprom/udf/postprocess.py b/numaprom/udf/postprocess.py index ef9c049..577c5ad 100644 --- a/numaprom/udf/postprocess.py +++ b/numaprom/udf/postprocess.py @@ -1,5 +1,6 @@ import os import time +from typing import Final import numpy as np from orjson import orjson @@ -11,10 +12,11 @@ from numaprom.clients.sentinel import get_redis_client_from_conf from numaprom.entities import Status, PrometheusPayload, StreamPayload, Header from numaprom.tools import msgs_forward, WindowScorer -from numaprom.udf.metrics import redis_conn_status_count +from numaprom.metrics import inc_redis_conn_success, inc_redis_conn_failed from numaprom.watcher import ConfigManager +_VERTEX: Final[str] = "postprocess" AUTH = os.getenv("REDIS_AUTH") SCORE_PRECISION = int(os.getenv("SCORE_PRECISION", 3)) UNDEFINED_SCORE = -1.0 @@ -150,12 +152,12 @@ def _publish(final_score: float, payload: StreamPayload) -> list[bytes]: uuid=payload.uuid, warn=warn, ) - redis_conn_status_count("postprocess", "failed") + inc_redis_conn_failed(_VERTEX) unified_anomaly, anomalies = __save_to_redis( payload=payload, final_score=final_score, recreate=True, unified_config=unified_config ) - else: - redis_conn_status_count("postprocess", "success") + inc_redis_conn_success(_VERTEX) + # If the unified anomaly is -1, we don't want to publish it if unified_anomaly >= 0: unified_json = __construct_unified_payload( diff --git a/numaprom/udf/preprocess.py b/numaprom/udf/preprocess.py index 6595c08..376cdc5 100644 --- a/numaprom/udf/preprocess.py +++ b/numaprom/udf/preprocess.py @@ -1,5 +1,6 @@ import os import time +from typing import Final import orjson from numalogic.registry import RedisRegistry, LocalLRUCache @@ -10,10 +11,10 @@ from numaprom.clients.sentinel import get_redis_client from numaprom.entities import Status, StreamPayload, Header from numaprom.tools import msg_forward -from numaprom.udf.metrics import increase_redis_conn_status +from numaprom.metrics import inc_redis_conn_success, inc_redis_conn_failed from numaprom.watcher import ConfigManager - +_VERTEX: Final[str] = "preprocess" AUTH = os.getenv("REDIS_AUTH") REDIS_CONF = ConfigManager.get_redis_config() REDIS_CLIENT = get_redis_client( @@ -57,7 +58,7 @@ def preprocess(_: list[str], datum: Datum) -> bytes: ) payload.set_header(Header.STATIC_INFERENCE) payload.set_status(Status.RUNTIME_ERROR) - increase_redis_conn_status("preprocess", "failed") + inc_redis_conn_failed(_VERTEX) return orjson.dumps(payload, option=orjson.OPT_SERIALIZE_NUMPY) except Exception as ex: LOGGER.exception( @@ -69,10 +70,9 @@ def preprocess(_: list[str], datum: Datum) -> bytes: ) payload.set_header(Header.STATIC_INFERENCE) payload.set_status(Status.RUNTIME_ERROR) - increase_redis_conn_status("preprocess", "failed") return orjson.dumps(payload, option=orjson.OPT_SERIALIZE_NUMPY) - else: - increase_redis_conn_status("preprocess", "success") + + inc_redis_conn_success(_VERTEX) if not preproc_artifact: LOGGER.info( "{uuid} - Preprocess artifact not found, forwarding for static thresholding. " diff --git a/numaprom/udf/threshold.py b/numaprom/udf/threshold.py index 0b991fa..16844e7 100644 --- a/numaprom/udf/threshold.py +++ b/numaprom/udf/threshold.py @@ -1,6 +1,7 @@ import os import time from collections import OrderedDict +from typing import Final from numalogic.registry import RedisRegistry, LocalLRUCache from numalogic.tools.exceptions import RedisRegistryError @@ -12,10 +13,11 @@ from numaprom.clients.sentinel import get_redis_client_from_conf from numaprom.entities import Status, TrainerPayload, PayloadFactory, Header from numaprom.tools import conditional_forward, calculate_static_thresh -from numaprom.udf.metrics import redis_conn_status_count +from numaprom.metrics import inc_redis_conn_success, inc_redis_conn_failed from numaprom.watcher import ConfigManager +_VERTEX: Final[str] = "threshold" LOCAL_CACHE_TTL = int(os.getenv("LOCAL_CACHE_TTL", 3600)) # default ttl set to 1 hour @@ -82,7 +84,7 @@ def threshold(_: list[str], datum: Datum) -> list[tuple[str, bytes]]: ) payload.set_header(Header.STATIC_INFERENCE) payload.set_status(Status.RUNTIME_ERROR) - redis_conn_status_count("threshold", "failed") + inc_redis_conn_failed(_VERTEX) return orjson.dumps(payload, option=orjson.OPT_SERIALIZE_NUMPY) except Exception as ex: LOGGER.exception( @@ -94,13 +96,12 @@ def threshold(_: list[str], datum: Datum) -> list[tuple[str, bytes]]: ) payload.set_header(Header.STATIC_INFERENCE) payload.set_status(Status.RUNTIME_ERROR) - redis_conn_status_count("threshold", "failed") return [ (TRAIN_VTX_KEY, orjson.dumps(train_payload)), (POSTPROC_VTX_KEY, _get_static_thresh_payload(payload, metric_config)), ] - else: - redis_conn_status_count("threshold", "success") + + inc_redis_conn_success(_VERTEX) if not thresh_artifact: LOGGER.info( diff --git a/numaprom/udf/window.py b/numaprom/udf/window.py index 1aa8b66..9feda01 100644 --- a/numaprom/udf/window.py +++ b/numaprom/udf/window.py @@ -1,6 +1,7 @@ import os import time import uuid +from typing import Final import numpy as np import numpy.typing as npt @@ -12,9 +13,11 @@ from numaprom.clients.sentinel import get_redis_client_from_conf from numaprom.entities import StreamPayload, Status, Header from numaprom.tools import msg_forward, create_composite_keys -from numaprom.udf.metrics import redis_conn_status_count +from numaprom.metrics import inc_redis_conn_success, inc_redis_conn_failed from numaprom.watcher import ConfigManager +_VERTEX: Final[str] = "window" + # TODO get the replacement value from config def _clean_arr( @@ -88,9 +91,9 @@ def window(_: list[str], datum: Datum) -> bytes | None: elements = __aggregate_window( unique_key, msg["timestamp"], value, win_size, buff_size, recreate=True ) - redis_conn_status_count("windowing", "failed") + inc_redis_conn_success(_VERTEX) else: - redis_conn_status_count("windowing", "success") + inc_redis_conn_failed(_VERTEX) # Drop message if no of elements is less than sequence length needed if len(elements) < win_size: diff --git a/starter.py b/starter.py index 90683d1..fba040d 100644 --- a/starter.py +++ b/starter.py @@ -3,11 +3,11 @@ from pynumaflow.function import Server from pynumaflow.sink import Sink -from numaprom.metrics import metrics from numaprom._constants import CONFIG_PATHS from numaprom.factory import HandlerFactory from numaprom.watcher import Watcher, ConfigHandler +from numaprom.metrics import start_metrics_server def run_watcher(): @@ -21,7 +21,7 @@ def run_watcher(): background_thread.start() step_handler = HandlerFactory.get_handler(sys.argv[2]) - metrics.start_metrics_server(8490) + start_metrics_server(8490) server_type = sys.argv[1] if server_type == "udsink":