From 4dced00f69ae492715e8a0fc1245a836556d16ea Mon Sep 17 00:00:00 2001 From: stheppi Date: Thu, 28 Mar 2024 16:43:30 +0000 Subject: [PATCH] Support record field timestamp This PR introduces a new SMT which allows picking a field from the payload and use it as source for introducing the headers for time-based object key values in Lenses datalakes connectors --- .../lenses/connect/smt/header/FieldType.java | 7 + .../smt/header/FieldTypeConstants.java | 7 + .../connect/smt/header/FieldTypeUtils.java | 54 +++ .../header/InsertFieldTimestampHeaders.java | 51 +++ .../InsertRollingFieldTimestampHeaders.java | 51 +++ .../smt/header/InsertTimestampHeaders.java | 18 +- .../smt/header/RecordFieldTimestamp.java | 186 ++++++++ .../smt/header/TimestampConverter.java | 61 +-- .../smt/header/UnixPrecisionConstants.java | 8 + .../io/lenses/connect/smt/header/Utils.java | 120 +++++ .../smt/header/ConvertToTimestampTest.java | 99 +++++ .../InsertFieldTimestampHeadersTest.java | 126 ++++++ ...nsertRollingFieldTimestampHeadersTest.java | 420 ++++++++++++++++++ .../smt/header/RecordFieldTimestampTest.java | 384 ++++++++++++++++ .../smt/header/UtilsExtractValueTest.java | 95 ++++ .../connect/smt/header/UtilsIsBlankTest.java | 23 + 16 files changed, 1655 insertions(+), 55 deletions(-) create mode 100644 src/main/java/io/lenses/connect/smt/header/FieldType.java create mode 100644 src/main/java/io/lenses/connect/smt/header/FieldTypeConstants.java create mode 100644 src/main/java/io/lenses/connect/smt/header/FieldTypeUtils.java create mode 100644 src/main/java/io/lenses/connect/smt/header/InsertFieldTimestampHeaders.java create mode 100644 src/main/java/io/lenses/connect/smt/header/InsertRollingFieldTimestampHeaders.java create mode 100644 src/main/java/io/lenses/connect/smt/header/RecordFieldTimestamp.java create mode 100644 src/main/java/io/lenses/connect/smt/header/UnixPrecisionConstants.java create mode 100644 src/test/java/io/lenses/connect/smt/header/ConvertToTimestampTest.java create mode 100644 src/test/java/io/lenses/connect/smt/header/InsertFieldTimestampHeadersTest.java create mode 100644 src/test/java/io/lenses/connect/smt/header/InsertRollingFieldTimestampHeadersTest.java create mode 100644 src/test/java/io/lenses/connect/smt/header/RecordFieldTimestampTest.java create mode 100644 src/test/java/io/lenses/connect/smt/header/UtilsExtractValueTest.java create mode 100644 src/test/java/io/lenses/connect/smt/header/UtilsIsBlankTest.java diff --git a/src/main/java/io/lenses/connect/smt/header/FieldType.java b/src/main/java/io/lenses/connect/smt/header/FieldType.java new file mode 100644 index 0000000..c1c8574 --- /dev/null +++ b/src/main/java/io/lenses/connect/smt/header/FieldType.java @@ -0,0 +1,7 @@ +package io.lenses.connect.smt.header; + +enum FieldType { + KEY, + VALUE, + TIMESTAMP +} diff --git a/src/main/java/io/lenses/connect/smt/header/FieldTypeConstants.java b/src/main/java/io/lenses/connect/smt/header/FieldTypeConstants.java new file mode 100644 index 0000000..289cfb8 --- /dev/null +++ b/src/main/java/io/lenses/connect/smt/header/FieldTypeConstants.java @@ -0,0 +1,7 @@ +package io.lenses.connect.smt.header; + +public class FieldTypeConstants { + public static final String KEY_FIELD = "_key"; + public static final String VALUE_FIELD = "_value"; + public static final String TIMESTAMP_FIELD = "_timestamp"; +} diff --git a/src/main/java/io/lenses/connect/smt/header/FieldTypeUtils.java b/src/main/java/io/lenses/connect/smt/header/FieldTypeUtils.java new file mode 100644 index 0000000..8ea9be5 --- /dev/null +++ b/src/main/java/io/lenses/connect/smt/header/FieldTypeUtils.java @@ -0,0 +1,54 @@ +package io.lenses.connect.smt.header; + +import java.util.Arrays; +import org.apache.kafka.common.config.ConfigException; + +class FieldTypeUtils { + public static FieldTypeAndFields extractFieldTypeAndFields(String from) { + FieldType fieldType; + String[] fields = from.split("\\."); + if (fields.length > 0) { + if (fields[0].equalsIgnoreCase(FieldTypeConstants.KEY_FIELD)) { + fieldType = FieldType.KEY; + // drop the first element + fields = Arrays.copyOfRange(fields, 1, fields.length); + } else if (fields[0].equalsIgnoreCase(FieldTypeConstants.TIMESTAMP_FIELD)) { + fieldType = FieldType.TIMESTAMP; + // if fields length is > 1, then it is an error since the timestamp is a primitive + if (fields.length > 1) { + throw new ConfigException( + "When using the record timestamp field, the field path should only be '_timestamp'."); + } + fields = new String[0]; + } else { + fieldType = FieldType.VALUE; + if (fields[0].equalsIgnoreCase(FieldTypeConstants.VALUE_FIELD)) { + // drop the first element + fields = Arrays.copyOfRange(fields, 1, fields.length); + } + } + } else { + fieldType = FieldType.VALUE; + } + + return new FieldTypeAndFields(fieldType, fields); + } + + static class FieldTypeAndFields { + private final FieldType fieldType; + private final String[] fields; + + public FieldTypeAndFields(FieldType fieldType, String[] fields) { + this.fieldType = fieldType; + this.fields = fields; + } + + public FieldType getFieldType() { + return fieldType; + } + + public String[] getFields() { + return fields; + } + } +} diff --git a/src/main/java/io/lenses/connect/smt/header/InsertFieldTimestampHeaders.java b/src/main/java/io/lenses/connect/smt/header/InsertFieldTimestampHeaders.java new file mode 100644 index 0000000..843f387 --- /dev/null +++ b/src/main/java/io/lenses/connect/smt/header/InsertFieldTimestampHeaders.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at: http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ +package io.lenses.connect.smt.header; + +import java.time.Instant; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.connector.ConnectRecord; +import org.apache.kafka.connect.transforms.util.SimpleConfig; + +/** + * A transformer which takes a record filed of type timestamp and inserts a header year, month, day, + * hour, minute, second and date. + * + * @param the record type + */ +public class InsertFieldTimestampHeaders> + extends InsertTimestampHeaders { + + public static ConfigDef CONFIG_DEF = + RecordFieldTimestamp.extendConfigDef(InsertTimestampHeaders.CONFIG_DEF); + private RecordFieldTimestamp fieldTimestamp; + + public InsertFieldTimestampHeaders() { + super(InsertRecordTimestampHeaders.CONFIG_DEF); + } + + @Override + protected Instant getInstant(R r) { + return fieldTimestamp.getInstant(r); + } + + @Override + public ConfigDef config() { + return CONFIG_DEF; + } + + @Override + protected void configureInternal(SimpleConfig config) { + + super.configureInternal(config); + fieldTimestamp = RecordFieldTimestamp.create(config, timeZone, locale); + } +} diff --git a/src/main/java/io/lenses/connect/smt/header/InsertRollingFieldTimestampHeaders.java b/src/main/java/io/lenses/connect/smt/header/InsertRollingFieldTimestampHeaders.java new file mode 100644 index 0000000..d4a44ea --- /dev/null +++ b/src/main/java/io/lenses/connect/smt/header/InsertRollingFieldTimestampHeaders.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at: http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ +package io.lenses.connect.smt.header; + +import java.time.Instant; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.connector.ConnectRecord; +import org.apache.kafka.connect.transforms.util.SimpleConfig; + +/** + * A transformer which takes a record filed of type timestamp and inserts a header year, month, day, + * hour, minute, second and date. + * + * @param the record type + */ +public class InsertRollingFieldTimestampHeaders> + extends InsertRollingTimestampHeaders { + + private RecordFieldTimestamp fieldTimestamp; + + public static ConfigDef CONFIG_DEF = + RecordFieldTimestamp.extendConfigDef(InsertRollingTimestampHeaders.CONFIG_DEF); + + public InsertRollingFieldTimestampHeaders() { + super(); + } + + @Override + protected Instant getInstantInternal(R r) { + return fieldTimestamp.getInstant(r); + } + + @Override + public ConfigDef config() { + return CONFIG_DEF; + } + + @Override + protected void configureInternal(SimpleConfig config) { + super.configureInternal(config); + fieldTimestamp = RecordFieldTimestamp.create(config, timeZone, locale); + } +} diff --git a/src/main/java/io/lenses/connect/smt/header/InsertTimestampHeaders.java b/src/main/java/io/lenses/connect/smt/header/InsertTimestampHeaders.java index 95535ca..55a0593 100644 --- a/src/main/java/io/lenses/connect/smt/header/InsertTimestampHeaders.java +++ b/src/main/java/io/lenses/connect/smt/header/InsertTimestampHeaders.java @@ -20,6 +20,7 @@ import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.connect.connector.ConnectRecord; +import org.apache.kafka.connect.errors.DataException; import org.apache.kafka.connect.transforms.Transformation; import org.apache.kafka.connect.transforms.util.SimpleConfig; @@ -46,10 +47,9 @@ abstract class InsertTimestampHeaders> implements Tra private String minuteHeader; private String secondHeader; private String dateHeader; + protected Locale locale; + protected ZoneId timeZone = ZoneId.of("UTC"); - private ZoneId timeZone = ZoneId.of("UTC"); - - private final ConfigDef configDef; public static ConfigDef CONFIG_DEF = new ConfigDef() .define( @@ -153,9 +153,7 @@ interface ConfigName { String DEFAULT_LOCALE = "en"; } - protected InsertTimestampHeaders(ConfigDef configDef) { - this.configDef = configDef; - } + protected InsertTimestampHeaders(ConfigDef configDef) {} protected abstract Instant getInstant(R r); @@ -167,6 +165,10 @@ public R apply(R r) { // instant from epoch final Instant now = getInstant(r); + if (now == null) { + throw new DataException( + "The timestamp value could not be extracted or is null. Please check the configuration and the record structure."); + } final ZonedDateTime zonedDateTime = ZonedDateTime.ofInstant(now, timeZone); r.headers().addString(yearHeader, yearFormat.format(zonedDateTime)); r.headers().addString(monthHeader, monthFormat.format(zonedDateTime)); @@ -201,7 +203,7 @@ public void configure(Map props) { minuteHeader = prefixName + "minute"; secondHeader = prefixName + "second"; dateHeader = prefixName + "date"; - Locale locale = new Locale(config.getString(ConfigName.LOCALE)); + locale = new Locale(config.getString(ConfigName.LOCALE)); yearFormat = createDateTimeFormatter( config.getString(ConfigName.YEAR_FORMAT_CONFIG), @@ -247,7 +249,7 @@ public void configure(Map props) { configureInternal(config); } - private static DateTimeFormatter createDateTimeFormatter( + public static DateTimeFormatter createDateTimeFormatter( String patternConfig, String configName, Locale locale) { try { return DateTimeFormatter.ofPattern(patternConfig, locale); diff --git a/src/main/java/io/lenses/connect/smt/header/RecordFieldTimestamp.java b/src/main/java/io/lenses/connect/smt/header/RecordFieldTimestamp.java new file mode 100644 index 0000000..fda0ab6 --- /dev/null +++ b/src/main/java/io/lenses/connect/smt/header/RecordFieldTimestamp.java @@ -0,0 +1,186 @@ +package io.lenses.connect.smt.header; + +import static io.lenses.connect.smt.header.FieldTypeConstants.KEY_FIELD; +import static io.lenses.connect.smt.header.FieldTypeConstants.VALUE_FIELD; +import static io.lenses.connect.smt.header.UnixPrecisionConstants.UNIX_PRECISION_MICROS; +import static io.lenses.connect.smt.header.UnixPrecisionConstants.UNIX_PRECISION_MILLIS; +import static io.lenses.connect.smt.header.UnixPrecisionConstants.UNIX_PRECISION_NANOS; +import static io.lenses.connect.smt.header.UnixPrecisionConstants.UNIX_PRECISION_SECONDS; +import static io.lenses.connect.smt.header.Utils.convertToTimestamp; +import static org.apache.kafka.connect.transforms.util.Requirements.requireMap; + +import java.time.Instant; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; +import java.util.Arrays; +import java.util.Locale; +import java.util.Optional; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.connector.ConnectRecord; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.data.Timestamp; +import org.apache.kafka.connect.errors.DataException; +import org.apache.kafka.connect.transforms.util.SimpleConfig; + +class RecordFieldTimestamp> { + private static final String FIELD_CONFIG = "field"; + private static final String FORMAT_FROM_CONFIG = "format.from.pattern"; + + public static final String UNIX_PRECISION_CONFIG = "unix.precision"; + private static final String UNIX_PRECISION_DEFAULT = "milliseconds"; + private final FieldTypeUtils.FieldTypeAndFields fieldTypeAndFields; + private final Optional fromPattern; + private final String unixPrecision; + private final ZoneId timeZone; + + private RecordFieldTimestamp( + FieldTypeUtils.FieldTypeAndFields fieldTypeAndFields, + Optional fromPattern, + String unixPrecision, + ZoneId timeZone) { + + this.fieldTypeAndFields = fieldTypeAndFields; + this.fromPattern = fromPattern; + this.unixPrecision = unixPrecision; + this.timeZone = timeZone; + } + + public FieldTypeUtils.FieldTypeAndFields getFieldTypeAndFields() { + return fieldTypeAndFields; + } + + public Optional getFromPattern() { + return fromPattern; + } + + public String getUnixPrecision() { + return unixPrecision; + } + + /** + * The check for null happens in InsertTimestampHeaders + * + * @param r the record + * @return the instant + */ + public Instant getInstant(R r) { + FieldType fieldType = fieldTypeAndFields.getFieldType(); + if (fieldType == FieldType.TIMESTAMP) { + return r.timestamp() == null ? Instant.now() : Instant.ofEpochMilli(r.timestamp()); + } else { + + final Object value = operatingValue(r); + if (value == null) { + return null; + } + if (fieldTypeAndFields.getFields().length == 0) { + return convertToTimestamp(value, unixPrecision, fromPattern, timeZone); + } + final Schema schema = operatingSchema(r); + Object extractedValue; + if (schema == null) { + // there's schemaless data; the input expected is a Map + extractedValue = + Utils.extractValue( + requireMap( + value, + "Extracting field value for:" + + Arrays.toString(fieldTypeAndFields.getFields())), + fieldTypeAndFields.getFields()); + } else if (value instanceof Struct) { + extractedValue = Utils.extractValue((Struct) value, fieldTypeAndFields.getFields()); + } else { + + throw new DataException( + "The SMT is configured to extract the data from: " + + Arrays.toString(fieldTypeAndFields.getFields()) + + " thus it requires a Struct value. Found: " + + value.getClass().getName() + + " instead."); + } + + return convertToTimestamp(extractedValue, unixPrecision, fromPattern, timeZone); + } + } + + private Schema operatingSchema(R record) { + switch (fieldTypeAndFields.getFieldType()) { + case KEY: + return record.keySchema(); + case TIMESTAMP: + return Timestamp.SCHEMA; + default: + return record.valueSchema(); + } + } + + private Object operatingValue(R record) { + if (fieldTypeAndFields.getFieldType() == FieldType.TIMESTAMP) { + return record.timestamp(); + } + if (fieldTypeAndFields.getFieldType() == FieldType.KEY) { + return record.key(); + } + return record.value(); + } + + public static > RecordFieldTimestamp create( + SimpleConfig config, ZoneId zoneId, Locale locale) { + String fieldConfig = config.getString(FIELD_CONFIG); + if (fieldConfig == null || fieldConfig.isEmpty()) { + fieldConfig = FieldTypeConstants.VALUE_FIELD; + } + + final FieldTypeUtils.FieldTypeAndFields fieldTypeAndFields = + FieldTypeUtils.extractFieldTypeAndFields(fieldConfig); + + final String unixPrecision = + Optional.ofNullable(config.getString(UNIX_PRECISION_CONFIG)).orElse(UNIX_PRECISION_DEFAULT); + + final Optional fromPattern = + Optional.ofNullable(config.getString(FORMAT_FROM_CONFIG)) + .map( + pattern -> + InsertTimestampHeaders.createDateTimeFormatter( + pattern, FORMAT_FROM_CONFIG, locale)); + + return new RecordFieldTimestamp<>(fieldTypeAndFields, fromPattern, unixPrecision, zoneId); + } + + public static ConfigDef extendConfigDef(ConfigDef from) { + return from.define( + FIELD_CONFIG, + ConfigDef.Type.STRING, + "", + ConfigDef.Importance.HIGH, + "The field path containing the timestamp, or empty if the entire value" + + " is a timestamp. Prefix the path with the literal string '" + + KEY_FIELD + + "' or '" + + VALUE_FIELD + + "' to specify the record key or value." + + "If no prefix is specified, the default is '" + + VALUE_FIELD + + "'.") + .define( + FORMAT_FROM_CONFIG, + ConfigDef.Type.STRING, + null, + ConfigDef.Importance.MEDIUM, + "A DateTimeFormatter-compatible format for the timestamp. Used to parse the" + + " input if the input is a string.") + .define( + UNIX_PRECISION_CONFIG, + ConfigDef.Type.STRING, + UNIX_PRECISION_DEFAULT, + ConfigDef.ValidString.in( + UNIX_PRECISION_NANOS, UNIX_PRECISION_MICROS, + UNIX_PRECISION_MILLIS, UNIX_PRECISION_SECONDS), + ConfigDef.Importance.LOW, + "The desired Unix precision for the timestamp: seconds, milliseconds, microseconds, " + + "or nanoseconds. Used to generate the output when type=unix or used to parse " + + "the input if the input is a Long. This SMT will cause precision loss during " + + "conversions from, and to, values with sub-millisecond components."); + } +} diff --git a/src/main/java/io/lenses/connect/smt/header/TimestampConverter.java b/src/main/java/io/lenses/connect/smt/header/TimestampConverter.java index 6f281a5..cb58c21 100644 --- a/src/main/java/io/lenses/connect/smt/header/TimestampConverter.java +++ b/src/main/java/io/lenses/connect/smt/header/TimestampConverter.java @@ -10,6 +10,10 @@ */ package io.lenses.connect.smt.header; +import static io.lenses.connect.smt.header.UnixPrecisionConstants.UNIX_PRECISION_MICROS; +import static io.lenses.connect.smt.header.UnixPrecisionConstants.UNIX_PRECISION_MILLIS; +import static io.lenses.connect.smt.header.UnixPrecisionConstants.UNIX_PRECISION_NANOS; +import static io.lenses.connect.smt.header.UnixPrecisionConstants.UNIX_PRECISION_SECONDS; import static io.lenses.connect.smt.header.Utils.isBlank; import static org.apache.kafka.connect.transforms.util.Requirements.requireMap; import static org.apache.kafka.connect.transforms.util.Requirements.requireStructOrNull; @@ -23,7 +27,6 @@ import java.time.ZonedDateTime; import java.time.format.DateTimeFormatter; import java.time.format.DateTimeParseException; -import java.util.Arrays; import java.util.Collections; import java.util.Date; import java.util.HashMap; @@ -66,10 +69,6 @@ public final class TimestampConverter> implements Tra public static final String TARGET_TIMEZONE_CONFIG = "target.timezone"; - private static final String KEY_FIELD = "_key"; - private static final String VALUE_FIELD = "_value"; - private static final String TIMESTAMP_FIELD = "_timestamp"; - public static final String UNIX_PRECISION_CONFIG = "unix.precision"; private static final String UNIX_PRECISION_DEFAULT = "milliseconds"; @@ -81,11 +80,6 @@ public final class TimestampConverter> implements Tra private static final String TYPE_TIME = "Time"; private static final String TYPE_TIMESTAMP = "Timestamp"; - private static final String UNIX_PRECISION_MILLIS = "milliseconds"; - private static final String UNIX_PRECISION_MICROS = "microseconds"; - private static final String UNIX_PRECISION_NANOS = "nanoseconds"; - private static final String UNIX_PRECISION_SECONDS = "seconds"; - public static final Schema OPTIONAL_DATE_SCHEMA = org.apache.kafka.connect.data.Date.builder().optional().schema(); public static final Schema OPTIONAL_TIMESTAMP_SCHEMA = Timestamp.builder().optional().schema(); @@ -100,12 +94,12 @@ public final class TimestampConverter> implements Tra ConfigDef.Importance.HIGH, "The field path containing the timestamp, or empty if the entire value" + " is a timestamp. Prefix the path with the literal string '" - + KEY_FIELD + + FieldTypeConstants.KEY_FIELD + "' or '" - + VALUE_FIELD + + FieldTypeConstants.VALUE_FIELD + "' to specify the record key or value." + "If no prefix is specified, the default is '" - + VALUE_FIELD + + FieldTypeConstants.VALUE_FIELD + "'.") .define( HEADER_NAME_CONFIG, @@ -429,12 +423,6 @@ private static class Config { final ZoneId targetTimeZoneId; } - private static enum FieldType { - KEY, - VALUE, - TIMESTAMP - } - private FieldType fieldType; private Config config; @@ -443,7 +431,7 @@ public void configure(Map configs) { final SimpleConfig simpleConfig = new SimpleConfig(CONFIG_DEF, configs); String fieldConfig = simpleConfig.getString(FIELD_CONFIG); if (fieldConfig == null || fieldConfig.isEmpty()) { - fieldConfig = VALUE_FIELD; + fieldConfig = FieldTypeConstants.VALUE_FIELD; } final String type = simpleConfig.getString(TARGET_TYPE_CONFIG); final String header = simpleConfig.getString(HEADER_NAME_CONFIG); @@ -468,31 +456,9 @@ public void configure(Map configs) { DateTimeFormatter toPattern = io.lenses.connect.smt.header.Utils.getDateFormat(toFormatPattern, timeZone.toZoneId()); - String[] fields = fieldConfig.split("\\."); - if (fields.length > 0) { - if (fields[0].equalsIgnoreCase(KEY_FIELD)) { - fieldType = FieldType.KEY; - // drop the first element - fields = Arrays.copyOfRange(fields, 1, fields.length); - } else if (fields[0].equalsIgnoreCase(TIMESTAMP_FIELD)) { - fieldType = FieldType.TIMESTAMP; - // if fields length is > 1, then it is an error since the timestamp is a primitive - if (fields.length > 1) { - throw new ConfigException( - "When using the record timestamp field, the field path should only be '_timestamp'."); - } - fields = new String[0]; - } else { - fieldType = FieldType.VALUE; - if (fields[0].equalsIgnoreCase(VALUE_FIELD)) { - // drop the first element - fields = Arrays.copyOfRange(fields, 1, fields.length); - } - } - } else { - fieldType = FieldType.VALUE; - } - + FieldTypeUtils.FieldTypeAndFields fieldTypeAndFields = + FieldTypeUtils.extractFieldTypeAndFields(fieldConfig); + fieldType = fieldTypeAndFields.getFieldType(); // ignore NONE as a rolling window type final HashSet ignoredRollingWindowTypes = new HashSet<>(Collections.singletonList("NONE")); @@ -514,7 +480,7 @@ public void configure(Map configs) { config = new Config( - fields, + fieldTypeAndFields.getFields(), type, fromPattern, fromFormatPattern, @@ -574,9 +540,10 @@ private Object operatingValue(R record) { } private SchemaAndValue fromRecordWithSchema(R record) { - final Schema schema = operatingSchema(record); + if (config.fields.length == 0) { Object value = operatingValue(record); + final Schema schema = operatingSchema(record); return convertTimestamp(value, timestampTypeFromSchema(schema)); } else { final Struct value = requireStructOrNull(operatingValue(record), PURPOSE); diff --git a/src/main/java/io/lenses/connect/smt/header/UnixPrecisionConstants.java b/src/main/java/io/lenses/connect/smt/header/UnixPrecisionConstants.java new file mode 100644 index 0000000..a52d6cd --- /dev/null +++ b/src/main/java/io/lenses/connect/smt/header/UnixPrecisionConstants.java @@ -0,0 +1,8 @@ +package io.lenses.connect.smt.header; + +public class UnixPrecisionConstants { + public static final String UNIX_PRECISION_MILLIS = "milliseconds"; + public static final String UNIX_PRECISION_MICROS = "microseconds"; + public static final String UNIX_PRECISION_NANOS = "nanoseconds"; + public static final String UNIX_PRECISION_SECONDS = "seconds"; +} diff --git a/src/main/java/io/lenses/connect/smt/header/Utils.java b/src/main/java/io/lenses/connect/smt/header/Utils.java index 55f6ad1..bb56fa4 100644 --- a/src/main/java/io/lenses/connect/smt/header/Utils.java +++ b/src/main/java/io/lenses/connect/smt/header/Utils.java @@ -10,12 +10,132 @@ */ package io.lenses.connect.smt.header; +import static io.lenses.connect.smt.header.UnixPrecisionConstants.UNIX_PRECISION_MICROS; +import static io.lenses.connect.smt.header.UnixPrecisionConstants.UNIX_PRECISION_MILLIS; +import static io.lenses.connect.smt.header.UnixPrecisionConstants.UNIX_PRECISION_NANOS; +import static io.lenses.connect.smt.header.UnixPrecisionConstants.UNIX_PRECISION_SECONDS; +import static org.apache.kafka.connect.transforms.util.Requirements.requireMap; + +import java.time.Instant; +import java.time.LocalDateTime; import java.time.ZoneId; import java.time.format.DateTimeFormatter; +import java.util.Arrays; +import java.util.Date; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.TimeUnit; import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.connect.data.Field; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.errors.DataException; class Utils { + static Instant convertToTimestamp( + Object value, String unixPrecision, Optional fromPattern, ZoneId zoneId) { + if (value == null) { + return Instant.now(); + } + if (value instanceof Instant) { + return (Instant) value; + } + if (value instanceof Long) { + switch (unixPrecision) { + case UNIX_PRECISION_SECONDS: + return new Date(TimeUnit.SECONDS.toMillis((Long) value)).toInstant(); + case UNIX_PRECISION_MICROS: + return new Date(TimeUnit.MICROSECONDS.toMillis((Long) value)).toInstant(); + case UNIX_PRECISION_NANOS: + return new Date(TimeUnit.NANOSECONDS.toMillis((Long) value)).toInstant(); + case UNIX_PRECISION_MILLIS: + default: + return new Date((Long) value).toInstant(); + } + } + if (value instanceof String) { + return fromPattern + .map( + pattern -> { + try { + final LocalDateTime localDateTime = LocalDateTime.parse((String) value, pattern); + return localDateTime.atZone(zoneId).toInstant(); + } catch (Exception e) { + throw new DataException( + "Could not parse the string timestamp: " + + value + + " with pattern: " + + pattern, + e); + } + }) + .orElseGet( + () -> { + try { + return Instant.ofEpochMilli(Long.parseLong((String) value)); + } catch (NumberFormatException e) { + throw new DataException("Expected a long, but found " + value); + } + }); + } + if (value instanceof Date) { + return ((Date) value).toInstant(); + } + throw new DataException( + "Expected an epoch, date or string date, but found " + value.getClass()); + } + + /** + * Extracts the value for the given field path from a Map + * + * @param from the map to extract the value from + * @param fields the field path + * @return the value for the given field path + */ + static Object extractValue(Map from, String[] fields) { + Map updatedValue = + requireMap(from, "Extracting value from: " + Arrays.toString(fields)); + for (int i = 0; i < fields.length - 1; i++) { + updatedValue = + requireMap( + updatedValue.get(fields[i]), "Extracting value from: " + Arrays.toString(fields)); + if (updatedValue == null) { + return null; + } + } + // updatedValue is now the map containing the field to be updated + // config.fields[config.fields.length-1] is the name of the field to be updated + return updatedValue.get(fields[fields.length - 1]); + } + + /** + * Extracts the value of a given field from the given Struct + * + * @param from Struct to extract the value from + * @param fields the path to the field to extract + * @return the value of the field, or null if the field does not exist + */ + static Object extractValue(Struct from, String[] fields) { + if (from == null) { + return null; + } + Struct updatedValue = from; + + for (int i = 0; i < fields.length - 1; i++) { + updatedValue = updatedValue.getStruct(fields[i]); + if (updatedValue == null) { + return null; + } + } + // updatedValue is now the struct containing the field to be updated + // config.fields[config.fields.length-1] is the name of the field to be updated + Field field = updatedValue.schema().field(fields[fields.length - 1]); + if (field == null) { + return null; + } + return updatedValue.get(fields[fields.length - 1]); + } + static DateTimeFormatter getDateFormat(String formatPattern, ZoneId zoneId) { if (formatPattern == null) { return null; diff --git a/src/test/java/io/lenses/connect/smt/header/ConvertToTimestampTest.java b/src/test/java/io/lenses/connect/smt/header/ConvertToTimestampTest.java new file mode 100644 index 0000000..c9c077f --- /dev/null +++ b/src/test/java/io/lenses/connect/smt/header/ConvertToTimestampTest.java @@ -0,0 +1,99 @@ +package io.lenses.connect.smt.header; + +import static java.time.ZoneOffset.UTC; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import java.time.Instant; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; +import java.util.Optional; +import org.apache.kafka.connect.errors.DataException; +import org.junit.jupiter.api.Test; + +public class ConvertToTimestampTest { + + @Test + public void convertToTimestampReturnsCurrentTimeWhenValueIsNull() { + Instant result = + Utils.convertToTimestamp(null, "seconds", Optional.empty(), ZoneId.systemDefault()); + assertNotNull(result); + } + + @Test + public void convertToTimestampReturnsSameInstantWhenValueIsInstant() { + Instant instant = Instant.now(); + Instant result = + Utils.convertToTimestamp(instant, "seconds", Optional.empty(), ZoneId.systemDefault()); + assertEquals(instant, result); + } + + @Test + public void convertToTimestampReturnsCorrectInstantWhenValueIsLong() { + Long value = 1633097000L; // corresponds to 2021-10-01T11:30:00Z + Instant result = + Utils.convertToTimestamp(value, "seconds", Optional.empty(), ZoneId.systemDefault()); + assertEquals(Instant.ofEpochSecond(value), result); + } + + @Test + public void convertToTimestampReturnsCorrectInstantWhenValueIsString() { + String value = "2021-10-01T11:30:00Z"; + Instant result = + Utils.convertToTimestamp( + value, + "seconds", + Optional.of(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ssZZZZZ").withZone(UTC)), + UTC); + assertEquals(Instant.parse(value), result); + } + + @Test + public void convertToTimestampThrowsDataExceptionWhenValueIsInvalidString() { + String value = "invalid"; + assertThrows( + DataException.class, + () -> + Utils.convertToTimestamp( + value, + "seconds", + Optional.of( + DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ssZZZZZ").withZone(UTC)), + UTC)); + } + + @Test + public void convertToTimestampReturnsCorrectInstantWhenValueIsEpochAndPrecisionIsMicros() { + Long value = 1633097000000000L; // corresponds to 2021-10-01T11:30:00Z + Instant result = + Utils.convertToTimestamp(value, "microseconds", Optional.empty(), ZoneId.systemDefault()); + assertEquals(Instant.ofEpochSecond(1633097000L, 0), result); + } + + @Test + public void convertToTimestampReturnsCorrectInstantWhenValueIsEpochAndPrecisionIsNanos() { + Long value = 1633097000000000L; // corresponds to 2021-10-01T11:30:00Z + Instant result = + Utils.convertToTimestamp(value, "nanoseconds", Optional.empty(), ZoneId.systemDefault()); + // Convert nanoseconds to seconds and add to epoch second + Instant expected = Instant.ofEpochSecond(value / 1_000_000_000L); + assertEquals(expected, result); + } + + @Test + public void convertToTimestampReturnsCorrectInstantWhenValueIsEpochAndPrecisionIsMillis() { + Long value = 1633097000000L; // corresponds to 2021-10-01T11:30:00Z + Instant result = + Utils.convertToTimestamp(value, "milliseconds", Optional.empty(), ZoneId.systemDefault()); + assertEquals(Instant.ofEpochSecond(1633097000L, 0), result); + } + + @Test + public void convertToTimestampReturnsCorrectInstantWhenValueIsEpochAndPrecisionIsSeconds() { + Long value = 1633097000L; // corresponds to 2021-10-01T11:30:00Z + Instant result = + Utils.convertToTimestamp(value, "seconds", Optional.empty(), ZoneId.systemDefault()); + assertEquals(Instant.ofEpochSecond(1633097000L, 0), result); + } +} diff --git a/src/test/java/io/lenses/connect/smt/header/InsertFieldTimestampHeadersTest.java b/src/test/java/io/lenses/connect/smt/header/InsertFieldTimestampHeadersTest.java new file mode 100644 index 0000000..f3b41b3 --- /dev/null +++ b/src/test/java/io/lenses/connect/smt/header/InsertFieldTimestampHeadersTest.java @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at: http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ +package io.lenses.connect.smt.header; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.time.Instant; +import java.util.HashMap; +import java.util.Map; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.header.ConnectHeaders; +import org.apache.kafka.connect.header.Headers; +import org.apache.kafka.connect.sink.SinkRecord; +import org.apache.kafka.connect.source.SourceRecord; +import org.junit.jupiter.api.Test; + +/** Unit tests for {@link InsertFieldTimestampHeadersTest}. */ +public class InsertFieldTimestampHeadersTest { + + @Test + public void testAllHeaders() { + InsertFieldTimestampHeaders transformer = new InsertFieldTimestampHeaders<>(); + Map configs = new HashMap<>(); + configs.put("header.prefix.name", "wallclock."); + configs.put("field", "_value"); + transformer.configure(configs); + + Long expected = Instant.parse("2020-01-05T11:21:04.000Z").toEpochMilli(); + final Headers headers = new ConnectHeaders(); + final SinkRecord record = + new SinkRecord( + "topic", + 0, + Schema.STRING_SCHEMA, + null, + Schema.INT64_SCHEMA, + expected, + 0, + expected, + TimestampType.LOG_APPEND_TIME, + headers); + final SinkRecord transformed = transformer.apply(record); + assertEquals("2020", transformed.headers().lastWithName("wallclock.year").value()); + assertEquals("01", transformed.headers().lastWithName("wallclock.month").value()); + assertEquals("05", transformed.headers().lastWithName("wallclock.day").value()); + assertEquals("11", transformed.headers().lastWithName("wallclock.hour").value()); + assertEquals("21", transformed.headers().lastWithName("wallclock.minute").value()); + assertEquals("04", transformed.headers().lastWithName("wallclock.second").value()); + assertEquals("2020-01-05", transformed.headers().lastWithName("wallclock.date").value()); + } + + @Test + public void testUsingKalkotaTimezone() { + InsertFieldTimestampHeaders transformer = new InsertFieldTimestampHeaders<>(); + Map configs = new HashMap<>(); + configs.put("header.prefix.name", "wallclock."); + configs.put("timezone", "Asia/Kolkata"); + configs.put("field", "_value"); + transformer.configure(configs); + + long expected = Instant.parse("2020-01-05T11:21:04.000Z").toEpochMilli(); + final Headers headers = new ConnectHeaders(); + final SourceRecord record = + new SourceRecord( + null, + null, + "topic", + 0, + Schema.STRING_SCHEMA, + "key", + Schema.INT64_SCHEMA, + expected, + expected, + headers); + final SourceRecord transformed = transformer.apply(record); + assertEquals("2020", transformed.headers().lastWithName("wallclock.year").value()); + assertEquals("01", transformed.headers().lastWithName("wallclock.month").value()); + assertEquals("05", transformed.headers().lastWithName("wallclock.day").value()); + assertEquals("16", transformed.headers().lastWithName("wallclock.hour").value()); + assertEquals("51", transformed.headers().lastWithName("wallclock.minute").value()); + assertEquals("04", transformed.headers().lastWithName("wallclock.second").value()); + assertEquals("2020-01-05", transformed.headers().lastWithName("wallclock.date").value()); + } + + @Test + public void changeDatePattern() { + InsertFieldTimestampHeaders transformer = new InsertFieldTimestampHeaders<>(); + Map configs = new HashMap<>(); + configs.put("header.prefix.name", "wallclock."); + configs.put("date.format", "yyyy-dd-MM"); + configs.put("field", "_value"); + transformer.configure(configs); + + final Headers headers = new ConnectHeaders(); + long expected = Instant.parse("2020-01-05T11:21:04.000Z").toEpochMilli(); + final SourceRecord record = + new SourceRecord( + null, + null, + "topic", + 0, + Schema.STRING_SCHEMA, + "key", + Schema.INT64_SCHEMA, + expected, + expected, + headers); + final SourceRecord transformed = transformer.apply(record); + assertEquals("2020", transformed.headers().lastWithName("wallclock.year").value()); + assertEquals("01", transformed.headers().lastWithName("wallclock.month").value()); + assertEquals("05", transformed.headers().lastWithName("wallclock.day").value()); + assertEquals("11", transformed.headers().lastWithName("wallclock.hour").value()); + assertEquals("21", transformed.headers().lastWithName("wallclock.minute").value()); + assertEquals("04", transformed.headers().lastWithName("wallclock.second").value()); + assertEquals("2020-05-01", transformed.headers().lastWithName("wallclock.date").value()); + } +} diff --git a/src/test/java/io/lenses/connect/smt/header/InsertRollingFieldTimestampHeadersTest.java b/src/test/java/io/lenses/connect/smt/header/InsertRollingFieldTimestampHeadersTest.java new file mode 100644 index 0000000..e6f2161 --- /dev/null +++ b/src/test/java/io/lenses/connect/smt/header/InsertRollingFieldTimestampHeadersTest.java @@ -0,0 +1,420 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at: http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ +package io.lenses.connect.smt.header; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.time.Instant; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.header.ConnectHeaders; +import org.apache.kafka.connect.header.Headers; +import org.apache.kafka.connect.source.SourceRecord; +import org.junit.jupiter.api.Test; + +/** Unit tests for {@link InsertRollingRecordTimestampHeaders}. */ +public class InsertRollingFieldTimestampHeadersTest { + @Test + public void testRollingWindowEvery15Minutes() { + ArrayList> scenarios = new ArrayList<>(); + + scenarios.add(new Tuple5<>(("2020-01-01T01:00:00.999Z"), 15, "2020-01-01 01:00", "01", "00")); + scenarios.add(new Tuple5<>(("2020-01-01T01:00:01.000Z"), 15, "2020-01-01 01:00", "01", "00")); + scenarios.add(new Tuple5<>(("2020-01-01T01:14:59.000Z"), 15, "2020-01-01 01:00", "01", "00")); + scenarios.add(new Tuple5<>(("2020-01-01T01:15:00.000Z"), 15, "2020-01-01 01:15", "01", "15")); + scenarios.add(new Tuple5<>(("2020-01-01T01:15:01.000Z"), 15, "2020-01-01 01:15", "01", "15")); + scenarios.add(new Tuple5<>(("2020-01-01T01:29:59.000Z"), 15, "2020-01-01 01:15", "01", "15")); + scenarios.add(new Tuple5<>(("2020-01-01T01:30:00.000Z"), 15, "2020-01-01 01:30", "01", "30")); + scenarios.add(new Tuple5<>(("2020-01-01T01:30:01.000Z"), 15, "2020-01-01 01:30", "01", "30")); + scenarios.add(new Tuple5<>(("2020-01-01T01:44:59.000Z"), 15, "2020-01-01 01:30", "01", "30")); + scenarios.add(new Tuple5<>(("2020-01-01T01:45:00.000Z"), 15, "2020-01-01 01:45", "01", "45")); + scenarios.add(new Tuple5<>(("2020-01-01T01:45:01.000Z"), 15, "2020-01-01 01:45", "01", "45")); + scenarios.add(new Tuple5<>(("2020-01-01T01:59:59.000Z"), 15, "2020-01-01 01:45", "01", "45")); + + scenarios.forEach( + scenario -> { + Map configs = new HashMap<>(); + configs.put("header.prefix.name", "wallclock_"); + configs.put("date.format", "yyyy-MM-dd HH:mm"); + configs.put("window.size", scenario.second.toString()); + configs.put("window.type", "minutes"); + configs.put("field", "_value"); + + final InsertRollingFieldTimestampHeaders transformer = + new InsertRollingFieldTimestampHeaders<>(); + transformer.configure(configs); + + final Headers headers = new ConnectHeaders(); + long expected = Instant.parse(scenario.first).toEpochMilli(); + final SourceRecord record = + new SourceRecord( + null, null, "topic", 0, Schema.STRING_SCHEMA, "key", null, expected, 0L, headers); + final SourceRecord transformed = transformer.apply(record); + final String actualDate = + transformed.headers().lastWithName("wallclock_date").value().toString(); + assertEquals(actualDate, scenario.third); + + final String actualHour = + transformed.headers().lastWithName("wallclock_hour").value().toString(); + assertEquals(actualHour, scenario.fourth); + final String actualMinute = + transformed.headers().lastWithName("wallclock_minute").value().toString(); + assertEquals(actualMinute, scenario.fifth); + }); + } + + @Test + public void testRollingWindowEvery15MinutesAndTimezoneSetToKalkota() { + ArrayList> scenarios = new ArrayList<>(); + + // the first param to the Tuple5 is UTC. the third, fourth and figth arguments should be adapted + // to the Kalkota timezone + scenarios.add(new Tuple5<>(("2020-01-01T01:00:00.999Z"), 15, "2020-01-01 06:30", "06", "30")); + scenarios.add(new Tuple5<>(("2020-01-01T01:00:01.000Z"), 15, "2020-01-01 06:30", "06", "30")); + scenarios.add(new Tuple5<>(("2020-01-01T01:14:59.000Z"), 15, "2020-01-01 06:30", "06", "30")); + scenarios.add(new Tuple5<>(("2020-01-01T01:15:00.000Z"), 15, "2020-01-01 06:45", "06", "45")); + scenarios.add(new Tuple5<>(("2020-01-01T01:15:01.000Z"), 15, "2020-01-01 06:45", "06", "45")); + scenarios.add(new Tuple5<>(("2020-01-01T01:29:59.000Z"), 15, "2020-01-01 06:45", "06", "45")); + scenarios.add(new Tuple5<>(("2020-01-01T01:30:00.000Z"), 15, "2020-01-01 07:00", "07", "00")); + scenarios.add(new Tuple5<>(("2020-01-01T01:30:01.000Z"), 15, "2020-01-01 07:00", "07", "00")); + scenarios.add(new Tuple5<>(("2020-01-01T01:44:59.000Z"), 15, "2020-01-01 07:00", "07", "00")); + scenarios.add(new Tuple5<>(("2020-01-01T01:45:00.000Z"), 15, "2020-01-01 07:15", "07", "15")); + scenarios.add(new Tuple5<>(("2020-01-01T01:45:01.000Z"), 15, "2020-01-01 07:15", "07", "15")); + scenarios.add(new Tuple5<>(("2020-01-01T01:59:59.000Z"), 15, "2020-01-01 07:15", "07", "15")); + + scenarios.forEach( + scenario -> { + Map configs = new HashMap<>(); + configs.put("header.prefix.name", "wallclock_"); + configs.put("date.format", "yyyy-MM-dd HH:mm"); + configs.put("window.size", scenario.second.toString()); + configs.put("window.type", "minutes"); + configs.put("timezone", "Asia/Kolkata"); + configs.put("field", "_value"); + final InsertRollingFieldTimestampHeaders transformer = + new InsertRollingFieldTimestampHeaders<>(); + transformer.configure(configs); + + final Headers headers = new ConnectHeaders(); + long expected = Instant.parse(scenario.first).toEpochMilli(); + final SourceRecord record = + new SourceRecord( + null, null, "topic", 0, Schema.STRING_SCHEMA, "key", null, expected, 0L, headers); + final SourceRecord transformed = transformer.apply(record); + final String actualDate = + transformed.headers().lastWithName("wallclock_date").value().toString(); + assertEquals(actualDate, scenario.third); + + final String actualHour = + transformed.headers().lastWithName("wallclock_hour").value().toString(); + assertEquals(actualHour, scenario.fourth); + final String actualMinute = + transformed.headers().lastWithName("wallclock_minute").value().toString(); + assertEquals(actualMinute, scenario.fifth); + }); + } + + @Test + public void testRollingWindowEvery15MinutesAndTimezoneIsParis() { + ArrayList> scenarios = new ArrayList<>(); + + scenarios.add(new Tuple5<>(("2020-01-01T01:00:00.999Z"), 15, "2020-01-01 02:00", "02", "00")); + scenarios.add(new Tuple5<>(("2020-01-01T01:00:01.000Z"), 15, "2020-01-01 02:00", "02", "00")); + scenarios.add(new Tuple5<>(("2020-01-01T01:14:59.000Z"), 15, "2020-01-01 02:00", "02", "00")); + scenarios.add(new Tuple5<>(("2020-01-01T01:15:00.000Z"), 15, "2020-01-01 02:15", "02", "15")); + scenarios.add(new Tuple5<>(("2020-01-01T01:15:01.000Z"), 15, "2020-01-01 02:15", "02", "15")); + scenarios.add(new Tuple5<>(("2020-01-01T01:29:59.000Z"), 15, "2020-01-01 02:15", "02", "15")); + scenarios.add(new Tuple5<>(("2020-01-01T01:30:00.000Z"), 15, "2020-01-01 02:30", "02", "30")); + scenarios.add(new Tuple5<>(("2020-01-01T01:30:01.000Z"), 15, "2020-01-01 02:30", "02", "30")); + scenarios.add(new Tuple5<>(("2020-01-01T01:44:59.000Z"), 15, "2020-01-01 02:30", "02", "30")); + scenarios.add(new Tuple5<>(("2020-01-01T01:45:00.000Z"), 15, "2020-01-01 02:45", "02", "45")); + scenarios.add(new Tuple5<>(("2020-01-01T01:45:01.000Z"), 15, "2020-01-01 02:45", "02", "45")); + scenarios.add(new Tuple5<>(("2020-01-01T01:59:59.000Z"), 15, "2020-01-01 02:45", "02", "45")); + + scenarios.forEach( + scenario -> { + Map configs = new HashMap<>(); + configs.put("header.prefix.name", "_"); + configs.put("date.format", "yyyy-MM-dd HH:mm"); + configs.put("window.size", scenario.second.toString()); + configs.put("window.type", "minutes"); + configs.put("timezone", "Europe/Paris"); + configs.put("field", "_value"); + final InsertRollingFieldTimestampHeaders transformer = + new InsertRollingFieldTimestampHeaders<>(); + transformer.configure(configs); + + final Headers headers = new ConnectHeaders(); + long expected = Instant.parse(scenario.first).toEpochMilli(); + final SourceRecord record = + new SourceRecord( + null, + null, + "topic", + 0, + Schema.STRING_SCHEMA, + "key", + Schema.INT64_SCHEMA, + expected, + 0L, + headers); + final SourceRecord transformed = transformer.apply(record); + final String actualDate = transformed.headers().lastWithName("_date").value().toString(); + assertEquals(actualDate, scenario.third); + + final String actualHour = transformed.headers().lastWithName("_hour").value().toString(); + assertEquals(actualHour, scenario.fourth); + final String actualMinute = + transformed.headers().lastWithName("_minute").value().toString(); + assertEquals(actualMinute, scenario.fifth); + }); + } + + @Test + public void testRollingWindowEvery5Minutes() { + ArrayList> scenarios = new ArrayList<>(); + + scenarios.add(new Tuple5<>(("2020-01-01T01:00:00.999Z"), 5, "2020-01-01 01:00", "01", "00")); + scenarios.add(new Tuple5<>(("2020-01-01T01:00:01.000Z"), 5, "2020-01-01 01:00", "01", "00")); + scenarios.add(new Tuple5<>(("2020-01-01T01:04:59.000Z"), 5, "2020-01-01 01:00", "01", "00")); + scenarios.add(new Tuple5<>(("2020-01-01T01:05:00.000Z"), 5, "2020-01-01 01:05", "01", "05")); + scenarios.add(new Tuple5<>(("2020-01-01T01:05:01.000Z"), 5, "2020-01-01 01:05", "01", "05")); + scenarios.add(new Tuple5<>(("2020-01-01T01:09:59.000Z"), 5, "2020-01-01 01:05", "01", "05")); + scenarios.add(new Tuple5<>(("2020-01-01T01:10:00.000Z"), 5, "2020-01-01 01:10", "01", "10")); + scenarios.add(new Tuple5<>(("2020-01-01T01:10:01.000Z"), 5, "2020-01-01 01:10", "01", "10")); + scenarios.add(new Tuple5<>(("2020-01-01T01:14:59.000Z"), 5, "2020-01-01 01:10", "01", "10")); + scenarios.add(new Tuple5<>(("2020-01-01T01:15:00.000Z"), 5, "2020-01-01 01:15", "01", "15")); + scenarios.add(new Tuple5<>(("2020-01-01T01:15:01.000Z"), 5, "2020-01-01 01:15", "01", "15")); + scenarios.add(new Tuple5<>(("2020-01-01T01:19:59.000Z"), 5, "2020-01-01 01:15", "01", "15")); + scenarios.add(new Tuple5<>(("2020-01-01T01:20:00.000Z"), 5, "2020-01-01 01:20", "01", "20")); + scenarios.add(new Tuple5<>(("2020-01-01T01:20:01.000Z"), 5, "2020-01-01 01:20", "01", "20")); + scenarios.add(new Tuple5<>(("2020-01-01T01:24:59.000Z"), 5, "2020-01-01 01:20", "01", "20")); + scenarios.add(new Tuple5<>(("2020-01-01T01:25:00.000Z"), 5, "2020-01-01 01:25", "01", "25")); + scenarios.add(new Tuple5<>(("2020-01-01T01:25:01.000Z"), 5, "2020-01-01 01:25", "01", "25")); + scenarios.add(new Tuple5<>(("2020-01-01T01:29:59.000Z"), 5, "2020-01-01 01:25", "01", "25")); + scenarios.add(new Tuple5<>(("2020-01-01T01:30:00.000Z"), 5, "2020-01-01 01:30", "01", "30")); + scenarios.add(new Tuple5<>(("2020-01-01T01:30:01.000Z"), 5, "2020-01-01 01:30", "01", "30")); + scenarios.add(new Tuple5<>(("2020-01-01T01:34:59.000Z"), 5, "2020-01-01 01:30", "01", "30")); + scenarios.add(new Tuple5<>(("2020-01-01T01:35:00.000Z"), 5, "2020-01-01 01:35", "01", "35")); + scenarios.add(new Tuple5<>(("2020-01-01T01:35:01.000Z"), 5, "2020-01-01 01:35", "01", "35")); + scenarios.add(new Tuple5<>(("2020-01-01T01:39:59.000Z"), 5, "2020-01-01 01:35", "01", "35")); + scenarios.add(new Tuple5<>(("2020-01-01T01:40:00.000Z"), 5, "2020-01-01 01:40", "01", "40")); + scenarios.add(new Tuple5<>(("2020-01-01T01:40:01.000Z"), 5, "2020-01-01 01:40", "01", "40")); + scenarios.add(new Tuple5<>(("2020-01-01T01:44:59.000Z"), 5, "2020-01-01 01:40", "01", "40")); + scenarios.add(new Tuple5<>(("2020-01-01T01:45:00.000Z"), 5, "2020-01-01 01:45", "01", "45")); + scenarios.add(new Tuple5<>(("2020-01-01T01:45:01.000Z"), 5, "2020-01-01 01:45", "01", "45")); + scenarios.add(new Tuple5<>(("2020-01-01T01:49:59.000Z"), 5, "2020-01-01 01:45", "01", "45")); + scenarios.add(new Tuple5<>(("2020-01-01T01:50:00.000Z"), 5, "2020-01-01 01:50", "01", "50")); + scenarios.add(new Tuple5<>(("2020-01-01T01:50:01.000Z"), 5, "2020-01-01 01:50", "01", "50")); + scenarios.add(new Tuple5<>(("2020-01-01T01:54:59.000Z"), 5, "2020-01-01 01:50", "01", "50")); + scenarios.add(new Tuple5<>(("2020-01-01T01:55:00.000Z"), 5, "2020-01-01 01:55", "01", "55")); + scenarios.add(new Tuple5<>(("2020-01-01T01:55:01.000Z"), 5, "2020-01-01 01:55", "01", "55")); + scenarios.add(new Tuple5<>(("2020-01-01T01:59:59.000Z"), 5, "2020-01-01 01:55", "01", "55")); + scenarios.add(new Tuple5<>(("2020-01-01T02:00:00.000Z"), 5, "2020-01-01 02:00", "02", "00")); + + scenarios.forEach( + scenario -> { + Map configs = new HashMap<>(); + configs.put("header.prefix.name", "wallclock_"); + configs.put("date.format", "yyyy-MM-dd HH:mm"); + configs.put("rolling.window.size", scenario.second.toString()); + configs.put("rolling.window.type", "minutes"); + configs.put("field", "_value"); + final InsertRollingFieldTimestampHeaders transformer = + new InsertRollingFieldTimestampHeaders<>(); + transformer.configure(configs); + + final Headers headers = new ConnectHeaders(); + long expected = Instant.parse(scenario.first).toEpochMilli(); + final SourceRecord record = + new SourceRecord( + null, null, "topic", 0, Schema.STRING_SCHEMA, "key", null, expected, 0L, headers); + final SourceRecord transformed = transformer.apply(record); + final String actualDate = + transformed.headers().lastWithName("wallclock_date").value().toString(); + assertEquals(actualDate, scenario.third); + + final String actualHour = + transformed.headers().lastWithName("wallclock_hour").value().toString(); + assertEquals(actualHour, scenario.fourth); + + final String actualMinute = + transformed.headers().lastWithName("wallclock_minute").value().toString(); + assertEquals(actualMinute, scenario.fifth); + }); + } + + @Test + public void testFormattedWithRollingWindowOf1Hour() { + ArrayList> scenarios = new ArrayList<>(); + scenarios.add(new Tuple5<>(("2020-01-01T01:19:59.999Z"), 1, "2020-01-01 01:00", "01", "00")); + scenarios.add(new Tuple5<>(("2020-01-01T01:20:00.000Z"), 1, "2020-01-01 01:00", "01", "00")); + scenarios.add(new Tuple5<>(("2020-01-01T01:20:01.000Z"), 1, "2020-01-01 01:00", "01", "00")); + scenarios.add(new Tuple5<>(("2020-01-01T01:59:59.000Z"), 1, "2020-01-01 01:00", "01", "00")); + scenarios.add(new Tuple5<>(("2020-01-01T02:00:00.000Z"), 1, "2020-01-01 02:00", "02", "00")); + scenarios.add(new Tuple5<>(("2020-01-01T02:15:00.000Z"), 1, "2020-01-01 02:00", "02", "00")); + scenarios.add(new Tuple5<>(("2020-01-01T02:59:59.999Z"), 1, "2020-01-01 02:00", "02", "00")); + + scenarios.forEach( + scenario -> { + Map configs = new HashMap<>(); + configs.put("header.prefix.name", "wallclock_"); + configs.put("date.format", "yyyy-MM-dd HH:mm"); + configs.put("rolling.window.size", scenario.second.toString()); + configs.put("rolling.window.type", "hours"); + configs.put("field", "_value"); + final InsertRollingFieldTimestampHeaders transformer = + new InsertRollingFieldTimestampHeaders<>(); + transformer.configure(configs); + + final Headers headers = new ConnectHeaders(); + long expected = Instant.parse(scenario.first).toEpochMilli(); + final SourceRecord record = + new SourceRecord( + null, null, "topic", 0, Schema.STRING_SCHEMA, "key", null, expected, 0L, headers); + final SourceRecord transformed = transformer.apply(record); + final String actualDate = + transformed.headers().lastWithName("wallclock_date").value().toString(); + assertEquals(actualDate, scenario.third); + + final String actualHour = + transformed.headers().lastWithName("wallclock_hour").value().toString(); + assertEquals(actualHour, scenario.fourth); + }); + } + + @Test + public void testRollingWindowOf3Hours() { + ArrayList> scenarios = new ArrayList<>(); + scenarios.add(new Tuple5<>(("2020-01-01T01:19:59.999Z"), 3, "2020-01-01 00:00", "00", "00")); + scenarios.add(new Tuple5<>(("2020-01-01T01:20:00.000Z"), 3, "2020-01-01 00:00", "00", "00")); + scenarios.add(new Tuple5<>(("2020-01-01T01:20:01.000Z"), 3, "2020-01-01 00:00", "00", "00")); + scenarios.add(new Tuple5<>(("2020-01-01T02:19:59.000Z"), 3, "2020-01-01 00:00", "00", "00")); + scenarios.add(new Tuple5<>(("2020-01-01T02:20:00.000Z"), 3, "2020-01-01 00:00", "00", "00")); + scenarios.add(new Tuple5<>(("2020-01-01T02:20:01.000Z"), 3, "2020-01-01 00:00", "00", "00")); + scenarios.add(new Tuple5<>(("2020-01-01T03:19:59.000Z"), 3, "2020-01-01 03:00", "03", "00")); + scenarios.add(new Tuple5<>(("2020-01-01T03:20:00.000Z"), 3, "2020-01-01 03:00", "03", "00")); + scenarios.add(new Tuple5<>(("2020-01-01T03:20:01.000Z"), 3, "2020-01-01 03:00", "03", "00")); + scenarios.add(new Tuple5<>(("2020-01-01T04:19:59.000Z"), 3, "2020-01-01 03:00", "03", "00")); + scenarios.add(new Tuple5<>(("2020-01-01T04:20:00.000Z"), 3, "2020-01-01 03:00", "03", "00")); + scenarios.add(new Tuple5<>(("2020-01-01T04:20:01.000Z"), 3, "2020-01-01 03:00", "03", "00")); + scenarios.add(new Tuple5<>(("2020-01-01T05:19:59.000Z"), 3, "2020-01-01 03:00", "03", "00")); + scenarios.add(new Tuple5<>(("2020-01-01T05:20:00.000Z"), 3, "2020-01-01 03:00", "03", "00")); + scenarios.add(new Tuple5<>(("2020-01-01T05:59:59.000Z"), 3, "2020-01-01 03:00", "03", "00")); + scenarios.add(new Tuple5<>(("2020-01-01T06:00:00.000Z"), 3, "2020-01-01 06:00", "06", "00")); + + scenarios.forEach( + scenario -> { + Map configs = new HashMap<>(); + configs.put("date.format", "yyyy-MM-dd HH:mm"); + configs.put("rolling.window.size", scenario.second.toString()); + configs.put("rolling.window.type", "hours"); + configs.put("field", "_value"); + final InsertRollingFieldTimestampHeaders transformer = + new InsertRollingFieldTimestampHeaders<>(); + transformer.configure(configs); + + final Headers headers = new ConnectHeaders(); + long expected = Instant.parse(scenario.first).toEpochMilli(); + final SourceRecord record = + new SourceRecord( + null, null, "topic", 0, Schema.STRING_SCHEMA, "key", null, expected, 0L, headers); + final SourceRecord transformed = transformer.apply(record); + final String actualDate = transformed.headers().lastWithName("date").value().toString(); + assertEquals(actualDate, scenario.third); + + final String actualHour = transformed.headers().lastWithName("hour").value().toString(); + assertEquals(actualHour, scenario.fourth); + }); + } + + @Test + public void testRollingWindowEvery12Seconds() { + ArrayList> scenarios = new ArrayList<>(); + scenarios.add( + new Tuple5<>(("2020-01-01T01:19:59.000Z"), 12, "2020-01-01 01:19:48", "19", "48")); + scenarios.add( + new Tuple5<>(("2020-01-01T01:20:00.000Z"), 12, "2020-01-01 01:20:00", "20", "00")); + scenarios.add( + new Tuple5<>(("2020-01-01T01:20:01.000Z"), 12, "2020-01-01 01:20:00", "20", "00")); + scenarios.add( + new Tuple5<>(("2020-01-01T01:20:02.000Z"), 12, "2020-01-01 01:20:00", "20", "00")); + scenarios.add( + new Tuple5<>(("2020-01-01T01:20:11.999Z"), 12, "2020-01-01 01:20:00", "20", "00")); + scenarios.add( + new Tuple5<>(("2020-01-01T01:20:12.000Z"), 12, "2020-01-01 01:20:12", "20", "12")); + scenarios.add( + new Tuple5<>(("2020-01-01T01:20:13.000Z"), 12, "2020-01-01 01:20:12", "20", "12")); + scenarios.add( + new Tuple5<>(("2020-01-01T01:20:23.999Z"), 12, "2020-01-01 01:20:12", "20", "12")); + scenarios.add( + new Tuple5<>(("2020-01-01T01:20:24.000Z"), 12, "2020-01-01 01:20:24", "20", "24")); + scenarios.add( + new Tuple5<>(("2020-01-01T01:20:25.000Z"), 12, "2020-01-01 01:20:24", "20", "24")); + scenarios.add( + new Tuple5<>(("2020-01-01T01:20:35.999Z"), 12, "2020-01-01 01:20:24", "20", "24")); + scenarios.add( + new Tuple5<>(("2020-01-01T01:20:36.000Z"), 12, "2020-01-01 01:20:36", "20", "36")); + scenarios.add( + new Tuple5<>(("2020-01-01T01:20:37.000Z"), 12, "2020-01-01 01:20:36", "20", "36")); + scenarios.add( + new Tuple5<>(("2020-01-01T01:20:47.999Z"), 12, "2020-01-01 01:20:36", "20", "36")); + scenarios.add( + new Tuple5<>(("2020-01-01T01:20:48.000Z"), 12, "2020-01-01 01:20:48", "20", "48")); + scenarios.add( + new Tuple5<>(("2020-01-01T01:20:49.000Z"), 12, "2020-01-01 01:20:48", "20", "48")); + scenarios.add( + new Tuple5<>(("2020-01-01T01:20:59.999Z"), 12, "2020-01-01 01:20:48", "20", "48")); + + scenarios.forEach( + scenario -> { + Map configs = new HashMap<>(); + configs.put("date.format", "yyyy-MM-dd HH:mm:ss"); + configs.put("rolling.window.size", scenario.second.toString()); + configs.put("rolling.window.type", "seconds"); + configs.put("field", "_value"); + final InsertRollingFieldTimestampHeaders transformer = + new InsertRollingFieldTimestampHeaders<>(); + transformer.configure(configs); + + final Headers headers = new ConnectHeaders(); + long expected = Instant.parse(scenario.first).toEpochMilli(); + final SourceRecord record = + new SourceRecord( + null, null, "topic", 0, Schema.STRING_SCHEMA, "key", null, expected, 0L, headers); + final SourceRecord transformed = transformer.apply(record); + final String actualDate = transformed.headers().lastWithName("date").value().toString(); + assertEquals(actualDate, scenario.third); + + final String actualSecond = + transformed.headers().lastWithName("second").value().toString(); + assertEquals(actualSecond, scenario.fifth); + }); + } + + static class Tuple5 { + private final A first; + private final B second; + private final C third; + private final D fourth; + private final E fifth; + + public Tuple5(A first, B second, C third, D fourth, E fifth) { + this.first = first; + this.second = second; + this.third = third; + this.fourth = fourth; + this.fifth = fifth; + } + } +} diff --git a/src/test/java/io/lenses/connect/smt/header/RecordFieldTimestampTest.java b/src/test/java/io/lenses/connect/smt/header/RecordFieldTimestampTest.java new file mode 100644 index 0000000..289633f --- /dev/null +++ b/src/test/java/io/lenses/connect/smt/header/RecordFieldTimestampTest.java @@ -0,0 +1,384 @@ +package io.lenses.connect.smt.header; + +import static java.time.ZoneOffset.UTC; +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.time.Instant; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Locale; +import java.util.Map; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.sink.SinkRecord; +import org.apache.kafka.connect.transforms.util.SimpleConfig; +import org.junit.jupiter.api.Test; + +/** + * Tests for {@link RecordFieldTimestamp} unix.precision configuration tests are not required since + * Connect covers that + */ +class RecordFieldTimestampTest { + + @Test + void nullRecordReturnsNullInstant() { + // generate the test creating an instance of RecordFieldTimestamp + Map props = new HashMap<>(); + props.put("field", "value"); + props.put("format.from.pattern", "yyyy-MM-dd"); + props.put("unix.precision", "milliseconds"); + + SimpleConfig config = + new SimpleConfig(RecordFieldTimestamp.extendConfigDef(new ConfigDef()), props); + RecordFieldTimestamp recordFieldTimestamp = + RecordFieldTimestamp.create(config, UTC, Locale.getDefault()); + Instant actual = + recordFieldTimestamp.getInstant(new SinkRecord("topic", 0, null, null, null, null, 0)); + assert actual == null; + } + + @Test + void fieldKeySetsTheFieldTypeToKeyTest() { + // generate the test creating an instance of RecordFieldTimestamp + String[] fields = new String[] {"_key", "_key.a", "_key.a.b"}; + Arrays.stream(fields) + .forEach( + field -> { + Map props = new HashMap<>(); + props.put("field", field); + props.put("format.from.pattern", "yyyy-MM-dd"); + props.put("unix.precision", "milliseconds"); + + SimpleConfig config = + new SimpleConfig(RecordFieldTimestamp.extendConfigDef(new ConfigDef()), props); + RecordFieldTimestamp recordFieldTimestamp = + RecordFieldTimestamp.create(config, UTC, Locale.getDefault()); + assertEquals( + recordFieldTimestamp.getFieldTypeAndFields().getFieldType(), FieldType.KEY); + }); + } + + @Test + void unixPrecisionDefaultsWhenNotSet() { + // generate the test creating an instance of RecordFieldTimestamp + Map props = new HashMap<>(); + props.put("field", "_key"); + props.put("format.from.pattern", "yyyy-MM-dd"); + + SimpleConfig config = + new SimpleConfig(RecordFieldTimestamp.extendConfigDef(new ConfigDef()), props); + RecordFieldTimestamp recordFieldTimestamp = + RecordFieldTimestamp.create(config, UTC, Locale.getDefault()); + assertEquals(recordFieldTimestamp.getUnixPrecision(), "milliseconds"); + } + + @Test + void fieldKeySetsTheFieldTypeToValueTest() { + String[] fields = new String[] {"_value", "a", "a.b"}; + Arrays.stream(fields) + .forEach( + field -> { + Map props = new HashMap<>(); + props.put("field", field); + props.put("format.from.pattern", "yyyy-MM-dd"); + props.put("unix.precision", "milliseconds"); + + SimpleConfig config = + new SimpleConfig(RecordFieldTimestamp.extendConfigDef(new ConfigDef()), props); + RecordFieldTimestamp recordFieldTimestamp = + RecordFieldTimestamp.create(config, UTC, Locale.getDefault()); + assertEquals( + recordFieldTimestamp.getFieldTypeAndFields().getFieldType(), FieldType.VALUE); + }); + } + + @Test + void fieldTimestampSetTheFieldTypeToTimestamp() { + Map props = new HashMap<>(); + props.put("field", "_timestamp"); + props.put("format.from.pattern", "yyyy-MM-dd"); + props.put("unix.precision", "milliseconds"); + + SimpleConfig config = + new SimpleConfig(RecordFieldTimestamp.extendConfigDef(new ConfigDef()), props); + RecordFieldTimestamp recordFieldTimestamp = + RecordFieldTimestamp.create(config, UTC, Locale.getDefault()); + assertEquals(recordFieldTimestamp.getFieldTypeAndFields().getFieldType(), FieldType.TIMESTAMP); + } + + @Test + void extendConfigDef() { + ConfigDef configDef = RecordFieldTimestamp.extendConfigDef(new ConfigDef()); + assert configDef.configKeys().size() == 3; + assert configDef.configKeys().containsKey("field"); + assert configDef.configKeys().containsKey("format.from.pattern"); + assert configDef.configKeys().containsKey("unix.precision"); + } + + @Test + void whenFieldIsSetToTimestampTheRecordTimestampIsReturned() { + Map props = new HashMap<>(); + props.put("field", "_timestamp"); + props.put("format.from.pattern", "yyyy-MM-dd"); + props.put("unix.precision", "milliseconds"); + + SimpleConfig config = + new SimpleConfig(RecordFieldTimestamp.extendConfigDef(new ConfigDef()), props); + RecordFieldTimestamp recordFieldTimestamp = + RecordFieldTimestamp.create(config, UTC, Locale.getDefault()); + long expectedTs = 123456789L; + Instant actual = + recordFieldTimestamp.getInstant( + new SinkRecord( + "topic", 0, null, null, null, null, 0, expectedTs, TimestampType.LOG_APPEND_TIME)); + assertEquals(actual.toEpochMilli(), expectedTs); + } + + @Test + void whenFieldIsSetToKeyItReturnsTheRecordKeyValueTest() { + Map props = new HashMap<>(); + props.put("field", "_key"); + props.put("format.from.pattern", "yyyy-MM-dd"); + props.put("unix.precision", "milliseconds"); + + SimpleConfig config = + new SimpleConfig(RecordFieldTimestamp.extendConfigDef(new ConfigDef()), props); + RecordFieldTimestamp recordFieldTimestamp = + RecordFieldTimestamp.create(config, UTC, Locale.getDefault()); + long expectedKey = 123456789L; + + SinkRecord[] records = + new SinkRecord[] { + new SinkRecord("topic", 0, null, expectedKey, null, null, 0), + new SinkRecord( + "topic", + 0, + null, + expectedKey, + Schema.INT64_SCHEMA, + null, + 0, + 0L, + TimestampType.LOG_APPEND_TIME) + }; + Arrays.stream(records) + .forEach( + record -> { + Instant actual = recordFieldTimestamp.getInstant(record); + assertEquals(actual.toEpochMilli(), expectedKey); + }); + } + + @Test + void whenFieldIsSetToValueItReturnsTheRecordValueTest() { + Map props = new HashMap<>(); + props.put("field", "_value"); + props.put("format.from.pattern", "yyyy-MM-dd"); + props.put("unix.precision", "milliseconds"); + + SimpleConfig config = + new SimpleConfig(RecordFieldTimestamp.extendConfigDef(new ConfigDef()), props); + RecordFieldTimestamp recordFieldTimestamp = + RecordFieldTimestamp.create(config, UTC, Locale.getDefault()); + long expectedValue = 123456789L; + + SinkRecord[] records = + new SinkRecord[] { + new SinkRecord("topic", 0, null, null, null, expectedValue, 0), + new SinkRecord( + "topic", + 0, + null, + null, + Schema.INT64_SCHEMA, + expectedValue, + 0, + 0L, + TimestampType.LOG_APPEND_TIME) + }; + Arrays.stream(records) + .forEach( + record -> { + Instant actual = recordFieldTimestamp.getInstant(record); + assertEquals(actual.toEpochMilli(), expectedValue); + }); + } + + @Test + void returnsTheKeyStructFieldWhenTheValueIsaLongTest() { + Map props = new HashMap<>(); + props.put("field", "_key.a"); + props.put("format.from.pattern", "yyyy-MM-dd"); + props.put("unix.precision", "milliseconds"); + + SimpleConfig config = + new SimpleConfig(RecordFieldTimestamp.extendConfigDef(new ConfigDef()), props); + RecordFieldTimestamp recordFieldTimestamp = + RecordFieldTimestamp.create(config, UTC, Locale.getDefault()); + long expectedValue = 123456789L; + + Schema schema = + SchemaBuilder.struct() + .field("a", Schema.INT64_SCHEMA) + .field("b", Schema.STRING_SCHEMA) + .build(); + Struct struct = new Struct(schema).put("a", expectedValue).put("b", "value"); + + SinkRecord input = new SinkRecord("topic", 0, schema, struct, null, expectedValue, 0); + + Instant actual = recordFieldTimestamp.getInstant(input); + assertEquals(actual.toEpochMilli(), expectedValue); + } + + @Test + void returnsTheValueStructFieldWhenTheValueIsALongTest() { + Map props = new HashMap<>(); + props.put("field", "a"); + props.put("format.from.pattern", "yyyy-MM-dd"); + props.put("unix.precision", "milliseconds"); + + SimpleConfig config = + new SimpleConfig(RecordFieldTimestamp.extendConfigDef(new ConfigDef()), props); + RecordFieldTimestamp recordFieldTimestamp = + RecordFieldTimestamp.create(config, UTC, Locale.getDefault()); + long expectedValue = 123456789L; + + Schema schema = + SchemaBuilder.struct() + .field("a", Schema.INT64_SCHEMA) + .field("b", Schema.STRING_SCHEMA) + .build(); + Struct struct = new Struct(schema).put("a", expectedValue).put("b", "value"); + + SinkRecord input = new SinkRecord("topic", 0, null, null, schema, struct, 0); + + Instant actual = recordFieldTimestamp.getInstant(input); + assertEquals(actual.toEpochMilli(), expectedValue); + } + + @Test + void returnsAKeyFieldWhenTheFieldValueIsAStringDateTimeRepresentationTest() { + Map props = new HashMap<>(); + props.put("field", "_key.a"); + props.put("format.from.pattern", "yyyy-MM-dd HH:mm:ss"); + props.put("unix.precision", "milliseconds"); + + SimpleConfig config = + new SimpleConfig(RecordFieldTimestamp.extendConfigDef(new ConfigDef()), props); + RecordFieldTimestamp recordFieldTimestamp = + RecordFieldTimestamp.create(config, UTC, Locale.getDefault()); + long expectedValue = 1704067260000L; + String dateTimeValue = "2024-01-01 00:01:00"; + + Schema schema = + SchemaBuilder.struct() + .field("a", Schema.STRING_SCHEMA) + .field("b", Schema.STRING_SCHEMA) + .build(); + Struct struct = new Struct(schema).put("a", dateTimeValue).put("b", "value"); + + SinkRecord input = new SinkRecord("topic", 0, schema, struct, null, expectedValue, 0); + + Instant actual = recordFieldTimestamp.getInstant(input); + assertEquals(actual.toEpochMilli(), expectedValue); + } + + // create the tests where instead of struct we have a Map and schema is null + + @Test + void returnsTheKeyMapFieldWhenTheValueIsaLongTest() { + Map props = new HashMap<>(); + props.put("field", "_key.a"); + props.put("format.from.pattern", "yyyy-MM-dd"); + props.put("unix.precision", "milliseconds"); + + SimpleConfig config = + new SimpleConfig(RecordFieldTimestamp.extendConfigDef(new ConfigDef()), props); + RecordFieldTimestamp recordFieldTimestamp = + RecordFieldTimestamp.create(config, UTC, Locale.getDefault()); + long expectedValue = 123456789L; + + Map map = new HashMap<>(); + map.put("a", expectedValue); + map.put("b", "value"); + + SinkRecord input = new SinkRecord("topic", 0, null, map, null, expectedValue, 0); + + Instant actual = recordFieldTimestamp.getInstant(input); + assertEquals(actual.toEpochMilli(), expectedValue); + } + + @Test + void returnsTheValueMapFieldWhenTheValueIsALongTest() { + Map props = new HashMap<>(); + props.put("field", "a"); + props.put("format.from.pattern", "yyyy-MM-dd"); + props.put("unix.precision", "milliseconds"); + + SimpleConfig config = + new SimpleConfig(RecordFieldTimestamp.extendConfigDef(new ConfigDef()), props); + RecordFieldTimestamp recordFieldTimestamp = + RecordFieldTimestamp.create(config, UTC, Locale.getDefault()); + long expectedValue = 123456789L; + + Map map = new HashMap<>(); + map.put("a", expectedValue); + map.put("b", "value"); + + SinkRecord input = new SinkRecord("topic", 0, null, null, null, map, 0); + + Instant actual = recordFieldTimestamp.getInstant(input); + assertEquals(actual.toEpochMilli(), expectedValue); + } + + @Test + void returnsTheValueMapFieldWhenTheValueIsADateTimeStringTest() { + Map props = new HashMap<>(); + props.put("field", "a"); + props.put("format.from.pattern", "yyyy-MM-dd HH:mm:ss"); + props.put("unix.precision", "milliseconds"); + + SimpleConfig config = + new SimpleConfig(RecordFieldTimestamp.extendConfigDef(new ConfigDef()), props); + RecordFieldTimestamp recordFieldTimestamp = + RecordFieldTimestamp.create(config, UTC, Locale.getDefault()); + long expectedValue = 1704067260000L; + String dateTimeValue = "2024-01-01 00:01:00"; + + Map map = new HashMap<>(); + map.put("a", dateTimeValue); + map.put("b", "value"); + + SinkRecord input = new SinkRecord("topic", 0, null, null, null, map, 0); + + Instant actual = recordFieldTimestamp.getInstant(input); + assertEquals(actual.toEpochMilli(), expectedValue); + } + + @Test + void returnsTheKeyMapFieldWhichContainsTheDateTimeStringTest() { + Map props = new HashMap<>(); + props.put("field", "_key.a"); + props.put("format.from.pattern", "yyyy-MM-dd HH:mm:ss"); + props.put("unix.precision", "milliseconds"); + + SimpleConfig config = + new SimpleConfig(RecordFieldTimestamp.extendConfigDef(new ConfigDef()), props); + RecordFieldTimestamp recordFieldTimestamp = + RecordFieldTimestamp.create(config, UTC, Locale.getDefault()); + long expectedValue = 1704067260000L; + String dateTimeValue = "2024-01-01 00:01:00"; + + Map map = new HashMap<>(); + map.put("a", dateTimeValue); + map.put("b", "value"); + + SinkRecord input = new SinkRecord("topic", 0, null, map, null, expectedValue, 0); + + Instant actual = recordFieldTimestamp.getInstant(input); + assertEquals(actual.toEpochMilli(), expectedValue); + } +} diff --git a/src/test/java/io/lenses/connect/smt/header/UtilsExtractValueTest.java b/src/test/java/io/lenses/connect/smt/header/UtilsExtractValueTest.java new file mode 100644 index 0000000..eeeb16c --- /dev/null +++ b/src/test/java/io/lenses/connect/smt/header/UtilsExtractValueTest.java @@ -0,0 +1,95 @@ +package io.lenses.connect.smt.header; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; + +import java.util.HashMap; +import java.util.Map; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; +import org.junit.jupiter.api.Test; + +public class UtilsExtractValueTest { + + @Test + public void extractValueReturnsCorrectValueFromMap() { + Map map = new HashMap<>(); + map.put("key", "value"); + Object result = Utils.extractValue(map, new String[] {"key"}); + assertEquals("value", result); + } + + @Test + public void extractValueReturnsNullWhenFieldDoesNotExistInMap() { + Map map = new HashMap<>(); + Object result = Utils.extractValue(map, new String[] {"nonexistent"}); + assertNull(result); + } + + @Test + public void extractValueReturnsCorrectValueFromNestedMap() { + Map nestedMap = new HashMap<>(); + nestedMap.put("key", "value"); + Map map = new HashMap<>(); + map.put("nested", nestedMap); + Object result = Utils.extractValue(map, new String[] {"nested", "key"}); + assertEquals("value", result); + } + + @Test + public void extractValueReturnsNullWhenFieldDoesNotExistInNestedMap() { + Map nestedMap = new HashMap<>(); + nestedMap.put("key", "value"); + Map map = new HashMap<>(); + map.put("nested", nestedMap); + Object result = Utils.extractValue(map, new String[] {"nested", "nonexistent"}); + assertNull(result); + } + + @Test + public void extractValueFromAKafkaConnectStruct() { + Schema schema = + SchemaBuilder.struct() + .field("field1", Schema.STRING_SCHEMA) + .field("field2", Schema.INT32_SCHEMA) + .build(); + + Struct struct = new Struct(schema).put("field1", "value1").put("field2", 42); + + Object result = Utils.extractValue(struct, new String[] {"field1"}); + assertEquals("value1", result); + } + + @Test + public void extractValueReturnsNullWhenFieldDoesNotExistInStruct() { + Schema schema = + SchemaBuilder.struct() + .field("field1", Schema.STRING_SCHEMA) + .field("field2", Schema.INT32_SCHEMA) + .build(); + + Struct struct = new Struct(schema).put("field1", "value1").put("field2", 42); + + Object result = Utils.extractValue(struct, new String[] {"nonexistent"}); + assertNull(result); + } + + @Test + public void extractValueReturnsCorrectValueFromNestedStruct() { + Schema nestedSchema = + SchemaBuilder.struct() + .field("field1", Schema.STRING_SCHEMA) + .field("field2", Schema.INT32_SCHEMA) + .build(); + + Schema schema = SchemaBuilder.struct().field("nested", nestedSchema).build(); + + Struct nestedStruct = new Struct(nestedSchema).put("field1", "value1").put("field2", 42); + + Struct struct = new Struct(schema).put("nested", nestedStruct); + + Object result = Utils.extractValue(struct, new String[] {"nested", "field1"}); + assertEquals("value1", result); + } +} diff --git a/src/test/java/io/lenses/connect/smt/header/UtilsIsBlankTest.java b/src/test/java/io/lenses/connect/smt/header/UtilsIsBlankTest.java new file mode 100644 index 0000000..4c7c5c1 --- /dev/null +++ b/src/test/java/io/lenses/connect/smt/header/UtilsIsBlankTest.java @@ -0,0 +1,23 @@ +package io.lenses.connect.smt.header; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import org.junit.jupiter.api.Test; + +public class UtilsIsBlankTest { + @Test + public void isBlankReturnsTrueWhenStringIsNull() { + assertTrue(Utils.isBlank(null)); + } + + @Test + public void isBlankReturnsTrueWhenStringIsEmpty() { + assertTrue(Utils.isBlank("")); + } + + @Test + public void isBlankReturnsFalseWhenStringIsNotBlank() { + assertFalse(Utils.isBlank("not blank")); + } +}