Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for epoch timestamps and configurable output format #3860

Merged
merged 4 commits into from
Dec 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions data-prepper-plugins/date-processor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ valid key and at least one pattern is required if match is configured.
* `patterns`: List of possible patterns the timestamp value of key can have. The patterns are based on sequence of letters and symbols.
The `patterns` support all the patterns listed in Java
[DatetimeFormatter](https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html).
and also supports `epoch_second`, `epoch_milli` and `epoch_nano` values which represents the timestamp as the number of seconds, milliseconds and nano seconds since epoch. Epoch values are always UTC time zone.
* Type: `List<String>`

The following example of date configuration will use `timestamp` key to match against given patterns and stores the timestamp in ISO 8601
Expand Down Expand Up @@ -106,6 +107,8 @@ processor:

* `to_origination_metadata` (Optional): When this option is used, matched time is put into the event's metadata as an instance of `Instant`.

* `output_format` (Optional): indicates the format of the `@timestamp`. Default is `yyyy-MM-dd'T'HH:mm:ss.SSSXXX`.

## Metrics

* `dateProcessingMatchSuccessCounter`: Number of records that match with at least one pattern specified in match configuration option.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@
import java.time.Instant;
import java.time.LocalDate;
import java.time.ZonedDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeFormatterBuilder;
import java.time.temporal.ChronoField;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.Pair;
Expand All @@ -33,12 +35,16 @@
public class DateProcessor extends AbstractProcessor<Record<Event>, Record<Event>> {
private static final Logger LOG = LoggerFactory.getLogger(DateProcessor.class);
private static final String OUTPUT_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSSXXX";
private static final int LENGTH_OF_EPOCH_IN_MILLIS = 13;
private static final int LENGTH_OF_EPOCH_SECONDS = 10;

static final String DATE_PROCESSING_MATCH_SUCCESS = "dateProcessingMatchSuccess";
static final String DATE_PROCESSING_MATCH_FAILURE = "dateProcessingMatchFailure";

private String keyToParse;
private List<DateTimeFormatter> dateTimeFormatters;
private Set<String> epochFormatters;
private String outputFormat;
private final DateProcessorConfig dateProcessorConfig;
private final ExpressionEvaluator expressionEvaluator;

Expand All @@ -50,6 +56,7 @@ public DateProcessor(PluginMetrics pluginMetrics, final DateProcessorConfig date
super(pluginMetrics);
this.dateProcessorConfig = dateProcessorConfig;
this.expressionEvaluator = expressionEvaluator;
this.outputFormat = dateProcessorConfig.getOutputFormat();

dateProcessingMatchSuccessCounter = pluginMetrics.counter(DATE_PROCESSING_MATCH_SUCCESS);
dateProcessingMatchFailureCounter = pluginMetrics.counter(DATE_PROCESSING_MATCH_FAILURE);
Expand All @@ -68,10 +75,10 @@ public Collection<Record<Event>> doExecute(Collection<Record<Event>> records) {

String zonedDateTime = null;

if (Boolean.TRUE.equals(dateProcessorConfig.getFromTimeReceived()))
if (Boolean.TRUE.equals(dateProcessorConfig.getFromTimeReceived())) {
zonedDateTime = getDateTimeFromTimeReceived(record);

else if (keyToParse != null && !keyToParse.isEmpty()) {
} else if (keyToParse != null && !keyToParse.isEmpty()) {
Pair<String, Instant> result = getDateTimeFromMatch(record);
if (result != null) {
zonedDateTime = result.getLeft();
Expand All @@ -85,8 +92,9 @@ else if (keyToParse != null && !keyToParse.isEmpty()) {
populateDateProcessorMetrics(zonedDateTime);
}

if (zonedDateTime != null)
if (zonedDateTime != null) {
record.getData().put(dateProcessorConfig.getDestination(), zonedDateTime);
}
}
return records;
}
Expand All @@ -101,7 +109,8 @@ private void populateDateProcessorMetrics(final String zonedDateTime) {
private void extractKeyAndFormatters() {
for (DateProcessorConfig.DateMatch entry: dateProcessorConfig.getMatch()) {
keyToParse = entry.getKey();
dateTimeFormatters = entry.getPatterns().stream().map(this::getSourceFormatter).collect(Collectors.toList());
epochFormatters = entry.getPatterns().stream().filter(pattern -> pattern.contains("epoch")).collect(Collectors.toSet());
dateTimeFormatters = entry.getPatterns().stream().filter(pattern -> !pattern.contains("epoch")).map(this::getSourceFormatter).collect(Collectors.toList());
}
}

Expand Down Expand Up @@ -146,11 +155,71 @@ private String getSourceTimestamp(final Record<Event> record) {
}
}

private Pair<String, Instant> getEpochFormatOutput(Instant time) {
if (outputFormat.equals("epoch_second")) {
return Pair.of(Long.toString(time.getEpochSecond()), time);
} else if (outputFormat.equals("epoch_milli")) {
return Pair.of(Long.toString(time.toEpochMilli()), time);
} else { // epoch_nano. validation for valid epoch_ should be
// done at init time
long nano = (long)time.getEpochSecond() * 1000_000_000 + (long) time.getNano();
return Pair.of(Long.toString(nano), time);
}
}

private Pair<String, Instant> getFormattedDateTimeString(final String sourceTimestamp) {
ZoneId srcZoneId = dateProcessorConfig.getSourceZoneId();
ZoneId dstZoneId = dateProcessorConfig.getDestinationZoneId();
Long numberValue = null;
Instant epochTime;

if (epochFormatters.size() > 0) {
try {
numberValue = Long.parseLong(sourceTimestamp);
} catch (NumberFormatException e) {
numberValue = null;
}
}
if (numberValue != null) {
int timestampLength = sourceTimestamp.length();
if (timestampLength > LENGTH_OF_EPOCH_IN_MILLIS) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should invert these conditionals. Let's look at the list of epoch_ values the user configures and then find the best fit from there.

This currently starts by inferring it and then seeing if the user has that. But, what if the value is very far in time from what we would typically expect?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is similar to providing two patterns like ["dd-mm-YYYY", "YYYY-mm-dd"] and the input not matching either one. Similarly, if the input has 13 digits and the pattern is ["epoch_second"] we should not match it. Basically, the pattern "epoch_second" should be treated as matching for 10-digit number, "epoch_milli" should be treated as matching for 13-digit number, and "epoch_nano" should be treated as matching 19-digit number. Because any other matching could result in silent errors.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We discussed offline. We will be more lenient on going back in time, but not forward in time. And we will only allow the selection of only one of the epoch_ configurations. Users won't select multiple of these for now.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current code already allows epoch times from the past but not from the future. Added check to make sure only one of the three "epoch_" patterns are allowed.

if (epochFormatters.contains("epoch_nano")) {
epochTime = Instant.ofEpochSecond(numberValue/1000_000_000, numberValue % 1000_000_000);
} else {
LOG.warn("Source time value is larger than epoch pattern configured. epoch_nano is expected but not present in the patterns list");
return null;
}
} else if (timestampLength > LENGTH_OF_EPOCH_SECONDS) {
if (epochFormatters.contains("epoch_milli")) {
epochTime = Instant.ofEpochMilli(numberValue);
} else {
LOG.warn("Source time value is larger than epoch pattern configured. epoch_milli is expected but not present in the patterns list");
return null;
}
} else {
epochTime = Instant.ofEpochSecond(numberValue);
}
// Epochs are always UTC zone
srcZoneId = ZoneId.of("UTC");
try {
if (outputFormat.startsWith("epoch_")) {
return getEpochFormatOutput(epochTime);
} else {
DateTimeFormatter outputFormatter = getOutputFormatter().withZone(dstZoneId);
ZonedDateTime tmp = ZonedDateTime.ofInstant(epochTime, srcZoneId);
return Pair.of(tmp.format(outputFormatter.withZone(dstZoneId)), tmp.toInstant());
}
} catch (Exception ignored) {
}
}

for (DateTimeFormatter formatter : dateTimeFormatters) {
try {
ZonedDateTime tmp = ZonedDateTime.parse(sourceTimestamp, formatter);
return Pair.of(tmp.format(getOutputFormatter().withZone(dateProcessorConfig.getDestinationZoneId())), tmp.toInstant());
if (outputFormat.startsWith("epoch_")) {
return getEpochFormatOutput(tmp.toInstant());
}
return Pair.of(tmp.format(getOutputFormatter().withZone(dstZoneId)), tmp.toInstant());
} catch (Exception ignored) {
}
}
Expand All @@ -160,7 +229,7 @@ private Pair<String, Instant> getFormattedDateTimeString(final String sourceTime
}

private DateTimeFormatter getOutputFormatter() {
return DateTimeFormatter.ofPattern(OUTPUT_FORMAT);
return DateTimeFormatter.ofPattern(outputFormat);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@
import java.time.ZoneId;
import java.util.List;
import java.util.Locale;
import java.time.format.DateTimeFormatter;

public class DateProcessorConfig {
static final Boolean DEFAULT_FROM_TIME_RECEIVED = false;
static final Boolean DEFAULT_TO_ORIGINATION_METADATA = false;
static final String DEFAULT_DESTINATION = "@timestamp";
static final String DEFAULT_OUTPUT_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSSXXX";
static final String DEFAULT_SOURCE_TIMEZONE = ZoneId.systemDefault().toString();
static final String DEFAULT_DESTINATION_TIMEZONE = ZoneId.systemDefault().toString();

Expand All @@ -41,6 +43,41 @@ public String getKey() {
public List<String> getPatterns() {
return patterns;
}

@JsonIgnore
public boolean isValidPatterns() {
// For now, allow only one of the three "epoch_" pattern
int count = 0;
for (final String pattern: patterns) {
if (pattern.startsWith("epoch_")) {
count++;
}
if (count > 1) {
return false;
}
}
for (final String pattern: patterns) {
if (!isValidPattern(pattern)) {
return false;
}
}
return true;
}

public static boolean isValidPattern(final String pattern) {
if (pattern.equals("epoch_second") ||
pattern.equals("epoch_milli") ||
pattern.equals("epoch_nano")) {
return true;
}
try {
DateTimeFormatter formatter = DateTimeFormatter.ofPattern(pattern);
return true;
} catch (Exception e) {
return false;
}
}

}

@JsonProperty("from_time_received")
Expand All @@ -55,6 +92,9 @@ public List<String> getPatterns() {
@JsonProperty("destination")
private String destination = DEFAULT_DESTINATION;

@JsonProperty("output_format")
private String outputFormat = DEFAULT_OUTPUT_FORMAT;

@JsonProperty("source_timezone")
private String sourceTimezone = DEFAULT_SOURCE_TIMEZONE;

Expand All @@ -76,6 +116,10 @@ public List<String> getPatterns() {
@JsonIgnore
private Locale sourceLocale;

public String getOutputFormat() {
return outputFormat;
}

public Boolean getFromTimeReceived() {
return fromTimeReceived;
}
Expand Down Expand Up @@ -160,15 +204,20 @@ boolean isValidMatch() {
if (match.size() != 1)
return false;

return match.get(0).getPatterns() != null && !match.get(0).getPatterns().isEmpty();
return match.get(0).getPatterns() != null && !match.get(0).getPatterns().isEmpty() && match.get(0).isValidPatterns();
}
return true;
}

@AssertTrue(message = "Invalid output format.")
boolean isValidOutputFormat() {
return DateMatch.isValidPattern(outputFormat);
}

@AssertTrue(message = "Invalid source_timezone provided.")
boolean isSourceTimezoneValid() {
try {
sourceZoneId = buildZoneId(sourceTimezone);
sourceZoneId = buildZoneId(sourceTimezone);
return true;
} catch (Exception e) {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

import static org.opensearch.dataprepper.test.helper.ReflectivelySetField.setField;
import java.lang.reflect.Field;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -42,6 +43,7 @@ class Validation {
void setUp() {
random = UUID.randomUUID().toString();
mockDateMatch = mock(DateProcessorConfig.DateMatch.class);
when(mockDateMatch.isValidPatterns()).thenReturn(true);
}

@Test
Expand All @@ -67,6 +69,23 @@ void isValidMatchAndFromTimestampReceived_should_return_false_if_from_time_recei
assertThat(dateProcessorConfig.isValidMatchAndFromTimestampReceived(), equalTo(false));
}

@Test
void testValidAndInvalidOutputFormats() throws NoSuchFieldException, IllegalAccessException {
setField(DateProcessorConfig.class, dateProcessorConfig, "outputFormat", random);
assertThat(dateProcessorConfig.isValidOutputFormat(), equalTo(false));

setField(DateProcessorConfig.class, dateProcessorConfig, "outputFormat", "epoch_second");
assertThat(dateProcessorConfig.isValidOutputFormat(), equalTo(true));
setField(DateProcessorConfig.class, dateProcessorConfig, "outputFormat", "epoch_milli");
assertThat(dateProcessorConfig.isValidOutputFormat(), equalTo(true));
setField(DateProcessorConfig.class, dateProcessorConfig, "outputFormat", "epoch_nano");
assertThat(dateProcessorConfig.isValidOutputFormat(), equalTo(true));
setField(DateProcessorConfig.class, dateProcessorConfig, "outputFormat", "epoch_xyz");
assertThat(dateProcessorConfig.isValidOutputFormat(), equalTo(false));
setField(DateProcessorConfig.class, dateProcessorConfig, "outputFormat", "yyyy-MM-dd'T'HH:mm:ss.nnnnnnnnnXXX");
assertThat(dateProcessorConfig.isValidOutputFormat(), equalTo(true));
}

@Test
void isValidMatch_should_return_true_if_match_has_single_entry() throws NoSuchFieldException, IllegalAccessException {
when(mockDateMatch.getPatterns()).thenReturn(Collections.singletonList(random));
Expand All @@ -77,6 +96,16 @@ void isValidMatch_should_return_true_if_match_has_single_entry() throws NoSuchFi
assertThat(dateProcessorConfig.isValidMatch(), equalTo(true));
}

@Test
void isValidMatch_should_return_false_if_match_has_multiple_epoch_patterns() throws NoSuchFieldException, IllegalAccessException {
when(mockDateMatch.getPatterns()).thenReturn(List.of("epoch_second", "epoch_milli"));

List<DateProcessorConfig.DateMatch> dateMatches = Arrays.asList(mockDateMatch, mockDateMatch);
reflectivelySetField(dateProcessorConfig, "match", dateMatches);

assertThat(dateProcessorConfig.isValidMatch(), equalTo(false));
}

@Test
void isValidMatch_should_return_false_if_match_has_multiple_entries() throws NoSuchFieldException, IllegalAccessException {
when(mockDateMatch.getPatterns()).thenReturn(Collections.singletonList(random));
Expand Down
Loading
Loading