Skip to content

Commit

Permalink
fix: logger context issues (#389)
Browse files Browse the repository at this point in the history
---------

Signed-off-by: Gulshan Bhatia <[email protected]>
  • Loading branch information
gulshan02 authored Jun 17, 2024
1 parent 8b8ae79 commit 9d4ea32
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 44 deletions.
51 changes: 20 additions & 31 deletions numalogic/udfs/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from pandas import DataFrame
from pynumaflow.mapper import Message


from numalogic.registry import ArtifactManager, ArtifactData
from numalogic.tools.exceptions import RedisRegistryError
from numalogic.tools.types import KEYS, redis_client_t
Expand Down Expand Up @@ -168,7 +169,11 @@ def _load_artifact(
)

logger = _struct_log.bind(
uuid=payload.uuid, skeys=skeys, dkeys=dkeys, payload_metrics=payload.metrics
uuid=payload.uuid,
skeys=skeys,
dkeys=dkeys,
payload_metrics=payload.metrics,
udf_vertex=vertex,
)

version_to_load = "-1"
Expand Down Expand Up @@ -252,12 +257,11 @@ def __construct_train_key(keys: KEYS) -> str:
return f"TRAIN::{_key}"

def __fetch_ts(self, uuid: str, key: str) -> _DedupMetadata:
logger = _struct_log.bind(uuid=uuid, key=key)
try:
data = self.client.hgetall(key)
except Exception:
_struct_log.exception(
"Problem fetching ts information for the key", uuid=uuid, key=key
)
logger.exception("Problem fetching ts information for the key")
return _DedupMetadata(msg_read_ts=None, msg_train_ts=None, msg_train_records=None)
else:
# decode the key:value pair and update the values
Expand Down Expand Up @@ -286,16 +290,13 @@ def ack_insufficient_data(self, key: KEYS, uuid: str, train_records: int) -> boo
bool.
"""
_key = self.__construct_train_key(key)
logger = _struct_log.bind(uuid=uuid, key=key)
try:
self.client.hset(name=_key, key="_msg_train_records", value=str(train_records))
except Exception:
_struct_log.exception(
"Problem while updating _msg_train_records information for the key",
uuid=uuid,
key=key,
)
logger.exception("Problem while updating _msg_train_records information for the key")
return False
_struct_log.debug("Acknowledging insufficient data for the key", uuid=uuid, key=key)
logger.debug("Acknowledging insufficient data for the key")
return True

def ack_read(
Expand Down Expand Up @@ -324,6 +325,7 @@ def ack_read(
"""
_key = self.__construct_train_key(key)
metadata = self.__fetch_ts(uuid=uuid, key=_key)
logger = _struct_log.bind(uuid=uuid, key=key)
_msg_read_ts, _msg_train_ts, _msg_train_records = (
metadata.msg_read_ts,
metadata.msg_train_ts,
Expand All @@ -337,11 +339,9 @@ def ack_read(
and _curr_time - float(_msg_read_ts)
< (min_train_records - int(_msg_train_records)) * data_freq
):
_struct_log.debug(
logger.debug(
"There was insufficient data for the key in the past. Retrying fetching"
" and training after secs",
uuid=uuid,
key=key,
secs=((min_train_records - int(_msg_train_records)) * data_freq)
- _curr_time
+ float(_msg_read_ts),
Expand All @@ -351,30 +351,22 @@ def ack_read(

# Check if the model is being trained by another process
if _msg_read_ts and time.time() - float(_msg_read_ts) < retry:
_struct_log.debug(
"Model with key is being trained by another process", uuid=uuid, key=key
)
logger.debug("Model with key is being trained by another process")
return False

# This check is needed if there is backpressure in the pipeline
if _msg_train_ts and time.time() - float(_msg_train_ts) < retrain_freq * 60 * 60:
_struct_log.debug(
logger.debug(
"Model was saved for the key in less than retrain_freq hrs, skipping training",
uuid=uuid,
key=key,
retrain_freq=retrain_freq,
)
return False
try:
self.client.hset(name=_key, key="_msg_read_ts", value=str(time.time()))
except Exception:
_struct_log.exception(
"Problem while updating msg_read_ts information for the key",
uuid=uuid,
key=key,
)
logger.exception("Problem while updating msg_read_ts information for the key")
return False
_struct_log.debug("Acknowledging request for Training for key", uuid=uuid, key=key)
logger.debug("Acknowledging request for Training for key")
return True

def ack_train(self, key: KEYS, uuid: str) -> bool:
Expand All @@ -390,16 +382,13 @@ def ack_train(self, key: KEYS, uuid: str) -> bool:
bool
"""
_key = self.__construct_train_key(key)
logger = _struct_log.bind(uuid=uuid, key=key)
try:
self.client.hset(name=_key, key="_msg_train_ts", value=str(time.time()))
except Exception:
_struct_log.exception(
"Problem while updating msg_train_ts information for the key",
uuid=uuid,
key=key,
)
logger.exception("Problem while updating msg_train_ts information for the key")
return False
_struct_log.debug("Acknowledging model saving complete for the key", uuid=uuid, key=key)
logger.debug("Acknowledging model saving complete for the key")
return True


Expand Down
10 changes: 5 additions & 5 deletions tests/udfs/test_rds_trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ def test_trainer_do_not_train_3(mocker, udf1, datum_mock, payload):

keys = payload["composite_keys"]

TrainMsgDeduplicator(REDIS_CLIENT).ack_read([*keys, "pipeline1"], "some-uuid")
TrainMsgDeduplicator(REDIS_CLIENT).ack_read(key=[*keys, "pipeline1"], uuid="some-uuid")
ts = datetime.strptime("2022-05-24 10:00:00", "%Y-%m-%d %H:%M:%S")
with freeze_time(ts + timedelta(minutes=15)):
udf1(keys, datum_mock)
Expand Down Expand Up @@ -461,11 +461,11 @@ def test_TrainMsgDeduplicator_exception_1(mocker, caplog, payload):
mocker.patch("redis.Redis.hset", Mock(side_effect=RedisError))
train_dedup = TrainMsgDeduplicator(REDIS_CLIENT)
keys = payload["composite_keys"]
train_dedup.ack_read([*keys, "pipeline1"], "some-uuid")
train_dedup.ack_read(key=[*keys, "pipeline1"], uuid="some-uuid")
assert "RedisError" in caplog.text
train_dedup.ack_train([*keys, "pipeline1"], "some-uuid")
train_dedup.ack_train(key=[*keys, "pipeline1"], uuid="some-uuid")
assert "RedisError" in caplog.text
train_dedup.ack_insufficient_data([*keys, "pipeline1"], "some-uuid", train_records=180)
train_dedup.ack_insufficient_data(key=[*keys, "pipeline1"], uuid="some-uuid", train_records=180)
assert "RedisError" in caplog.text


Expand All @@ -474,7 +474,7 @@ def test_TrainMsgDeduplicator_exception_2(mocker, caplog, payload):
mocker.patch("redis.Redis.hset", Mock(side_effect=RedisError))
train_dedup = TrainMsgDeduplicator(REDIS_CLIENT)
keys = payload["composite_keys"]
train_dedup.ack_read([*keys, "pipeline1"], "some-uuid")
train_dedup.ack_read(key=[*keys, "pipeline1"], uuid="some-uuid")
assert "RedisError" in caplog.text


Expand Down
38 changes: 30 additions & 8 deletions tests/udfs/test_trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,9 @@ def test_trainer_do_not_train_2(self):
self.udf1(self.keys, self.datum)
ts = datetime.strptime("2022-05-24 10:00:00", "%Y-%m-%d %H:%M:%S")
with freeze_time(ts + timedelta(hours=25)):
TrainMsgDeduplicator(REDIS_CLIENT).ack_read([*self.keys, "pipeline1"], uuid="some-uuid")
TrainMsgDeduplicator(REDIS_CLIENT).ack_read(
key=[*self.keys, "pipeline1"], uuid="some-uuid"
)
with freeze_time(ts + timedelta(hours=25) + timedelta(minutes=15)):
self.udf1(self.keys, self.datum)
self.assertEqual(
Expand Down Expand Up @@ -277,7 +279,7 @@ def test_trainer_do_not_train_3(self):
}
),
)
TrainMsgDeduplicator(REDIS_CLIENT).ack_read([*self.keys, "pipeline1"], "some-uuid")
TrainMsgDeduplicator(REDIS_CLIENT).ack_read(key=[*self.keys, "pipeline1"], uuid="some-uuid")
ts = datetime.strptime("2022-05-24 10:00:00", "%Y-%m-%d %H:%M:%S")
with freeze_time(ts + timedelta(minutes=15)):
self.udf1(self.keys, self.datum)
Expand Down Expand Up @@ -418,28 +420,48 @@ def test_trainer_datafetcher_err_and_train(self):
@patch("redis.Redis.hset", Mock(side_effect=RedisError))
def test_TrainMsgDeduplicator_exception_1(self):
train_dedup = TrainMsgDeduplicator(REDIS_CLIENT)
train_dedup.ack_read([*self.keys, "pipeline1"], "some-uuid")
train_dedup.ack_read(key=[*self.keys, "pipeline1"], uuid="some-uuid")
self.assertLogs("RedisError")
train_dedup.ack_train([*self.keys, "pipeline1"], "some-uuid")
train_dedup.ack_train(key=[*self.keys, "pipeline1"], uuid="some-uuid")
self.assertLogs("RedisError")
train_dedup.ack_insufficient_data([*self.keys, "pipeline1"], "some-uuid", train_records=180)
train_dedup.ack_insufficient_data(
key=[*self.keys, "pipeline1"], uuid="some-uuid", train_records=180
)
self.assertLogs("RedisError")

@patch("redis.Redis.hset", Mock(side_effect=mock_druid_fetch_data()))
def test_TrainMsgDeduplicator_insufficent_data(self):
with self.assertLogs(level="DEBUG") as log:
train_dedup = TrainMsgDeduplicator(REDIS_CLIENT)
train_dedup.ack_insufficient_data(
[*self.keys, "pipeline1"], "some-uuid", train_records=180
key=[*self.keys, "pipeline1"], uuid="some-uuid", train_records=180
)
self.assertLogs("Acknowledging insufficient data for the key", log.output[-1])
self.assertLogs(
"DEBUG:numalogic.udfs._logger:uuid='some-uuid' "
"event='Acknowledging insufficient data for the key' "
"key=['5984175597303660107', 'pipeline1'] "
"level='debug' timestamp='2024-06-14T19:08:23.610016Z'",
log.output[-1],
)

@patch("redis.Redis.hgetall", Mock(side_effect=RedisError))
def test_TrainMsgDeduplicator_exception_2(self):
train_dedup = TrainMsgDeduplicator(REDIS_CLIENT)
train_dedup.ack_read([*self.keys, "pipeline1"], "some-uuid")
train_dedup.ack_read(key=[*self.keys, "pipeline1"], uuid="some-uuid")
self.assertLogs("RedisError")

@patch("redis.Redis.hgetall", Mock(side_effect=RedisError))
def test_TrainMsgDeduplicator_test_debug_logs(self):
train_dedup = TrainMsgDeduplicator(REDIS_CLIENT)
with self.assertLogs(level="DEBUG") as log:
train_dedup.ack_read(key=[*self.keys, "pipeline1"], uuid="some-uuid")
self.assertLogs(
"DEBUG:numalogic.udfs._logger:uuid='some-uuid' "
"event='Acknowledging request for Training for key' key=['5984175597303660107', "
"'pipeline1'] level='debug' timestamp='2024-06-14T19:02:55.050604Z'",
log.output[-1],
)

def test_druid_from_config_1(self):
with self.assertLogs(level="WARN") as log:
self.udf1(self.keys, self.datum)
Expand Down

0 comments on commit 9d4ea32

Please sign in to comment.