diff --git a/aggregator/src/main/java/io/spoud/kcc/aggregator/repository/GaugeRepository.java b/aggregator/src/main/java/io/spoud/kcc/aggregator/repository/GaugeRepository.java index bb90e8d..eb4a187 100644 --- a/aggregator/src/main/java/io/spoud/kcc/aggregator/repository/GaugeRepository.java +++ b/aggregator/src/main/java/io/spoud/kcc/aggregator/repository/GaugeRepository.java @@ -5,6 +5,7 @@ import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.Tags; import jakarta.enterprise.context.ApplicationScoped; +import lombok.Getter; import lombok.RequiredArgsConstructor; import java.util.Map; @@ -26,5 +27,11 @@ public void updateGauge(String name, Tags tags, double value) { }).set(value); } - private record GaugeKey(String name, Tags tags) {} + public Map getGaugeValues() { + var result = new ConcurrentHashMap(); + gauges.forEach((key, value) -> result.put(key, value.get())); + return result; + } + + public record GaugeKey(String name, Tags tags) {} } diff --git a/aggregator/src/main/java/io/spoud/kcc/aggregator/stream/MetricEnricher.java b/aggregator/src/main/java/io/spoud/kcc/aggregator/stream/MetricEnricher.java index 43ba309..1732269 100644 --- a/aggregator/src/main/java/io/spoud/kcc/aggregator/stream/MetricEnricher.java +++ b/aggregator/src/main/java/io/spoud/kcc/aggregator/stream/MetricEnricher.java @@ -24,6 +24,7 @@ import java.time.Duration; import java.util.Collections; import java.util.List; +import java.util.stream.Stream; @ApplicationScoped @RequiredArgsConstructor @@ -93,8 +94,8 @@ public Topology metricEnricherTopology() { value.getName(), value.getInitialMetricName()), Named.as("rekey-to-metric-name")) .peek((key, value) -> { try { - var tags = Tags.of(value.getContext().entrySet().stream() - .map(e -> Tag.of(e.getKey(), e.getValue())) + var tags = Tags.of(Stream.concat(value.getContext().keySet().stream(), value.getTags().keySet().stream()) + .map(k -> Tag.of(k, value.getContext().getOrDefault(k, value.getTags().get(k)))) .toList()); gaugeRepository.updateGauge("kcc_" + value.getInitialMetricName(), tags, value.getValue()); } catch (Exception e) { diff --git a/aggregator/src/test/java/io/spoud/kcc/aggregator/stream/MetricEnricherTest.java b/aggregator/src/test/java/io/spoud/kcc/aggregator/stream/MetricEnricherTest.java index c7cb26b..7c43423 100644 --- a/aggregator/src/test/java/io/spoud/kcc/aggregator/stream/MetricEnricherTest.java +++ b/aggregator/src/test/java/io/spoud/kcc/aggregator/stream/MetricEnricherTest.java @@ -1,6 +1,8 @@ package io.spoud.kcc.aggregator.stream; import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde; +import io.micrometer.core.instrument.Tags; +import io.micrometer.core.instrument.simple.SimpleMeterRegistry; import io.quarkus.logging.Log; import io.spoud.kcc.aggregator.data.MetricNameEntity; import io.spoud.kcc.aggregator.data.RawTelegrafData; @@ -49,6 +51,7 @@ class MetricEnricherTest { private TestOutputTopic aggregatedTopic; private TestOutputTopic aggregatedTableFriendlyTopic; private MetricNameRepository metricRepository; + private GaugeRepository gaugeRepository; private MetricReducer metricReducer; private ResultCaptor reducerResult; @@ -78,6 +81,7 @@ void setup() { .build(); metricReducer = Mockito.spy(new MetricReducer(configProperties)); metricRepository = new MetricNameRepository(metricReducer); + gaugeRepository = new GaugeRepository(new SimpleMeterRegistry()); ContextDataRepository contextDataRepository = Mockito.mock(ContextDataRepository.class); final CachedContextDataManager cachedContextDataManager = new CachedContextDataManager(contextDataRepository); @@ -85,7 +89,7 @@ void setup() { SerdeFactory serdeFactory = new SerdeFactory(new HashMap(kafkaProperties)); reducerResult = new ResultCaptor<>(); Mockito.doAnswer(reducerResult).when(metricReducer).apply(Mockito.any(), Mockito.any()); - MetricEnricher metricEnricher = new MetricEnricher(metricRepository, cachedContextDataManager, configProperties, serdeFactory, Mockito.mock(GaugeRepository.class), metricReducer); + MetricEnricher metricEnricher = new MetricEnricher(metricRepository, cachedContextDataManager, configProperties, serdeFactory, gaugeRepository, metricReducer); final Topology topology = metricEnricher.metricEnricherTopology(); System.out.println(topology.describe()); @@ -151,6 +155,30 @@ void should_add_context() throws InterruptedException { assertThat(aggregated.getTags()).containsEntry("env", "dev"); } + @Test + void gauges_should_not_overlap() throws InterruptedException { + contextDataStore.put( + "id1", + new ContextData( + Instant.now(), + null, + null, + EntityType.TOPIC, + "spoud_.*", + Map.of("cost-unit", "my-cost-unit"))); + Thread.sleep(1000); + + // generate metrics for two different topics that will be mapped to the same context + rawTelegrafDataTopic.pipeInput(generateTopicRawTelegraf("spoud_topic_1", 1.5)); + rawTelegrafDataTopic.pipeInput(generateTopicRawTelegraf("spoud_topic_2", 2.5)); + + // make sure that each topic spawns its own gauge even if they share the same context + assertThat(gaugeRepository.getGaugeValues()).containsKeys( + new GaugeRepository.GaugeKey("kcc_confluent_kafka_server_sent_bytes", Tags.of("cost-unit", "my-cost-unit", "env", "dev", "topic", "spoud_topic_1")), + new GaugeRepository.GaugeKey("kcc_confluent_kafka_server_sent_bytes", Tags.of("cost-unit", "my-cost-unit", "env", "dev", "topic", "spoud_topic_2")) + ); + } + @Test void should_end_up_in_friendly_table() { contextDataStore.put(