Skip to content

Commit

Permalink
fix: metrics emission (#163)
Browse files Browse the repository at this point in the history
Signed-off-by: Avik Basu <[email protected]>
Co-authored-by: Avik Basu <[email protected]>
  • Loading branch information
ab93 and ab93 committed Jul 29, 2023
1 parent a899562 commit 4f6d21c
Show file tree
Hide file tree
Showing 10 changed files with 85 additions and 53 deletions.
15 changes: 15 additions & 0 deletions numaprom/metrics/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from numaprom.metrics._metrics import (
increase_redis_conn_status,
inc_inference_count,
start_metrics_server,
inc_redis_conn_success,
inc_redis_conn_failed,
)

__all__ = [
"increase_redis_conn_status",
"inc_inference_count",
"start_metrics_server",
"inc_redis_conn_success",
"inc_redis_conn_failed",
]
31 changes: 31 additions & 0 deletions numaprom/metrics/_metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from functools import partial

from prometheus_client import start_http_server
from prometheus_client import Counter

from numaprom import LOGGER

# Metrics
REDIS_CONN_STATUS_COUNT = Counter("numaprom_redis_conn_status_count", "", ["vertex", "status"])
INFERENCE_COUNT = Counter(
"numaprom_inference_count", "", ["model", "namespace", "app", "metric", "status"]
)


def increase_redis_conn_status(vertex: str, status: str) -> None:
global REDIS_CONN_STATUS_COUNT
REDIS_CONN_STATUS_COUNT.labels(vertex, status).inc()


inc_redis_conn_success = partial(increase_redis_conn_status, status="success")
inc_redis_conn_failed = partial(increase_redis_conn_status, status="failed")


def inc_inference_count(model: str, namespace: str, app: str, metric: str, status: str) -> None:
global INFERENCE_COUNT
INFERENCE_COUNT.labels(model, namespace, app, metric, status).inc()


def start_metrics_server(port: int) -> None:
LOGGER.info("Starting Prometheus metrics server on port: {port}", port=port)
start_http_server(port)
5 changes: 0 additions & 5 deletions numaprom/metrics/metrics.py

This file was deleted.

24 changes: 13 additions & 11 deletions numaprom/udf/inference.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import os
import time
from typing import Final

from numalogic.config import NumalogicConf
from numalogic.models.autoencoder import AutoencoderTrainer
from numalogic.registry import ArtifactData, RedisRegistry, LocalLRUCache
Expand All @@ -14,10 +16,11 @@
from numaprom.entities import PayloadFactory
from numaprom.entities import Status, StreamPayload, Header
from numaprom.tools import msg_forward
from numaprom.udf.metrics import increase_redis_conn_status, increase_interface_count
from numaprom.metrics import inc_redis_conn_success, inc_inference_count
from numaprom.watcher import ConfigManager


_VERTEX: Final[str] = "inference"
REDIS_CLIENT = get_redis_client_from_conf(master_node=False)
LOCAL_CACHE_TTL = int(os.getenv("LOCAL_CACHE_TTL", 3600)) # default ttl set to 1 hour

Expand Down Expand Up @@ -86,10 +89,10 @@ def inference(_: list[str], datum: Datum) -> bytes:
)
payload.set_header(Header.STATIC_INFERENCE)
payload.set_status(Status.RUNTIME_ERROR)
increase_redis_conn_status("inference", "failed")
inc_redis_conn_success(_VERTEX)
return orjson.dumps(payload, option=orjson.OPT_SERIALIZE_NUMPY)
else:
increase_redis_conn_status("inference", "success")

inc_redis_conn_success(_VERTEX)
if not artifact_data:
LOGGER.info(
"{uuid} - Inference artifact not found, "
Expand Down Expand Up @@ -127,13 +130,12 @@ def inference(_: list[str], datum: Datum) -> bytes:
payload.set_status(Status.RUNTIME_ERROR)
return orjson.dumps(payload, option=orjson.OPT_SERIALIZE_NUMPY)

increase_interface_count(
payload.header,
payload.composite_keys["namespace"],
payload.composite_keys["app"],
payload.composite_keys["namespace"],
payload.composite_keys["name"],
payload.metadata["version"],
inc_inference_count(
model=payload.get_metadata("version"),
namespace=payload.composite_keys.get("namespace"),
app=payload.composite_keys.get("app"),
metric=payload.composite_keys.get("name"),
status=payload.header,
)
LOGGER.info("{uuid} - Sending Payload: {payload} ", uuid=payload.uuid, payload=payload)
LOGGER.debug(
Expand Down
17 changes: 0 additions & 17 deletions numaprom/udf/metrics.py

This file was deleted.

10 changes: 6 additions & 4 deletions numaprom/udf/postprocess.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import os
import time
from typing import Final

import numpy as np
from orjson import orjson
Expand All @@ -11,10 +12,11 @@
from numaprom.clients.sentinel import get_redis_client_from_conf
from numaprom.entities import Status, PrometheusPayload, StreamPayload, Header
from numaprom.tools import msgs_forward, WindowScorer
from numaprom.udf.metrics import redis_conn_status_count
from numaprom.metrics import inc_redis_conn_success, inc_redis_conn_failed
from numaprom.watcher import ConfigManager


_VERTEX: Final[str] = "postprocess"
AUTH = os.getenv("REDIS_AUTH")
SCORE_PRECISION = int(os.getenv("SCORE_PRECISION", 3))
UNDEFINED_SCORE = -1.0
Expand Down Expand Up @@ -150,12 +152,12 @@ def _publish(final_score: float, payload: StreamPayload) -> list[bytes]:
uuid=payload.uuid,
warn=warn,
)
redis_conn_status_count("postprocess", "failed")
inc_redis_conn_failed(_VERTEX)
unified_anomaly, anomalies = __save_to_redis(
payload=payload, final_score=final_score, recreate=True, unified_config=unified_config
)
else:
redis_conn_status_count("postprocess", "success")
inc_redis_conn_success(_VERTEX)

# If the unified anomaly is -1, we don't want to publish it
if unified_anomaly >= 0:
unified_json = __construct_unified_payload(
Expand Down
12 changes: 6 additions & 6 deletions numaprom/udf/preprocess.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import os
import time
from typing import Final

import orjson
from numalogic.registry import RedisRegistry, LocalLRUCache
Expand All @@ -10,10 +11,10 @@
from numaprom.clients.sentinel import get_redis_client
from numaprom.entities import Status, StreamPayload, Header
from numaprom.tools import msg_forward
from numaprom.udf.metrics import increase_redis_conn_status
from numaprom.metrics import inc_redis_conn_success, inc_redis_conn_failed
from numaprom.watcher import ConfigManager


_VERTEX: Final[str] = "preprocess"
AUTH = os.getenv("REDIS_AUTH")
REDIS_CONF = ConfigManager.get_redis_config()
REDIS_CLIENT = get_redis_client(
Expand Down Expand Up @@ -57,7 +58,7 @@ def preprocess(_: list[str], datum: Datum) -> bytes:
)
payload.set_header(Header.STATIC_INFERENCE)
payload.set_status(Status.RUNTIME_ERROR)
increase_redis_conn_status("preprocess", "failed")
inc_redis_conn_failed(_VERTEX)
return orjson.dumps(payload, option=orjson.OPT_SERIALIZE_NUMPY)
except Exception as ex:
LOGGER.exception(
Expand All @@ -69,10 +70,9 @@ def preprocess(_: list[str], datum: Datum) -> bytes:
)
payload.set_header(Header.STATIC_INFERENCE)
payload.set_status(Status.RUNTIME_ERROR)
increase_redis_conn_status("preprocess", "failed")
return orjson.dumps(payload, option=orjson.OPT_SERIALIZE_NUMPY)
else:
increase_redis_conn_status("preprocess", "success")

inc_redis_conn_success(_VERTEX)
if not preproc_artifact:
LOGGER.info(
"{uuid} - Preprocess artifact not found, forwarding for static thresholding. "
Expand Down
11 changes: 6 additions & 5 deletions numaprom/udf/threshold.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import os
import time
from collections import OrderedDict
from typing import Final

from numalogic.registry import RedisRegistry, LocalLRUCache
from numalogic.tools.exceptions import RedisRegistryError
Expand All @@ -12,10 +13,11 @@
from numaprom.clients.sentinel import get_redis_client_from_conf
from numaprom.entities import Status, TrainerPayload, PayloadFactory, Header
from numaprom.tools import conditional_forward, calculate_static_thresh
from numaprom.udf.metrics import redis_conn_status_count
from numaprom.metrics import inc_redis_conn_success, inc_redis_conn_failed
from numaprom.watcher import ConfigManager


_VERTEX: Final[str] = "threshold"
LOCAL_CACHE_TTL = int(os.getenv("LOCAL_CACHE_TTL", 3600)) # default ttl set to 1 hour


Expand Down Expand Up @@ -82,7 +84,7 @@ def threshold(_: list[str], datum: Datum) -> list[tuple[str, bytes]]:
)
payload.set_header(Header.STATIC_INFERENCE)
payload.set_status(Status.RUNTIME_ERROR)
redis_conn_status_count("threshold", "failed")
inc_redis_conn_failed(_VERTEX)
return orjson.dumps(payload, option=orjson.OPT_SERIALIZE_NUMPY)
except Exception as ex:
LOGGER.exception(
Expand All @@ -94,13 +96,12 @@ def threshold(_: list[str], datum: Datum) -> list[tuple[str, bytes]]:
)
payload.set_header(Header.STATIC_INFERENCE)
payload.set_status(Status.RUNTIME_ERROR)
redis_conn_status_count("threshold", "failed")
return [
(TRAIN_VTX_KEY, orjson.dumps(train_payload)),
(POSTPROC_VTX_KEY, _get_static_thresh_payload(payload, metric_config)),
]
else:
redis_conn_status_count("threshold", "success")

inc_redis_conn_success(_VERTEX)

if not thresh_artifact:
LOGGER.info(
Expand Down
9 changes: 6 additions & 3 deletions numaprom/udf/window.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import os
import time
import uuid
from typing import Final

import numpy as np
import numpy.typing as npt
Expand All @@ -12,9 +13,11 @@
from numaprom.clients.sentinel import get_redis_client_from_conf
from numaprom.entities import StreamPayload, Status, Header
from numaprom.tools import msg_forward, create_composite_keys
from numaprom.udf.metrics import redis_conn_status_count
from numaprom.metrics import inc_redis_conn_success, inc_redis_conn_failed
from numaprom.watcher import ConfigManager

_VERTEX: Final[str] = "window"


# TODO get the replacement value from config
def _clean_arr(
Expand Down Expand Up @@ -88,9 +91,9 @@ def window(_: list[str], datum: Datum) -> bytes | None:
elements = __aggregate_window(
unique_key, msg["timestamp"], value, win_size, buff_size, recreate=True
)
redis_conn_status_count("windowing", "failed")
inc_redis_conn_success(_VERTEX)
else:
redis_conn_status_count("windowing", "success")
inc_redis_conn_failed(_VERTEX)

# Drop message if no of elements is less than sequence length needed
if len(elements) < win_size:
Expand Down
4 changes: 2 additions & 2 deletions starter.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@

from pynumaflow.function import Server
from pynumaflow.sink import Sink
from numaprom.metrics import metrics

from numaprom._constants import CONFIG_PATHS
from numaprom.factory import HandlerFactory
from numaprom.watcher import Watcher, ConfigHandler
from numaprom.metrics import start_metrics_server


def run_watcher():
Expand All @@ -21,7 +21,7 @@ def run_watcher():
background_thread.start()

step_handler = HandlerFactory.get_handler(sys.argv[2])
metrics.start_metrics_server(8490)
start_metrics_server(8490)
server_type = sys.argv[1]

if server_type == "udsink":
Expand Down

0 comments on commit 4f6d21c

Please sign in to comment.