Skip to content

Commit

Permalink
fix: redis conn
Browse files Browse the repository at this point in the history
Signed-off-by: Avik Basu <[email protected]>
  • Loading branch information
ab93 committed Jul 6, 2023
1 parent 9fa4d90 commit 45536e6
Show file tree
Hide file tree
Showing 8 changed files with 237 additions and 306 deletions.
22 changes: 13 additions & 9 deletions numaprom/clients/sentinel.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ def get_redis_client(
port: int,
password: str,
mastername: str,
decode_responses: bool = False,
recreate: bool = False,
) -> redis_client_t:
"""Return a master redis client for sentinel connections, with retry.
Expand All @@ -43,7 +42,7 @@ def get_redis_client(
return SENTINEL_MASTER_CLIENT

retry = Retry(
ExponentialBackoff(cap=2, base=1),
ExponentialBackoff(),
3,
supported_errors=(
ConnectionError,
Expand All @@ -53,18 +52,23 @@ def get_redis_client(
MasterNotFoundError,
),
)
sentinel_args = {
"sentinels": [(host, port)],
"socket_timeout": 0.1,
"decode_responses": decode_responses,
}

LOGGER.info("Sentinel redis params: {args}", args=sentinel_args)
conn_kwargs = {
"socket_timeout": 1,
"socket_connect_timeout": 1,
"socket_keepalive": True,
"health_check_interval": 10,
}

sentinel = Sentinel(
**sentinel_args, sentinel_kwargs=dict(password=password), password=password, retry=retry
[(host, port)],
sentinel_kwargs=dict(password=password, **conn_kwargs),
retry=retry,
password=password,
**conn_kwargs
)
SENTINEL_MASTER_CLIENT = sentinel.master_for(mastername)
LOGGER.info("Sentinel redis params: {args}", args=conn_kwargs)
return SENTINEL_MASTER_CLIENT


Expand Down
5 changes: 1 addition & 4 deletions numaprom/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,13 @@
from pynumaflow.function import Messages
from pynumaflow.sink import Responses

from numaprom.udf import preprocess, postprocess, window, metric_filter, inference, threshold
from numaprom.udf import preprocess, postprocess, window, inference, threshold
from numaprom.udsink import train, train_rollout


class HandlerFactory:
@classmethod
def get_handler(cls, step: str) -> Callable[..., Messages | Responses]:
if step == "metric_filter":
return metric_filter

if step == "window":
return window

Expand Down
3 changes: 1 addition & 2 deletions numaprom/udf/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
from numaprom.udf.filter import metric_filter
from numaprom.udf.inference import inference
from numaprom.udf.postprocess import postprocess
from numaprom.udf.preprocess import preprocess
from numaprom.udf.window import window
from numaprom.udf.threshold import threshold


__all__ = ["preprocess", "metric_filter", "inference", "window", "postprocess", "threshold"]
__all__ = ["preprocess", "inference", "window", "postprocess", "threshold"]
2 changes: 1 addition & 1 deletion numaprom/udf/postprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def __save_to_redis(

metric_name = payload.composite_keys["name"]

r_keys = payload.composite_keys
r_keys = payload.composite_keys.copy()
r_keys.pop("name")
r_key = f"{':'.join(r_keys.values())}:{payload.end_ts}"

Expand Down
486 changes: 210 additions & 276 deletions poetry.lock

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "numalogic-prometheus"
version = "0.4.11"
version = "0.4.12"
description = "ML inference on numaflow using numalogic on Prometheus metrics"
authors = ["Numalogic developers"]
packages = [{ include = "numaprom" }]
Expand All @@ -21,7 +21,7 @@ repository = "https://github.com/numaproj/numalogic-prometheus"
python = "~3.10"
redis = {extras = ["hiredis"], version = "^4.5" }
pynumaflow = "~0.4.1"
numalogic = {version = "~0.4", extras = ["redis"]}
numalogic = {version = "~0.5", extras = ["redis"]}
orjson = "^3.8.4"
omegaconf = "^2.3.0"
watchdog = "^3.0.0"
Expand Down
6 changes: 1 addition & 5 deletions tests/test_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,11 @@

from numaprom.factory import HandlerFactory

from numaprom.udf import metric_filter, preprocess, postprocess, inference, threshold
from numaprom.udf import preprocess, postprocess, inference, threshold
from numaprom.udsink import train, train_rollout


class TestFactory(unittest.TestCase):
def test_metric_filter(self):
func = HandlerFactory.get_handler("metric_filter")
self.assertEqual(func, metric_filter)

def test_preprocess(self):
func = HandlerFactory.get_handler("preprocess")
self.assertEqual(func, preprocess)
Expand Down
15 changes: 8 additions & 7 deletions tests/test_prometheus.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,10 @@ class TestPrometheus(unittest.TestCase):
@classmethod
def setUpClass(cls) -> None:
end = datetime.datetime.now()
start = end - datetime.timedelta(hours=36)
start = end - datetime.timedelta(hours=72)
cls.start = start.timestamp()
cls.end = end.timestamp()
cls.prom = Prometheus(prometheus_server="http://localhost:8490")
cls.prom = Prometheus(prometheus_server="http://localhost:8090")

@patch.object(Prometheus, "query_range", Mock(return_value=mock_query_range()))
def test_query_metric1(self):
Expand Down Expand Up @@ -117,17 +117,18 @@ def test_query_metric3(self):
)
self.assertEqual(_out.shape, (2, 3))

@patch.object(Prometheus, "query_range", Mock(return_value=mock_multiple_metrics()))
# @patch.object(Prometheus, "query_range", Mock(return_value=mock_multiple_metrics()))
def test_query_metric4(self):
_out = self.prom.query_metric(
metric_name="namespace_app_pod_http_server_requests_errors",
labels_map={"namespace": "sandbox-numalogic-demo"},
metric_name="namespace_app_rollouts_http_request_error_rate",
labels_map={"namespace": "dev-devx-o11yfuzzygqlfederation-usw2-prd"},
return_labels=["rollouts_pod_template_hash"],
start=self.start,
end=self.end,
)
self.assertEqual(_out.shape, (4, 3))
self.assertEqual(_out["rollouts_pod_template_hash"].unique().shape[0], 2)
print(_out)
# self.assertEqual(_out.shape, (4, 3))
# self.assertEqual(_out["rollouts_pod_template_hash"].unique().shape[0], 2)

@patch.object(requests, "get", Mock(return_value=mock_response()))
def test_query_range(self):
Expand Down

0 comments on commit 45536e6

Please sign in to comment.