Skip to content

Commit

Permalink
feat: multi timeseries models
Browse files Browse the repository at this point in the history
Signed-off-by: nkoppisetty <[email protected]>
  • Loading branch information
nkoppisetty committed Jul 17, 2023
1 parent 25e99b4 commit 132cc25
Show file tree
Hide file tree
Showing 27 changed files with 14,628 additions and 2,604 deletions.
48 changes: 0 additions & 48 deletions udf/anomaly-detection/config/default-configs/config.yaml

This file was deleted.

21 changes: 0 additions & 21 deletions udf/anomaly-detection/config/default-configs/numalogic_config.yaml

This file was deleted.

This file was deleted.

13 changes: 0 additions & 13 deletions udf/anomaly-detection/config/user-configs/config.yaml

This file was deleted.

4 changes: 2 additions & 2 deletions udf/anomaly-detection/src/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import logging
import os

from src._config import UnifiedConf, MetricConf, DataStreamConf, PipelineConf, Configs
from src._config import UnifiedConf, StreamConf, PipelineConf, Configs


def get_logger(name):
Expand All @@ -25,4 +25,4 @@ def get_logger(name):
return logger


__all__ = ["get_logger", "UnifiedConf", "MetricConf", "DataStreamConf", "Configs", "PipelineConf"]
__all__ = ["get_logger", "UnifiedConf", "StreamConf", "Configs", "PipelineConf"]
16 changes: 5 additions & 11 deletions udf/anomaly-detection/src/_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,34 +29,28 @@ class StaticThreshold:
weight: float = 0.0


@dataclass
class MetricConf:
metric: str
retrain_conf: ReTrainConf = field(default_factory=lambda: ReTrainConf())
static_threshold: StaticThreshold = field(default_factory=lambda: StaticThreshold())
numalogic_conf: NumalogicConf = MISSING


class DataSource(str, Enum):
PROMETHEUS = "prometheus"
DRUID = "druid"


@dataclass
class DataStreamConf:
class StreamConf:
name: str = "default"
source: str = DataSource.PROMETHEUS.value
window_size: int = 12
composite_keys: List[str] = field(default_factory=list)
metrics: List[str] = field(default_factory=list)
metric_configs: List[MetricConf] = field(default_factory=list)
retrain_conf: ReTrainConf = field(default_factory=lambda: ReTrainConf())
static_threshold: StaticThreshold = field(default_factory=lambda: StaticThreshold())
numalogic_conf: NumalogicConf = MISSING
unified_config: UnifiedConf = field(default_factory=lambda: UnifiedConf())
druid_fetcher: DruidFetcherConf = MISSING


@dataclass
class Configs:
configs: List[DataStreamConf]
configs: List[StreamConf]


@dataclass
Expand Down
1 change: 1 addition & 0 deletions udf/anomaly-detection/src/connectors/_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
class PrometheusConf:
server: str
pushgateway: str
scrape_interval: int = 30


@dataclass
Expand Down
39 changes: 27 additions & 12 deletions udf/anomaly-detection/src/connectors/druid.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,16 @@ def __init__(self, url: str, endpoint: str):
self.client = PyDruid(url, endpoint)

def fetch_data(
self,
datasource: str,
filter_keys: list[str],
filter_values: list[str],
dimensions: list[str],
granularity: str = "minute",
aggregations: dict = None,
group_by: list[str] = None,
pivot: Pivot = None,
hours: float = 24,
self,
datasource: str,
filter_keys: list[str],
filter_values: list[str],
dimensions: list[str],
granularity: str = "minute",
aggregations: dict = None,
group_by: list[str] = None,
pivot: Pivot = None,
hours: float = 24,
) -> pd.DataFrame:
_start_time = time.time()
filter_pairs = {}
Expand Down Expand Up @@ -67,7 +67,7 @@ def fetch_data(
logging.warning("No data found for keys %s", filter_pairs)
return pd.DataFrame()

df["timestamp"] = pd.to_datetime(df["timestamp"]).astype("int64") // 10**6
df["timestamp"] = pd.to_datetime(df["timestamp"]).astype("int64") // 10 ** 6

if group_by:
df = df.groupby(by=group_by).sum().reset_index()
Expand All @@ -85,4 +85,19 @@ def fetch_data(
filter_pairs,
df.shape,
)
return df
return df


fetcher = DruidFetcher("https://getafix.odldruid-prd.a.intuit.com/", "druid/v2")
df = fetcher.fetch_data(
datasource="tech-ip-customer-interaction-metrics",
filter_keys=["assetId"],
filter_values=["5984175597303660107"],
dimensions=["ciStatus"],
group_by=["timestamp", "ciStatus"],
pivot=Pivot(index="timestamp", columns=["ciStatus"], value=["count"]),
aggregations={"count": {"type": "doubleSum", "fieldName": "count", "name": "count"}},
hours=240,
)
df.to_csv("test.csv")
print(df)
25 changes: 8 additions & 17 deletions udf/anomaly-detection/src/entities.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class _BasePayload:

@dataclass
class TrainerPayload(_BasePayload):
metric: str
metrics: List[str]
header: Header = Header.TRAIN_REQUEST

def to_json(self):
Expand All @@ -55,9 +55,9 @@ class StreamPayload(_BasePayload):
raw_data: Matrix
metrics: List[str]
timestamps: List[int]
status: Dict[str, Status] = field(default_factory=dict)
header: Dict[str, Header] = field(default_factory=dict)
metadata: Dict[str, Dict[str, Any]] = field(default_factory=dict)
status: Status = field(default_factory=lambda x: Status.RAW)
header: Header = field(default_factory=lambda x: Header.MODEL_INFERENCE)
metadata: Dict[str, Any] = field(default_factory=dict)

def get_df(self, original=False) -> pd.DataFrame:
return pd.DataFrame(self.get_data(original), columns=self.metrics)
Expand All @@ -70,25 +70,16 @@ def set_metric_data(self, metric: str, arr: Matrix) -> None:
_df[metric] = arr
self.set_data(np.asarray(_df.values.tolist()))

def get_metric_arr(self, metric: str) -> npt.NDArray[float]:
return self.get_df()[metric].values

def get_data(self, original=False) -> npt.NDArray[float]:
if original:
return np.asarray(self.raw_data)
return np.asarray(self.data)

def set_status(self, metric: str, status: Status) -> None:
self.status[metric] = status

def set_header(self, metric: str, header: Header) -> None:
self.header[metric] = header
def set_status(self, status: Status) -> None:
self.status = status

def set_metric_metadata(self, metric: str, key: str, value) -> None:
if metric in self.metadata.keys():
self.metadata[metric][key] = value
else:
self.metadata[metric] = {key: value}
def set_header(self, header: Header) -> None:
self.header = header

def set_metadata(self, key: str, value) -> None:
self.metadata[key] = value
Expand Down
10 changes: 5 additions & 5 deletions udf/anomaly-detection/src/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from numalogic.config import PostprocessFactory, ModelInfo
from numalogic.models.threshold import SigmoidThreshold

from src import get_logger, MetricConf
from src import get_logger
from src._config import StaticThreshold
from src.connectors.prometheus import Prometheus
from src.entities import TrainerPayload, Matrix
Expand Down Expand Up @@ -76,7 +76,6 @@ def is_host_reachable(hostname: str, port=None, max_retries=5, sleep_sec=5) -> b

def fetch_data(
payload: TrainerPayload,
metric_config: MetricConf,
labels: dict,
return_labels=None,
hours: int = 36,
Expand All @@ -89,12 +88,12 @@ def fetch_data(
start_dt = end_dt - timedelta(hours=hours)

df = datafetcher.query_metric(
metric_name=payload.composite_keys["name"],
metric_name=payload.composite_keys[1],
labels_map=labels,
return_labels=return_labels,
start=start_dt.timestamp(),
end=end_dt.timestamp(),
step=metric_config.scrape_interval,
step=prometheus_conf.scrape_interval,
)
_LOGGER.info(
"%s - Time taken to fetch data: %s, for df shape: %s",
Expand Down Expand Up @@ -170,7 +169,7 @@ def get_static_score(self, x_arr) -> float:
static_score = np.mean(static_scores)
return self.postproc_clf.transform(static_score)

def get_norm_score(self, x_arr: Matrix):
def get_norm_score(self, x_arr: Matrix) -> float:
"""
Returns the normalized window score
Expand All @@ -180,6 +179,7 @@ def get_norm_score(self, x_arr: Matrix):
Returns:
Score for the window
"""

win_score = np.mean(x_arr)
return self.postproc_clf.transform(win_score)

Expand Down
Loading

0 comments on commit 132cc25

Please sign in to comment.