From d1d8a24bda21303af2c998a9baee2bf1d71cba73 Mon Sep 17 00:00:00 2001 From: Leonard Xu Date: Wed, 28 Aug 2024 01:22:58 +0800 Subject: [PATCH] [e2e][tests] Improve the stability of pipeline e2e tests --- .../mysql/source/utils/TableDiscoveryUtils.java | 3 ++- .../flink/cdc/pipeline/tests/RouteE2eITCase.java | 10 +++++----- .../pipeline/tests/utils/PipelineTestEnvironment.java | 2 +- 3 files changed, 8 insertions(+), 7 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/TableDiscoveryUtils.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/TableDiscoveryUtils.java index dedeab2dd5..83b1b404f4 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/TableDiscoveryUtils.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/TableDiscoveryUtils.java @@ -88,7 +88,8 @@ public static List listTables( "\t including table '{}' for further processing", tableId); } else { - LOG.info("\t '{}' is filtered out of table capturing", tableId); + LOG.debug( + "\t '{}' is filtered out of table capturing", tableId); } } }); diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/RouteE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/RouteE2eITCase.java index c8072cf7d3..2fe279e626 100644 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/RouteE2eITCase.java +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/RouteE2eITCase.java @@ -24,7 +24,6 @@ import org.apache.flink.cdc.pipeline.tests.utils.PipelineTestEnvironment; import org.junit.After; -import org.junit.Assert; import org.junit.Before; import org.junit.ClassRule; import org.junit.Test; @@ -719,6 +718,11 @@ public void testMergeTableRouteWithTransform() throws Exception { @Test public void testReplacementSymbol() throws Exception { + String defaultFlinkProperties = getFlinkProperties(flinkVersion); + overrideFlinkProperties( + defaultFlinkProperties.replace( + "execution.checkpointing.interval: 300", + "execution.checkpointing.interval: 10000")); String pipelineJob = String.format( "source:\n" @@ -839,8 +843,4 @@ private void waitUntilSpecificEvent(String event) throws Exception { + taskManagerConsumer.toUtf8String()); } } - - private void assertNotExists(String event) { - Assert.assertFalse(taskManagerConsumer.toUtf8String().contains(event)); - } } diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestEnvironment.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestEnvironment.java index 048e23bd7c..51551e8b0e 100644 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestEnvironment.java +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestEnvironment.java @@ -251,7 +251,7 @@ private static Version parseVersion(String version) { versionParts.get(0), versionParts.get(1), versionParts.get(2), null, null, null); } - private static String getFlinkProperties(String flinkVersion) { + protected static String getFlinkProperties(String flinkVersion) { // this is needed for oracle-cdc tests. // see https://stackoverflow.com/a/47062742/4915129 String javaOptsConfig;