Skip to content

Commit

Permalink
feat: support multichannel (#233)
Browse files Browse the repository at this point in the history
Support Multichannel

---------

Signed-off-by: kbatra <[email protected]>
  • Loading branch information
s0nicboOm authored Jul 19, 2023
1 parent 8eb3ca6 commit 6fbff1c
Show file tree
Hide file tree
Showing 20 changed files with 588 additions and 566 deletions.
206 changes: 103 additions & 103 deletions udf/anomaly-detection/poetry.lock

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions udf/anomaly-detection/src/_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ class ReTrainConf:

@dataclass
class StaticThreshold:
upper_limit: int = 3
weight: float = 0.0
upper_limit: List[int] = field(default_factory=list)

Check failure on line 28 in udf/anomaly-detection/src/_config.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (UP006)

udf/anomaly-detection/src/_config.py:28:18: UP006 Use `list` instead of `List` for type annotation
weight: List[float] = field(default_factory=list)

Check failure on line 29 in udf/anomaly-detection/src/_config.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (UP006)

udf/anomaly-detection/src/_config.py:29:13: UP006 Use `list` instead of `List` for type annotation


class DataSource(str, Enum):
Expand Down
17 changes: 4 additions & 13 deletions udf/anomaly-detection/src/connectors/druid.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import time
from collections.abc import Hashable
from typing import Sequence

import pytz
import logging
Expand Down Expand Up @@ -78,6 +80,8 @@ def fetch_data(
columns=pivot.columns,
values=pivot.value,
)
df.columns = df.columns.map('{0[1]}'.format)
df.reset_index(inplace=True)

_LOGGER.info(
"Time taken to fetch data: %s, for keys: %s, for df shape: %s",
Expand All @@ -88,16 +92,3 @@ def fetch_data(
return df


fetcher = DruidFetcher("https://getafix.odldruid-prd.a.intuit.com/", "druid/v2")
df = fetcher.fetch_data(
datasource="tech-ip-customer-interaction-metrics",
filter_keys=["assetId"],
filter_values=["5984175597303660107"],
dimensions=["ciStatus"],
group_by=["timestamp", "ciStatus"],
pivot=Pivot(index="timestamp", columns=["ciStatus"], value=["count"]),
aggregations={"count": {"type": "doubleSum", "fieldName": "count", "name": "count"}},
hours=240,
)
df.to_csv("test.csv")
print(df)
4 changes: 2 additions & 2 deletions udf/anomaly-detection/src/entities.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ class StreamPayload(_BasePayload):
raw_data: Matrix
metrics: List[str]
timestamps: List[int]
status: Status = field(default_factory=lambda x: Status.RAW)
header: Header = field(default_factory=lambda x: Header.MODEL_INFERENCE)
status: Status = Status.RAW
header: Header = Header.MODEL_INFERENCE
metadata: Dict[str, Any] = field(default_factory=dict)

def get_df(self, original=False) -> pd.DataFrame:
Expand Down
29 changes: 16 additions & 13 deletions udf/anomaly-detection/src/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,10 @@ def is_host_reachable(hostname: str, port=None, max_retries=5, sleep_sec=5) -> b


def fetch_data(
payload: TrainerPayload,
labels: dict,
return_labels=None,
hours: int = 36,
payload: TrainerPayload,
labels: dict,
return_labels=None,
hours: int = 36,
) -> pd.DataFrame:
_start_time = time.time()
prometheus_conf = ConfigManager.get_prom_config()
Expand Down Expand Up @@ -108,8 +108,10 @@ def calculate_static_thresh(x_arr: Matrix, static_threshold: StaticThreshold) ->
"""
Calculates anomaly scores using static thresholding.
"""
static_clf = SigmoidThreshold(upper_limit=static_threshold.upper_limit)
static_scores = static_clf.score_samples(x_arr)
static_scores = np.zeros(x_arr.shape)
for col in range(x_arr.shape[1]):
static_clf = SigmoidThreshold(upper_limit=static_threshold.upper_limit[col])
static_scores[:, col] = static_clf.score_samples(x_arr[:, col])
return static_scores


Expand All @@ -126,11 +128,11 @@ class WindowScorer:

def __init__(self, static_thresh: StaticThreshold, postprocess_conf: ModelInfo):
self.static_thresh = static_thresh
self.model_wt = 1.0 - self.static_thresh.weight
self.model_wt = 1.0 - np.array(self.static_thresh.weight)
postproc_factory = PostprocessFactory()
self.postproc_clf = postproc_factory.get_instance(postprocess_conf)

def get_ensemble_score(self, x_arr: Matrix) -> float:
def get_ensemble_score(self, x_arr: Matrix) -> np.ndarray:
"""
Returns the final normalized window score.
Expand All @@ -150,11 +152,12 @@ def get_ensemble_score(self, x_arr: Matrix) -> float:

norm_static_score = self.get_static_score(x_arr)
ensemble_score = (self.static_thresh.weight * norm_static_score) + (
self.model_wt * norm_score
self.model_wt * norm_score
)

return ensemble_score

def get_static_score(self, x_arr) -> float:
def get_static_score(self, x_arr) -> np.ndarray:
"""
Returns the normalized window score
calculated using the static threshold estimator.
Expand All @@ -166,10 +169,10 @@ def get_static_score(self, x_arr) -> float:
Score for the window
"""
static_scores = calculate_static_thresh(x_arr, self.static_thresh)
static_score = np.mean(static_scores)
static_score = np.mean(static_scores, axis=0)
return self.postproc_clf.transform(static_score)

def get_norm_score(self, x_arr: Matrix) -> float:
def get_norm_score(self, x_arr: Matrix) -> np.ndarray:
"""
Returns the normalized window score
Expand All @@ -180,7 +183,7 @@ def get_norm_score(self, x_arr: Matrix) -> float:
Score for the window
"""

win_score = np.mean(x_arr)
win_score = np.mean(x_arr, axis=0)
return self.postproc_clf.transform(win_score)

def adjust_weights(self):
Expand Down
18 changes: 9 additions & 9 deletions udf/anomaly-detection/src/udf/inference.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def _run_inference(
) -> np.ndarray:
model = artifact_data.artifact
win_size = ConfigManager.get_stream_config(config_name=keys[0]).window_size
data_arr = payload.get_data().reshape(-1, 1)
data_arr = payload.get_data()
stream_loader = DataLoader(StreamingDataset(data_arr, win_size))

trainer = AutoencoderTrainer()
Expand All @@ -54,7 +54,7 @@ def _run_inference(
)
raise RuntimeError("Failed to infer") from err
_LOGGER.info("%s - Successfully inferred: Keys: %s, Metric: %s", payload.uuid, keys, payload.metrics)
return recon_err.numpy().flatten()
return recon_err.numpy()

def inference(
self, keys: List[str], payload: StreamPayload
Expand Down Expand Up @@ -105,8 +105,8 @@ def inference(

_LOGGER.info(
"%s - Loaded artifact data from %s",
uuid=payload.uuid,
source=artifact_data.extras.get("source"),
payload.uuid,
artifact_data.extras.get("source"),
)
if RedisRegistry.is_artifact_stale(artifact_data, int(retrain_config.retrain_freq_hr)) \
and artifact_data.extras.get("source") == "registry":
Expand All @@ -120,7 +120,7 @@ def inference(

# Generate predictions
try:
x_infered = self._run_inference(keys, payload, artifact_data)
x_inferred = self._run_inference(keys, payload, artifact_data)
except RuntimeError:
_LOGGER.info(
"%s - Failed to infer, forwarding for static thresholding. Keys: %s, Metric: %s",
Expand All @@ -130,7 +130,7 @@ def inference(
)
return None, Status.RUNTIME_ERROR, Header.STATIC_INFERENCE, -1

return x_infered, Status.INFERRED, header, int(artifact_data.extras.get("version"))
return x_inferred, Status.INFERRED, header, int(artifact_data.extras.get("version"))

def run(self, keys: List[str], datum: Datum) -> Messages:
_start_time = time.perf_counter()
Expand All @@ -145,13 +145,13 @@ def run(self, keys: List[str], datum: Datum) -> Messages:

messages = Messages()
# Perform inference
x_infered, status, header, version = self.inference(keys, payload)
x_inferred, status, header, version = self.inference(keys, payload)
payload.set_status(status=status)
payload.set_header(header=header)
payload.set_metadata(key="model_version", value=version)

if x_infered is not None:
payload.set_data(arr=x_infered)
if x_inferred is not None:
payload.set_data(arr=x_inferred)

messages.append(Message(keys=keys, value=payload.to_json()))
_LOGGER.info("%s - Sending Msg: { Keys: %s, Payload: %r }", payload.uuid, keys, payload)
Expand Down
14 changes: 4 additions & 10 deletions udf/anomaly-detection/src/udf/postprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,8 @@ class Postprocess:
def postprocess(cls, keys: list[str], payload: StreamPayload) -> (float, dict):
static_thresh = ConfigManager.get_static_threshold_config(config_name=keys[0])
postprocess_conf = ConfigManager.get_postprocess_config(config_name=keys[0])
unified_config = ConfigManager.get_stream_config(config_name=keys[0]).unified_config

# TODO: Implement weighted average or max strategy for unified anomaly
# if weights:
# weighted_anomalies = np.multiply(scores, unified_weights)
# unified_anomaly = float(np.sum(weighted_anomalies) / np.sum(unified_weights))

# Compute score using static thresholding
# Compute static threshold score if header is static inference
metric_arr = payload.get_data()
win_scorer = WindowScorer(static_thresh, postprocess_conf)
if payload.header == Header.STATIC_INFERENCE:
Expand All @@ -52,10 +46,10 @@ def postprocess(cls, keys: list[str], payload: StreamPayload) -> (float, dict):

# TODO: construct map
metric_scores = {}
for metric in payload.metrics:
metric_scores[metric] = 0
for i in range(len(payload.metrics)):
metric_scores[payload.metrics[i]] = final_score[i]

return final_score, metric_scores
return cls.get_unified_anomaly(keys, final_score.tolist(), payload), metric_scores

@classmethod
def get_unified_anomaly(
Expand Down
4 changes: 2 additions & 2 deletions udf/anomaly-detection/src/udf/preprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,9 @@ def preprocess(
return None, Status.ARTIFACT_NOT_FOUND

# Perform preprocessing
x_raw = payload.get_data().reshape(-1, 1)
x_raw = payload.get_data()
preproc_clf = preproc_artifact.artifact
x_scaled = preproc_clf.transform(x_raw).flatten()
x_scaled = preproc_clf.transform(x_raw)
_LOGGER.info(
"%s - Successfully preprocessed, Keys: %s, Metrics: %s, x_scaled: %s",
payload.uuid,
Expand Down
48 changes: 23 additions & 25 deletions udf/anomaly-detection/src/udf/threshold.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def __init__(self):
cache_registry=local_cache)

def threshold(
self, keys: List[str], metric: str, payload: StreamPayload
self, keys: List[str], payload: StreamPayload
) -> (np.ndarray, Status, Header, int):
metric_arr = payload.get_data()

Expand Down Expand Up @@ -98,30 +98,28 @@ def run(self, keys: List[str], datum: Datum) -> Messages:

messages = Messages()

# Perform threshold for each metric
for metric in payload.metrics:
y_score, status, header, version = self.threshold(keys, metric, payload)
payload.set_status(status=status)
payload.set_header(header=header)
payload.set_metadata(key="model_version", value=version)

if y_score is not None:
payload.set_data(arr=y_score)

if y_score is None or header == Header.MODEL_STALE or status == Status.ARTIFACT_NOT_FOUND:
train_payload = TrainerPayload(
uuid=payload.uuid, composite_keys=keys, metrics=payload.metrics
)
_LOGGER.info(
"%s - Sending Msg: { Keys: %s, Tags:%s, Payload: %s }",
payload.uuid,
keys,
[TRAIN_VTX_KEY],
train_payload,
)
messages.append(
Message(keys=keys, value=train_payload.to_json(), tags=[TRAIN_VTX_KEY])
)
y_score, status, header, version = self.threshold(keys, payload)
payload.set_status(status=status)
payload.set_header(header=header)
payload.set_metadata(key="model_version", value=version)

if y_score is not None:
payload.set_data(arr=y_score)

if y_score is None or header == Header.MODEL_STALE or status == Status.ARTIFACT_NOT_FOUND:
train_payload = TrainerPayload(
uuid=payload.uuid, composite_keys=keys, metrics=payload.metrics
)
_LOGGER.info(
"%s - Sending Msg: { Keys: %s, Tags:%s, Payload: %s }",
payload.uuid,
keys,
[TRAIN_VTX_KEY],
train_payload,
)
messages.append(
Message(keys=keys, value=train_payload.to_json(), tags=[TRAIN_VTX_KEY])
)

messages.append(Message(keys=keys, value=payload.to_json(), tags=[POSTPROC_VTX_KEY]))
_LOGGER.info(
Expand Down
25 changes: 14 additions & 11 deletions udf/anomaly-detection/src/udsink/train.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import os
import time

import numpy as np
import orjson
import pandas as pd
from typing import List, Iterator
Expand Down Expand Up @@ -33,6 +35,14 @@
REQUEST_EXPIRY = int(os.getenv("REQUEST_EXPIRY", 300))


def get_feature_df(data: pd.DataFrame, metrics: list):
for col in metrics:
if col not in data:
data.loc[:, col] = 0
data.fillna(0, inplace=True)
return data[metrics]


class Train:
@classmethod
def fetch_prometheus_data(cls, payload: TrainerPayload) -> pd.DataFrame:
Expand Down Expand Up @@ -153,7 +163,6 @@ def _train_and_save(
# TODO: filter the metrics here

x_train, preproc_clf = self._preprocess(train_df.to_numpy(), preproc_cfgs)

trainer_cfg = numalogic_conf.trainer
x_reconerr, anomaly_model, trainer = self._train_model(
payload.uuid, x_train, model_cfg, trainer_cfg
Expand Down Expand Up @@ -237,20 +246,14 @@ def run(self, datums: Iterator[Datum]) -> Responses:
is_new = self._is_new_request(redis_client, payload)

if not is_new:
# _LOGGER.debug(
# "%s - Skipping train request with keys: %s, metric: %s",
# payload.uuid,
# payload.composite_keys,
# payload.metric,
# )
responses.append(Response.as_success(_datum.id))
continue

retrain_config = ConfigManager.get_retrain_config(payload.composite_keys[0])
numalogic_config = ConfigManager.get_numalogic_config(payload.composite_keys[0])

try:
train_df = self.fetch_data(payload)
df = self.fetch_data(payload)
except Exception as err:
_LOGGER.error(
"%s - Error while fetching data for keys: %s, metrics: %s, err: %r",
Expand All @@ -262,19 +265,19 @@ def run(self, datums: Iterator[Datum]) -> Responses:
responses.append(Response.as_success(_datum.id))
continue

if len(train_df) < retrain_config.min_train_size:
if len(df) < retrain_config.min_train_size:
_LOGGER.warning(
"%s - Skipping training, train data less than minimum required: %s, df shape: %s",
payload.uuid,
retrain_config.min_train_size,
train_df.shape,
df.shape,
)
responses.append(Response.as_success(_datum.id))
continue

train_df = get_feature_df(df, payload.metrics)
self._train_and_save(numalogic_config, payload, redis_client, train_df)

responses.append(Response.as_success(_datum.id))

return responses

Loading

0 comments on commit 6fbff1c

Please sign in to comment.