Skip to content

Commit

Permalink
Merge pull request #15 from lensesio/debug/smt-print-properties-speci…
Browse files Browse the repository at this point in the history
…fic-error

SMT should print properties on specific error to assist debugging
  • Loading branch information
davidsloan committed Aug 21, 2024
2 parents 61f1e45 + ded0d80 commit 45951cb
Show file tree
Hide file tree
Showing 6 changed files with 151 additions and 6 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
target/
.idea/
kafka-connect-smt.iml
45 changes: 45 additions & 0 deletions src/main/java/io/lenses/connect/smt/header/PropsFormatter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at: http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
* law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
* for the specific language governing permissions and limitations under the License.
*/
package io.lenses.connect.smt.header;

import org.apache.kafka.connect.transforms.util.SimpleConfig;

/**
* This class is responsible for formatting properties from a SimpleConfig object.
* It converts the properties into a string representation in a json-like format.
*/
public class PropsFormatter {

private final SimpleConfig simpleConfig;

/**
* Constructs a new PropsFormatter with the given SimpleConfig.
*
* @param simpleConfig the SimpleConfig object containing the properties to be formatted
*/
public PropsFormatter(SimpleConfig simpleConfig) {
this.simpleConfig = simpleConfig;
}

/**
* Formats the properties from the SimpleConfig object into a string.
* The properties are represented as key-value pairs in the format: "key: "value"".
* All properties are enclosed in curly braces.
*
* @return a string representation of the properties
*/
public String apply() {
StringBuilder sb = new StringBuilder("{");
simpleConfig.originalsStrings().forEach((k, v) -> sb.append(k).append(": \"").append(v).append("\", "));
sb.delete(sb.length() - 2, sb.length());
return sb.append("}").toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,20 @@ class RecordFieldTimestamp<R extends ConnectRecord<R>> {
private final String unixPrecision;
private final ZoneId timeZone;

private final Optional<PropsFormatter> propsFormatter;

private RecordFieldTimestamp(
FieldTypeUtils.FieldTypeAndFields fieldTypeAndFields,
Optional<DateTimeFormatter> fromPattern,
String unixPrecision,
ZoneId timeZone) {
ZoneId timeZone,
Optional<PropsFormatter> propsFormatter) {

this.fieldTypeAndFields = fieldTypeAndFields;
this.fromPattern = fromPattern;
this.unixPrecision = unixPrecision;
this.timeZone = timeZone;
this.propsFormatter = propsFormatter;
}

public FieldTypeUtils.FieldTypeAndFields getFieldTypeAndFields() {
Expand Down Expand Up @@ -85,7 +89,7 @@ public Instant getInstant(R r) {
return null;
}
if (fieldTypeAndFields.getFields().length == 0) {
return convertToTimestamp(value, unixPrecision, fromPattern, timeZone);
return convertToTimestamp(value, unixPrecision, fromPattern, timeZone, propsFormatter);
}
final Schema schema = operatingSchema(r);
Object extractedValue;
Expand All @@ -110,7 +114,7 @@ public Instant getInstant(R r) {
+ " instead.");
}

return convertToTimestamp(extractedValue, unixPrecision, fromPattern, timeZone);
return convertToTimestamp(extractedValue, unixPrecision, fromPattern, timeZone, propsFormatter);
}
}

Expand Down Expand Up @@ -155,7 +159,7 @@ public static <R extends ConnectRecord<R>> RecordFieldTimestamp<R> create(
InsertTimestampHeaders.createDateTimeFormatter(
pattern, FORMAT_FROM_CONFIG, locale));

return new RecordFieldTimestamp<>(fieldTypeAndFields, fromPattern, unixPrecision, zoneId);
return new RecordFieldTimestamp<>(fieldTypeAndFields, fromPattern, unixPrecision, zoneId, Optional.of(new PropsFormatter(config)));
}

public static ConfigDef extendConfigDef(ConfigDef from) {
Expand Down
9 changes: 7 additions & 2 deletions src/main/java/io/lenses/connect/smt/header/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;

import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Struct;
Expand All @@ -33,7 +34,11 @@
class Utils {

static Instant convertToTimestamp(
Object value, String unixPrecision, Optional<DateTimeFormatter> fromPattern, ZoneId zoneId) {
Object value, String unixPrecision, Optional<DateTimeFormatter> fromPattern, ZoneId zoneId) {
return convertToTimestamp(value, unixPrecision, fromPattern, zoneId, Optional.empty());
}
static Instant convertToTimestamp(
Object value, String unixPrecision, Optional<DateTimeFormatter> fromPattern, ZoneId zoneId, Optional<PropsFormatter> propsFormatter) {
if (value == null) {
return Instant.now();
}
Expand Down Expand Up @@ -74,7 +79,7 @@ static Instant convertToTimestamp(
try {
return Instant.ofEpochMilli(Long.parseLong((String) value));
} catch (NumberFormatException e) {
throw new DataException("Expected a long, but found " + value);
throw new DataException("Expected a long, but found " + value + ". Props: " + propsFormatter.map(PropsFormatter::apply).orElse("(No props formatter)"));
}
});
}
Expand Down
36 changes: 36 additions & 0 deletions src/test/java/io/lenses/connect/smt/header/PropsFormatterTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at: http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
* law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
* for the specific language governing permissions and limitations under the License.
*/
package io.lenses.connect.smt.header;

import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.transforms.util.SimpleConfig;
import org.junit.jupiter.api.Test;

import java.util.Map;

import static org.junit.jupiter.api.Assertions.*;

class PropsFormatterTest {

@Test
void singleEntry() {
Map<String, Object> props = Map.of("something", "else");
PropsFormatter writer = new PropsFormatter(new SimpleConfig(new ConfigDef(), props));
assertEquals("{something: \"else\"}", writer.apply());
}

@Test
void multipleEntries() {
Map<String, Object> props = Map.of("first", "item", "something", "else");
PropsFormatter writer = new PropsFormatter(new SimpleConfig(new ConfigDef(), props));
assertEquals("{first: \"item\", something: \"else\"}", writer.apply());
}
}
54 changes: 54 additions & 0 deletions src/test/java/io/lenses/connect/smt/header/UtilsTimestampTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at: http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
* law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
* for the specific language governing permissions and limitations under the License.
*/
package io.lenses.connect.smt.header;

import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.transforms.util.SimpleConfig;
import org.junit.jupiter.api.Test;

import java.time.ZoneId;
import java.util.Map;
import java.util.Optional;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;

class UtilsTimestampTest {

public static final String TIMESTAMP = "2024-08-16T04:30:00.232Z";
public static final String PRECISION = "milliseconds";

@Test
void convertToTimestampShouldWritePropsOnFailure() {
PropsFormatter propsFormatter = new PropsFormatter(new SimpleConfig(new ConfigDef(), Map.of("some", "props", "for", "2" ) ));
DataException dataException = assertThrows(DataException.class, () -> Utils.convertToTimestamp(
TIMESTAMP,
PRECISION,
Optional.empty(),
ZoneId.of("UTC"),
Optional.of(propsFormatter)
));
assertEquals("Expected a long, but found 2024-08-16T04:30:00.232Z. Props: {some: \"props\", for: \"2\"}",dataException.getMessage());
}

@Test
void convertToTimestampShouldNotFailWhenNoPropsFormatter() {
DataException dataException = assertThrows(DataException.class, () -> Utils.convertToTimestamp(
TIMESTAMP,
PRECISION,
Optional.empty(),
ZoneId.of("UTC")
));
assertEquals("Expected a long, but found 2024-08-16T04:30:00.232Z. Props: (No props formatter)",dataException.getMessage());
}

}

0 comments on commit 45951cb

Please sign in to comment.