From 9d4ea327712e3b45afc1fc04830fc301e25b4fd3 Mon Sep 17 00:00:00 2001 From: Gulshan Bhatia Date: Mon, 17 Jun 2024 09:18:55 -0700 Subject: [PATCH] fix: logger context issues (#389) --------- Signed-off-by: Gulshan Bhatia --- numalogic/udfs/tools.py | 51 +++++++++++++--------------------- tests/udfs/test_rds_trainer.py | 10 +++---- tests/udfs/test_trainer.py | 38 +++++++++++++++++++------ 3 files changed, 55 insertions(+), 44 deletions(-) diff --git a/numalogic/udfs/tools.py b/numalogic/udfs/tools.py index 83075f59..c86b4d6a 100644 --- a/numalogic/udfs/tools.py +++ b/numalogic/udfs/tools.py @@ -8,6 +8,7 @@ from pandas import DataFrame from pynumaflow.mapper import Message + from numalogic.registry import ArtifactManager, ArtifactData from numalogic.tools.exceptions import RedisRegistryError from numalogic.tools.types import KEYS, redis_client_t @@ -168,7 +169,11 @@ def _load_artifact( ) logger = _struct_log.bind( - uuid=payload.uuid, skeys=skeys, dkeys=dkeys, payload_metrics=payload.metrics + uuid=payload.uuid, + skeys=skeys, + dkeys=dkeys, + payload_metrics=payload.metrics, + udf_vertex=vertex, ) version_to_load = "-1" @@ -252,12 +257,11 @@ def __construct_train_key(keys: KEYS) -> str: return f"TRAIN::{_key}" def __fetch_ts(self, uuid: str, key: str) -> _DedupMetadata: + logger = _struct_log.bind(uuid=uuid, key=key) try: data = self.client.hgetall(key) except Exception: - _struct_log.exception( - "Problem fetching ts information for the key", uuid=uuid, key=key - ) + logger.exception("Problem fetching ts information for the key") return _DedupMetadata(msg_read_ts=None, msg_train_ts=None, msg_train_records=None) else: # decode the key:value pair and update the values @@ -286,16 +290,13 @@ def ack_insufficient_data(self, key: KEYS, uuid: str, train_records: int) -> boo bool. """ _key = self.__construct_train_key(key) + logger = _struct_log.bind(uuid=uuid, key=key) try: self.client.hset(name=_key, key="_msg_train_records", value=str(train_records)) except Exception: - _struct_log.exception( - "Problem while updating _msg_train_records information for the key", - uuid=uuid, - key=key, - ) + logger.exception("Problem while updating _msg_train_records information for the key") return False - _struct_log.debug("Acknowledging insufficient data for the key", uuid=uuid, key=key) + logger.debug("Acknowledging insufficient data for the key") return True def ack_read( @@ -324,6 +325,7 @@ def ack_read( """ _key = self.__construct_train_key(key) metadata = self.__fetch_ts(uuid=uuid, key=_key) + logger = _struct_log.bind(uuid=uuid, key=key) _msg_read_ts, _msg_train_ts, _msg_train_records = ( metadata.msg_read_ts, metadata.msg_train_ts, @@ -337,11 +339,9 @@ def ack_read( and _curr_time - float(_msg_read_ts) < (min_train_records - int(_msg_train_records)) * data_freq ): - _struct_log.debug( + logger.debug( "There was insufficient data for the key in the past. Retrying fetching" " and training after secs", - uuid=uuid, - key=key, secs=((min_train_records - int(_msg_train_records)) * data_freq) - _curr_time + float(_msg_read_ts), @@ -351,30 +351,22 @@ def ack_read( # Check if the model is being trained by another process if _msg_read_ts and time.time() - float(_msg_read_ts) < retry: - _struct_log.debug( - "Model with key is being trained by another process", uuid=uuid, key=key - ) + logger.debug("Model with key is being trained by another process") return False # This check is needed if there is backpressure in the pipeline if _msg_train_ts and time.time() - float(_msg_train_ts) < retrain_freq * 60 * 60: - _struct_log.debug( + logger.debug( "Model was saved for the key in less than retrain_freq hrs, skipping training", - uuid=uuid, - key=key, retrain_freq=retrain_freq, ) return False try: self.client.hset(name=_key, key="_msg_read_ts", value=str(time.time())) except Exception: - _struct_log.exception( - "Problem while updating msg_read_ts information for the key", - uuid=uuid, - key=key, - ) + logger.exception("Problem while updating msg_read_ts information for the key") return False - _struct_log.debug("Acknowledging request for Training for key", uuid=uuid, key=key) + logger.debug("Acknowledging request for Training for key") return True def ack_train(self, key: KEYS, uuid: str) -> bool: @@ -390,16 +382,13 @@ def ack_train(self, key: KEYS, uuid: str) -> bool: bool """ _key = self.__construct_train_key(key) + logger = _struct_log.bind(uuid=uuid, key=key) try: self.client.hset(name=_key, key="_msg_train_ts", value=str(time.time())) except Exception: - _struct_log.exception( - "Problem while updating msg_train_ts information for the key", - uuid=uuid, - key=key, - ) + logger.exception("Problem while updating msg_train_ts information for the key") return False - _struct_log.debug("Acknowledging model saving complete for the key", uuid=uuid, key=key) + logger.debug("Acknowledging model saving complete for the key") return True diff --git a/tests/udfs/test_rds_trainer.py b/tests/udfs/test_rds_trainer.py index 842e4fa6..a9c520fb 100644 --- a/tests/udfs/test_rds_trainer.py +++ b/tests/udfs/test_rds_trainer.py @@ -310,7 +310,7 @@ def test_trainer_do_not_train_3(mocker, udf1, datum_mock, payload): keys = payload["composite_keys"] - TrainMsgDeduplicator(REDIS_CLIENT).ack_read([*keys, "pipeline1"], "some-uuid") + TrainMsgDeduplicator(REDIS_CLIENT).ack_read(key=[*keys, "pipeline1"], uuid="some-uuid") ts = datetime.strptime("2022-05-24 10:00:00", "%Y-%m-%d %H:%M:%S") with freeze_time(ts + timedelta(minutes=15)): udf1(keys, datum_mock) @@ -461,11 +461,11 @@ def test_TrainMsgDeduplicator_exception_1(mocker, caplog, payload): mocker.patch("redis.Redis.hset", Mock(side_effect=RedisError)) train_dedup = TrainMsgDeduplicator(REDIS_CLIENT) keys = payload["composite_keys"] - train_dedup.ack_read([*keys, "pipeline1"], "some-uuid") + train_dedup.ack_read(key=[*keys, "pipeline1"], uuid="some-uuid") assert "RedisError" in caplog.text - train_dedup.ack_train([*keys, "pipeline1"], "some-uuid") + train_dedup.ack_train(key=[*keys, "pipeline1"], uuid="some-uuid") assert "RedisError" in caplog.text - train_dedup.ack_insufficient_data([*keys, "pipeline1"], "some-uuid", train_records=180) + train_dedup.ack_insufficient_data(key=[*keys, "pipeline1"], uuid="some-uuid", train_records=180) assert "RedisError" in caplog.text @@ -474,7 +474,7 @@ def test_TrainMsgDeduplicator_exception_2(mocker, caplog, payload): mocker.patch("redis.Redis.hset", Mock(side_effect=RedisError)) train_dedup = TrainMsgDeduplicator(REDIS_CLIENT) keys = payload["composite_keys"] - train_dedup.ack_read([*keys, "pipeline1"], "some-uuid") + train_dedup.ack_read(key=[*keys, "pipeline1"], uuid="some-uuid") assert "RedisError" in caplog.text diff --git a/tests/udfs/test_trainer.py b/tests/udfs/test_trainer.py index 86e19d52..3ffcb773 100644 --- a/tests/udfs/test_trainer.py +++ b/tests/udfs/test_trainer.py @@ -243,7 +243,9 @@ def test_trainer_do_not_train_2(self): self.udf1(self.keys, self.datum) ts = datetime.strptime("2022-05-24 10:00:00", "%Y-%m-%d %H:%M:%S") with freeze_time(ts + timedelta(hours=25)): - TrainMsgDeduplicator(REDIS_CLIENT).ack_read([*self.keys, "pipeline1"], uuid="some-uuid") + TrainMsgDeduplicator(REDIS_CLIENT).ack_read( + key=[*self.keys, "pipeline1"], uuid="some-uuid" + ) with freeze_time(ts + timedelta(hours=25) + timedelta(minutes=15)): self.udf1(self.keys, self.datum) self.assertEqual( @@ -277,7 +279,7 @@ def test_trainer_do_not_train_3(self): } ), ) - TrainMsgDeduplicator(REDIS_CLIENT).ack_read([*self.keys, "pipeline1"], "some-uuid") + TrainMsgDeduplicator(REDIS_CLIENT).ack_read(key=[*self.keys, "pipeline1"], uuid="some-uuid") ts = datetime.strptime("2022-05-24 10:00:00", "%Y-%m-%d %H:%M:%S") with freeze_time(ts + timedelta(minutes=15)): self.udf1(self.keys, self.datum) @@ -418,11 +420,13 @@ def test_trainer_datafetcher_err_and_train(self): @patch("redis.Redis.hset", Mock(side_effect=RedisError)) def test_TrainMsgDeduplicator_exception_1(self): train_dedup = TrainMsgDeduplicator(REDIS_CLIENT) - train_dedup.ack_read([*self.keys, "pipeline1"], "some-uuid") + train_dedup.ack_read(key=[*self.keys, "pipeline1"], uuid="some-uuid") self.assertLogs("RedisError") - train_dedup.ack_train([*self.keys, "pipeline1"], "some-uuid") + train_dedup.ack_train(key=[*self.keys, "pipeline1"], uuid="some-uuid") self.assertLogs("RedisError") - train_dedup.ack_insufficient_data([*self.keys, "pipeline1"], "some-uuid", train_records=180) + train_dedup.ack_insufficient_data( + key=[*self.keys, "pipeline1"], uuid="some-uuid", train_records=180 + ) self.assertLogs("RedisError") @patch("redis.Redis.hset", Mock(side_effect=mock_druid_fetch_data())) @@ -430,16 +434,34 @@ def test_TrainMsgDeduplicator_insufficent_data(self): with self.assertLogs(level="DEBUG") as log: train_dedup = TrainMsgDeduplicator(REDIS_CLIENT) train_dedup.ack_insufficient_data( - [*self.keys, "pipeline1"], "some-uuid", train_records=180 + key=[*self.keys, "pipeline1"], uuid="some-uuid", train_records=180 ) - self.assertLogs("Acknowledging insufficient data for the key", log.output[-1]) + self.assertLogs( + "DEBUG:numalogic.udfs._logger:uuid='some-uuid' " + "event='Acknowledging insufficient data for the key' " + "key=['5984175597303660107', 'pipeline1'] " + "level='debug' timestamp='2024-06-14T19:08:23.610016Z'", + log.output[-1], + ) @patch("redis.Redis.hgetall", Mock(side_effect=RedisError)) def test_TrainMsgDeduplicator_exception_2(self): train_dedup = TrainMsgDeduplicator(REDIS_CLIENT) - train_dedup.ack_read([*self.keys, "pipeline1"], "some-uuid") + train_dedup.ack_read(key=[*self.keys, "pipeline1"], uuid="some-uuid") self.assertLogs("RedisError") + @patch("redis.Redis.hgetall", Mock(side_effect=RedisError)) + def test_TrainMsgDeduplicator_test_debug_logs(self): + train_dedup = TrainMsgDeduplicator(REDIS_CLIENT) + with self.assertLogs(level="DEBUG") as log: + train_dedup.ack_read(key=[*self.keys, "pipeline1"], uuid="some-uuid") + self.assertLogs( + "DEBUG:numalogic.udfs._logger:uuid='some-uuid' " + "event='Acknowledging request for Training for key' key=['5984175597303660107', " + "'pipeline1'] level='debug' timestamp='2024-06-14T19:02:55.050604Z'", + log.output[-1], + ) + def test_druid_from_config_1(self): with self.assertLogs(level="WARN") as log: self.udf1(self.keys, self.datum)