From 232d52032a2e10826ede85adbf82072dddd1ce76 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ga=C3=A9tan=20Collaud?= Date: Thu, 8 Feb 2024 09:36:59 +0100 Subject: [PATCH] implement regex replace for context (#5) * implement regex replace for context * better method name * improve regex replace --- .../stream/CachedContextDataManager.java | 21 +++-- .../stream/TelegrafDataWrapper.java | 22 ++++- .../aggregator/stream/MetricEnricherTest.java | 83 ++++++++++++++++++- config-sample/context-data.json | 20 +++-- 4 files changed, 126 insertions(+), 20 deletions(-) diff --git a/aggregator/src/main/java/io/spoud/kcc/aggregator/stream/CachedContextDataManager.java b/aggregator/src/main/java/io/spoud/kcc/aggregator/stream/CachedContextDataManager.java index 28c8244..e222603 100644 --- a/aggregator/src/main/java/io/spoud/kcc/aggregator/stream/CachedContextDataManager.java +++ b/aggregator/src/main/java/io/spoud/kcc/aggregator/stream/CachedContextDataManager.java @@ -10,10 +10,8 @@ import java.time.Duration; import java.time.Instant; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Objects; +import java.util.*; +import java.util.regex.Matcher; import java.util.regex.Pattern; /** @@ -68,10 +66,17 @@ public CachedContextData(ContextData contextData) { this.pattern = Pattern.compile(contextData.getRegex(), Pattern.CASE_INSENSITIVE); } - public boolean matches(Metric metric, Instant timestamp) { - return contextData.getEntityType().equals(metric.type()) - && isInContextDataTimeRange(timestamp, contextData) - && pattern.matcher(metric.objectName()).matches(); + /** + * Get the Matcher object, but only if the current metric is eligible and satisfy the regex + */ + public Optional getMatcher(Metric metric, Instant timestamp) { + if (contextData.getEntityType().equals(metric.type()) + && isInContextDataTimeRange(timestamp, contextData)) { + return Optional.of(pattern.matcher(metric.objectName())) + .filter(Matcher::matches);// only if there is a match; + } else { + return Optional.empty(); + } } private boolean isInContextDataTimeRange(Instant timestamp, ContextData contextData) { diff --git a/aggregator/src/main/java/io/spoud/kcc/aggregator/stream/TelegrafDataWrapper.java b/aggregator/src/main/java/io/spoud/kcc/aggregator/stream/TelegrafDataWrapper.java index f2c9e32..d685856 100644 --- a/aggregator/src/main/java/io/spoud/kcc/aggregator/stream/TelegrafDataWrapper.java +++ b/aggregator/src/main/java/io/spoud/kcc/aggregator/stream/TelegrafDataWrapper.java @@ -1,5 +1,6 @@ package io.spoud.kcc.aggregator.stream; +import io.quarkus.logging.Log; import io.spoud.kcc.aggregator.data.Metric; import io.spoud.kcc.aggregator.data.RawTelegrafData; import io.spoud.kcc.data.EntityType; @@ -8,6 +9,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.stream.Collectors; public class TelegrafDataWrapper { @@ -35,10 +37,22 @@ public Optional enrichWithContext(List context = new HashMap<>(); // This is the join with regex contextData.forEach(cache -> { - if (cache.matches(metric, telegrafData.timestamp())) { - // TODO add to a list instead of overriding the context - context.putAll(cache.getContextData().getContext()); - } + cache.getMatcher(metric, telegrafData.timestamp()) + .ifPresent(matcher -> { + // TODO add to a list instead of overriding the context + context.putAll(cache.getContextData().getContext().entrySet().stream() + // replace all the regex variable in the value + .map(entry -> { + try { + return Map.entry(entry.getKey(), matcher.replaceAll(entry.getValue())); + } catch (IndexOutOfBoundsException ex) { + Log.warnv(ex, "Unable to replace regex variable for the entry \"{0}\" with the regex \"{1}\" and the context \"{2}={3}\"", + metric.objectName(), cache.getContextData().getRegex(), entry.getKey(), entry.getValue()); + return entry; + } + }) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (l, r) -> r))); + }); }); return new AggregatedDataInfo(metric.type(), metric.objectName(), context); }); diff --git a/aggregator/src/test/java/io/spoud/kcc/aggregator/stream/MetricEnricherTest.java b/aggregator/src/test/java/io/spoud/kcc/aggregator/stream/MetricEnricherTest.java index e2ec3d2..f345938 100644 --- a/aggregator/src/test/java/io/spoud/kcc/aggregator/stream/MetricEnricherTest.java +++ b/aggregator/src/test/java/io/spoud/kcc/aggregator/stream/MetricEnricherTest.java @@ -1,4 +1,4 @@ -package io.spoud.kcc.aggregator.stream; + package io.spoud.kcc.aggregator.stream; import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde; import io.quarkus.logging.Log; @@ -344,6 +344,87 @@ void should_window_per_hour() { )); } + @Test + void should_replace_regex_in_context() { + contextDataStore.put( + "id1", + new ContextData( + Instant.now(), + null, + null, + EntityType.TOPIC, + "^([a-z0-9-_]+)\\.([a-z0-9-_]+)\\.([a-z0-9-_]+)\\.([a-z0-9-_]+)$", + Map.of("stage", "$1" + , "app", "$2" + , "subject", "$3" + , "version", "$4"))); + rawTelegrafDataTopic.pipeInput("1", generateTopicRawTelegraf("stage_name.app_name.subject-name.v1", 1.0)); + final List list = aggregatedTopic.readValuesToList(); + assertThat(list).hasSize(1); + assertThat(list.get(0).getContext()).containsEntry("stage", "stage_name"); + assertThat(list.get(0).getContext()).containsEntry("app", "app_name"); + assertThat(list.get(0).getContext()).containsEntry("subject", "subject-name"); + assertThat(list.get(0).getContext()).containsEntry("version", "v1"); + } + + @Test + void should_ignore_regex_replacement_issue() { + contextDataStore.put( + "id1", + new ContextData( + Instant.now(), + null, + null, + EntityType.TOPIC, + "stage_(.+)", + Map.of("stage", "$1" + , "app", "$2" + , "subject", "$3" + , "version", "$4"))); + rawTelegrafDataTopic.pipeInput("1", generateTopicRawTelegraf("stage_name.app_name.subject-name.v1", 1.0)); + final List list = aggregatedTopic.readValuesToList(); + assertThat(list).hasSize(1); + assertThat(list.get(0).getContext()).containsEntry("stage", "name.app_name.subject-name.v1"); + assertThat(list.get(0).getContext()).containsEntry("app", "$2"); + assertThat(list.get(0).getContext()).containsEntry("subject", "$3"); + assertThat(list.get(0).getContext()).containsEntry("version", "$4"); + } + + @Test + void should_replace_multiple_regex() { + contextDataStore.put( + "id1", + new ContextData( + Instant.now(), + null, + null, + EntityType.TOPIC, + "berne-parkings.*", + Map.of("app", "bern-parking" + , "domain", "bern-parking" + , "cu", "spoud"))); + + contextDataStore.put( + "id2", + new ContextData( + Instant.now(), + null, + null, + EntityType.TOPIC, + ".*(avro|xml|json).*", + Map.of("format", "$1"))); + + rawTelegrafDataTopic.pipeInput("1", generateTopicRawTelegraf("berne-parkings-json", 1.0)); + + final List list = aggregatedTopic.readValuesToList(); + assertThat(list).hasSize(1); + assertThat(list.get(0).getContext()).containsEntry("format", "json"); + assertThat(list.get(0).getContext()).containsEntry("app", "bern-parking"); + assertThat(list.get(0).getContext()).containsEntry("domain", "bern-parking"); + assertThat(list.get(0).getContext()).containsEntry("cu", "spoud"); + + } + private RawTelegrafData generateTopicRawTelegraf(String topicName, double value) { return generateTopicRawTelegraf(Instant.now(), topicName, value); } diff --git a/config-sample/context-data.json b/config-sample/context-data.json index ab230a0..2a81ff0 100644 --- a/config-sample/context-data.json +++ b/config-sample/context-data.json @@ -2,6 +2,7 @@ { "id": "0e49079d-65f7-46a7-be7e-ada4faf51025", "entityType": "TOPIC", + "comment": "Simply look for a topic ending with a suffix and apply some context to it", "regex": ".*logistics", "context": [ { @@ -21,19 +22,24 @@ { "id": "323b603d-5b5f-48d2-84fc-4e784e942289", "entityType": "TOPIC", - "regex": ".*collaboration", + "comment": "Extract context from the topic name. This is useful if you enforce a strong naming convention.", + "regex": "^([a-z0-9-_]+)\\.([a-z0-9-_]+)\\.([a-z0-9-_]+)\\.([a-z0-9-_]+)$", "context": [ { - "key": "cost-unit", - "value": "spoud" + "key": "stage", + "value": "$1" }, { - "key": "domain", - "value": "collaboration" + "key": "env", + "value": "$2" }, { - "key": "app", - "value": "agoora" + "key": "subject", + "value": "$3" + }, + { + "key": "version", + "value": "$4" } ] }