Skip to content

Commit

Permalink
Rewrite victoriametrics client to pure promql (#1882)
Browse files Browse the repository at this point in the history
* Initial rewriting of victoriametrics client to the pure PromQl

* Get rid of hacky code related to handling status codes query in one requests

* Add few fixes

* Add more tests for caching unavailable metrics

* Debug reason of wiremock server timestous

* Fix failing prometheus metrics tests

* Add code refactor

* Add metrics for prometheus requests

* Improve creating prometheus client dependencies

* Improve creating prometheus client dependencies

* Fix handling errors when one of the requests fails

* Remove unnecessary log

* CR fixes

* CR fixes

* CR fixes
  • Loading branch information
faderskd authored Aug 1, 2024
1 parent d9d92e9 commit 4292174
Show file tree
Hide file tree
Showing 34 changed files with 741 additions and 590 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ allprojects {
curator : '5.4.0',
dropwizard_metrics: '4.2.25',
micrometer_metrics: '1.12.5',
wiremock : '3.5.2',
wiremock : '3.9.0',
spock : '2.4-M4-groovy-4.0',
groovy : '4.0.21',
avro : '1.11.3',
Expand Down
4 changes: 2 additions & 2 deletions docs/docs/configuration/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ Option | Description
{modulePrefix}.metrics.prometheus.step | The step size to use in computing windowed statistics | 60s
{modulePrefix}.metrics.prometheus.descriptions | If meter descriptions should be sent to Prometheus | true

In order to be able to access basic metrics via Management API, it needs to be configured to reach VictoriaMetrics API:
In order to be able to access basic metrics via Management API, it needs to be configured to reach Prometheus API:

Option | Description | Default value
------------------------------------------|-----------------------------------------------| -------------
prometheus.client.enabled | Should fetch external metrics from Prometheus | true
prometheus.client.externalMonitoringUrl | URI to VictoriaMetrics HTTP API | http://localhost:18090
prometheus.client.externalMonitoringUrl | URI to Prometheus HTTP API | http://localhost:18090
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
public class MetricDecimalValue {
private static final String UNAVAILABLE_STRING = "unavailable";
private static final MetricDecimalValue UNAVAILABLE = new MetricDecimalValue(false, "-1.0");
private static final MetricDecimalValue DEFAULT_VALUE = new MetricDecimalValue(true, "0.0");

private final boolean available;
private final String value;
Expand All @@ -23,6 +24,10 @@ public static MetricDecimalValue unavailable() {
return UNAVAILABLE;
}

public static MetricDecimalValue defaultValue() {
return DEFAULT_VALUE;
}

public static MetricDecimalValue of(String value) {
return new MetricDecimalValue(true, value);
}
Expand Down Expand Up @@ -65,4 +70,12 @@ public boolean equals(Object o) {
public int hashCode() {
return Objects.hash(available, value);
}

@Override
public String toString() {
return "MetricDecimalValue{"
+ "available=" + available
+ ", value='" + value + '\''
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,18 @@ public class ExternalMonitoringClientProperties {

private int maxConnections = 100;

private int maxConnectionsPerRoute = 10;
private int maxConnectionsPerRoute = 100;

private int cacheTtlSeconds = 55;

private int cacheSize = 100_000;

private int fetchingTimeoutMillis = 5000;
private int fetchingThreads = 30;

private String externalMonitoringUrl = "http://localhost:18090";


public int getConnectionTimeoutMillis() {
return connectionTimeoutMillis;
}
Expand Down Expand Up @@ -71,4 +75,20 @@ public String getExternalMonitoringUrl() {
public void setExternalMonitoringUrl(String externalMonitoringUrl) {
this.externalMonitoringUrl = externalMonitoringUrl;
}

public int getFetchingThreads() {
return fetchingThreads;
}

public void setFetchingThreads(int fetchingThreads) {
this.fetchingThreads = fetchingThreads;
}

public int getFetchingTimeoutMillis() {
return fetchingTimeoutMillis;
}

public void setFetchingTimeoutMillis(int fetchingTimeoutMillis) {
this.fetchingTimeoutMillis = fetchingTimeoutMillis;
}
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
package pl.allegro.tech.hermes.management.config;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.micrometer.core.instrument.MeterRegistry;
import org.apache.hc.client5.http.classic.HttpClient;
import org.apache.hc.client5.http.config.RequestConfig;
import org.apache.hc.client5.http.impl.classic.HttpClientBuilder;
import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManager;
import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManagerBuilder;
import org.apache.hc.core5.util.Timeout;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
Expand All @@ -15,31 +18,40 @@
import org.springframework.web.client.RestTemplate;
import pl.allegro.tech.hermes.management.infrastructure.prometheus.CachingPrometheusClient;
import pl.allegro.tech.hermes.management.infrastructure.prometheus.PrometheusClient;
import pl.allegro.tech.hermes.management.infrastructure.prometheus.PrometheusMetricsProvider;
import pl.allegro.tech.hermes.management.infrastructure.prometheus.RestTemplatePrometheusClient;
import pl.allegro.tech.hermes.management.infrastructure.prometheus.VictoriaMetricsMetricsProvider;

import java.net.URI;
import java.time.Duration;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import static com.google.common.base.Ticker.systemTicker;

@Configuration
@ConditionalOnProperty(value = "prometheus.client.enabled", havingValue = "true")
public class ExternalMonitoringConfiguration {

@Bean
@ConditionalOnProperty(value = "prometheus.client.enabled", havingValue = "true")
public VictoriaMetricsMetricsProvider prometheusMetricsProvider(PrometheusClient prometheusClient,
PrometheusMonitoringClientProperties properties) {
return new VictoriaMetricsMetricsProvider(prometheusClient,
public PrometheusMetricsProvider prometheusMetricsProvider(PrometheusClient prometheusClient,
PrometheusMonitoringClientProperties properties) {
return new PrometheusMetricsProvider(prometheusClient,
properties.getConsumersMetricsPrefix(), properties.getFrontendMetricsPrefix(),
properties.getAdditionalFilters());
}

@Bean
@ConditionalOnProperty(value = "prometheus.client.enabled", havingValue = "true")
public PrometheusClient prometheusClient(@Qualifier("monitoringRestTemplate") RestTemplate monitoringRestTemplate,
PrometheusMonitoringClientProperties clientProperties) {
PrometheusMonitoringClientProperties clientProperties,
@Qualifier("prometheusFetcherExecutorService") ExecutorService executorService,
MeterRegistry meterRegistry) {
RestTemplatePrometheusClient underlyingPrometheusClient =
new RestTemplatePrometheusClient(monitoringRestTemplate, URI.create(clientProperties.getExternalMonitoringUrl()));
new RestTemplatePrometheusClient(
monitoringRestTemplate,
URI.create(clientProperties.getExternalMonitoringUrl()),
executorService,
Duration.ofMillis(clientProperties.getFetchingTimeoutMillis()),
meterRegistry);
return new CachingPrometheusClient(
underlyingPrometheusClient,
systemTicker(),
Expand All @@ -49,6 +61,7 @@ public PrometheusClient prometheusClient(@Qualifier("monitoringRestTemplate") Re
}

@Bean("monitoringRestTemplate")
@ConditionalOnMissingBean(name = "monitoringRestTemplate")
public RestTemplate restTemplate(ExternalMonitoringClientProperties clientProperties) {
PoolingHttpClientConnectionManager connectionManager = PoolingHttpClientConnectionManagerBuilder.create()
.setMaxConnTotal(clientProperties.getMaxConnections())
Expand All @@ -66,7 +79,14 @@ public RestTemplate restTemplate(ExternalMonitoringClientProperties clientProper
.build();

ClientHttpRequestFactory clientHttpRequestFactory = new HttpComponentsClientHttpRequestFactory(client);

return new RestTemplate(clientHttpRequestFactory);
}

@Bean("prometheusFetcherExecutorService")
@ConditionalOnMissingBean(name = "prometheusFetcherExecutorService")
public ExecutorService executorService(ExternalMonitoringClientProperties clientProperties) {
return Executors.newFixedThreadPool(clientProperties.getFetchingThreads(),
new ThreadFactoryBuilder().setNameFormat("prometheus-metrics-fetcher-%d").build()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -344,8 +344,10 @@ private List<UnhealthySubscription> getUnhealthyList(Collection<SubscriptionName
)
.get(subscriptionHealthCheckTimeoutMillis, TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
logger.error("Timeout occurred while fetching unhealthy subscriptions...", e);
throw new UnhealthySubscriptionGetException("Fetching unhealthy subscriptions timed out.");
} catch (Exception e) {
logger.error("Fetching unhealthy subscriptions failed...", e);
throw new UnhealthySubscriptionGetException("Fetching unhealthy subscriptions failed.", e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,22 @@ public static MonitoringMetricsContainer unavailable() {
return new MonitoringMetricsContainer(false, new HashMap<>());
}

public MonitoringMetricsContainer addMetricValue(String metricPath, MetricDecimalValue value) {
public MonitoringMetricsContainer addMetricValue(String query, MetricDecimalValue value) {
if (!isAvailable) {
throw new IllegalStateException("Adding value to unavailable metrics container");
}
this.metrics.put(metricPath, value);
this.metrics.put(query, value);
return this;
}

public MetricDecimalValue metricValue(String metricPath) {
public MetricDecimalValue metricValue(String query) {
if (!isAvailable) {
return MetricDecimalValue.unavailable();
}
return metrics.getOrDefault(metricPath, DEFAULT_VALUE);
return metrics.getOrDefault(query, DEFAULT_VALUE);
}

public boolean hasUnavailableMetrics() {
return !isAvailable || metrics.entrySet().stream().anyMatch(e -> !e.getValue().isAvailable());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,25 @@
import com.google.common.cache.LoadingCache;
import pl.allegro.tech.hermes.management.infrastructure.metrics.MonitoringMetricsContainer;

import java.util.List;
import java.util.concurrent.ExecutionException;

import static java.util.concurrent.TimeUnit.SECONDS;


public class CachingPrometheusClient implements PrometheusClient {

private final PrometheusClient underlyingPrometheusClient;
private final LoadingCache<String, MonitoringMetricsContainer> prometheusMetricsCache;
/*
Metrics will always be requested in the context of a single subscription/topic. The single sub/topic will
always result in the same list of metrics queries. There is no overlapping between metrics used in the context of
topic or subscriptions. That's why it is safe to use a list of queries as a caching key.
Maybe it will be worth to cache it per query except of queries when there will be too much overhead
of refreshing all sub/topic metrics if the single fetch fails (currently we invalidate whole metrics container
when one of the sub metric is unavailable)
*/
private final LoadingCache<List<String>, MonitoringMetricsContainer> prometheusMetricsCache;

public CachingPrometheusClient(PrometheusClient underlyingPrometheusClient, Ticker ticker,
long cacheTtlInSeconds, long cacheSize) {
Expand All @@ -26,19 +37,24 @@ public CachingPrometheusClient(PrometheusClient underlyingPrometheusClient, Tick
}

@Override
public MonitoringMetricsContainer readMetrics(String query) {
public MonitoringMetricsContainer readMetrics(List<String> queries) {
try {
return prometheusMetricsCache.get(query);
MonitoringMetricsContainer monitoringMetricsContainer = prometheusMetricsCache.get(List.copyOf(queries));
if (monitoringMetricsContainer.hasUnavailableMetrics()) {
// try to reload the on the next fetch
prometheusMetricsCache.invalidate(queries);
}
return monitoringMetricsContainer;
} catch (ExecutionException e) {
// should never happen because the loader does not throw any checked exceptions
// should never happen because the loader does not throw any exceptions
throw new RuntimeException(e);
}
}

private class PrometheusMetricsCacheLoader extends CacheLoader<String, MonitoringMetricsContainer> {
private class PrometheusMetricsCacheLoader extends CacheLoader<List<String>, MonitoringMetricsContainer> {
@Override
public MonitoringMetricsContainer load(String query) {
return underlyingPrometheusClient.readMetrics(query);
public MonitoringMetricsContainer load(List<String> queries) {
return underlyingPrometheusClient.readMetrics(queries);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,44 @@
package pl.allegro.tech.hermes.management.infrastructure.prometheus;

import pl.allegro.tech.hermes.api.SubscriptionName;
import pl.allegro.tech.hermes.api.TopicName;
import pl.allegro.tech.hermes.management.infrastructure.metrics.MonitoringMetricsContainer;

import java.util.List;


public interface PrometheusClient {
MonitoringMetricsContainer readMetrics(String query);
String SUBSCRIPTION_QUERY_FORMAT = "sum by (group, topic, subscription)"
+ " (irate({__name__='%s', group='%s', topic='%s', subscription='%s', %s}[1m]))";

String SUBSCRIPTION_QUERY_FORMAT_STATUS_CODE = "sum by (group, topic, subscription)"
+ " (irate({__name__='%s', group='%s', topic='%s', subscription='%s', status_code=~'%s', %s}[1m]))";

String TOPIC_QUERY_FORMAT = "sum by (group, topic) (irate({__name__='%s', group='%s', "
+ "topic='%s', %s}[1m]))";

default MonitoringMetricsContainer readMetrics(String... query) {
return readMetrics(List.of(query));
}

MonitoringMetricsContainer readMetrics(List<String> queries);

static String forSubscription(String name, SubscriptionName subscriptionName, String additionalFilters) {
return String.format(SUBSCRIPTION_QUERY_FORMAT, name,
subscriptionName.getTopicName().getGroupName(), subscriptionName.getTopicName().getName(),
subscriptionName.getName(), additionalFilters);
}

static String forSubscriptionStatusCode(String name, SubscriptionName subscriptionName,
String regex, String additionalFilters) {
return String.format(SUBSCRIPTION_QUERY_FORMAT_STATUS_CODE, name,
subscriptionName.getTopicName().getGroupName(), subscriptionName.getTopicName().getName(),
subscriptionName.getName(), regex, additionalFilters);
}


static String forTopic(String name, TopicName topicName, String additionalFilters) {
return String.format(TOPIC_QUERY_FORMAT, name,
topicName.getGroupName(), topicName.getName(), additionalFilters);
}
}
Loading

0 comments on commit 4292174

Please sign in to comment.