Skip to content

Commit

Permalink
implement regex replace for context (#5)
Browse files Browse the repository at this point in the history
* implement regex replace for context

* better method name

* improve regex replace
  • Loading branch information
gaetancollaud authored Feb 8, 2024
1 parent 67c40d2 commit 232d520
Show file tree
Hide file tree
Showing 4 changed files with 126 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,8 @@

import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

/**
Expand Down Expand Up @@ -68,10 +66,17 @@ public CachedContextData(ContextData contextData) {
this.pattern = Pattern.compile(contextData.getRegex(), Pattern.CASE_INSENSITIVE);
}

public boolean matches(Metric metric, Instant timestamp) {
return contextData.getEntityType().equals(metric.type())
&& isInContextDataTimeRange(timestamp, contextData)
&& pattern.matcher(metric.objectName()).matches();
/**
* Get the Matcher object, but only if the current metric is eligible and satisfy the regex
*/
public Optional<Matcher> getMatcher(Metric metric, Instant timestamp) {
if (contextData.getEntityType().equals(metric.type())
&& isInContextDataTimeRange(timestamp, contextData)) {
return Optional.of(pattern.matcher(metric.objectName()))
.filter(Matcher::matches);// only if there is a match;
} else {
return Optional.empty();
}
}

private boolean isInContextDataTimeRange(Instant timestamp, ContextData contextData) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.spoud.kcc.aggregator.stream;

import io.quarkus.logging.Log;
import io.spoud.kcc.aggregator.data.Metric;
import io.spoud.kcc.aggregator.data.RawTelegrafData;
import io.spoud.kcc.data.EntityType;
Expand All @@ -8,6 +9,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;

public class TelegrafDataWrapper {

Expand Down Expand Up @@ -35,10 +37,22 @@ public Optional<AggregatedDataInfo> enrichWithContext(List<CachedContextDataMana
Map<String, String> context = new HashMap<>();
// This is the join with regex
contextData.forEach(cache -> {
if (cache.matches(metric, telegrafData.timestamp())) {
// TODO add to a list instead of overriding the context
context.putAll(cache.getContextData().getContext());
}
cache.getMatcher(metric, telegrafData.timestamp())
.ifPresent(matcher -> {
// TODO add to a list instead of overriding the context
context.putAll(cache.getContextData().getContext().entrySet().stream()
// replace all the regex variable in the value
.map(entry -> {
try {
return Map.entry(entry.getKey(), matcher.replaceAll(entry.getValue()));
} catch (IndexOutOfBoundsException ex) {
Log.warnv(ex, "Unable to replace regex variable for the entry \"{0}\" with the regex \"{1}\" and the context \"{2}={3}\"",
metric.objectName(), cache.getContextData().getRegex(), entry.getKey(), entry.getValue());
return entry;
}
})
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (l, r) -> r)));
});
});
return new AggregatedDataInfo(metric.type(), metric.objectName(), context);
});
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.spoud.kcc.aggregator.stream;
package io.spoud.kcc.aggregator.stream;

import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde;
import io.quarkus.logging.Log;
Expand Down Expand Up @@ -344,6 +344,87 @@ void should_window_per_hour() {
));
}

@Test
void should_replace_regex_in_context() {
contextDataStore.put(
"id1",
new ContextData(
Instant.now(),
null,
null,
EntityType.TOPIC,
"^([a-z0-9-_]+)\\.([a-z0-9-_]+)\\.([a-z0-9-_]+)\\.([a-z0-9-_]+)$",
Map.of("stage", "$1"
, "app", "$2"
, "subject", "$3"
, "version", "$4")));
rawTelegrafDataTopic.pipeInput("1", generateTopicRawTelegraf("stage_name.app_name.subject-name.v1", 1.0));
final List<AggregatedDataWindowed> list = aggregatedTopic.readValuesToList();
assertThat(list).hasSize(1);
assertThat(list.get(0).getContext()).containsEntry("stage", "stage_name");
assertThat(list.get(0).getContext()).containsEntry("app", "app_name");
assertThat(list.get(0).getContext()).containsEntry("subject", "subject-name");
assertThat(list.get(0).getContext()).containsEntry("version", "v1");
}

@Test
void should_ignore_regex_replacement_issue() {
contextDataStore.put(
"id1",
new ContextData(
Instant.now(),
null,
null,
EntityType.TOPIC,
"stage_(.+)",
Map.of("stage", "$1"
, "app", "$2"
, "subject", "$3"
, "version", "$4")));
rawTelegrafDataTopic.pipeInput("1", generateTopicRawTelegraf("stage_name.app_name.subject-name.v1", 1.0));
final List<AggregatedDataWindowed> list = aggregatedTopic.readValuesToList();
assertThat(list).hasSize(1);
assertThat(list.get(0).getContext()).containsEntry("stage", "name.app_name.subject-name.v1");
assertThat(list.get(0).getContext()).containsEntry("app", "$2");
assertThat(list.get(0).getContext()).containsEntry("subject", "$3");
assertThat(list.get(0).getContext()).containsEntry("version", "$4");
}

@Test
void should_replace_multiple_regex() {
contextDataStore.put(
"id1",
new ContextData(
Instant.now(),
null,
null,
EntityType.TOPIC,
"berne-parkings.*",
Map.of("app", "bern-parking"
, "domain", "bern-parking"
, "cu", "spoud")));

contextDataStore.put(
"id2",
new ContextData(
Instant.now(),
null,
null,
EntityType.TOPIC,
".*(avro|xml|json).*",
Map.of("format", "$1")));

rawTelegrafDataTopic.pipeInput("1", generateTopicRawTelegraf("berne-parkings-json", 1.0));

final List<AggregatedDataWindowed> list = aggregatedTopic.readValuesToList();
assertThat(list).hasSize(1);
assertThat(list.get(0).getContext()).containsEntry("format", "json");
assertThat(list.get(0).getContext()).containsEntry("app", "bern-parking");
assertThat(list.get(0).getContext()).containsEntry("domain", "bern-parking");
assertThat(list.get(0).getContext()).containsEntry("cu", "spoud");

}

private RawTelegrafData generateTopicRawTelegraf(String topicName, double value) {
return generateTopicRawTelegraf(Instant.now(), topicName, value);
}
Expand Down
20 changes: 13 additions & 7 deletions config-sample/context-data.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
{
"id": "0e49079d-65f7-46a7-be7e-ada4faf51025",
"entityType": "TOPIC",
"comment": "Simply look for a topic ending with a suffix and apply some context to it",
"regex": ".*logistics",
"context": [
{
Expand All @@ -21,19 +22,24 @@
{
"id": "323b603d-5b5f-48d2-84fc-4e784e942289",
"entityType": "TOPIC",
"regex": ".*collaboration",
"comment": "Extract context from the topic name. This is useful if you enforce a strong naming convention.",
"regex": "^([a-z0-9-_]+)\\.([a-z0-9-_]+)\\.([a-z0-9-_]+)\\.([a-z0-9-_]+)$",
"context": [
{
"key": "cost-unit",
"value": "spoud"
"key": "stage",
"value": "$1"
},
{
"key": "domain",
"value": "collaboration"
"key": "env",
"value": "$2"
},
{
"key": "app",
"value": "agoora"
"key": "subject",
"value": "$3"
},
{
"key": "version",
"value": "$4"
}
]
}
Expand Down

0 comments on commit 232d520

Please sign in to comment.