Skip to content

Commit

Permalink
feat: ttl to config
Browse files Browse the repository at this point in the history
Signed-off-by: nkoppisetty <[email protected]>
  • Loading branch information
nkoppisetty committed Jul 27, 2023
1 parent 4cb3d61 commit 5a0ef08
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 17 deletions.
2 changes: 2 additions & 0 deletions udf/anomaly-detection/src/_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ class ReTrainConf:
min_train_size: int = 2000
retrain_freq_hr: int = 24
resume_training: bool = False
model_expiry_sec: int = 86400 # 24 hrs
dedup_expiry_sec: int = 1800 # 30 days


@dataclass
Expand Down
50 changes: 33 additions & 17 deletions udf/anomaly-detection/src/udf/trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class Trainer:
def fetch_prometheus_data(cls, payload: TrainerPayload) -> pd.DataFrame:
prometheus_conf = ConfigManager.get_prom_config()
if prometheus_conf is None:
_LOGGER.error("Prometheus config is not available")
_LOGGER.error("%s - Prometheus config is not available", payload.uuid)
return pd.DataFrame()
data_fetcher = PrometheusDataFetcher(prometheus_conf.server)
return data_fetcher.fetch_data(
Expand All @@ -61,7 +61,7 @@ def fetch_druid_data(cls, payload: TrainerPayload) -> pd.DataFrame:
druid_conf = ConfigManager.get_druid_config()
fetcher_conf = stream_config.druid_fetcher
if druid_conf is None:
_LOGGER.error("Druid config is not available")
_LOGGER.error("%s - Druid config is not available", payload.uuid)
return pd.DataFrame()
data_fetcher = DruidFetcher(url=druid_conf.url, endpoint=druid_conf.endpoint)

Expand All @@ -79,28 +79,43 @@ def fetch_druid_data(cls, payload: TrainerPayload) -> pd.DataFrame:

@classmethod
def fetch_data(cls, payload: TrainerPayload) -> pd.DataFrame:
_start_train = time.perf_counter()
stream_config = ConfigManager.get_stream_config(payload.config_id)

_df = pd.DataFrame()
if stream_config.source == DataSource.PROMETHEUS:
return cls.fetch_prometheus_data(payload)
_df = cls.fetch_prometheus_data(payload)
elif stream_config.source == DataSource.DRUID:
return cls.fetch_druid_data(payload)
_df = cls.fetch_druid_data(payload)
else:
_LOGGER.error(
"%s - Data source is not supported, source: %s, keys: %s",
payload.uuid,
stream_config.source,
payload.composite_keys,
)
return _df

_LOGGER.error(
"Data source is not supported, source: %s, keys: %s",
_LOGGER.debug(
"%s - Time taken to fetch data from %s: %.3f sec, df shape: %s",
payload.uuid,
stream_config.source,
payload.composite_keys,
time.perf_counter() - _start_train,
_df.shape,
)
return pd.DataFrame()
return _df

@classmethod
def _is_new_request(cls, redis_client: redis_client_t, payload: TrainerPayload) -> bool:
def _is_new_request(
cls, redis_client: redis_client_t, dedup_expiry: int, payload: TrainerPayload
) -> bool:
_ckeys = ":".join(payload.composite_keys)
r_key = f"train::{_ckeys}"
value = redis_client.get(r_key)
if value:
return False

redis_client.setex(r_key, time=REQUEST_EXPIRY, value=1)
redis_client.setex(r_key, time=dedup_expiry, value=1)
return True

@classmethod
Expand Down Expand Up @@ -157,6 +172,7 @@ def _train_and_save(

model_cfg = numalogic_conf.model
preproc_cfgs = numalogic_conf.preprocess
retrain_cfg = ConfigManager.get_retrain_config(payload.config_id)

# TODO: filter the metrics here

Expand All @@ -173,7 +189,7 @@ def _train_and_save(

# TODO if one of the models fail to save, delete the previously saved models and transition stage
# Save main model
model_registry = RedisRegistry(client=redis_client)
model_registry = RedisRegistry(client=redis_client, ttl=retrain_cfg.model_expiry_sec)
try:
version = model_registry.save(
skeys=skeys,
Expand Down Expand Up @@ -238,17 +254,17 @@ def _train_and_save(
def run(self, keys: List[str], datum: Datum) -> Messages:
messages = Messages()
redis_client = get_redis_client_from_conf()

payload = TrainerPayload(**orjson.loads(datum.value))
is_new = self._is_new_request(redis_client, payload)

retrain_config = ConfigManager.get_retrain_config(payload.config_id)
numalogic_config = ConfigManager.get_numalogic_config(payload.config_id)

is_new = self._is_new_request(redis_client, retrain_config.dedup_expiry_sec, payload)

if not is_new:
messages.append(Message.to_drop())
return messages

retrain_config = ConfigManager.get_retrain_config(payload.config_id)
numalogic_config = ConfigManager.get_numalogic_config(payload.config_id)

try:
df = self.fetch_data(payload)
except Exception as err:
Expand All @@ -275,6 +291,6 @@ def run(self, keys: List[str], datum: Datum) -> Messages:
train_df = get_feature_df(df, payload.metrics)
self._train_and_save(numalogic_config, payload, redis_client, train_df)

messages.append(Message(keys=keys, value=train_df.to_json()))
messages.append(Message(keys=keys, value=payload.to_json()))

return messages

0 comments on commit 5a0ef08

Please sign in to comment.