From d855e00893a8e3bbc562d7a4415e8eb95407d671 Mon Sep 17 00:00:00 2001 From: kunni Date: Tue, 9 Jul 2024 23:38:53 +0800 Subject: [PATCH] [FLINK-35791][kafka] add database and table info of canal/debezium json format for kafka sink. --- .../canal/CanalJsonSerializationSchema.java | 24 ++++++-- .../DebeziumJsonSerializationSchema.java | 20 +++++-- .../CanalJsonSerializationSchemaTest.java | 8 +-- .../DebeziumJsonSerializationSchemaTest.java | 8 +-- .../kafka/sink/KafkaDataSinkITCase.java | 60 ++++++++++++++----- 5 files changed, 89 insertions(+), 31 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/canal/CanalJsonSerializationSchema.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/canal/CanalJsonSerializationSchema.java index 78548e31b77..1303eaba1fd 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/canal/CanalJsonSerializationSchema.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/canal/CanalJsonSerializationSchema.java @@ -94,7 +94,7 @@ public CanalJsonSerializationSchema( @Override public void open(InitializationContext context) { this.context = context; - reuseGenericRowData = new GenericRowData(3); + reuseGenericRowData = new GenericRowData(6); } @Override @@ -132,6 +132,17 @@ public byte[] serialize(Event event) { } DataChangeEvent dataChangeEvent = (DataChangeEvent) event; + reuseGenericRowData.setField( + 3, StringData.fromString(dataChangeEvent.tableId().getSchemaName())); + reuseGenericRowData.setField( + 4, StringData.fromString(dataChangeEvent.tableId().getTableName())); + reuseGenericRowData.setField( + 5, + new GenericArrayData( + jsonSerializers.get(dataChangeEvent.tableId()).getSchema().primaryKeys() + .stream() + .map(StringData::fromString) + .toArray())); try { switch (dataChangeEvent.op()) { case INSERT: @@ -200,14 +211,19 @@ public byte[] serialize(Event event) { } } + /** + * Refer to .... + */ private static RowType createJsonRowType(DataType databaseSchema) { - // Canal JSON contains other information, e.g. "database", "ts" - // but we don't need them return (RowType) DataTypes.ROW( DataTypes.FIELD("old", DataTypes.ARRAY(databaseSchema)), DataTypes.FIELD("data", DataTypes.ARRAY(databaseSchema)), - DataTypes.FIELD("type", DataTypes.STRING())) + DataTypes.FIELD("type", DataTypes.STRING()), + DataTypes.FIELD("database", DataTypes.STRING()), + DataTypes.FIELD("table", DataTypes.STRING()), + DataTypes.FIELD("pkNames", DataTypes.ARRAY(DataTypes.STRING()))) .getLogicalType(); } } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchema.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchema.java index 2f305ce4258..291c0ff90a8 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchema.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchema.java @@ -92,7 +92,7 @@ public DebeziumJsonSerializationSchema( @Override public void open(InitializationContext context) { - reuseGenericRowData = new GenericRowData(3); + reuseGenericRowData = new GenericRowData(4); this.context = context; } @@ -131,6 +131,11 @@ public byte[] serialize(Event event) { } DataChangeEvent dataChangeEvent = (DataChangeEvent) event; + reuseGenericRowData.setField( + 3, + GenericRowData.of( + StringData.fromString(dataChangeEvent.tableId().getNamespace()), + StringData.fromString(dataChangeEvent.tableId().getTableName()))); try { switch (dataChangeEvent.op()) { case INSERT: @@ -185,14 +190,21 @@ public byte[] serialize(Event event) { } } + /** + * Refer to .... + */ private static RowType createJsonRowType(DataType databaseSchema) { - // Debezium JSON contains some other information, e.g. "source", "ts_ms" - // but we don't need them. return (RowType) DataTypes.ROW( DataTypes.FIELD("before", databaseSchema), DataTypes.FIELD("after", databaseSchema), - DataTypes.FIELD("op", DataTypes.STRING())) + DataTypes.FIELD("op", DataTypes.STRING()), + DataTypes.FIELD( + "source", + DataTypes.ROW( + DataTypes.FIELD("db", DataTypes.STRING()), + DataTypes.FIELD("table", DataTypes.STRING())))) .getLogicalType(); } } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/canal/CanalJsonSerializationSchemaTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/canal/CanalJsonSerializationSchemaTest.java index c6335e6d69c..362354c6ecc 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/canal/CanalJsonSerializationSchemaTest.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/canal/CanalJsonSerializationSchemaTest.java @@ -83,7 +83,7 @@ public void testSerialize() throws Exception { })); JsonNode expected = mapper.readTree( - "{\"old\":null,\"data\":[{\"col1\":\"1\",\"col2\":\"1\"}],\"type\":\"INSERT\"}"); + "{\"old\":null,\"data\":[{\"col1\":\"1\",\"col2\":\"1\"}],\"type\":\"INSERT\",\"database\":\"default_schema\",\"table\":\"table1\",\"pkNames\":[\"col1\"]}"); JsonNode actual = mapper.readTree(serializationSchema.serialize(insertEvent1)); Assertions.assertEquals(expected, actual); @@ -97,7 +97,7 @@ public void testSerialize() throws Exception { })); expected = mapper.readTree( - "{\"old\":null,\"data\":[{\"col1\":\"2\",\"col2\":\"2\"}],\"type\":\"INSERT\"}"); + "{\"old\":null,\"data\":[{\"col1\":\"2\",\"col2\":\"2\"}],\"type\":\"INSERT\",\"database\":\"default_schema\",\"table\":\"table1\",\"pkNames\":[\"col1\"]}"); actual = mapper.readTree(serializationSchema.serialize(insertEvent2)); Assertions.assertEquals(expected, actual); @@ -111,7 +111,7 @@ public void testSerialize() throws Exception { })); expected = mapper.readTree( - "{\"old\":[{\"col1\":\"2\",\"col2\":\"2\"}],\"data\":null,\"type\":\"DELETE\"}"); + "{\"old\":[{\"col1\":\"2\",\"col2\":\"2\"}],\"data\":null,\"type\":\"DELETE\",\"database\":\"default_schema\",\"table\":\"table1\",\"pkNames\":[\"col1\"]}"); actual = mapper.readTree(serializationSchema.serialize(deleteEvent)); Assertions.assertEquals(expected, actual); @@ -130,7 +130,7 @@ public void testSerialize() throws Exception { })); expected = mapper.readTree( - "{\"old\":[{\"col1\":\"1\",\"col2\":\"1\"}],\"data\":[{\"col1\":\"1\",\"col2\":\"x\"}],\"type\":\"UPDATE\"}"); + "{\"old\":[{\"col1\":\"1\",\"col2\":\"1\"}],\"data\":[{\"col1\":\"1\",\"col2\":\"x\"}],\"type\":\"UPDATE\",\"database\":\"default_schema\",\"table\":\"table1\",\"pkNames\":[\"col1\"]}"); actual = mapper.readTree(serializationSchema.serialize(updateEvent)); Assertions.assertEquals(expected, actual); } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchemaTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchemaTest.java index 0be02b9b382..deabc2faec0 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchemaTest.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchemaTest.java @@ -81,7 +81,7 @@ public void testSerialize() throws Exception { })); JsonNode expected = mapper.readTree( - "{\"before\":null,\"after\":{\"col1\":\"1\",\"col2\":\"1\"},\"op\":\"c\"}"); + "{\"before\":null,\"after\":{\"col1\":\"1\",\"col2\":\"1\"},\"op\":\"c\",\"source\":{\"db\":\"default_namespace\",\"table\":\"table1\"}}"); JsonNode actual = mapper.readTree(serializationSchema.serialize(insertEvent1)); Assertions.assertEquals(expected, actual); DataChangeEvent insertEvent2 = @@ -94,7 +94,7 @@ public void testSerialize() throws Exception { })); expected = mapper.readTree( - "{\"before\":null,\"after\":{\"col1\":\"2\",\"col2\":\"2\"},\"op\":\"c\"}"); + "{\"before\":null,\"after\":{\"col1\":\"2\",\"col2\":\"2\"},\"op\":\"c\",\"source\":{\"db\":\"default_namespace\",\"table\":\"table1\"}}"); actual = mapper.readTree(serializationSchema.serialize(insertEvent2)); Assertions.assertEquals(expected, actual); DataChangeEvent deleteEvent = @@ -107,7 +107,7 @@ public void testSerialize() throws Exception { })); expected = mapper.readTree( - "{\"before\":{\"col1\":\"2\",\"col2\":\"2\"},\"after\":null,\"op\":\"d\"}"); + "{\"before\":{\"col1\":\"2\",\"col2\":\"2\"},\"after\":null,\"op\":\"d\",\"source\":{\"db\":\"default_namespace\",\"table\":\"table1\"}}"); actual = mapper.readTree(serializationSchema.serialize(deleteEvent)); Assertions.assertEquals(expected, actual); DataChangeEvent updateEvent = @@ -125,7 +125,7 @@ public void testSerialize() throws Exception { })); expected = mapper.readTree( - "{\"before\":{\"col1\":\"1\",\"col2\":\"1\"},\"after\":{\"col1\":\"1\",\"col2\":\"x\"},\"op\":\"u\"}"); + "{\"before\":{\"col1\":\"1\",\"col2\":\"1\"},\"after\":{\"col1\":\"1\",\"col2\":\"x\"},\"op\":\"u\",\"source\":{\"db\":\"default_namespace\",\"table\":\"table1\"}}"); actual = mapper.readTree(serializationSchema.serialize(updateEvent)); Assertions.assertEquals(expected, actual); } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkITCase.java index 2e265659665..893df8a4dd6 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkITCase.java @@ -268,15 +268,25 @@ void testDebeziumJsonFormat() throws Exception { List expected = Arrays.asList( mapper.readTree( - "{\"before\":null,\"after\":{\"col1\":\"1\",\"col2\":\"1\"},\"op\":\"c\"}"), + String.format( + "{\"before\":null,\"after\":{\"col1\":\"1\",\"col2\":\"1\"},\"op\":\"c\",\"source\":{\"db\":\"default_namespace\",\"table\":\"%s\"}}", + table1.getTableName())), mapper.readTree( - "{\"before\":null,\"after\":{\"col1\":\"2\",\"col2\":\"2\"},\"op\":\"c\"}"), + String.format( + "{\"before\":null,\"after\":{\"col1\":\"2\",\"col2\":\"2\"},\"op\":\"c\",\"source\":{\"db\":\"default_namespace\",\"table\":\"%s\"}}", + table1.getTableName())), mapper.readTree( - "{\"before\":null,\"after\":{\"col1\":\"3\",\"col2\":\"3\"},\"op\":\"c\"}"), + String.format( + "{\"before\":null,\"after\":{\"col1\":\"3\",\"col2\":\"3\"},\"op\":\"c\",\"source\":{\"db\":\"default_namespace\",\"table\":\"%s\"}}", + table1.getTableName())), mapper.readTree( - "{\"before\":{\"col1\":\"1\",\"newCol3\":\"1\"},\"after\":null,\"op\":\"d\"}"), + String.format( + "{\"before\":{\"col1\":\"1\",\"newCol3\":\"1\"},\"after\":null,\"op\":\"d\",\"source\":{\"db\":\"default_namespace\",\"table\":\"%s\"}}", + table1.getTableName())), mapper.readTree( - "{\"before\":{\"col1\":\"2\",\"newCol3\":\"\"},\"after\":{\"col1\":\"2\",\"newCol3\":\"x\"},\"op\":\"u\"}")); + String.format( + "{\"before\":{\"col1\":\"2\",\"newCol3\":\"\"},\"after\":{\"col1\":\"2\",\"newCol3\":\"x\"},\"op\":\"u\",\"source\":{\"db\":\"default_namespace\",\"table\":\"%s\"}}", + table1.getTableName()))); assertThat(deserializeValues(collectedRecords)).containsAll(expected); checkProducerLeak(); } @@ -330,15 +340,25 @@ void testCanalJsonFormat() throws Exception { List expected = Arrays.asList( mapper.readTree( - "{\"old\":null,\"data\":[{\"col1\":\"1\",\"col2\":\"1\"}],\"type\":\"INSERT\"}"), + String.format( + "{\"old\":null,\"data\":[{\"col1\":\"1\",\"col2\":\"1\"}],\"type\":\"INSERT\",\"database\":\"default_schema\",\"table\":\"%s\",\"pkNames\":[\"col1\"]}", + table1.getTableName())), mapper.readTree( - "{\"old\":null,\"data\":[{\"col1\":\"2\",\"col2\":\"2\"}],\"type\":\"INSERT\"}"), + String.format( + "{\"old\":null,\"data\":[{\"col1\":\"2\",\"col2\":\"2\"}],\"type\":\"INSERT\",\"database\":\"default_schema\",\"table\":\"%s\",\"pkNames\":[\"col1\"]}", + table1.getTableName())), mapper.readTree( - "{\"old\":null,\"data\":[{\"col1\":\"3\",\"col2\":\"3\"}],\"type\":\"INSERT\"}"), + String.format( + "{\"old\":null,\"data\":[{\"col1\":\"3\",\"col2\":\"3\"}],\"type\":\"INSERT\",\"database\":\"default_schema\",\"table\":\"%s\",\"pkNames\":[\"col1\"]}", + table1.getTableName())), mapper.readTree( - "{\"old\":[{\"col1\":\"1\",\"newCol3\":\"1\"}],\"data\":null,\"type\":\"DELETE\"}"), + String.format( + "{\"old\":[{\"col1\":\"1\",\"newCol3\":\"1\"}],\"data\":null,\"type\":\"DELETE\",\"database\":\"default_schema\",\"table\":\"%s\",\"pkNames\":[\"col1\"]}", + table1.getTableName())), mapper.readTree( - "{\"old\":[{\"col1\":\"2\",\"newCol3\":\"\"}],\"data\":[{\"col1\":\"2\",\"newCol3\":\"x\"}],\"type\":\"UPDATE\"}")); + String.format( + "{\"old\":[{\"col1\":\"2\",\"newCol3\":\"\"}],\"data\":[{\"col1\":\"2\",\"newCol3\":\"x\"}],\"type\":\"UPDATE\",\"database\":\"default_schema\",\"table\":\"%s\",\"pkNames\":[\"col1\"]}", + table1.getTableName()))); assertThat(deserializeValues(collectedRecords)).containsAll(expected); checkProducerLeak(); } @@ -416,15 +436,25 @@ void testTopicAndHeaderOption() throws Exception { List expected = Arrays.asList( mapper.readTree( - "{\"before\":null,\"after\":{\"col1\":\"1\",\"col2\":\"1\"},\"op\":\"c\"}"), + String.format( + "{\"before\":null,\"after\":{\"col1\":\"1\",\"col2\":\"1\"},\"op\":\"c\",\"source\":{\"db\":\"default_namespace\",\"table\":\"%s\"}}", + table1.getTableName())), mapper.readTree( - "{\"before\":null,\"after\":{\"col1\":\"2\",\"col2\":\"2\"},\"op\":\"c\"}"), + String.format( + "{\"before\":null,\"after\":{\"col1\":\"2\",\"col2\":\"2\"},\"op\":\"c\",\"source\":{\"db\":\"default_namespace\",\"table\":\"%s\"}}", + table1.getTableName())), mapper.readTree( - "{\"before\":null,\"after\":{\"col1\":\"3\",\"col2\":\"3\"},\"op\":\"c\"}"), + String.format( + "{\"before\":null,\"after\":{\"col1\":\"3\",\"col2\":\"3\"},\"op\":\"c\",\"source\":{\"db\":\"default_namespace\",\"table\":\"%s\"}}", + table1.getTableName())), mapper.readTree( - "{\"before\":{\"col1\":\"1\",\"newCol3\":\"1\"},\"after\":null,\"op\":\"d\"}"), + String.format( + "{\"before\":{\"col1\":\"1\",\"newCol3\":\"1\"},\"after\":null,\"op\":\"d\",\"source\":{\"db\":\"default_namespace\",\"table\":\"%s\"}}", + table1.getTableName())), mapper.readTree( - "{\"before\":{\"col1\":\"2\",\"newCol3\":\"\"},\"after\":{\"col1\":\"2\",\"newCol3\":\"x\"},\"op\":\"u\"}")); + String.format( + "{\"before\":{\"col1\":\"2\",\"newCol3\":\"\"},\"after\":{\"col1\":\"2\",\"newCol3\":\"x\"},\"op\":\"u\",\"source\":{\"db\":\"default_namespace\",\"table\":\"%s\"}}", + table1.getTableName()))); assertThat(deserializeValues(collectedRecords)).containsAll(expected); checkProducerLeak(); }