Skip to content

Commit

Permalink
fix: train hours bug
Browse files Browse the repository at this point in the history
Signed-off-by: nkoppisetty <[email protected]>
  • Loading branch information
nkoppisetty committed Aug 12, 2023
1 parent 04c23f6 commit f1df53e
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 8 deletions.
1 change: 0 additions & 1 deletion udf/anomaly-detection/src/connectors/_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,3 @@ class DruidFetcherConf:
group_by: list[str] = field(default_factory=list)
pivot: Pivot = field(default_factory=lambda: Pivot())
granularity: str = "minute"
hours: float = 36
15 changes: 8 additions & 7 deletions udf/anomaly-detection/src/udf/trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def get_feature_df(data: pd.DataFrame, metrics: list):

class Trainer:
@classmethod
def fetch_prometheus_data(cls, payload: TrainerPayload) -> pd.DataFrame:
def fetch_prometheus_data(cls, payload: TrainerPayload, hours: int) -> pd.DataFrame:
prometheus_conf = ConfigManager.get_prom_config()
if prometheus_conf is None:
_LOGGER.error("%s - Prometheus config is not available", payload.uuid)
Expand All @@ -53,10 +53,11 @@ def fetch_prometheus_data(cls, payload: TrainerPayload) -> pd.DataFrame:
metric=payload.metrics[0],
labels={"namespace": payload.composite_keys[1]},
return_labels=["rollouts_pod_template_hash"],
hours=hours
)

@classmethod
def fetch_druid_data(cls, payload: TrainerPayload) -> pd.DataFrame:
def fetch_druid_data(cls, payload: TrainerPayload, hours: int) -> pd.DataFrame:
stream_config = ConfigManager.get_stream_config(payload.config_id)
druid_conf = ConfigManager.get_druid_config()
fetcher_conf = stream_config.druid_fetcher
Expand All @@ -74,19 +75,19 @@ def fetch_druid_data(cls, payload: TrainerPayload) -> pd.DataFrame:
aggregations=OmegaConf.to_container(fetcher_conf.aggregations),
group_by=OmegaConf.to_container(fetcher_conf.group_by),
pivot=fetcher_conf.pivot,
hours=fetcher_conf.hours,
hours=hours,
)

@classmethod
def fetch_data(cls, payload: TrainerPayload) -> pd.DataFrame:
def fetch_data(cls, payload: TrainerPayload, hours: int) -> pd.DataFrame:
_start_train = time.perf_counter()
stream_config = ConfigManager.get_stream_config(payload.config_id)

_df = pd.DataFrame()
if stream_config.source == DataSource.PROMETHEUS:
_df = cls.fetch_prometheus_data(payload)
_df = cls.fetch_prometheus_data(payload, hours)
elif stream_config.source == DataSource.DRUID:
_df = cls.fetch_druid_data(payload)
_df = cls.fetch_druid_data(payload, hours)
else:
_LOGGER.error(
"%s - Data source is not supported, source: %s, keys: %s",
Expand Down Expand Up @@ -267,7 +268,7 @@ def run(self, keys: List[str], datum: Datum) -> Messages:
return messages

try:
df = self.fetch_data(payload)
df = self.fetch_data(payload, retrain_config.train_hours)
except Exception as err:
_LOGGER.error(
"%s - Error while fetching data for keys: %s, metrics: %s, err: %r",
Expand Down

0 comments on commit f1df53e

Please sign in to comment.