Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-35757][mysql] support dynamically capture new tables #3467

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,12 @@ public EventSourceProvider getEventSourceProvider() {
new MySqlSource<>(
configFactory,
deserializer,
(sourceReaderMetrics, sourceConfig) ->
(sourceReaderContext, sourceReaderMetrics, sourceConfig) ->
new MySqlPipelineRecordEmitter(
deserializer, sourceReaderMetrics, sourceConfig));
sourceReaderContext,
deserializer,
sourceReaderMetrics,
sourceConfig));

return FlinkSourceProvider.of(source);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.flink.cdc.connectors.mysql.source.reader;

import org.apache.flink.api.connector.source.SourceOutput;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.schema.Schema;
Expand Down Expand Up @@ -71,10 +72,12 @@ public class MySqlPipelineRecordEmitter extends MySqlRecordEmitter<Event> {
private final List<CreateTableEvent> createTableEventCache;

public MySqlPipelineRecordEmitter(
SourceReaderContext context,
DebeziumDeserializationSchema<Event> debeziumDeserializationSchema,
MySqlSourceReaderMetrics sourceReaderMetrics,
MySqlSourceConfig sourceConfig) {
super(
context,
debeziumDeserializationSchema,
sourceReaderMetrics,
sourceConfig.isIncludeSchemaChanges());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,10 @@ public static Builder newBuilder() {
@Override
public void deserialize(SourceRecord record, Collector<RowData> out) throws Exception {
Envelope.Operation op = Envelope.operationFor(record);
// filter ddl statement to avoid NPE
if (op == null) {
return;
}
Struct value = (Struct) record.value();
Schema valueSchema = record.valueSchema();
if (op == Envelope.Operation.CREATE || op == Envelope.Operation.READ) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,15 +264,8 @@ private boolean hasEnterPureBinlogPhase(TableId tableId, BinlogOffset position)
return true;
}

// Use still need to capture new sharding table if user disable scan new added table,
// The history records for all new added tables(including sharding table and normal table)
// will be capture after restore from a savepoint if user enable scan new added table
if (!statefulTaskContext.getSourceConfig().isScanNewlyAddedTableEnabled()) {
// the new added sharding table without history records
return !maxSplitHighWatermarkMap.containsKey(tableId)
&& capturedTableFilter.isIncluded(tableId);
}
return false;
return !maxSplitHighWatermarkMap.containsKey(tableId)
&& capturedTableFilter.isIncluded(tableId);
}

private void configureFilter() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,9 @@ public static <T> MySqlSourceBuilder<T> builder() {
this(
configFactory,
deserializationSchema,
(sourceReaderMetrics, sourceConfig) ->
(sourceReaderContext, sourceReaderMetrics, sourceConfig) ->
new MySqlRecordEmitter<>(
sourceReaderContext,
deserializationSchema,
sourceReaderMetrics,
sourceConfig.isIncludeSchemaChanges()));
Expand Down Expand Up @@ -185,7 +186,7 @@ public SourceReader<T, MySqlSplit> createReader(SourceReaderContext readerContex
return new MySqlSourceReader<>(
elementsQueue,
splitReaderSupplier,
recordEmitterSupplier.get(sourceReaderMetrics, sourceConfig),
recordEmitterSupplier.get(readerContext, sourceReaderMetrics, sourceConfig),
readerContext.getConfiguration(),
mySqlSourceReaderContext,
sourceConfig);
Expand Down Expand Up @@ -272,6 +273,8 @@ public void setSnapshotHooks(SnapshotPhaseHooks snapshotHooks) {
interface RecordEmitterSupplier<T> extends Serializable {

RecordEmitter<SourceRecords, T, MySqlSplitState> get(
MySqlSourceReaderMetrics metrics, MySqlSourceConfig sourceConfig);
SourceReaderContext sourceReaderContext,
MySqlSourceReaderMetrics metrics,
MySqlSourceConfig sourceConfig);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSplit;
import org.apache.flink.util.CollectionUtil;

import io.debezium.relational.TableId;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -77,7 +79,8 @@ public boolean waitingForFinishedSplits() {
}

@Override
public List<FinishedSnapshotSplitInfo> getFinishedSplitInfos() {
public List<FinishedSnapshotSplitInfo> getFinishedSplitInfos(
boolean isScanNewlyAddedTableEnabled) {
return Collections.emptyList();
}

Expand Down Expand Up @@ -120,6 +123,9 @@ public void startAssignNewlyAddedTables() {}
@Override
public void onBinlogSplitUpdated() {}

@Override
public void addAlreadyProcessedTables(TableId tableId) {}

@Override
public void close() {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,9 @@ public boolean waitingForFinishedSplits() {
}

@Override
public List<FinishedSnapshotSplitInfo> getFinishedSplitInfos() {
return snapshotSplitAssigner.getFinishedSplitInfos();
public List<FinishedSnapshotSplitInfo> getFinishedSplitInfos(
boolean isScanNewlyAddedTableEnabled) {
return snapshotSplitAssigner.getFinishedSplitInfos(isScanNewlyAddedTableEnabled);
}

@Override
Expand All @@ -158,6 +159,11 @@ public void addSplits(Collection<MySqlSplit> splits) {
snapshotSplitAssigner.addSplits(snapshotSplits);
}

@Override
public void addAlreadyProcessedTables(TableId tableId) {
snapshotSplitAssigner.addAlreadyProcessedTables(tableId);
}

@Override
public PendingSplitsState snapshotState(long checkpointId) {
return new HybridPendingSplitsState(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner {

private final List<TableId> alreadyProcessedTables;
private final List<MySqlSchemalessSnapshotSplit> remainingSplits;
private final List<MySqlSchemalessSnapshotSplit> newAddedTableRemainingSplits;
private final Map<String, MySqlSchemalessSnapshotSplit> assignedSplits;
private final Map<TableId, TableChanges.TableChange> tableSchemas;
private final Map<String, BinlogOffset> splitFinishedOffsets;
Expand Down Expand Up @@ -146,6 +147,7 @@ private MySqlSnapshotSplitAssigner(
this.currentParallelism = currentParallelism;
this.alreadyProcessedTables = alreadyProcessedTables;
this.remainingSplits = new CopyOnWriteArrayList<>(remainingSplits);
this.newAddedTableRemainingSplits = new CopyOnWriteArrayList<>();
// When job restore from savepoint, sort the existing tables and newly added tables
// to let enumerator only send newly added tables' BinlogSplitMetaEvent
this.assignedSplits =
Expand Down Expand Up @@ -319,6 +321,7 @@ private void splitTable(TableId nextTable) {
.collect(Collectors.toList());
chunkNum += splits.size();
remainingSplits.addAll(schemaLessSnapshotSplits);
newAddedTableRemainingSplits.addAll(schemaLessSnapshotSplits);
if (!chunkSplitter.hasNextChunk()) {
remainingTables.remove(nextTable);
}
Expand Down Expand Up @@ -369,15 +372,23 @@ public boolean waitingForFinishedSplits() {
}

@Override
public List<FinishedSnapshotSplitInfo> getFinishedSplitInfos() {
public List<FinishedSnapshotSplitInfo> getFinishedSplitInfos(
boolean isScanNewlyAddedTableEnabled) {
if (waitingForFinishedSplits()) {
LOG.error(
"The assigner is not ready to offer finished split information, this should not be called");
throw new FlinkRuntimeException(
"The assigner is not ready to offer finished split information, this should not be called");
}
Collection<MySqlSchemalessSnapshotSplit> snapshotSplit;
if (isScanNewlyAddedTableEnabled && !newAddedTableRemainingSplits.isEmpty()) {
snapshotSplit = newAddedTableRemainingSplits;
} else {
snapshotSplit = assignedSplits.values();
}

final List<MySqlSchemalessSnapshotSplit> assignedSnapshotSplit =
new ArrayList<>(assignedSplits.values());
new ArrayList<>(snapshotSplit);
List<FinishedSnapshotSplitInfo> finishedSnapshotSplitInfos = new ArrayList<>();
for (MySqlSchemalessSnapshotSplit split : assignedSnapshotSplit) {
BinlogOffset binlogOffset = splitFinishedOffsets.get(split.splitId());
Expand Down Expand Up @@ -607,4 +618,9 @@ private static MySqlChunkSplitter createChunkSplitter(
}
return new MySqlChunkSplitter(mySqlSchema, sourceConfig);
}

@Override
public void addAlreadyProcessedTables(TableId tableId) {
alreadyProcessedTables.add(tableId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.apache.flink.cdc.connectors.mysql.source.split.FinishedSnapshotSplitInfo;
import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSplit;

import io.debezium.relational.TableId;

import java.util.Collection;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -59,7 +61,7 @@ public interface MySqlSplitAssigner {
* Gets the finished splits' information. This is useful metadata to generate a binlog split
* that considering finished snapshot splits.
*/
List<FinishedSnapshotSplitInfo> getFinishedSplitInfos();
List<FinishedSnapshotSplitInfo> getFinishedSplitInfos(boolean isScanNewlyAddedTableEnabled);

/**
* Callback to handle the finished splits with finished binlog offset. This is useful for
Expand Down Expand Up @@ -116,6 +118,8 @@ public interface MySqlSplitAssigner {
*/
void onBinlogSplitUpdated();

void addAlreadyProcessedTables(TableId tableId);

/**
* Called to close the assigner, in case it holds on to any resources, like threads or network
* connections.
Expand Down
Loading
Loading