From 263529c66e8f2bf93ad55ebf2d07d48b2d112517 Mon Sep 17 00:00:00 2001 From: kunni Date: Mon, 29 Jul 2024 16:36:17 +0800 Subject: [PATCH] [FLINK-35791][kafka] address comments. --- .../DebeziumJsonSerializationSchemaTest.java | 8 ++++---- .../kafka/sink/KafkaDataSinkITCase.java | 20 +++++++++---------- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchemaTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchemaTest.java index deabc2faec..f2b36e4efe 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchemaTest.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchemaTest.java @@ -81,7 +81,7 @@ public void testSerialize() throws Exception { })); JsonNode expected = mapper.readTree( - "{\"before\":null,\"after\":{\"col1\":\"1\",\"col2\":\"1\"},\"op\":\"c\",\"source\":{\"db\":\"default_namespace\",\"table\":\"table1\"}}"); + "{\"before\":null,\"after\":{\"col1\":\"1\",\"col2\":\"1\"},\"op\":\"c\",\"source\":{\"db\":\"default_schema\",\"table\":\"table1\"}}"); JsonNode actual = mapper.readTree(serializationSchema.serialize(insertEvent1)); Assertions.assertEquals(expected, actual); DataChangeEvent insertEvent2 = @@ -94,7 +94,7 @@ public void testSerialize() throws Exception { })); expected = mapper.readTree( - "{\"before\":null,\"after\":{\"col1\":\"2\",\"col2\":\"2\"},\"op\":\"c\",\"source\":{\"db\":\"default_namespace\",\"table\":\"table1\"}}"); + "{\"before\":null,\"after\":{\"col1\":\"2\",\"col2\":\"2\"},\"op\":\"c\",\"source\":{\"db\":\"default_schema\",\"table\":\"table1\"}}"); actual = mapper.readTree(serializationSchema.serialize(insertEvent2)); Assertions.assertEquals(expected, actual); DataChangeEvent deleteEvent = @@ -107,7 +107,7 @@ public void testSerialize() throws Exception { })); expected = mapper.readTree( - "{\"before\":{\"col1\":\"2\",\"col2\":\"2\"},\"after\":null,\"op\":\"d\",\"source\":{\"db\":\"default_namespace\",\"table\":\"table1\"}}"); + "{\"before\":{\"col1\":\"2\",\"col2\":\"2\"},\"after\":null,\"op\":\"d\",\"source\":{\"db\":\"default_schema\",\"table\":\"table1\"}}"); actual = mapper.readTree(serializationSchema.serialize(deleteEvent)); Assertions.assertEquals(expected, actual); DataChangeEvent updateEvent = @@ -125,7 +125,7 @@ public void testSerialize() throws Exception { })); expected = mapper.readTree( - "{\"before\":{\"col1\":\"1\",\"col2\":\"1\"},\"after\":{\"col1\":\"1\",\"col2\":\"x\"},\"op\":\"u\",\"source\":{\"db\":\"default_namespace\",\"table\":\"table1\"}}"); + "{\"before\":{\"col1\":\"1\",\"col2\":\"1\"},\"after\":{\"col1\":\"1\",\"col2\":\"x\"},\"op\":\"u\",\"source\":{\"db\":\"default_schema\",\"table\":\"table1\"}}"); actual = mapper.readTree(serializationSchema.serialize(updateEvent)); Assertions.assertEquals(expected, actual); } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkITCase.java index 893df8a4dd..1ef10603b7 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkITCase.java @@ -269,23 +269,23 @@ void testDebeziumJsonFormat() throws Exception { Arrays.asList( mapper.readTree( String.format( - "{\"before\":null,\"after\":{\"col1\":\"1\",\"col2\":\"1\"},\"op\":\"c\",\"source\":{\"db\":\"default_namespace\",\"table\":\"%s\"}}", + "{\"before\":null,\"after\":{\"col1\":\"1\",\"col2\":\"1\"},\"op\":\"c\",\"source\":{\"db\":\"default_schema\",\"table\":\"%s\"}}", table1.getTableName())), mapper.readTree( String.format( - "{\"before\":null,\"after\":{\"col1\":\"2\",\"col2\":\"2\"},\"op\":\"c\",\"source\":{\"db\":\"default_namespace\",\"table\":\"%s\"}}", + "{\"before\":null,\"after\":{\"col1\":\"2\",\"col2\":\"2\"},\"op\":\"c\",\"source\":{\"db\":\"default_schema\",\"table\":\"%s\"}}", table1.getTableName())), mapper.readTree( String.format( - "{\"before\":null,\"after\":{\"col1\":\"3\",\"col2\":\"3\"},\"op\":\"c\",\"source\":{\"db\":\"default_namespace\",\"table\":\"%s\"}}", + "{\"before\":null,\"after\":{\"col1\":\"3\",\"col2\":\"3\"},\"op\":\"c\",\"source\":{\"db\":\"default_schema\",\"table\":\"%s\"}}", table1.getTableName())), mapper.readTree( String.format( - "{\"before\":{\"col1\":\"1\",\"newCol3\":\"1\"},\"after\":null,\"op\":\"d\",\"source\":{\"db\":\"default_namespace\",\"table\":\"%s\"}}", + "{\"before\":{\"col1\":\"1\",\"newCol3\":\"1\"},\"after\":null,\"op\":\"d\",\"source\":{\"db\":\"default_schema\",\"table\":\"%s\"}}", table1.getTableName())), mapper.readTree( String.format( - "{\"before\":{\"col1\":\"2\",\"newCol3\":\"\"},\"after\":{\"col1\":\"2\",\"newCol3\":\"x\"},\"op\":\"u\",\"source\":{\"db\":\"default_namespace\",\"table\":\"%s\"}}", + "{\"before\":{\"col1\":\"2\",\"newCol3\":\"\"},\"after\":{\"col1\":\"2\",\"newCol3\":\"x\"},\"op\":\"u\",\"source\":{\"db\":\"default_schema\",\"table\":\"%s\"}}", table1.getTableName()))); assertThat(deserializeValues(collectedRecords)).containsAll(expected); checkProducerLeak(); @@ -437,23 +437,23 @@ void testTopicAndHeaderOption() throws Exception { Arrays.asList( mapper.readTree( String.format( - "{\"before\":null,\"after\":{\"col1\":\"1\",\"col2\":\"1\"},\"op\":\"c\",\"source\":{\"db\":\"default_namespace\",\"table\":\"%s\"}}", + "{\"before\":null,\"after\":{\"col1\":\"1\",\"col2\":\"1\"},\"op\":\"c\",\"source\":{\"db\":\"default_schema\",\"table\":\"%s\"}}", table1.getTableName())), mapper.readTree( String.format( - "{\"before\":null,\"after\":{\"col1\":\"2\",\"col2\":\"2\"},\"op\":\"c\",\"source\":{\"db\":\"default_namespace\",\"table\":\"%s\"}}", + "{\"before\":null,\"after\":{\"col1\":\"2\",\"col2\":\"2\"},\"op\":\"c\",\"source\":{\"db\":\"default_schema\",\"table\":\"%s\"}}", table1.getTableName())), mapper.readTree( String.format( - "{\"before\":null,\"after\":{\"col1\":\"3\",\"col2\":\"3\"},\"op\":\"c\",\"source\":{\"db\":\"default_namespace\",\"table\":\"%s\"}}", + "{\"before\":null,\"after\":{\"col1\":\"3\",\"col2\":\"3\"},\"op\":\"c\",\"source\":{\"db\":\"default_schema\",\"table\":\"%s\"}}", table1.getTableName())), mapper.readTree( String.format( - "{\"before\":{\"col1\":\"1\",\"newCol3\":\"1\"},\"after\":null,\"op\":\"d\",\"source\":{\"db\":\"default_namespace\",\"table\":\"%s\"}}", + "{\"before\":{\"col1\":\"1\",\"newCol3\":\"1\"},\"after\":null,\"op\":\"d\",\"source\":{\"db\":\"default_schema\",\"table\":\"%s\"}}", table1.getTableName())), mapper.readTree( String.format( - "{\"before\":{\"col1\":\"2\",\"newCol3\":\"\"},\"after\":{\"col1\":\"2\",\"newCol3\":\"x\"},\"op\":\"u\",\"source\":{\"db\":\"default_namespace\",\"table\":\"%s\"}}", + "{\"before\":{\"col1\":\"2\",\"newCol3\":\"\"},\"after\":{\"col1\":\"2\",\"newCol3\":\"x\"},\"op\":\"u\",\"source\":{\"db\":\"default_schema\",\"table\":\"%s\"}}", table1.getTableName()))); assertThat(deserializeValues(collectedRecords)).containsAll(expected); checkProducerLeak();