diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/partitioning/PrePartitionOperator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/partitioning/PrePartitionOperator.java index 6d66fa8785..1171ad07b4 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/partitioning/PrePartitionOperator.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/partitioning/PrePartitionOperator.java @@ -28,6 +28,7 @@ import org.apache.flink.cdc.common.schema.Schema; import org.apache.flink.cdc.runtime.operators.schema.SchemaOperator; import org.apache.flink.cdc.runtime.operators.sink.SchemaEvolutionClient; +import org.apache.flink.cdc.runtime.serializer.event.EventSerializer; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; @@ -108,7 +109,10 @@ private void partitionBy(DataChangeEvent dataChangeEvent) throws Exception { private void broadcastEvent(Event toBroadcast) { for (int i = 0; i < downstreamParallelism; i++) { - output.collect(new StreamRecord<>(new PartitioningEvent(toBroadcast, i))); + // Deep-copying each event is required since downstream subTasks might run in the same + // JVM + Event copiedEvent = EventSerializer.INSTANCE.copy(toBroadcast); + output.collect(new StreamRecord<>(new PartitioningEvent(copiedEvent, i))); } }