From 59d6af4eeb3765c22b7f12325b9f58b8cf7fccf0 Mon Sep 17 00:00:00 2001 From: Avik Basu Date: Mon, 16 Sep 2024 18:34:58 -0700 Subject: [PATCH] feat: wavefront fetcher Signed-off-by: Avik Basu --- numalogic/connectors/__init__.py | 2 + numalogic/connectors/wavefront.py | 154 +++++++++++++++++++++++ numalogic/tools/exceptions.py | 6 + tests/connectors/test_wf.py | 198 ++++++++++++++++++++++++++++++ 4 files changed, 360 insertions(+) create mode 100644 numalogic/connectors/wavefront.py create mode 100644 tests/connectors/test_wf.py diff --git a/numalogic/connectors/__init__.py b/numalogic/connectors/__init__.py index 8c8e2d19..2ca81b57 100644 --- a/numalogic/connectors/__init__.py +++ b/numalogic/connectors/__init__.py @@ -11,6 +11,7 @@ RDSFetcherConf, ) from numalogic.connectors.prometheus import PrometheusFetcher +from numalogic.connectors.wavefront import WavefrontFetcher __all__ = [ "RedisConf", @@ -23,6 +24,7 @@ "RDSFetcher", "RDSConf", "RDSFetcherConf", + "WavefrontFetcher", ] if find_spec("boto3"): diff --git a/numalogic/connectors/wavefront.py b/numalogic/connectors/wavefront.py new file mode 100644 index 00000000..c3ee7eb1 --- /dev/null +++ b/numalogic/connectors/wavefront.py @@ -0,0 +1,154 @@ +import os +from datetime import datetime +from typing import Optional + +import pandas as pd +from wavefront_api_client import Configuration, QueryApi, ApiClient + +from numalogic.connectors._base import DataFetcher +from numalogic.tools.exceptions import WavefrontFetcherError + +import logging + +LOGGER = logging.getLogger(__name__) + + +class WavefrontFetcher(DataFetcher): + """ + Fetches data from Wavefront. + + Args: + url (str): Wavefront URL. + api_token (str): Wavefront API token. + + Raises: + ValueError: If API token is not provided. + WavefrontFetcherError: If there is an error fetching data from Wavefront. + """ + + def __init__(self, url: str, api_token: Optional[str] = None): + super().__init__(url) + api_token = api_token or os.getenv("WAVEFRONT_API_TOKEN") + if not api_token: + raise ValueError("WAVEFRONT API token is not provided") + configuration = Configuration() + configuration.host = url + configuration.api_key["X-AUTH-TOKEN"] = api_token + self.api_client = QueryApi( + ApiClient( + configuration, + header_name="Authorization", + header_value=f"Bearer {api_token}", + ) + ) + + def _call_api( + self, query: str, start: int, end: Optional[int], granularity: str + ) -> pd.DataFrame: + """Calls the Wavefront API to fetch data.""" + return self.api_client.query_api( + query, start, granularity, e=end, include_obsolete_metrics=True, use_raw_qk=True + ) + + @staticmethod + def _format_results(res: dict) -> pd.DataFrame: + """Validates and formats the results from the API.""" + if res.get("error_type") is not None: + raise WavefrontFetcherError( + f"Error fetching data from Wavefront: {res.get('error_type')}: {res.get('error_message')}" + ) + if res.get("timeseries") is None: + raise WavefrontFetcherError("No timeseries data found for the query") + dfs = [] + for ts in res["timeseries"]: + dfs.append(pd.DataFrame(ts["data"], columns=["timestamp", "value"])) + df = pd.concat(dfs) + df["timestamp"] = pd.to_datetime(df["timestamp"], unit="s") + df = df.set_index("timestamp").sort_index() + return df + + def fetch( + self, + metric: str, + start: datetime, + filters: Optional[dict] = None, + end: Optional[datetime] = None, + granularity: str = "m", + ) -> pd.DataFrame: + """ + Fetches data from Wavefront as a single metric. + + Args: + metric (str): Metric to fetch. Example: 'system.cpu.usage'. Do not include the 'ts()' function. + start (datetime): Start time. + filters (dict): Filters to apply to the query. + end (datetime): End time. Set to None to fetch data until now. + granularity (str): Granularity of the data. Default is 'm' (minute). + + Returns: + Dataframe with the fetched data in the format: timestamp (index), value (column). + + Raises: + WavefrontFetcherError: If there is an error fetching data from Wavefront + """ + start = int(start.timestamp()) + if end: + end = int(end.timestamp()) + if filters: + _filters = " and ".join([f'{key}="{value}"' for key, value in filters.items()]) + query = f"ts({metric}, {_filters})" + else: + query = f"ts({metric}" + LOGGER.info(f"Fetching data from Wavefront for query: {query}") + res = self._call_api(query, start, end, granularity) + return self._format_results(res.to_dict()) + + def raw_fetch( + self, + query: str, + start: datetime, + filters: Optional[dict] = None, + end: Optional[datetime] = None, + granularity: str = "m", + ) -> pd.DataFrame: + """ + Fetches data from Wavefront using a raw query, allowing for more complex queries. + + Args: + query (str): Raw query to fetch data. + start (datetime): Start time. + filters (dict): Filters to apply to the query. + end (datetime): End time. Set to None to fetch data until now. + granularity (str): Granularity of the data. Default is 'm' (minute). + + Returns: + Dataframe with the fetched data in the format: timestamp (index), value (column). + + Raises: + WavefrontFetcherError: + - If there is an error fetching data from Wavefront + - If there is a key error in the query. + + >>> from datetime import datetime, timedelta + ... + >>> fetcher = WavefrontFetcher(url="https://miata.wavefront.com", api_token="6spd-manual") + >>> df = fetcher.raw_fetch( + ... query="rawsum(ts(engine.rpm, gear='{gear}' and track='{track}'))", + ... start=datetime.now() - timedelta(minutes=5), + ... filters={"gear": "1", "track": "laguna_seca"}, + ... end=datetime.now(), + ... ) + """ + + start = start.timestamp() + if end: + end = end.timestamp() + + try: + query = query.format(**filters) + except KeyError as key_err: + raise WavefrontFetcherError(f"Key error in query: {key_err}") + + LOGGER.info(f"Fetching data from Wavefront for query: {query}") + qres = self._call_api(query, start, granularity, end) + return self._format_results(qres.to_dict()) diff --git a/numalogic/tools/exceptions.py b/numalogic/tools/exceptions.py index bd7d001d..6688f5c8 100644 --- a/numalogic/tools/exceptions.py +++ b/numalogic/tools/exceptions.py @@ -122,3 +122,9 @@ class RDSFetcherError(Exception): """Base class for all exceptions raised by the RDSFetcher class.""" pass + + +class WavefrontFetcherError(Exception): + """Base class for all exceptions raised by the WavefrontFetcher class.""" + + pass diff --git a/tests/connectors/test_wf.py b/tests/connectors/test_wf.py new file mode 100644 index 00000000..10648b30 --- /dev/null +++ b/tests/connectors/test_wf.py @@ -0,0 +1,198 @@ +from copy import copy +from datetime import datetime, timedelta + +import pytest +from wavefront_api_client import QueryResult + +from numalogic.connectors import WavefrontFetcher +from numalogic.tools.exceptions import WavefrontFetcherError + +DUMMY_URL = "https://dummy.wavefront.com" +DUMMY_TOKEN = "dummy_token" +DUMMY_OUT = QueryResult( + **{ + "dimensions": None, + "error_message": None, + "error_type": None, + "events": None, + "granularity": 60, + "name": "ts(iks.namespace.kube.hpa.status.desired.replicas, " + "cluster='fdp-prd-usw2-k8s' and " + "namespace='fdp-documentservice-usw2-prd') - " + "ts(iks.namespace.app.pod.count, cluster='fdp-prd-usw2-k8s' and " + "namespace='fdp-documentservice-usw2-prd')", + "query": "ts(iks.namespace.kube.hpa.status.desired.replicas, " + "cluster='fdp-prd-usw2-k8s' and " + "namespace='fdp-documentservice-usw2-prd') - " + "ts(iks.namespace.app.pod.count, cluster='fdp-prd-usw2-k8s' and " + "namespace='fdp-documentservice-usw2-prd')", + "spans": None, + "stats": { + "buffer_keys": 72, + "cached_compacted_keys": None, + "compacted_keys": 3, + "compacted_points": 357, + "cpu_ns": 398618692, + "distributions": 0, + "edges": 0, + "hosts_used": None, + "keys": 73, + "latency": 413, + "metrics": 427, + "metrics_used": None, + "points": 427, + "queries": 17, + "query_tasks": 0, + "s3_keys": 0, + "skipped_compacted_keys": 4, + "spans": 0, + "summaries": 427, + }, + "timeseries": [ + { + "data": [ + [1726533000.0, 0.0], + [1726533060.0, 0.0], + [1726533120.0, 0.0], + [1726533180.0, 0.0], + [1726533240.0, 0.0], + [1726533300.0, 0.0], + [1726533360.0, 0.0], + [1726533420.0, 0.0], + [1726533480.0, 0.0], + [1726533540.0, 0.0], + [1726533600.0, 0.0], + [1726533660.0, 0.0], + [1726533720.0, 0.0], + [1726533780.0, 0.0], + [1726533840.0, 0.0], + [1726533900.0, 0.0], + [1726533960.0, 0.0], + [1726534020.0, 0.0], + ], + "host": "10.176.157.157:8080", + "label": "iks.namespace.kube.hpa.status.desired.replicas", + "tags": { + "assetId": "4615081310646958673", + "bu": "ip", + "cluster": "fdp-prd-usw2-k8s", + "container": "kube-state-metrics", + "endpoint": "http-metrics", + "env": "prod", + "horizontalpodautoscaler": "document-service-rollout-hpa", + "job": "kube-state-metrics-v2", + "namespace": "fdp-documentservice-usw2-prd", + "pod": "kube-state-metrics-v2-fc68fc5fb-kjzdc", + "prometheus": "addon-metricset-ns/k8s-prometheus", + "prometheus.replica": "prometheus-k8s-prometheus-0", + "service": "kube-state-metrics-v2", + }, + } + ], + "trace_dimensions": [], + "traces": None, + "warnings": None, + } +) + +DUMMY_OUT_ERR = copy(DUMMY_OUT) +DUMMY_OUT_ERR.error_type = "QuerySyntaxError" +DUMMY_OUT_ERR.error_message = "Invalid query" + +DUMMY_OUT_NO_TS = copy(DUMMY_OUT) +DUMMY_OUT_NO_TS.timeseries = None + + +@pytest.fixture +def wavefront_fetcher(): + return WavefrontFetcher( + url=DUMMY_URL, + api_token=DUMMY_TOKEN, + ) + + +def test_init(): + with pytest.raises(ValueError): + WavefrontFetcher(url=DUMMY_URL) + + +def test_fetch_01(wavefront_fetcher, mocker): + mocker.patch.object(wavefront_fetcher, "_call_api", return_value=DUMMY_OUT) + + df = wavefront_fetcher.fetch( + metric="iks.namespace.kube.hpa.status.desired.replicas", + start=datetime.now() - timedelta(days=1), + filters={"cluster": "fdp-prd-usw2-k8s", "namespace": "fdp-documentservice-usw2-prd"}, + end=datetime.now(), + ) + assert df.shape == (18, 1) + assert df.columns == ["value"] + assert df.index.name == "timestamp" + + +def test_fetch_02(wavefront_fetcher, mocker): + mocker.patch.object(wavefront_fetcher, "_call_api", return_value=DUMMY_OUT) + + df = wavefront_fetcher.fetch( + metric="iks.namespace.kube.hpa.status.desired.replicas", + start=datetime.now() - timedelta(days=1), + end=datetime.now(), + ) + assert df.shape == (18, 1) + assert df.columns == ["value"] + assert df.index.name == "timestamp" + + +def test_raw_fetch(wavefront_fetcher, mocker): + mocker.patch.object(wavefront_fetcher, "_call_api", return_value=DUMMY_OUT) + + df = wavefront_fetcher.raw_fetch( + query="ts(iks.namespace.kube.hpa.status.desired.replicas, cluster='{cluster}' and " + "namespace='{namespace}') - ts(iks.namespace.app.pod.count, cluster='{cluster}' and " + "namespace='{namespace}')", + start=datetime.now() - timedelta(minutes=5), + filters={"cluster": "fdp-prd-usw2-k8s", "namespace": "fdp-documentservice-usw2-prd"}, + end=datetime.now(), + ) + assert df.shape == (18, 1) + assert df.columns == ["value"] + assert df.index.name == "timestamp" + + +def test_fetch_err_01(wavefront_fetcher, mocker): + mocker.patch.object(wavefront_fetcher, "_call_api", return_value=DUMMY_OUT_ERR) + + with pytest.raises(WavefrontFetcherError): + wavefront_fetcher.fetch( + metric="some_metric", + start=datetime.now() - timedelta(days=1), + filters={"cluster": "some_cluster", "namespace": "some_ns"}, + end=datetime.now(), + ) + + +def test_fetch_err_02(wavefront_fetcher, mocker): + + mocker.patch.object(wavefront_fetcher, "_call_api", return_value=DUMMY_OUT_NO_TS) + + with pytest.raises(WavefrontFetcherError): + wavefront_fetcher.fetch( + metric="some_metric", + start=datetime.now() - timedelta(days=1), + filters={"cluster": "some_cluster", "namespace": "some_ns"}, + end=datetime.now(), + ) + + +def test_raw_fetch_err(wavefront_fetcher, mocker): + mocker.patch.object(wavefront_fetcher, "_call_api", return_value=DUMMY_OUT) + + with pytest.raises(WavefrontFetcherError): + wavefront_fetcher.raw_fetch( + query="ts(iks.namespace.kube.hpa.status.desired.replicas, cluster='{cluster}' and " + "namespace='{namespace}') - ts(iks.namespace.app.pod.count, cluster='{cluster}' and " + "namespace='{namespace}')", + start=datetime.now() - timedelta(minutes=5), + filters={"randomkey": "fdp-prd-usw2-k8s", "namespace": "fdp-documentservice-usw2-prd"}, + end=datetime.now(), + )