Skip to content

Commit

Permalink
[FLINK-35791][kafka] add database and table info of canal/debezium js…
Browse files Browse the repository at this point in the history
…on format for kafka sink.
  • Loading branch information
lvyanquan committed Jul 9, 2024
1 parent ca1470d commit d855e00
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public CanalJsonSerializationSchema(
@Override
public void open(InitializationContext context) {
this.context = context;
reuseGenericRowData = new GenericRowData(3);
reuseGenericRowData = new GenericRowData(6);
}

@Override
Expand Down Expand Up @@ -132,6 +132,17 @@ public byte[] serialize(Event event) {
}

DataChangeEvent dataChangeEvent = (DataChangeEvent) event;
reuseGenericRowData.setField(
3, StringData.fromString(dataChangeEvent.tableId().getSchemaName()));
reuseGenericRowData.setField(
4, StringData.fromString(dataChangeEvent.tableId().getTableName()));
reuseGenericRowData.setField(
5,
new GenericArrayData(
jsonSerializers.get(dataChangeEvent.tableId()).getSchema().primaryKeys()
.stream()
.map(StringData::fromString)
.toArray()));
try {
switch (dataChangeEvent.op()) {
case INSERT:
Expand Down Expand Up @@ -200,14 +211,19 @@ public byte[] serialize(Event event) {
}
}

/**
* Refer to <a
* href="https://github.com/alibaba/canal/blob/9373429015c0f25318b703833a1d7913676f2aa3/protocol/src/main/java/com/alibaba/otter/canal/protocol/FlatMessage.java#L74">...</a>.
*/
private static RowType createJsonRowType(DataType databaseSchema) {
// Canal JSON contains other information, e.g. "database", "ts"
// but we don't need them
return (RowType)
DataTypes.ROW(
DataTypes.FIELD("old", DataTypes.ARRAY(databaseSchema)),
DataTypes.FIELD("data", DataTypes.ARRAY(databaseSchema)),
DataTypes.FIELD("type", DataTypes.STRING()))
DataTypes.FIELD("type", DataTypes.STRING()),
DataTypes.FIELD("database", DataTypes.STRING()),
DataTypes.FIELD("table", DataTypes.STRING()),
DataTypes.FIELD("pkNames", DataTypes.ARRAY(DataTypes.STRING())))
.getLogicalType();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public DebeziumJsonSerializationSchema(

@Override
public void open(InitializationContext context) {
reuseGenericRowData = new GenericRowData(3);
reuseGenericRowData = new GenericRowData(4);
this.context = context;
}

Expand Down Expand Up @@ -131,6 +131,11 @@ public byte[] serialize(Event event) {
}

DataChangeEvent dataChangeEvent = (DataChangeEvent) event;
reuseGenericRowData.setField(
3,
GenericRowData.of(
StringData.fromString(dataChangeEvent.tableId().getNamespace()),
StringData.fromString(dataChangeEvent.tableId().getTableName())));
try {
switch (dataChangeEvent.op()) {
case INSERT:
Expand Down Expand Up @@ -185,14 +190,21 @@ public byte[] serialize(Event event) {
}
}

/**
* Refer to <a
* href="https://debezium.io/documentation/reference/1.9/connectors/mysql.html">...</a>.
*/
private static RowType createJsonRowType(DataType databaseSchema) {
// Debezium JSON contains some other information, e.g. "source", "ts_ms"
// but we don't need them.
return (RowType)
DataTypes.ROW(
DataTypes.FIELD("before", databaseSchema),
DataTypes.FIELD("after", databaseSchema),
DataTypes.FIELD("op", DataTypes.STRING()))
DataTypes.FIELD("op", DataTypes.STRING()),
DataTypes.FIELD(
"source",
DataTypes.ROW(
DataTypes.FIELD("db", DataTypes.STRING()),
DataTypes.FIELD("table", DataTypes.STRING()))))
.getLogicalType();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public void testSerialize() throws Exception {
}));
JsonNode expected =
mapper.readTree(
"{\"old\":null,\"data\":[{\"col1\":\"1\",\"col2\":\"1\"}],\"type\":\"INSERT\"}");
"{\"old\":null,\"data\":[{\"col1\":\"1\",\"col2\":\"1\"}],\"type\":\"INSERT\",\"database\":\"default_schema\",\"table\":\"table1\",\"pkNames\":[\"col1\"]}");
JsonNode actual = mapper.readTree(serializationSchema.serialize(insertEvent1));
Assertions.assertEquals(expected, actual);

Expand All @@ -97,7 +97,7 @@ public void testSerialize() throws Exception {
}));
expected =
mapper.readTree(
"{\"old\":null,\"data\":[{\"col1\":\"2\",\"col2\":\"2\"}],\"type\":\"INSERT\"}");
"{\"old\":null,\"data\":[{\"col1\":\"2\",\"col2\":\"2\"}],\"type\":\"INSERT\",\"database\":\"default_schema\",\"table\":\"table1\",\"pkNames\":[\"col1\"]}");
actual = mapper.readTree(serializationSchema.serialize(insertEvent2));
Assertions.assertEquals(expected, actual);

Expand All @@ -111,7 +111,7 @@ public void testSerialize() throws Exception {
}));
expected =
mapper.readTree(
"{\"old\":[{\"col1\":\"2\",\"col2\":\"2\"}],\"data\":null,\"type\":\"DELETE\"}");
"{\"old\":[{\"col1\":\"2\",\"col2\":\"2\"}],\"data\":null,\"type\":\"DELETE\",\"database\":\"default_schema\",\"table\":\"table1\",\"pkNames\":[\"col1\"]}");
actual = mapper.readTree(serializationSchema.serialize(deleteEvent));
Assertions.assertEquals(expected, actual);

Expand All @@ -130,7 +130,7 @@ public void testSerialize() throws Exception {
}));
expected =
mapper.readTree(
"{\"old\":[{\"col1\":\"1\",\"col2\":\"1\"}],\"data\":[{\"col1\":\"1\",\"col2\":\"x\"}],\"type\":\"UPDATE\"}");
"{\"old\":[{\"col1\":\"1\",\"col2\":\"1\"}],\"data\":[{\"col1\":\"1\",\"col2\":\"x\"}],\"type\":\"UPDATE\",\"database\":\"default_schema\",\"table\":\"table1\",\"pkNames\":[\"col1\"]}");
actual = mapper.readTree(serializationSchema.serialize(updateEvent));
Assertions.assertEquals(expected, actual);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public void testSerialize() throws Exception {
}));
JsonNode expected =
mapper.readTree(
"{\"before\":null,\"after\":{\"col1\":\"1\",\"col2\":\"1\"},\"op\":\"c\"}");
"{\"before\":null,\"after\":{\"col1\":\"1\",\"col2\":\"1\"},\"op\":\"c\",\"source\":{\"db\":\"default_namespace\",\"table\":\"table1\"}}");
JsonNode actual = mapper.readTree(serializationSchema.serialize(insertEvent1));
Assertions.assertEquals(expected, actual);
DataChangeEvent insertEvent2 =
Expand All @@ -94,7 +94,7 @@ public void testSerialize() throws Exception {
}));
expected =
mapper.readTree(
"{\"before\":null,\"after\":{\"col1\":\"2\",\"col2\":\"2\"},\"op\":\"c\"}");
"{\"before\":null,\"after\":{\"col1\":\"2\",\"col2\":\"2\"},\"op\":\"c\",\"source\":{\"db\":\"default_namespace\",\"table\":\"table1\"}}");
actual = mapper.readTree(serializationSchema.serialize(insertEvent2));
Assertions.assertEquals(expected, actual);
DataChangeEvent deleteEvent =
Expand All @@ -107,7 +107,7 @@ public void testSerialize() throws Exception {
}));
expected =
mapper.readTree(
"{\"before\":{\"col1\":\"2\",\"col2\":\"2\"},\"after\":null,\"op\":\"d\"}");
"{\"before\":{\"col1\":\"2\",\"col2\":\"2\"},\"after\":null,\"op\":\"d\",\"source\":{\"db\":\"default_namespace\",\"table\":\"table1\"}}");
actual = mapper.readTree(serializationSchema.serialize(deleteEvent));
Assertions.assertEquals(expected, actual);
DataChangeEvent updateEvent =
Expand All @@ -125,7 +125,7 @@ public void testSerialize() throws Exception {
}));
expected =
mapper.readTree(
"{\"before\":{\"col1\":\"1\",\"col2\":\"1\"},\"after\":{\"col1\":\"1\",\"col2\":\"x\"},\"op\":\"u\"}");
"{\"before\":{\"col1\":\"1\",\"col2\":\"1\"},\"after\":{\"col1\":\"1\",\"col2\":\"x\"},\"op\":\"u\",\"source\":{\"db\":\"default_namespace\",\"table\":\"table1\"}}");
actual = mapper.readTree(serializationSchema.serialize(updateEvent));
Assertions.assertEquals(expected, actual);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,15 +268,25 @@ void testDebeziumJsonFormat() throws Exception {
List<JsonNode> expected =
Arrays.asList(
mapper.readTree(
"{\"before\":null,\"after\":{\"col1\":\"1\",\"col2\":\"1\"},\"op\":\"c\"}"),
String.format(
"{\"before\":null,\"after\":{\"col1\":\"1\",\"col2\":\"1\"},\"op\":\"c\",\"source\":{\"db\":\"default_namespace\",\"table\":\"%s\"}}",
table1.getTableName())),
mapper.readTree(
"{\"before\":null,\"after\":{\"col1\":\"2\",\"col2\":\"2\"},\"op\":\"c\"}"),
String.format(
"{\"before\":null,\"after\":{\"col1\":\"2\",\"col2\":\"2\"},\"op\":\"c\",\"source\":{\"db\":\"default_namespace\",\"table\":\"%s\"}}",
table1.getTableName())),
mapper.readTree(
"{\"before\":null,\"after\":{\"col1\":\"3\",\"col2\":\"3\"},\"op\":\"c\"}"),
String.format(
"{\"before\":null,\"after\":{\"col1\":\"3\",\"col2\":\"3\"},\"op\":\"c\",\"source\":{\"db\":\"default_namespace\",\"table\":\"%s\"}}",
table1.getTableName())),
mapper.readTree(
"{\"before\":{\"col1\":\"1\",\"newCol3\":\"1\"},\"after\":null,\"op\":\"d\"}"),
String.format(
"{\"before\":{\"col1\":\"1\",\"newCol3\":\"1\"},\"after\":null,\"op\":\"d\",\"source\":{\"db\":\"default_namespace\",\"table\":\"%s\"}}",
table1.getTableName())),
mapper.readTree(
"{\"before\":{\"col1\":\"2\",\"newCol3\":\"\"},\"after\":{\"col1\":\"2\",\"newCol3\":\"x\"},\"op\":\"u\"}"));
String.format(
"{\"before\":{\"col1\":\"2\",\"newCol3\":\"\"},\"after\":{\"col1\":\"2\",\"newCol3\":\"x\"},\"op\":\"u\",\"source\":{\"db\":\"default_namespace\",\"table\":\"%s\"}}",
table1.getTableName())));
assertThat(deserializeValues(collectedRecords)).containsAll(expected);
checkProducerLeak();
}
Expand Down Expand Up @@ -330,15 +340,25 @@ void testCanalJsonFormat() throws Exception {
List<JsonNode> expected =
Arrays.asList(
mapper.readTree(
"{\"old\":null,\"data\":[{\"col1\":\"1\",\"col2\":\"1\"}],\"type\":\"INSERT\"}"),
String.format(
"{\"old\":null,\"data\":[{\"col1\":\"1\",\"col2\":\"1\"}],\"type\":\"INSERT\",\"database\":\"default_schema\",\"table\":\"%s\",\"pkNames\":[\"col1\"]}",
table1.getTableName())),
mapper.readTree(
"{\"old\":null,\"data\":[{\"col1\":\"2\",\"col2\":\"2\"}],\"type\":\"INSERT\"}"),
String.format(
"{\"old\":null,\"data\":[{\"col1\":\"2\",\"col2\":\"2\"}],\"type\":\"INSERT\",\"database\":\"default_schema\",\"table\":\"%s\",\"pkNames\":[\"col1\"]}",
table1.getTableName())),
mapper.readTree(
"{\"old\":null,\"data\":[{\"col1\":\"3\",\"col2\":\"3\"}],\"type\":\"INSERT\"}"),
String.format(
"{\"old\":null,\"data\":[{\"col1\":\"3\",\"col2\":\"3\"}],\"type\":\"INSERT\",\"database\":\"default_schema\",\"table\":\"%s\",\"pkNames\":[\"col1\"]}",
table1.getTableName())),
mapper.readTree(
"{\"old\":[{\"col1\":\"1\",\"newCol3\":\"1\"}],\"data\":null,\"type\":\"DELETE\"}"),
String.format(
"{\"old\":[{\"col1\":\"1\",\"newCol3\":\"1\"}],\"data\":null,\"type\":\"DELETE\",\"database\":\"default_schema\",\"table\":\"%s\",\"pkNames\":[\"col1\"]}",
table1.getTableName())),
mapper.readTree(
"{\"old\":[{\"col1\":\"2\",\"newCol3\":\"\"}],\"data\":[{\"col1\":\"2\",\"newCol3\":\"x\"}],\"type\":\"UPDATE\"}"));
String.format(
"{\"old\":[{\"col1\":\"2\",\"newCol3\":\"\"}],\"data\":[{\"col1\":\"2\",\"newCol3\":\"x\"}],\"type\":\"UPDATE\",\"database\":\"default_schema\",\"table\":\"%s\",\"pkNames\":[\"col1\"]}",
table1.getTableName())));
assertThat(deserializeValues(collectedRecords)).containsAll(expected);
checkProducerLeak();
}
Expand Down Expand Up @@ -416,15 +436,25 @@ void testTopicAndHeaderOption() throws Exception {
List<JsonNode> expected =
Arrays.asList(
mapper.readTree(
"{\"before\":null,\"after\":{\"col1\":\"1\",\"col2\":\"1\"},\"op\":\"c\"}"),
String.format(
"{\"before\":null,\"after\":{\"col1\":\"1\",\"col2\":\"1\"},\"op\":\"c\",\"source\":{\"db\":\"default_namespace\",\"table\":\"%s\"}}",
table1.getTableName())),
mapper.readTree(
"{\"before\":null,\"after\":{\"col1\":\"2\",\"col2\":\"2\"},\"op\":\"c\"}"),
String.format(
"{\"before\":null,\"after\":{\"col1\":\"2\",\"col2\":\"2\"},\"op\":\"c\",\"source\":{\"db\":\"default_namespace\",\"table\":\"%s\"}}",
table1.getTableName())),
mapper.readTree(
"{\"before\":null,\"after\":{\"col1\":\"3\",\"col2\":\"3\"},\"op\":\"c\"}"),
String.format(
"{\"before\":null,\"after\":{\"col1\":\"3\",\"col2\":\"3\"},\"op\":\"c\",\"source\":{\"db\":\"default_namespace\",\"table\":\"%s\"}}",
table1.getTableName())),
mapper.readTree(
"{\"before\":{\"col1\":\"1\",\"newCol3\":\"1\"},\"after\":null,\"op\":\"d\"}"),
String.format(
"{\"before\":{\"col1\":\"1\",\"newCol3\":\"1\"},\"after\":null,\"op\":\"d\",\"source\":{\"db\":\"default_namespace\",\"table\":\"%s\"}}",
table1.getTableName())),
mapper.readTree(
"{\"before\":{\"col1\":\"2\",\"newCol3\":\"\"},\"after\":{\"col1\":\"2\",\"newCol3\":\"x\"},\"op\":\"u\"}"));
String.format(
"{\"before\":{\"col1\":\"2\",\"newCol3\":\"\"},\"after\":{\"col1\":\"2\",\"newCol3\":\"x\"},\"op\":\"u\",\"source\":{\"db\":\"default_namespace\",\"table\":\"%s\"}}",
table1.getTableName())));
assertThat(deserializeValues(collectedRecords)).containsAll(expected);
checkProducerLeak();
}
Expand Down

0 comments on commit d855e00

Please sign in to comment.