Skip to content

Commit

Permalink
Merge pull request #92 from robusta-dev/mem_test_v2
Browse files Browse the repository at this point in the history
Fixed prometheus query memory issue
  • Loading branch information
Avi-Robusta authored Jul 4, 2023
2 parents 26f9f70 + 14c4ae5 commit 682c2e5
Show file tree
Hide file tree
Showing 8 changed files with 147 additions and 34 deletions.
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
from typing import Any, Optional

from robusta_krr.core.abstract.strategies import Metric

from .base_metric import BaseMetricLoader
from .base_metric import BaseMetricLoader, QueryType

PrometheusSeries = Any

Expand Down Expand Up @@ -57,6 +56,6 @@ def filter_prom_jobs_results(
return_list.append(sorted_relevant_series[0])
return return_list

async def query_prometheus(self, metric: Metric) -> list[PrometheusSeries]:
result = await super().query_prometheus(metric)
async def query_prometheus(self, metric: Metric, query_type: QueryType) -> list[PrometheusSeries]:
result = await super().query_prometheus(metric, query_type)
return self.filter_prom_jobs_results(result)
101 changes: 85 additions & 16 deletions robusta_krr/core/integrations/prometheus/metrics/base_metric.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,8 @@
from concurrent.futures import ThreadPoolExecutor
import datetime
from typing import TYPE_CHECKING, Callable, Optional, TypeVar

import enum
import numpy as np

from robusta_krr.core.abstract.strategies import Metric, ResourceHistoryData
from robusta_krr.core.models.config import Config
from robusta_krr.core.models.objects import K8sObjectData
Expand All @@ -16,8 +15,17 @@
if TYPE_CHECKING:
from .. import CustomPrometheusConnect

MetricsDictionary = dict[str, type[BaseMetricLoader]]


class QueryType(str, enum.Enum):
Query = "query"
QueryRange = "query_range"


# A registry of metrics that can be used to fetch a corresponding metric loader.
REGISTERED_METRICS: dict[str, type[BaseMetricLoader]] = {}
REGISTERED_METRICS: MetricsDictionary = {}
STRATEGY_METRICS_OVERRIDES: dict[str, MetricsDictionary] = {}


class BaseMetricLoader(Configurable, abc.ABC):
Expand Down Expand Up @@ -50,19 +58,33 @@ def get_prometheus_cluster_label(self) -> str:
return f', {self.config.prometheus_label}="{self.config.prometheus_cluster_label}"'

@abc.abstractmethod
def get_query(self, object: K8sObjectData) -> str:
def get_query(self, object: K8sObjectData, resolution: Optional[str]) -> str:
"""
This method should be implemented by all subclasses to provide a query string to fetch metrics.
Args:
object (K8sObjectData): The object for which metrics need to be fetched.
resolution (Optional[str]): a string for configurable resolution to the query.
Returns:
str: The query string.
"""

pass

def get_graph_query(self, object: K8sObjectData, resolution: Optional[str]) -> str:
"""
This method should be implemented by all subclasses to provide a query string in the metadata to produce relevant graphs.
Args:
object (K8sObjectData): The object for which metrics need to be fetched.
resolution (Optional[str]): a string for configurable resolution to the query.
Returns:
str: The query string.
"""
return self.get_query(object, resolution)

def _step_to_string(self, step: datetime.timedelta) -> str:
"""
Converts step in datetime.timedelta format to a string format used by Prometheus.
Expand All @@ -73,10 +95,28 @@ def _step_to_string(self, step: datetime.timedelta) -> str:
Returns:
str: Step size in string format used by Prometheus.
"""

if step.total_seconds() > 60 * 60 * 24:
return f"{int(step.total_seconds()) // (60 * 60 * 24)}d"
return f"{int(step.total_seconds()) // 60}m"

async def query_prometheus(self, metric: Metric) -> list[dict]:
def query_prometheus_thread(self, metric: Metric, query_type: QueryType) -> list[dict]:
if query_type == QueryType.QueryRange:
value = self.prometheus.custom_query_range(
query=metric.query,
start_time=metric.start_time,
end_time=metric.end_time,
step=metric.step,
)
return value

# regular query, lighter on preformance
results = self.prometheus.custom_query(query=metric.query)
# format the results to return the same format as custom_query_range
for result in results:
result["values"] = [result.pop("value")]
return results

async def query_prometheus(self, metric: Metric, query_type: QueryType) -> list[dict]:
"""
Asynchronous method that queries Prometheus to fetch metrics.
Expand All @@ -90,12 +130,7 @@ async def query_prometheus(self, metric: Metric) -> list[dict]:
loop = asyncio.get_running_loop()
return await loop.run_in_executor(
self.executor,
lambda: self.prometheus.custom_query_range(
query=metric.query,
start_time=metric.start_time,
end_time=metric.end_time,
step=metric.step,
),
lambda: self.query_prometheus_thread(metric=metric, query_type=query_type),
)

async def load_data(
Expand All @@ -112,16 +147,19 @@ async def load_data(
Returns:
ResourceHistoryData: An instance of the ResourceHistoryData class representing the loaded metrics.
"""

query = self.get_query(object)
resolution = f"{self._step_to_string(period)}:{self._step_to_string(step)}"
query = self.get_query(object, resolution)
query_type = self.get_query_type()
end_time = datetime.datetime.now().astimezone()
metric = Metric(
query=query,
start_time=end_time - period,
end_time=end_time,
step=self._step_to_string(step),
)
result = await self.query_prometheus(metric)
result = await self.query_prometheus(metric=metric, query_type=query_type)
# adding the query in the results for a graph
metric.query = self.get_graph_query(object, resolution)

if result == []:
self.warning(f"{service_name} returned no {self.__class__.__name__} metrics for {object}")
Expand All @@ -135,12 +173,13 @@ async def load_data(
)

@staticmethod
def get_by_resource(resource: str) -> type[BaseMetricLoader]:
def get_by_resource(resource: str, strategy: Optional[str]) -> type[BaseMetricLoader]:
"""
Fetches the metric loader corresponding to the specified resource.
Args:
resource (str): The name of the resource.
resource (str): The name of the strategy.
Returns:
type[BaseMetricLoader]: The class of the metric loader corresponding to the resource.
Expand All @@ -150,6 +189,13 @@ def get_by_resource(resource: str) -> type[BaseMetricLoader]:
"""

try:
lower_strategy = strategy.lower()
if (
lower_strategy
and lower_strategy in STRATEGY_METRICS_OVERRIDES
and resource in STRATEGY_METRICS_OVERRIDES[lower_strategy]
):
return STRATEGY_METRICS_OVERRIDES[lower_strategy][resource]
return REGISTERED_METRICS[resource]
except KeyError as e:
raise KeyError(f"Resource {resource} was not registered by `@bind_metric(...)`") from e
Expand All @@ -174,3 +220,26 @@ def decorator(cls: type[Self]) -> type[Self]:
return cls

return decorator


# This is a temporary solutions, metric loaders will be moved to strategy in the future
def override_metric(strategy: str, resource: str) -> Callable[[type[Self]], type[Self]]:
"""
A decorator that overrides the bound metric on a specific strategy.
Args:
strategy (str): The name of the strategy for this metric.
resource (str): The name of the resource.
Returns:
Callable[[type[Self]], type[Self]]: The decorator that does the binding.
"""

def decorator(cls: type[Self]) -> type[Self]:
lower_strategy = strategy.lower()
if lower_strategy not in STRATEGY_METRICS_OVERRIDES:
STRATEGY_METRICS_OVERRIDES[lower_strategy] = {}
STRATEGY_METRICS_OVERRIDES[lower_strategy][resource] = cls
return cls

return decorator
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
from typing import Optional
from robusta_krr.core.models.allocations import ResourceType
from robusta_krr.core.models.objects import K8sObjectData

from .base_filtered_metric import BaseFilteredMetricLoader
from .base_metric import bind_metric
from .base_metric import bind_metric, QueryType


@bind_metric(ResourceType.CPU)
class CPUMetricLoader(BaseFilteredMetricLoader):
def get_query(self, object: K8sObjectData) -> str:
def get_query(self, object: K8sObjectData, resolution: Optional[str]) -> str:
pods_selector = "|".join(pod.name for pod in object.pods)
cluster_label = self.get_prometheus_cluster_label()
return (
Expand All @@ -18,3 +19,6 @@ def get_query(self, object: K8sObjectData) -> str:
f"{cluster_label}"
"}[5m])) by (container, pod, job)"
)

def get_query_type(self) -> QueryType:
return QueryType.QueryRange
37 changes: 34 additions & 3 deletions robusta_krr/core/integrations/prometheus/metrics/memory_metric.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
from typing import Optional
from robusta_krr.core.models.allocations import ResourceType
from robusta_krr.core.models.objects import K8sObjectData

from .base_filtered_metric import BaseFilteredMetricLoader
from .base_metric import bind_metric
from .base_metric import bind_metric, QueryType, override_metric


@bind_metric(ResourceType.Memory)
class MemoryMetricLoader(BaseFilteredMetricLoader):
def get_query(self, object: K8sObjectData) -> str:
def get_query(self, object: K8sObjectData, resolution: Optional[str]) -> str:
pods_selector = "|".join(pod.name for pod in object.pods)
cluster_label = self.get_prometheus_cluster_label()
return (
Expand All @@ -16,5 +17,35 @@ def get_query(self, object: K8sObjectData) -> str:
f'pod=~"{pods_selector}", '
f'container="{object.container}"'
f"{cluster_label}"
"}) by (container, pod, job)"
"}) by (container, pod, job, id)"
)

def get_query_type(self) -> QueryType:
return QueryType.QueryRange

# This is a temporary solutions, metric loaders will be moved to strategy in the future
@override_metric("simple", ResourceType.Memory)
class MemoryMetricLoader(MemoryMetricLoader):
"""
A class that overrides the memory metric on the simple strategy.
"""

def get_query(self, object: K8sObjectData, resolution: Optional[str]) -> str:
pods_selector = "|".join(pod.name for pod in object.pods)
cluster_label = self.get_prometheus_cluster_label()
resolution_formatted = f"[{resolution}]" if resolution else ""
return (
f"max(max_over_time(container_memory_working_set_bytes{{"
f'namespace="{object.namespace}", '
f'pod=~"{pods_selector}", '
f'container="{object.container}"'
f"{cluster_label}}}"
f"{resolution_formatted}"
f")) by (container, pod, job, id)"
)

def get_query_type(self) -> QueryType:
return QueryType.Query

def get_graph_query(self, object: K8sObjectData, resolution: Optional[str]) -> str:
return super().get_query(object, resolution)
Original file line number Diff line number Diff line change
Expand Up @@ -177,9 +177,9 @@ async def gather_data(
"""
self.debug(f"Gathering data for {object} and {resource}")

MetricLoaderType = BaseMetricLoader.get_by_resource(resource, self.config.strategy)
await self.add_historic_pods(object, period)

MetricLoaderType = BaseMetricLoader.get_by_resource(resource)

metric_loader = MetricLoaderType(self.config, self.prometheus, self.executor)
return await metric_loader.load_data(object, period, step, self.name())

Expand All @@ -202,7 +202,7 @@ async def add_historic_pods(self, object: K8sObjectData, period: datetime.timede
f'owner_name="{object.name}", '
f'owner_kind="Deployment", '
f'namespace="{object.namespace}"'
f'{cluster_label}'
f"{cluster_label}"
"}"
f"[{period_literal}]"
)
Expand All @@ -218,7 +218,7 @@ async def add_historic_pods(self, object: K8sObjectData, period: datetime.timede
f'owner_name=~"{owners_regex}", '
f'owner_kind="{pod_owner_kind}", '
f'namespace="{object.namespace}"'
f'{cluster_label}'
f"{cluster_label}"
"}"
f"[{period_literal}]"
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from kubernetes.client import ApiClient
from requests.exceptions import ConnectionError, HTTPError

from concurrent.futures import ThreadPoolExecutor
from robusta_krr.core.models.config import Config
from robusta_krr.utils.service_discovery import ServiceDiscovery

Expand Down Expand Up @@ -48,9 +48,14 @@ def __init__(
*,
cluster: Optional[str] = None,
api_client: Optional[ApiClient] = None,
executor: Optional[ThreadPoolExecutor] = None,
) -> None:
super().__init__(
config=config, cluster=cluster, api_client=api_client, service_discovery=ThanosMetricsDiscovery
config=config,
cluster=cluster,
api_client=api_client,
service_discovery=ThanosMetricsDiscovery,
executor=executor,
)

def check_connection(self):
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from typing import Optional

from concurrent.futures import ThreadPoolExecutor
from kubernetes.client import ApiClient
from requests.exceptions import ConnectionError, HTTPError

Expand Down Expand Up @@ -47,9 +47,14 @@ def __init__(
*,
cluster: Optional[str] = None,
api_client: Optional[ApiClient] = None,
executor: Optional[ThreadPoolExecutor] = None,
) -> None:
super().__init__(
config=config, cluster=cluster, api_client=api_client, service_discovery=VictoriaMetricsDiscovery
config=config,
cluster=cluster,
api_client=api_client,
service_discovery=VictoriaMetricsDiscovery,
executor=executor,
)

def check_connection(self):
Expand Down
2 changes: 1 addition & 1 deletion robusta_krr/core/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ async def _gather_objects_recommendations(

async def _collect_result(self) -> Result:
clusters = await self._k8s_loader.list_clusters()
if len(clusters) > 1 and self.config.prometheus_url:
if clusters and len(clusters) > 1 and self.config.prometheus_url:
# this can only happen for multi-cluster querying a single centeralized prometheus
# In this scenario we dont yet support determining which metrics belong to which cluster so the reccomendation can be incorrect
raise ClusterNotSpecifiedException(
Expand Down

0 comments on commit 682c2e5

Please sign in to comment.