Skip to content

Commit

Permalink
Enable custom metrics
Browse files Browse the repository at this point in the history
Signed-off-by: Saravanan Balasubramanian <[email protected]>
  • Loading branch information
sarabala1979 committed Jul 28, 2023
1 parent 1eafb0d commit 8dd47e1
Show file tree
Hide file tree
Showing 8 changed files with 82 additions and 2 deletions.
Empty file added numaprom/metrics/__init__.py
Empty file.
6 changes: 6 additions & 0 deletions numaprom/metrics/metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
import time

Check failure on line 1 in numaprom/metrics/metrics.py

View workflow job for this annotation

GitHub Actions / Ruff lint

Ruff (F401)

numaprom/metrics/metrics.py:1:8: F401 `time` imported but unused

from prometheus_client import start_http_server

def start_metrics_server(port):
start_http_server(port)
23 changes: 22 additions & 1 deletion numaprom/udf/inference.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,25 @@
from numaprom.tools import msg_forward
from numaprom.watcher import ConfigManager

from prometheus_client import Counter

REDIS_CLIENT = get_redis_client_from_conf(master_node=False)
LOCAL_CACHE_TTL = int(os.getenv("LOCAL_CACHE_TTL", 3600)) # default ttl set to 1 hour

# Metrics
redis_conn_status_count = Counter('numaprom_redis_conn_status_count', '', ['vertex', 'status'])


def increase_redis_conn_status(status):
redis_conn_status_count.labels('inference', status).inc()


inference_count = Counter('numaprom_inference_count', '', ['model', 'namespace', 'app', 'metric', 'status'])

Check failure on line 32 in numaprom/udf/inference.py

View workflow job for this annotation

GitHub Actions / Ruff lint

Ruff (E501)

numaprom/udf/inference.py:32:101: E501 Line too long (108 > 100 characters)


def increase_interface_count(model, namespace, app, metric, status):
inference_count.labels(model, namespace, app, metric, status).inc()


def _run_inference(
payload: StreamPayload, artifact_data: ArtifactData, numalogic_conf: NumalogicConf
Expand Down Expand Up @@ -84,8 +100,10 @@ def inference(_: list[str], datum: Datum) -> bytes:
)
payload.set_header(Header.STATIC_INFERENCE)
payload.set_status(Status.RUNTIME_ERROR)
increase_redis_conn_status("failed")
return orjson.dumps(payload, option=orjson.OPT_SERIALIZE_NUMPY)

else:
increase_redis_conn_status("success")
if not artifact_data:
LOGGER.info(
"{uuid} - Inference artifact not found, "
Expand Down Expand Up @@ -123,6 +141,9 @@ def inference(_: list[str], datum: Datum) -> bytes:
payload.set_status(Status.RUNTIME_ERROR)
return orjson.dumps(payload, option=orjson.OPT_SERIALIZE_NUMPY)

increase_interface_count(payload.header, payload.composite_keys["namespace"], payload.composite_keys["app"],

Check failure on line 144 in numaprom/udf/inference.py

View workflow job for this annotation

GitHub Actions / Ruff lint

Ruff (E501)

numaprom/udf/inference.py:144:101: E501 Line too long (112 > 100 characters)
payload.composite_keys["namespace"], payload.composite_keys["name"],
payload.metadata['version'])
LOGGER.info("{uuid} - Sending Payload: {payload} ", uuid=payload.uuid, payload=payload)
LOGGER.debug(
"{uuid} - Time taken in inference: {time} sec",
Expand Down
13 changes: 12 additions & 1 deletion numaprom/udf/postprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,19 @@
from numaprom.tools import msgs_forward, WindowScorer
from numaprom.watcher import ConfigManager

from prometheus_client import Counter

AUTH = os.getenv("REDIS_AUTH")
SCORE_PRECISION = int(os.getenv("SCORE_PRECISION", 3))
UNDEFINED_SCORE = -1.0

# Metrics
redis_conn_status_count = Counter('numaprom_redis_conn_status_count', '', ['vertex', 'status'])


def increase_redis_conn_status(status):
redis_conn_status_count.labels('postprocess', status).inc()


def __save_to_redis(
payload: StreamPayload, final_score: float, recreate: bool, unified_config: UnifiedConf
Expand Down Expand Up @@ -148,10 +157,12 @@ def _publish(final_score: float, payload: StreamPayload) -> list[bytes]:
uuid=payload.uuid,
warn=warn,
)
redis_conn_status_count("failed")
unified_anomaly, anomalies = __save_to_redis(
payload=payload, final_score=final_score, recreate=True, unified_config=unified_config
)

else:
redis_conn_status_count("success")
# If the unified anomaly is -1, we don't want to publish it
if unified_anomaly >= 0:
unified_json = __construct_unified_payload(
Expand Down
12 changes: 12 additions & 0 deletions numaprom/udf/preprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from numaprom.tools import msg_forward
from numaprom.watcher import ConfigManager

from prometheus_client import Counter

AUTH = os.getenv("REDIS_AUTH")
REDIS_CONF = ConfigManager.get_redis_config()
Expand All @@ -25,6 +26,14 @@
)
LOCAL_CACHE_TTL = int(os.getenv("LOCAL_CACHE_TTL", 3600)) # default ttl set to 1 hour

# Metrics
redis_conn_status_count = Counter('numaprom_redis_conn_status_count', '', ['vertex', 'status'])


def increase_redis_conn_status(status):
redis_conn_status_count.labels('preprocess', status).inc()



@msg_forward
def preprocess(_: list[str], datum: Datum) -> bytes:
Expand Down Expand Up @@ -56,7 +65,10 @@ def preprocess(_: list[str], datum: Datum) -> bytes:
)
payload.set_header(Header.STATIC_INFERENCE)
payload.set_status(Status.RUNTIME_ERROR)
increase_redis_conn_status("failed")
return orjson.dumps(payload, option=orjson.OPT_SERIALIZE_NUMPY)
else:
increase_redis_conn_status("success")
except Exception as ex:

Check failure on line 72 in numaprom/udf/preprocess.py

View workflow job for this annotation

GitHub Actions / Ruff lint

Ruff (E999)

numaprom/udf/preprocess.py:72:5: E999 SyntaxError: Unexpected token 'except'
LOGGER.exception(
"{uuid} - Unhandled exception while fetching preproc artifact, "
Expand Down
15 changes: 15 additions & 0 deletions numaprom/udf/threshold.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,20 @@
from numaprom.tools import conditional_forward, calculate_static_thresh
from numaprom.watcher import ConfigManager

from prometheus_client import Counter

LOCAL_CACHE_TTL = int(os.getenv("LOCAL_CACHE_TTL", 3600)) # default ttl set to 1 hour


# Metrics
redis_conn_status_count = Counter('numaprom_redis_conn_status_count', '', ['vertex', 'status'])


def increase_redis_conn_status(status):
redis_conn_status_count.labels('threshold', status).inc()



def _get_static_thresh_payload(payload, metric_config) -> bytes:
"""Calculates static thresholding, and returns a serialized json bytes payload."""
static_scores = calculate_static_thresh(payload, metric_config.static_threshold)
Expand Down Expand Up @@ -80,6 +91,7 @@ def threshold(_: list[str], datum: Datum) -> list[tuple[str, bytes]]:
)
payload.set_header(Header.STATIC_INFERENCE)
payload.set_status(Status.RUNTIME_ERROR)
redis_conn_status_count("failed")
return orjson.dumps(payload, option=orjson.OPT_SERIALIZE_NUMPY)
except Exception as ex:
LOGGER.exception(
Expand All @@ -91,10 +103,13 @@ def threshold(_: list[str], datum: Datum) -> list[tuple[str, bytes]]:
)
payload.set_header(Header.STATIC_INFERENCE)
payload.set_status(Status.RUNTIME_ERROR)
redis_conn_status_count("failed")
return [
(TRAIN_VTX_KEY, orjson.dumps(train_payload)),
(POSTPROC_VTX_KEY, _get_static_thresh_payload(payload, metric_config)),
]
else:
redis_conn_status_count("success")
if not thresh_artifact:
LOGGER.info(
"{uuid} - Threshold artifact not found, performing static thresholding. Keys: {keys}",
Expand Down
13 changes: 13 additions & 0 deletions numaprom/udf/window.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,16 @@
from numaprom.tools import msg_forward, create_composite_keys
from numaprom.watcher import ConfigManager

from prometheus_client import Counter

# Metrics
redis_conn_status_count = Counter('numaprom_redis_conn_status_count', '', ['vertex', 'status'])


def increase_redis_conn_status(status):
redis_conn_status_count.labels('window', status).inc()



# TODO get the replacement value from config
def _clean_arr(
Expand Down Expand Up @@ -87,6 +97,9 @@ def window(_: list[str], datum: Datum) -> bytes | None:
elements = __aggregate_window(
unique_key, msg["timestamp"], value, win_size, buff_size, recreate=True
)
redis_conn_status_count("failed")
else:
redis_conn_status_count("success")

# Drop message if no of elements is less than sequence length needed
if len(elements) < win_size:
Expand Down
2 changes: 2 additions & 0 deletions starter.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

from pynumaflow.function import Server
from pynumaflow.sink import Sink
from numaprom.metrics import metrics

from numaprom._constants import CONFIG_PATHS
from numaprom.factory import HandlerFactory
Expand All @@ -20,6 +21,7 @@ def run_watcher():
background_thread.start()

step_handler = HandlerFactory.get_handler(sys.argv[2])
metrics.start_metrics_server(8490)
server_type = sys.argv[1]

if server_type == "udsink":
Expand Down

0 comments on commit 8dd47e1

Please sign in to comment.