diff --git a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java index bddd5fc00f..b3f17b2e52 100644 --- a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java +++ b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java @@ -97,7 +97,8 @@ public PipelineExecution compose(PipelineDef pipelineDef) { // Build Source Operator DataSourceTranslator sourceTranslator = new DataSourceTranslator(); DataStream stream = - sourceTranslator.translate(pipelineDef.getSource(), env, pipelineDef.getConfig()); + sourceTranslator.translate( + pipelineDef.getSource(), env, pipelineDef.getConfig(), parallelism); // Build PreTransformOperator for processing Schema Event TransformTranslator transformTranslator = new TransformTranslator(); diff --git a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSourceTranslator.java b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSourceTranslator.java index 7b631c6f1a..90bbea73ee 100644 --- a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSourceTranslator.java +++ b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSourceTranslator.java @@ -23,7 +23,6 @@ import org.apache.flink.cdc.common.event.Event; import org.apache.flink.cdc.common.factories.DataSourceFactory; import org.apache.flink.cdc.common.factories.FactoryHelper; -import org.apache.flink.cdc.common.pipeline.PipelineOptions; import org.apache.flink.cdc.common.source.DataSource; import org.apache.flink.cdc.common.source.EventSourceProvider; import org.apache.flink.cdc.common.source.FlinkSourceFunctionProvider; @@ -41,12 +40,14 @@ public class DataSourceTranslator { public DataStreamSource translate( - SourceDef sourceDef, StreamExecutionEnvironment env, Configuration pipelineConfig) { + SourceDef sourceDef, + StreamExecutionEnvironment env, + Configuration pipelineConfig, + int sourceParallelism) { // Create data source DataSource dataSource = createDataSource(sourceDef, env, pipelineConfig); // Get source provider - final int sourceParallelism = pipelineConfig.get(PipelineOptions.PIPELINE_PARALLELISM); EventSourceProvider eventSourceProvider = dataSource.getEventSourceProvider(); if (eventSourceProvider instanceof FlinkSourceProvider) { // Source diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java index 77360169e2..da88753e58 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java @@ -94,7 +94,7 @@ public class SchemaRegistryRequestHandler implements Closeable { private final Set flushedSinkWriters; /** Status of the execution of current schema change request. */ - private boolean isSchemaChangeApplying; + private volatile boolean isSchemaChangeApplying; /** Executor service to execute schema change. */ private final ExecutorService schemaChangeThreadPool;