Skip to content

Commit

Permalink
[FLINK-35700][cli] Loosen CDC pipeline options validation
Browse files Browse the repository at this point in the history
  • Loading branch information
yuxiqian committed Jun 26, 2024
1 parent 3441a1e commit 72a2744
Show file tree
Hide file tree
Showing 2 changed files with 2 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -150,19 +150,6 @@ public static void validatePipelineDefinition(Configuration configuration)
PipelineOptions.PIPELINE_SCHEMA_OPERATOR_UID,
PipelineOptions.PIPELINE_SCHEMA_OPERATOR_RPC_TIMEOUT);

Set<String> optionKeys =
options.stream().map(ConfigOption::key).collect(Collectors.toSet());

configuration
.getKeys()
.forEach(
key -> {
if (!optionKeys.contains(key)) {
throw new ValidationException(
String.format("Unknown configuration key `%s`", key));
}
});

options.forEach(
option -> {
if (!configuration.getOptional(option).isPresent()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,15 +69,14 @@ void testEmptyConfigValidation() {

@Test
void testUnknownConfigValidation() {
// An empty configuration should fail
// Unknown configuration entries should be tolerated
Map<String, String> configurations = new HashMap<>();

configurations.put("parallelism", "1");
configurations.put("name", "Pipeline Job");
configurations.put("unknown", "optionValue");

Assertions.assertThrowsExactly(
ValidationException.class,
Assertions.assertDoesNotThrow(
() ->
PipelineDef.validatePipelineDefinition(
Configuration.fromMap(configurations)));
Expand Down

0 comments on commit 72a2744

Please sign in to comment.