Skip to content

Commit

Permalink
Support record field timestamp
Browse files Browse the repository at this point in the history
This PR introduces a new SMT which allows picking a field from the payload and use it as source for introducing the headers for time-based object key values in Lenses datalakes connectors
  • Loading branch information
stheppi committed Mar 28, 2024
1 parent 9a5b46a commit 4dced00
Show file tree
Hide file tree
Showing 16 changed files with 1,655 additions and 55 deletions.
7 changes: 7 additions & 0 deletions src/main/java/io/lenses/connect/smt/header/FieldType.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package io.lenses.connect.smt.header;

enum FieldType {
KEY,
VALUE,
TIMESTAMP
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package io.lenses.connect.smt.header;

public class FieldTypeConstants {
public static final String KEY_FIELD = "_key";
public static final String VALUE_FIELD = "_value";
public static final String TIMESTAMP_FIELD = "_timestamp";
}
54 changes: 54 additions & 0 deletions src/main/java/io/lenses/connect/smt/header/FieldTypeUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package io.lenses.connect.smt.header;

import java.util.Arrays;
import org.apache.kafka.common.config.ConfigException;

class FieldTypeUtils {
public static FieldTypeAndFields extractFieldTypeAndFields(String from) {
FieldType fieldType;
String[] fields = from.split("\\.");
if (fields.length > 0) {
if (fields[0].equalsIgnoreCase(FieldTypeConstants.KEY_FIELD)) {
fieldType = FieldType.KEY;
// drop the first element
fields = Arrays.copyOfRange(fields, 1, fields.length);
} else if (fields[0].equalsIgnoreCase(FieldTypeConstants.TIMESTAMP_FIELD)) {
fieldType = FieldType.TIMESTAMP;
// if fields length is > 1, then it is an error since the timestamp is a primitive
if (fields.length > 1) {
throw new ConfigException(
"When using the record timestamp field, the field path should only be '_timestamp'.");
}
fields = new String[0];
} else {
fieldType = FieldType.VALUE;
if (fields[0].equalsIgnoreCase(FieldTypeConstants.VALUE_FIELD)) {
// drop the first element
fields = Arrays.copyOfRange(fields, 1, fields.length);
}
}
} else {
fieldType = FieldType.VALUE;
}

return new FieldTypeAndFields(fieldType, fields);
}

static class FieldTypeAndFields {
private final FieldType fieldType;
private final String[] fields;

public FieldTypeAndFields(FieldType fieldType, String[] fields) {
this.fieldType = fieldType;
this.fields = fields;
}

public FieldType getFieldType() {
return fieldType;
}

public String[] getFields() {
return fields;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at: http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
* law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
* for the specific language governing permissions and limitations under the License.
*/
package io.lenses.connect.smt.header;

import java.time.Instant;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.transforms.util.SimpleConfig;

/**
* A transformer which takes a record filed of type timestamp and inserts a header year, month, day,
* hour, minute, second and date.
*
* @param <R> the record type
*/
public class InsertFieldTimestampHeaders<R extends ConnectRecord<R>>
extends InsertTimestampHeaders<R> {

public static ConfigDef CONFIG_DEF =
RecordFieldTimestamp.extendConfigDef(InsertTimestampHeaders.CONFIG_DEF);
private RecordFieldTimestamp<R> fieldTimestamp;

public InsertFieldTimestampHeaders() {
super(InsertRecordTimestampHeaders.CONFIG_DEF);
}

@Override
protected Instant getInstant(R r) {
return fieldTimestamp.getInstant(r);
}

@Override
public ConfigDef config() {
return CONFIG_DEF;
}

@Override
protected void configureInternal(SimpleConfig config) {

super.configureInternal(config);
fieldTimestamp = RecordFieldTimestamp.create(config, timeZone, locale);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at: http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
* law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
* for the specific language governing permissions and limitations under the License.
*/
package io.lenses.connect.smt.header;

import java.time.Instant;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.transforms.util.SimpleConfig;

/**
* A transformer which takes a record filed of type timestamp and inserts a header year, month, day,
* hour, minute, second and date.
*
* @param <R> the record type
*/
public class InsertRollingFieldTimestampHeaders<R extends ConnectRecord<R>>
extends InsertRollingTimestampHeaders<R> {

private RecordFieldTimestamp<R> fieldTimestamp;

public static ConfigDef CONFIG_DEF =
RecordFieldTimestamp.extendConfigDef(InsertRollingTimestampHeaders.CONFIG_DEF);

public InsertRollingFieldTimestampHeaders() {
super();
}

@Override
protected Instant getInstantInternal(R r) {
return fieldTimestamp.getInstant(r);
}

@Override
public ConfigDef config() {
return CONFIG_DEF;
}

@Override
protected void configureInternal(SimpleConfig config) {
super.configureInternal(config);
fieldTimestamp = RecordFieldTimestamp.create(config, timeZone, locale);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.transforms.Transformation;
import org.apache.kafka.connect.transforms.util.SimpleConfig;

Expand All @@ -46,10 +47,9 @@ abstract class InsertTimestampHeaders<R extends ConnectRecord<R>> implements Tra
private String minuteHeader;
private String secondHeader;
private String dateHeader;
protected Locale locale;
protected ZoneId timeZone = ZoneId.of("UTC");

private ZoneId timeZone = ZoneId.of("UTC");

private final ConfigDef configDef;
public static ConfigDef CONFIG_DEF =
new ConfigDef()
.define(
Expand Down Expand Up @@ -153,9 +153,7 @@ interface ConfigName {
String DEFAULT_LOCALE = "en";
}

protected InsertTimestampHeaders(ConfigDef configDef) {
this.configDef = configDef;
}
protected InsertTimestampHeaders(ConfigDef configDef) {}

protected abstract Instant getInstant(R r);

Expand All @@ -167,6 +165,10 @@ public R apply(R r) {

// instant from epoch
final Instant now = getInstant(r);
if (now == null) {
throw new DataException(
"The timestamp value could not be extracted or is null. Please check the configuration and the record structure.");
}
final ZonedDateTime zonedDateTime = ZonedDateTime.ofInstant(now, timeZone);
r.headers().addString(yearHeader, yearFormat.format(zonedDateTime));
r.headers().addString(monthHeader, monthFormat.format(zonedDateTime));
Expand Down Expand Up @@ -201,7 +203,7 @@ public void configure(Map<String, ?> props) {
minuteHeader = prefixName + "minute";
secondHeader = prefixName + "second";
dateHeader = prefixName + "date";
Locale locale = new Locale(config.getString(ConfigName.LOCALE));
locale = new Locale(config.getString(ConfigName.LOCALE));
yearFormat =
createDateTimeFormatter(
config.getString(ConfigName.YEAR_FORMAT_CONFIG),
Expand Down Expand Up @@ -247,7 +249,7 @@ public void configure(Map<String, ?> props) {
configureInternal(config);
}

private static DateTimeFormatter createDateTimeFormatter(
public static DateTimeFormatter createDateTimeFormatter(
String patternConfig, String configName, Locale locale) {
try {
return DateTimeFormatter.ofPattern(patternConfig, locale);
Expand Down
Loading

0 comments on commit 4dced00

Please sign in to comment.