Skip to content

Commit

Permalink
[FLINK-35791][kafka] address comments.
Browse files Browse the repository at this point in the history
  • Loading branch information
lvyanquan committed Aug 7, 2024
1 parent 4cd26f8 commit 263529c
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public void testSerialize() throws Exception {
}));
JsonNode expected =
mapper.readTree(
"{\"before\":null,\"after\":{\"col1\":\"1\",\"col2\":\"1\"},\"op\":\"c\",\"source\":{\"db\":\"default_namespace\",\"table\":\"table1\"}}");
"{\"before\":null,\"after\":{\"col1\":\"1\",\"col2\":\"1\"},\"op\":\"c\",\"source\":{\"db\":\"default_schema\",\"table\":\"table1\"}}");
JsonNode actual = mapper.readTree(serializationSchema.serialize(insertEvent1));
Assertions.assertEquals(expected, actual);
DataChangeEvent insertEvent2 =
Expand All @@ -94,7 +94,7 @@ public void testSerialize() throws Exception {
}));
expected =
mapper.readTree(
"{\"before\":null,\"after\":{\"col1\":\"2\",\"col2\":\"2\"},\"op\":\"c\",\"source\":{\"db\":\"default_namespace\",\"table\":\"table1\"}}");
"{\"before\":null,\"after\":{\"col1\":\"2\",\"col2\":\"2\"},\"op\":\"c\",\"source\":{\"db\":\"default_schema\",\"table\":\"table1\"}}");
actual = mapper.readTree(serializationSchema.serialize(insertEvent2));
Assertions.assertEquals(expected, actual);
DataChangeEvent deleteEvent =
Expand All @@ -107,7 +107,7 @@ public void testSerialize() throws Exception {
}));
expected =
mapper.readTree(
"{\"before\":{\"col1\":\"2\",\"col2\":\"2\"},\"after\":null,\"op\":\"d\",\"source\":{\"db\":\"default_namespace\",\"table\":\"table1\"}}");
"{\"before\":{\"col1\":\"2\",\"col2\":\"2\"},\"after\":null,\"op\":\"d\",\"source\":{\"db\":\"default_schema\",\"table\":\"table1\"}}");
actual = mapper.readTree(serializationSchema.serialize(deleteEvent));
Assertions.assertEquals(expected, actual);
DataChangeEvent updateEvent =
Expand All @@ -125,7 +125,7 @@ public void testSerialize() throws Exception {
}));
expected =
mapper.readTree(
"{\"before\":{\"col1\":\"1\",\"col2\":\"1\"},\"after\":{\"col1\":\"1\",\"col2\":\"x\"},\"op\":\"u\",\"source\":{\"db\":\"default_namespace\",\"table\":\"table1\"}}");
"{\"before\":{\"col1\":\"1\",\"col2\":\"1\"},\"after\":{\"col1\":\"1\",\"col2\":\"x\"},\"op\":\"u\",\"source\":{\"db\":\"default_schema\",\"table\":\"table1\"}}");
actual = mapper.readTree(serializationSchema.serialize(updateEvent));
Assertions.assertEquals(expected, actual);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,23 +269,23 @@ void testDebeziumJsonFormat() throws Exception {
Arrays.asList(
mapper.readTree(
String.format(
"{\"before\":null,\"after\":{\"col1\":\"1\",\"col2\":\"1\"},\"op\":\"c\",\"source\":{\"db\":\"default_namespace\",\"table\":\"%s\"}}",
"{\"before\":null,\"after\":{\"col1\":\"1\",\"col2\":\"1\"},\"op\":\"c\",\"source\":{\"db\":\"default_schema\",\"table\":\"%s\"}}",
table1.getTableName())),
mapper.readTree(
String.format(
"{\"before\":null,\"after\":{\"col1\":\"2\",\"col2\":\"2\"},\"op\":\"c\",\"source\":{\"db\":\"default_namespace\",\"table\":\"%s\"}}",
"{\"before\":null,\"after\":{\"col1\":\"2\",\"col2\":\"2\"},\"op\":\"c\",\"source\":{\"db\":\"default_schema\",\"table\":\"%s\"}}",
table1.getTableName())),
mapper.readTree(
String.format(
"{\"before\":null,\"after\":{\"col1\":\"3\",\"col2\":\"3\"},\"op\":\"c\",\"source\":{\"db\":\"default_namespace\",\"table\":\"%s\"}}",
"{\"before\":null,\"after\":{\"col1\":\"3\",\"col2\":\"3\"},\"op\":\"c\",\"source\":{\"db\":\"default_schema\",\"table\":\"%s\"}}",
table1.getTableName())),
mapper.readTree(
String.format(
"{\"before\":{\"col1\":\"1\",\"newCol3\":\"1\"},\"after\":null,\"op\":\"d\",\"source\":{\"db\":\"default_namespace\",\"table\":\"%s\"}}",
"{\"before\":{\"col1\":\"1\",\"newCol3\":\"1\"},\"after\":null,\"op\":\"d\",\"source\":{\"db\":\"default_schema\",\"table\":\"%s\"}}",
table1.getTableName())),
mapper.readTree(
String.format(
"{\"before\":{\"col1\":\"2\",\"newCol3\":\"\"},\"after\":{\"col1\":\"2\",\"newCol3\":\"x\"},\"op\":\"u\",\"source\":{\"db\":\"default_namespace\",\"table\":\"%s\"}}",
"{\"before\":{\"col1\":\"2\",\"newCol3\":\"\"},\"after\":{\"col1\":\"2\",\"newCol3\":\"x\"},\"op\":\"u\",\"source\":{\"db\":\"default_schema\",\"table\":\"%s\"}}",
table1.getTableName())));
assertThat(deserializeValues(collectedRecords)).containsAll(expected);
checkProducerLeak();
Expand Down Expand Up @@ -437,23 +437,23 @@ void testTopicAndHeaderOption() throws Exception {
Arrays.asList(
mapper.readTree(
String.format(
"{\"before\":null,\"after\":{\"col1\":\"1\",\"col2\":\"1\"},\"op\":\"c\",\"source\":{\"db\":\"default_namespace\",\"table\":\"%s\"}}",
"{\"before\":null,\"after\":{\"col1\":\"1\",\"col2\":\"1\"},\"op\":\"c\",\"source\":{\"db\":\"default_schema\",\"table\":\"%s\"}}",
table1.getTableName())),
mapper.readTree(
String.format(
"{\"before\":null,\"after\":{\"col1\":\"2\",\"col2\":\"2\"},\"op\":\"c\",\"source\":{\"db\":\"default_namespace\",\"table\":\"%s\"}}",
"{\"before\":null,\"after\":{\"col1\":\"2\",\"col2\":\"2\"},\"op\":\"c\",\"source\":{\"db\":\"default_schema\",\"table\":\"%s\"}}",
table1.getTableName())),
mapper.readTree(
String.format(
"{\"before\":null,\"after\":{\"col1\":\"3\",\"col2\":\"3\"},\"op\":\"c\",\"source\":{\"db\":\"default_namespace\",\"table\":\"%s\"}}",
"{\"before\":null,\"after\":{\"col1\":\"3\",\"col2\":\"3\"},\"op\":\"c\",\"source\":{\"db\":\"default_schema\",\"table\":\"%s\"}}",
table1.getTableName())),
mapper.readTree(
String.format(
"{\"before\":{\"col1\":\"1\",\"newCol3\":\"1\"},\"after\":null,\"op\":\"d\",\"source\":{\"db\":\"default_namespace\",\"table\":\"%s\"}}",
"{\"before\":{\"col1\":\"1\",\"newCol3\":\"1\"},\"after\":null,\"op\":\"d\",\"source\":{\"db\":\"default_schema\",\"table\":\"%s\"}}",
table1.getTableName())),
mapper.readTree(
String.format(
"{\"before\":{\"col1\":\"2\",\"newCol3\":\"\"},\"after\":{\"col1\":\"2\",\"newCol3\":\"x\"},\"op\":\"u\",\"source\":{\"db\":\"default_namespace\",\"table\":\"%s\"}}",
"{\"before\":{\"col1\":\"2\",\"newCol3\":\"\"},\"after\":{\"col1\":\"2\",\"newCol3\":\"x\"},\"op\":\"u\",\"source\":{\"db\":\"default_schema\",\"table\":\"%s\"}}",
table1.getTableName())));
assertThat(deserializeValues(collectedRecords)).containsAll(expected);
checkProducerLeak();
Expand Down

0 comments on commit 263529c

Please sign in to comment.