Skip to content

Commit

Permalink
add tags of scraped metrics to micrometer gauges for that metric (#110)
Browse files Browse the repository at this point in the history
* add tags of scraped metrics to micrometer gauges for that metric

If the context and the tags contain information with the same key, the value from context supersedes the tag value.

* add test
  • Loading branch information
arov00 authored May 24, 2024
1 parent 6066299 commit 10404b5
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tags;
import jakarta.enterprise.context.ApplicationScoped;
import lombok.Getter;
import lombok.RequiredArgsConstructor;

import java.util.Map;
Expand All @@ -26,5 +27,11 @@ public void updateGauge(String name, Tags tags, double value) {
}).set(value);
}

private record GaugeKey(String name, Tags tags) {}
public Map<GaugeKey, Double> getGaugeValues() {
var result = new ConcurrentHashMap<GaugeKey, Double>();
gauges.forEach((key, value) -> result.put(key, value.get()));
return result;
}

public record GaugeKey(String name, Tags tags) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.stream.Stream;

@ApplicationScoped
@RequiredArgsConstructor
Expand Down Expand Up @@ -93,8 +94,8 @@ public Topology metricEnricherTopology() {
value.getName(), value.getInitialMetricName()), Named.as("rekey-to-metric-name"))
.peek((key, value) -> {
try {
var tags = Tags.of(value.getContext().entrySet().stream()
.map(e -> Tag.of(e.getKey(), e.getValue()))
var tags = Tags.of(Stream.concat(value.getContext().keySet().stream(), value.getTags().keySet().stream())
.map(k -> Tag.of(k, value.getContext().getOrDefault(k, value.getTags().get(k))))
.toList());
gaugeRepository.updateGauge("kcc_" + value.getInitialMetricName(), tags, value.getValue());
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package io.spoud.kcc.aggregator.stream;

import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde;
import io.micrometer.core.instrument.Tags;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import io.quarkus.logging.Log;
import io.spoud.kcc.aggregator.data.MetricNameEntity;
import io.spoud.kcc.aggregator.data.RawTelegrafData;
Expand Down Expand Up @@ -49,6 +51,7 @@ class MetricEnricherTest {
private TestOutputTopic<AggregatedDataKey, AggregatedDataWindowed> aggregatedTopic;
private TestOutputTopic<AggregatedDataKey, AggregatedDataTableFriendly> aggregatedTableFriendlyTopic;
private MetricNameRepository metricRepository;
private GaugeRepository gaugeRepository;
private MetricReducer metricReducer;
private ResultCaptor<AggregatedData> reducerResult;

Expand Down Expand Up @@ -78,14 +81,15 @@ void setup() {
.build();
metricReducer = Mockito.spy(new MetricReducer(configProperties));
metricRepository = new MetricNameRepository(metricReducer);
gaugeRepository = new GaugeRepository(new SimpleMeterRegistry());
ContextDataRepository contextDataRepository = Mockito.mock(ContextDataRepository.class);

final CachedContextDataManager cachedContextDataManager = new CachedContextDataManager(contextDataRepository);
Properties kafkaProperties = createKafkaProperties();
SerdeFactory serdeFactory = new SerdeFactory(new HashMap(kafkaProperties));
reducerResult = new ResultCaptor<>();
Mockito.doAnswer(reducerResult).when(metricReducer).apply(Mockito.any(), Mockito.any());
MetricEnricher metricEnricher = new MetricEnricher(metricRepository, cachedContextDataManager, configProperties, serdeFactory, Mockito.mock(GaugeRepository.class), metricReducer);
MetricEnricher metricEnricher = new MetricEnricher(metricRepository, cachedContextDataManager, configProperties, serdeFactory, gaugeRepository, metricReducer);
final Topology topology = metricEnricher.metricEnricherTopology();
System.out.println(topology.describe());

Expand Down Expand Up @@ -151,6 +155,30 @@ void should_add_context() throws InterruptedException {
assertThat(aggregated.getTags()).containsEntry("env", "dev");
}

@Test
void gauges_should_not_overlap() throws InterruptedException {
contextDataStore.put(
"id1",
new ContextData(
Instant.now(),
null,
null,
EntityType.TOPIC,
"spoud_.*",
Map.of("cost-unit", "my-cost-unit")));
Thread.sleep(1000);

// generate metrics for two different topics that will be mapped to the same context
rawTelegrafDataTopic.pipeInput(generateTopicRawTelegraf("spoud_topic_1", 1.5));
rawTelegrafDataTopic.pipeInput(generateTopicRawTelegraf("spoud_topic_2", 2.5));

// make sure that each topic spawns its own gauge even if they share the same context
assertThat(gaugeRepository.getGaugeValues()).containsKeys(
new GaugeRepository.GaugeKey("kcc_confluent_kafka_server_sent_bytes", Tags.of("cost-unit", "my-cost-unit", "env", "dev", "topic", "spoud_topic_1")),
new GaugeRepository.GaugeKey("kcc_confluent_kafka_server_sent_bytes", Tags.of("cost-unit", "my-cost-unit", "env", "dev", "topic", "spoud_topic_2"))
);
}

@Test
void should_end_up_in_friendly_table() {
contextDataStore.put(
Expand Down

0 comments on commit 10404b5

Please sign in to comment.