diff --git a/numaprom/udf/inference.py b/numaprom/udf/inference.py index 33574f1..08adee8 100644 --- a/numaprom/udf/inference.py +++ b/numaprom/udf/inference.py @@ -16,7 +16,7 @@ from numaprom.entities import PayloadFactory from numaprom.entities import Status, StreamPayload, Header from numaprom.tools import msg_forward -from numaprom.metrics import inc_redis_conn_success, inc_inference_count +from numaprom.metrics import inc_redis_conn_success, inc_inference_count, inc_redis_conn_failed from numaprom.watcher import ConfigManager @@ -89,7 +89,7 @@ def inference(_: list[str], datum: Datum) -> bytes: ) payload.set_header(Header.STATIC_INFERENCE) payload.set_status(Status.RUNTIME_ERROR) - inc_redis_conn_success(_VERTEX) + inc_redis_conn_failed(_VERTEX) return orjson.dumps(payload, option=orjson.OPT_SERIALIZE_NUMPY) inc_redis_conn_success(_VERTEX) diff --git a/numaprom/udf/postprocess.py b/numaprom/udf/postprocess.py index 577c5ad..3672243 100644 --- a/numaprom/udf/postprocess.py +++ b/numaprom/udf/postprocess.py @@ -156,7 +156,8 @@ def _publish(final_score: float, payload: StreamPayload) -> list[bytes]: unified_anomaly, anomalies = __save_to_redis( payload=payload, final_score=final_score, recreate=True, unified_config=unified_config ) - inc_redis_conn_success(_VERTEX) + else: + inc_redis_conn_success(_VERTEX) # If the unified anomaly is -1, we don't want to publish it if unified_anomaly >= 0: diff --git a/numaprom/udf/threshold.py b/numaprom/udf/threshold.py index 16844e7..d4bf6e1 100644 --- a/numaprom/udf/threshold.py +++ b/numaprom/udf/threshold.py @@ -100,8 +100,8 @@ def threshold(_: list[str], datum: Datum) -> list[tuple[str, bytes]]: (TRAIN_VTX_KEY, orjson.dumps(train_payload)), (POSTPROC_VTX_KEY, _get_static_thresh_payload(payload, metric_config)), ] - - inc_redis_conn_success(_VERTEX) + else: + inc_redis_conn_success(_VERTEX) if not thresh_artifact: LOGGER.info( diff --git a/numaprom/udf/window.py b/numaprom/udf/window.py index 9feda01..d157429 100644 --- a/numaprom/udf/window.py +++ b/numaprom/udf/window.py @@ -91,9 +91,9 @@ def window(_: list[str], datum: Datum) -> bytes | None: elements = __aggregate_window( unique_key, msg["timestamp"], value, win_size, buff_size, recreate=True ) - inc_redis_conn_success(_VERTEX) - else: inc_redis_conn_failed(_VERTEX) + else: + inc_redis_conn_success(_VERTEX) # Drop message if no of elements is less than sequence length needed if len(elements) < win_size: