From d3473de4db92229eb09e1e4c698a2db5448c8805 Mon Sep 17 00:00:00 2001 From: Hongshun Wang <125648852+loserwang1024@users.noreply.github.com> Date: Tue, 20 Aug 2024 00:53:02 +0800 Subject: [PATCH] [FLINK-36076][minor][cdc-runtime] Set isSchemaChangeApplying as volatile for thread safe consideration This closes #3556. --- .../flink/cdc/composer/flink/FlinkPipelineComposer.java | 3 ++- .../composer/flink/translator/DataSourceTranslator.java | 7 ++++--- .../schema/coordinator/SchemaRegistryRequestHandler.java | 2 +- 3 files changed, 7 insertions(+), 5 deletions(-) diff --git a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java index bddd5fc00f..b3f17b2e52 100644 --- a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java +++ b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java @@ -97,7 +97,8 @@ public PipelineExecution compose(PipelineDef pipelineDef) { // Build Source Operator DataSourceTranslator sourceTranslator = new DataSourceTranslator(); DataStream stream = - sourceTranslator.translate(pipelineDef.getSource(), env, pipelineDef.getConfig()); + sourceTranslator.translate( + pipelineDef.getSource(), env, pipelineDef.getConfig(), parallelism); // Build PreTransformOperator for processing Schema Event TransformTranslator transformTranslator = new TransformTranslator(); diff --git a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSourceTranslator.java b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSourceTranslator.java index 7b631c6f1a..90bbea73ee 100644 --- a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSourceTranslator.java +++ b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSourceTranslator.java @@ -23,7 +23,6 @@ import org.apache.flink.cdc.common.event.Event; import org.apache.flink.cdc.common.factories.DataSourceFactory; import org.apache.flink.cdc.common.factories.FactoryHelper; -import org.apache.flink.cdc.common.pipeline.PipelineOptions; import org.apache.flink.cdc.common.source.DataSource; import org.apache.flink.cdc.common.source.EventSourceProvider; import org.apache.flink.cdc.common.source.FlinkSourceFunctionProvider; @@ -41,12 +40,14 @@ public class DataSourceTranslator { public DataStreamSource translate( - SourceDef sourceDef, StreamExecutionEnvironment env, Configuration pipelineConfig) { + SourceDef sourceDef, + StreamExecutionEnvironment env, + Configuration pipelineConfig, + int sourceParallelism) { // Create data source DataSource dataSource = createDataSource(sourceDef, env, pipelineConfig); // Get source provider - final int sourceParallelism = pipelineConfig.get(PipelineOptions.PIPELINE_PARALLELISM); EventSourceProvider eventSourceProvider = dataSource.getEventSourceProvider(); if (eventSourceProvider instanceof FlinkSourceProvider) { // Source diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java index 77360169e2..da88753e58 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java @@ -94,7 +94,7 @@ public class SchemaRegistryRequestHandler implements Closeable { private final Set flushedSinkWriters; /** Status of the execution of current schema change request. */ - private boolean isSchemaChangeApplying; + private volatile boolean isSchemaChangeApplying; /** Executor service to execute schema change. */ private final ExecutorService schemaChangeThreadPool;