Skip to content

Commit

Permalink
feat: wavefront fetcher
Browse files Browse the repository at this point in the history
Signed-off-by: Avik Basu <[email protected]>
  • Loading branch information
ab93 committed Sep 17, 2024
1 parent f6ce546 commit 59d6af4
Show file tree
Hide file tree
Showing 4 changed files with 360 additions and 0 deletions.
2 changes: 2 additions & 0 deletions numalogic/connectors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
RDSFetcherConf,
)
from numalogic.connectors.prometheus import PrometheusFetcher
from numalogic.connectors.wavefront import WavefrontFetcher

__all__ = [
"RedisConf",
Expand All @@ -23,6 +24,7 @@
"RDSFetcher",
"RDSConf",
"RDSFetcherConf",
"WavefrontFetcher",
]

if find_spec("boto3"):
Expand Down
154 changes: 154 additions & 0 deletions numalogic/connectors/wavefront.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
import os
from datetime import datetime
from typing import Optional

import pandas as pd
from wavefront_api_client import Configuration, QueryApi, ApiClient

from numalogic.connectors._base import DataFetcher
from numalogic.tools.exceptions import WavefrontFetcherError

import logging

LOGGER = logging.getLogger(__name__)


class WavefrontFetcher(DataFetcher):
"""
Fetches data from Wavefront.
Args:
url (str): Wavefront URL.
api_token (str): Wavefront API token.
Raises:
ValueError: If API token is not provided.
WavefrontFetcherError: If there is an error fetching data from Wavefront.
"""

Check failure on line 27 in numalogic/connectors/wavefront.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (D407)

numalogic/connectors/wavefront.py:17:5: D407 Missing dashed underline after section ("Raises")

Check failure on line 27 in numalogic/connectors/wavefront.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (D406)

numalogic/connectors/wavefront.py:17:5: D406 Section name should end with a newline ("Raises")

def __init__(self, url: str, api_token: Optional[str] = None):
super().__init__(url)
api_token = api_token or os.getenv("WAVEFRONT_API_TOKEN")
if not api_token:
raise ValueError("WAVEFRONT API token is not provided")
configuration = Configuration()
configuration.host = url
configuration.api_key["X-AUTH-TOKEN"] = api_token
self.api_client = QueryApi(
ApiClient(
configuration,
header_name="Authorization",
header_value=f"Bearer {api_token}",
)
)

def _call_api(
self, query: str, start: int, end: Optional[int], granularity: str
) -> pd.DataFrame:
"""Calls the Wavefront API to fetch data."""
return self.api_client.query_api(
query, start, granularity, e=end, include_obsolete_metrics=True, use_raw_qk=True
)

@staticmethod
def _format_results(res: dict) -> pd.DataFrame:
"""Validates and formats the results from the API."""
if res.get("error_type") is not None:
raise WavefrontFetcherError(
f"Error fetching data from Wavefront: {res.get('error_type')}: {res.get('error_message')}"

Check failure on line 58 in numalogic/connectors/wavefront.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (E501)

numalogic/connectors/wavefront.py:58:101: E501 Line too long (106 > 100 characters)
)
if res.get("timeseries") is None:
raise WavefrontFetcherError("No timeseries data found for the query")
dfs = []
for ts in res["timeseries"]:
dfs.append(pd.DataFrame(ts["data"], columns=["timestamp", "value"]))
df = pd.concat(dfs)
df["timestamp"] = pd.to_datetime(df["timestamp"], unit="s")
df = df.set_index("timestamp").sort_index()
return df

Check failure on line 68 in numalogic/connectors/wavefront.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (RET504)

numalogic/connectors/wavefront.py:68:16: RET504 Unnecessary assignment to `df` before `return` statement

def fetch(
self,
metric: str,
start: datetime,
filters: Optional[dict] = None,
end: Optional[datetime] = None,
granularity: str = "m",
) -> pd.DataFrame:
"""
Fetches data from Wavefront as a single metric.
Args:
metric (str): Metric to fetch. Example: 'system.cpu.usage'. Do not include the 'ts()' function.

Check failure on line 82 in numalogic/connectors/wavefront.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (E501)

numalogic/connectors/wavefront.py:82:101: E501 Line too long (107 > 100 characters)
start (datetime): Start time.
filters (dict): Filters to apply to the query.
end (datetime): End time. Set to None to fetch data until now.
granularity (str): Granularity of the data. Default is 'm' (minute).
Returns:
Dataframe with the fetched data in the format: timestamp (index), value (column).
Raises:
WavefrontFetcherError: If there is an error fetching data from Wavefront
"""

Check failure on line 93 in numalogic/connectors/wavefront.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (D407)

numalogic/connectors/wavefront.py:78:9: D407 Missing dashed underline after section ("Returns")

Check failure on line 93 in numalogic/connectors/wavefront.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (D406)

numalogic/connectors/wavefront.py:78:9: D406 Section name should end with a newline ("Returns")

Check failure on line 93 in numalogic/connectors/wavefront.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (D407)

numalogic/connectors/wavefront.py:78:9: D407 Missing dashed underline after section ("Raises")

Check failure on line 93 in numalogic/connectors/wavefront.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (D406)

numalogic/connectors/wavefront.py:78:9: D406 Section name should end with a newline ("Raises")
start = int(start.timestamp())
if end:
end = int(end.timestamp())
if filters:
_filters = " and ".join([f'{key}="{value}"' for key, value in filters.items()])
query = f"ts({metric}, {_filters})"
else:
query = f"ts({metric}"
LOGGER.info(f"Fetching data from Wavefront for query: {query}")
res = self._call_api(query, start, end, granularity)
return self._format_results(res.to_dict())

def raw_fetch(
self,
query: str,
start: datetime,
filters: Optional[dict] = None,
end: Optional[datetime] = None,
granularity: str = "m",
) -> pd.DataFrame:
"""
Fetches data from Wavefront using a raw query, allowing for more complex queries.
Args:
query (str): Raw query to fetch data.
start (datetime): Start time.
filters (dict): Filters to apply to the query.
end (datetime): End time. Set to None to fetch data until now.
granularity (str): Granularity of the data. Default is 'm' (minute).
Returns:
Dataframe with the fetched data in the format: timestamp (index), value (column).
Raises:
WavefrontFetcherError:
- If there is an error fetching data from Wavefront
- If there is a key error in the query.
>>> from datetime import datetime, timedelta
...
>>> fetcher = WavefrontFetcher(url="https://miata.wavefront.com", api_token="6spd-manual")
>>> df = fetcher.raw_fetch(
... query="rawsum(ts(engine.rpm, gear='{gear}' and track='{track}'))",
... start=datetime.now() - timedelta(minutes=5),
... filters={"gear": "1", "track": "laguna_seca"},
... end=datetime.now(),
... )
"""

Check failure on line 141 in numalogic/connectors/wavefront.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (D202)

numalogic/connectors/wavefront.py:114:9: D202 No blank lines allowed after function docstring (found 1)

start = start.timestamp()
if end:
end = end.timestamp()

try:
query = query.format(**filters)
except KeyError as key_err:
raise WavefrontFetcherError(f"Key error in query: {key_err}")

LOGGER.info(f"Fetching data from Wavefront for query: {query}")
qres = self._call_api(query, start, granularity, end)
return self._format_results(qres.to_dict())
6 changes: 6 additions & 0 deletions numalogic/tools/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,3 +122,9 @@ class RDSFetcherError(Exception):
"""Base class for all exceptions raised by the RDSFetcher class."""

pass


class WavefrontFetcherError(Exception):
"""Base class for all exceptions raised by the WavefrontFetcher class."""

pass
198 changes: 198 additions & 0 deletions tests/connectors/test_wf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
from copy import copy
from datetime import datetime, timedelta

import pytest
from wavefront_api_client import QueryResult

from numalogic.connectors import WavefrontFetcher
from numalogic.tools.exceptions import WavefrontFetcherError

DUMMY_URL = "https://dummy.wavefront.com"
DUMMY_TOKEN = "dummy_token"
DUMMY_OUT = QueryResult(
**{
"dimensions": None,
"error_message": None,
"error_type": None,
"events": None,
"granularity": 60,
"name": "ts(iks.namespace.kube.hpa.status.desired.replicas, "
"cluster='fdp-prd-usw2-k8s' and "
"namespace='fdp-documentservice-usw2-prd') - "
"ts(iks.namespace.app.pod.count, cluster='fdp-prd-usw2-k8s' and "
"namespace='fdp-documentservice-usw2-prd')",
"query": "ts(iks.namespace.kube.hpa.status.desired.replicas, "
"cluster='fdp-prd-usw2-k8s' and "
"namespace='fdp-documentservice-usw2-prd') - "
"ts(iks.namespace.app.pod.count, cluster='fdp-prd-usw2-k8s' and "
"namespace='fdp-documentservice-usw2-prd')",
"spans": None,
"stats": {
"buffer_keys": 72,
"cached_compacted_keys": None,
"compacted_keys": 3,
"compacted_points": 357,
"cpu_ns": 398618692,
"distributions": 0,
"edges": 0,
"hosts_used": None,
"keys": 73,
"latency": 413,
"metrics": 427,
"metrics_used": None,
"points": 427,
"queries": 17,
"query_tasks": 0,
"s3_keys": 0,
"skipped_compacted_keys": 4,
"spans": 0,
"summaries": 427,
},
"timeseries": [
{
"data": [
[1726533000.0, 0.0],
[1726533060.0, 0.0],
[1726533120.0, 0.0],
[1726533180.0, 0.0],
[1726533240.0, 0.0],
[1726533300.0, 0.0],
[1726533360.0, 0.0],
[1726533420.0, 0.0],
[1726533480.0, 0.0],
[1726533540.0, 0.0],
[1726533600.0, 0.0],
[1726533660.0, 0.0],
[1726533720.0, 0.0],
[1726533780.0, 0.0],
[1726533840.0, 0.0],
[1726533900.0, 0.0],
[1726533960.0, 0.0],
[1726534020.0, 0.0],
],
"host": "10.176.157.157:8080",
"label": "iks.namespace.kube.hpa.status.desired.replicas",
"tags": {
"assetId": "4615081310646958673",
"bu": "ip",
"cluster": "fdp-prd-usw2-k8s",
"container": "kube-state-metrics",
"endpoint": "http-metrics",
"env": "prod",
"horizontalpodautoscaler": "document-service-rollout-hpa",
"job": "kube-state-metrics-v2",
"namespace": "fdp-documentservice-usw2-prd",
"pod": "kube-state-metrics-v2-fc68fc5fb-kjzdc",
"prometheus": "addon-metricset-ns/k8s-prometheus",
"prometheus.replica": "prometheus-k8s-prometheus-0",
"service": "kube-state-metrics-v2",
},
}
],
"trace_dimensions": [],
"traces": None,
"warnings": None,
}
)

DUMMY_OUT_ERR = copy(DUMMY_OUT)
DUMMY_OUT_ERR.error_type = "QuerySyntaxError"
DUMMY_OUT_ERR.error_message = "Invalid query"

DUMMY_OUT_NO_TS = copy(DUMMY_OUT)
DUMMY_OUT_NO_TS.timeseries = None


@pytest.fixture
def wavefront_fetcher():
return WavefrontFetcher(
url=DUMMY_URL,
api_token=DUMMY_TOKEN,
)


def test_init():
with pytest.raises(ValueError):
WavefrontFetcher(url=DUMMY_URL)


def test_fetch_01(wavefront_fetcher, mocker):
mocker.patch.object(wavefront_fetcher, "_call_api", return_value=DUMMY_OUT)

df = wavefront_fetcher.fetch(
metric="iks.namespace.kube.hpa.status.desired.replicas",
start=datetime.now() - timedelta(days=1),
filters={"cluster": "fdp-prd-usw2-k8s", "namespace": "fdp-documentservice-usw2-prd"},
end=datetime.now(),
)
assert df.shape == (18, 1)
assert df.columns == ["value"]
assert df.index.name == "timestamp"


def test_fetch_02(wavefront_fetcher, mocker):
mocker.patch.object(wavefront_fetcher, "_call_api", return_value=DUMMY_OUT)

df = wavefront_fetcher.fetch(
metric="iks.namespace.kube.hpa.status.desired.replicas",
start=datetime.now() - timedelta(days=1),
end=datetime.now(),
)
assert df.shape == (18, 1)
assert df.columns == ["value"]
assert df.index.name == "timestamp"


def test_raw_fetch(wavefront_fetcher, mocker):
mocker.patch.object(wavefront_fetcher, "_call_api", return_value=DUMMY_OUT)

df = wavefront_fetcher.raw_fetch(
query="ts(iks.namespace.kube.hpa.status.desired.replicas, cluster='{cluster}' and "
"namespace='{namespace}') - ts(iks.namespace.app.pod.count, cluster='{cluster}' and "
"namespace='{namespace}')",
start=datetime.now() - timedelta(minutes=5),
filters={"cluster": "fdp-prd-usw2-k8s", "namespace": "fdp-documentservice-usw2-prd"},
end=datetime.now(),
)
assert df.shape == (18, 1)
assert df.columns == ["value"]
assert df.index.name == "timestamp"


def test_fetch_err_01(wavefront_fetcher, mocker):
mocker.patch.object(wavefront_fetcher, "_call_api", return_value=DUMMY_OUT_ERR)

with pytest.raises(WavefrontFetcherError):
wavefront_fetcher.fetch(
metric="some_metric",
start=datetime.now() - timedelta(days=1),
filters={"cluster": "some_cluster", "namespace": "some_ns"},
end=datetime.now(),
)


def test_fetch_err_02(wavefront_fetcher, mocker):

mocker.patch.object(wavefront_fetcher, "_call_api", return_value=DUMMY_OUT_NO_TS)

with pytest.raises(WavefrontFetcherError):
wavefront_fetcher.fetch(
metric="some_metric",
start=datetime.now() - timedelta(days=1),
filters={"cluster": "some_cluster", "namespace": "some_ns"},
end=datetime.now(),
)


def test_raw_fetch_err(wavefront_fetcher, mocker):
mocker.patch.object(wavefront_fetcher, "_call_api", return_value=DUMMY_OUT)

with pytest.raises(WavefrontFetcherError):
wavefront_fetcher.raw_fetch(
query="ts(iks.namespace.kube.hpa.status.desired.replicas, cluster='{cluster}' and "
"namespace='{namespace}') - ts(iks.namespace.app.pod.count, cluster='{cluster}' and "
"namespace='{namespace}')",
start=datetime.now() - timedelta(minutes=5),
filters={"randomkey": "fdp-prd-usw2-k8s", "namespace": "fdp-documentservice-usw2-prd"},
end=datetime.now(),
)

0 comments on commit 59d6af4

Please sign in to comment.