diff --git a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/PipelineDef.java b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/PipelineDef.java index 27d2bb83ecc..adcaec4c1c9 100644 --- a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/PipelineDef.java +++ b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/PipelineDef.java @@ -150,19 +150,6 @@ public static void validatePipelineDefinition(Configuration configuration) PipelineOptions.PIPELINE_SCHEMA_OPERATOR_UID, PipelineOptions.PIPELINE_SCHEMA_OPERATOR_RPC_TIMEOUT); - Set optionKeys = - options.stream().map(ConfigOption::key).collect(Collectors.toSet()); - - configuration - .getKeys() - .forEach( - key -> { - if (!optionKeys.contains(key)) { - throw new ValidationException( - String.format("Unknown configuration key `%s`", key)); - } - }); - options.forEach( option -> { if (!configuration.getOptional(option).isPresent() diff --git a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/definition/PipelineValidationTest.java b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/definition/PipelineValidationTest.java index f4a07d0311a..920d77ffc7f 100644 --- a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/definition/PipelineValidationTest.java +++ b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/definition/PipelineValidationTest.java @@ -69,15 +69,14 @@ void testEmptyConfigValidation() { @Test void testUnknownConfigValidation() { - // An empty configuration should fail + // Unknown configuration entries should be tolerated Map configurations = new HashMap<>(); configurations.put("parallelism", "1"); configurations.put("name", "Pipeline Job"); configurations.put("unknown", "optionValue"); - Assertions.assertThrowsExactly( - ValidationException.class, + Assertions.assertDoesNotThrow( () -> PipelineDef.validatePipelineDefinition( Configuration.fromMap(configurations)));