Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for multiple source timestamp formats #16

Merged
merged 6 commits into from
Sep 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/workflows/build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ jobs:
- name: Build
run: mvn clean package -B

- name: Test
run: mvn test

- name: Create JAR
run: mvn jar:jar

Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public class InsertRollingFieldTimestampHeaders<R extends ConnectRecord<R>>
extends InsertRollingTimestampHeaders<R> {
private RecordFieldTimestamp<R> fieldTimestamp;

public static ConfigDef CONFIG_DEF;
public static final ConfigDef CONFIG_DEF;

static {
// The code would be
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
abstract class InsertRollingTimestampHeaders<R extends ConnectRecord<R>>
extends InsertTimestampHeaders<R> {

public static ConfigDef CONFIG_DEF =
public static final ConfigDef CONFIG_DEF =
InsertTimestampHeaders.CONFIG_DEF
.define(
ConfigName.ROLLING_WINDOW_SIZE_CONFIG,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at: http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
* law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
* for the specific language governing permissions and limitations under the License.
*/
package io.lenses.connect.smt.header;

import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeParseException;
import java.util.List;
import java.util.Locale;
import java.util.stream.Collectors;
import org.apache.kafka.common.config.ConfigException;

class MultiDateTimeFormatter {

private final List<DateTimeFormatter> formatters;
private final List<String> patterns;
private final Boolean returnNowIfNull;

public MultiDateTimeFormatter(
List<String> patterns, List<DateTimeFormatter> formatters, Boolean returnNowIfNull) {
this.patterns = patterns;
this.formatters = formatters;
this.returnNowIfNull = returnNowIfNull;
}

public Instant format(String value, ZoneId zoneId) {
if (value == null && returnNowIfNull) {
return LocalDateTime.now().atZone(zoneId).toInstant();
} else if (value == null) {
throw new DateTimeParseException("No valid date time provided", "null", 0);
}
for (DateTimeFormatter formatter : formatters) {
try {
LocalDateTime localDateTime = LocalDateTime.parse(value, formatter);
return localDateTime.atZone(zoneId).toInstant();
} catch (DateTimeParseException dtpe) {
// ignore exception and use fallback
}
}
throw new DateTimeParseException("Cannot parse date with any formats", value, 0);
}

public String getDisplayPatterns() {
return String.join(", ", patterns);
}

private static DateTimeFormatter createFormatter(
String pattern, String configName, Locale locale, ZoneId zoneId) {
try {
DateTimeFormatter formatter = DateTimeFormatter.ofPattern(pattern);
if (locale != null) {
formatter = formatter.withLocale(locale);
}
if (zoneId != null) {
formatter = formatter.withZone(zoneId);
}
return formatter;
} catch (IllegalArgumentException e) {
throw new ConfigException("Configuration '" + configName + "' is not a valid date format.");
}
}

public static MultiDateTimeFormatter createDateTimeFormatter(
List<String> patternConfigs, String configName, Locale locale) {

return new MultiDateTimeFormatter(
patternConfigs,
patternConfigs.stream()
.map(patternConfig -> createFormatter(patternConfig, configName, locale, null))
.collect(Collectors.toUnmodifiableList()),
false);
}

public static MultiDateTimeFormatter createDateTimeFormatter(
List<String> patternConfigs, String configName, ZoneId zoneId) {

return new MultiDateTimeFormatter(
patternConfigs,
patternConfigs.stream()
.map(patternConfig -> createFormatter(patternConfig, configName, null, zoneId))
.collect(Collectors.toUnmodifiableList()),
true);
}
}
55 changes: 30 additions & 25 deletions src/main/java/io/lenses/connect/smt/header/PropsFormatter.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,36 +10,41 @@
*/
package io.lenses.connect.smt.header;

import java.util.Map;
import org.apache.kafka.connect.transforms.util.SimpleConfig;

/**
* This class is responsible for formatting properties from a SimpleConfig object.
* It converts the properties into a string representation in a json-like format.
* This class is responsible for formatting properties from a SimpleConfig object. It converts the
* properties into a string representation in a json-like format.
*/
public class PropsFormatter {

private final SimpleConfig simpleConfig;
private final SimpleConfig simpleConfig;

/**
* Constructs a new PropsFormatter with the given SimpleConfig.
*
* @param simpleConfig the SimpleConfig object containing the properties to be formatted
*/
public PropsFormatter(SimpleConfig simpleConfig) {
this.simpleConfig = simpleConfig;
}
/**
* Constructs a new PropsFormatter with the given SimpleConfig.
*
* @param simpleConfig the SimpleConfig object containing the properties to be formatted
*/
public PropsFormatter(SimpleConfig simpleConfig) {
this.simpleConfig = simpleConfig;
}

/**
* Formats the properties from the SimpleConfig object into a string.
* The properties are represented as key-value pairs in the format: "key: "value"".
* All properties are enclosed in curly braces.
*
* @return a string representation of the properties
*/
public String apply() {
StringBuilder sb = new StringBuilder("{");
simpleConfig.originalsStrings().forEach((k, v) -> sb.append(k).append(": \"").append(v).append("\", "));
sb.delete(sb.length() - 2, sb.length());
return sb.append("}").toString();
}
}
/**
* Formats the properties from the SimpleConfig object into a string. The properties are
* represented as key-value pairs in the format: "key: "value"". All properties are enclosed in
* curly braces.
*
* @return a string representation of the properties
*/
public String apply() {
StringBuilder sb = new StringBuilder("{");
simpleConfig.originalsStrings().entrySet().stream()
.sorted(Map.Entry.comparingByKey())
.forEach(
entry ->
sb.append(entry.getKey()).append(": \"").append(entry.getValue()).append("\", "));
sb.delete(sb.length() - 2, sb.length());
return sb.append("}").toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import java.time.Instant;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.Arrays;
import java.util.Locale;
import java.util.Optional;
Expand All @@ -40,15 +39,15 @@ class RecordFieldTimestamp<R extends ConnectRecord<R>> {
public static final String UNIX_PRECISION_CONFIG = "unix.precision";
private static final String UNIX_PRECISION_DEFAULT = "milliseconds";
private final FieldTypeUtils.FieldTypeAndFields fieldTypeAndFields;
private final Optional<DateTimeFormatter> fromPattern;
private final Optional<MultiDateTimeFormatter> fromPattern;
private final String unixPrecision;
private final ZoneId timeZone;

private final Optional<PropsFormatter> propsFormatter;

private RecordFieldTimestamp(
FieldTypeUtils.FieldTypeAndFields fieldTypeAndFields,
Optional<DateTimeFormatter> fromPattern,
Optional<MultiDateTimeFormatter> fromPattern,
String unixPrecision,
ZoneId timeZone,
Optional<PropsFormatter> propsFormatter) {
Expand All @@ -64,10 +63,6 @@ public FieldTypeUtils.FieldTypeAndFields getFieldTypeAndFields() {
return fieldTypeAndFields;
}

public Optional<DateTimeFormatter> getFromPattern() {
return fromPattern;
}

public String getUnixPrecision() {
return unixPrecision;
}
Expand Down Expand Up @@ -114,7 +109,8 @@ public Instant getInstant(R r) {
+ " instead.");
}

return convertToTimestamp(extractedValue, unixPrecision, fromPattern, timeZone, propsFormatter);
return convertToTimestamp(
extractedValue, unixPrecision, fromPattern, timeZone, propsFormatter);
}
}

Expand Down Expand Up @@ -152,14 +148,19 @@ public static <R extends ConnectRecord<R>> RecordFieldTimestamp<R> create(
final String unixPrecision =
Optional.ofNullable(config.getString(UNIX_PRECISION_CONFIG)).orElse(UNIX_PRECISION_DEFAULT);

final Optional<DateTimeFormatter> fromPattern =
Optional.ofNullable(config.getString(FORMAT_FROM_CONFIG))
final Optional<MultiDateTimeFormatter> fromPattern =
Optional.ofNullable(config.getList(FORMAT_FROM_CONFIG))
.map(
pattern ->
InsertTimestampHeaders.createDateTimeFormatter(
pattern, FORMAT_FROM_CONFIG, locale));

return new RecordFieldTimestamp<>(fieldTypeAndFields, fromPattern, unixPrecision, zoneId, Optional.of(new PropsFormatter(config)));
patterns ->
MultiDateTimeFormatter.createDateTimeFormatter(
patterns, FORMAT_FROM_CONFIG, locale));

return new RecordFieldTimestamp<>(
fieldTypeAndFields,
fromPattern,
unixPrecision,
zoneId,
Optional.of(new PropsFormatter(config)));
}

public static ConfigDef extendConfigDef(ConfigDef from) {
Expand All @@ -179,7 +180,7 @@ public static ConfigDef extendConfigDef(ConfigDef from) {
+ "'.")
.define(
FORMAT_FROM_CONFIG,
ConfigDef.Type.STRING,
ConfigDef.Type.LIST,
null,
ConfigDef.Importance.MEDIUM,
"A DateTimeFormatter-compatible format for the timestamp. Used to parse the"
Expand Down
36 changes: 15 additions & 21 deletions src/main/java/io/lenses/connect/smt/header/TimestampConverter.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.ZoneId;
import java.time.ZoneOffset;
Expand Down Expand Up @@ -117,7 +116,7 @@ public final class TimestampConverter<R extends ConnectRecord<R>> implements Tra
"The desired timestamp representation: string, unix, Date, Time, or Timestamp")
.define(
FORMAT_FROM_CONFIG,
ConfigDef.Type.STRING,
ConfigDef.Type.LIST,
null,
ConfigDef.Importance.MEDIUM,
"A DateTimeFormatter-compatible format for the timestamp. Used to parse the"
Expand Down Expand Up @@ -196,15 +195,13 @@ public Date toRaw(Config config, Object orig) {
+ "' configuration property.");
}
try {
final LocalDateTime localDateTime =
LocalDateTime.parse((String) orig, config.fromFormat);
return Date.from(localDateTime.atZone(ZoneOffset.UTC).toInstant());
return Date.from(config.fromFormat.format((String) orig, ZoneOffset.UTC));
} catch (DateTimeParseException e) {
throw new DataException(
"Could not parse timestamp: value ("
+ orig
+ ") does not match pattern ("
+ config.fromFormatPattern
+ ") does not match any patterns ("
+ config.fromFormat.getDisplayPatterns()
+ ")",
e);
}
Expand Down Expand Up @@ -387,19 +384,15 @@ private static class Config {
Config(
String[] fields,
String type,
DateTimeFormatter fromFormat,
String fromFormatPattern,
MultiDateTimeFormatter fromFormat,
DateTimeFormatter toFormat,
String toFormatPattern,
String unixPrecision,
String header,
Optional<RollingWindowDetails> rollingWindow,
TimeZone targetTimeZone) {
this.fields = fields;
this.type = type;
this.fromFormat = fromFormat;
this.fromFormatPattern = fromFormatPattern;
this.toFormatPattern = toFormatPattern;
this.toFormat = toFormat;
this.unixPrecision = unixPrecision;
this.header = header;
Expand All @@ -411,9 +404,7 @@ private static class Config {
String[] fields;
String header;
String type;
String fromFormatPattern;
String toFormatPattern;
final DateTimeFormatter fromFormat;
final MultiDateTimeFormatter fromFormat;
final DateTimeFormatter toFormat;
String unixPrecision;

Expand All @@ -438,7 +429,15 @@ public void configure(Map<String, ?> configs) {
if (header == null || header.isEmpty()) {
throw new ConfigException("TimestampConverter requires header key to be specified");
}
String fromFormatPattern = simpleConfig.getString(FORMAT_FROM_CONFIG);

MultiDateTimeFormatter fromPattern =
Optional.ofNullable(simpleConfig.getList(FORMAT_FROM_CONFIG))
.map(
fromFormatPattern ->
MultiDateTimeFormatter.createDateTimeFormatter(
fromFormatPattern, FORMAT_FROM_CONFIG, Constants.UTC.toZoneId()))
.orElse(null);

String toFormatPattern = simpleConfig.getString(FORMAT_TO_CONFIG);

final String unixPrecision = simpleConfig.getString(UNIX_PRECISION_CONFIG);
Expand All @@ -450,9 +449,6 @@ public void configure(Map<String, ?> configs) {
"TimestampConverter requires format option to be specified "
+ "when using string timestamps");
}
DateTimeFormatter fromPattern =
io.lenses.connect.smt.header.Utils.getDateFormat(
fromFormatPattern, Constants.UTC.toZoneId());
DateTimeFormatter toPattern =
io.lenses.connect.smt.header.Utils.getDateFormat(toFormatPattern, timeZone.toZoneId());

Expand Down Expand Up @@ -483,9 +479,7 @@ public void configure(Map<String, ?> configs) {
fieldTypeAndFields.getFields(),
type,
fromPattern,
fromFormatPattern,
toPattern,
toFormatPattern,
unixPrecision,
header,
rollingWindowDetails,
Expand Down
Loading
Loading