Skip to content

Commit

Permalink
[FLINK-36148][pipeline-connector][mysql] Add custom parser for buildi…
Browse files Browse the repository at this point in the history
…ng CreateTableEvent of table creation ddl.
  • Loading branch information
lvyanquan committed Aug 24, 2024
1 parent 060d203 commit 2699c15
Show file tree
Hide file tree
Showing 3 changed files with 132 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@

import org.apache.flink.cdc.common.event.AddColumnEvent;
import org.apache.flink.cdc.common.event.AlterColumnTypeEvent;
import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.DropColumnEvent;
import org.apache.flink.cdc.common.event.DropTableEvent;
import org.apache.flink.cdc.common.event.RenameColumnEvent;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.TruncateTableEvent;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.types.DataType;

import io.debezium.connector.mysql.antlr.MySqlAntlrDdlParser;
Expand All @@ -32,6 +34,7 @@
import io.debezium.ddl.parser.mysql.generated.MySqlParserBaseListener;
import io.debezium.relational.Column;
import io.debezium.relational.ColumnEditor;
import io.debezium.relational.TableEditor;
import io.debezium.relational.TableId;
import org.antlr.v4.runtime.tree.ParseTreeListener;
import org.slf4j.Logger;
Expand All @@ -43,6 +46,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import static org.apache.flink.cdc.connectors.mysql.utils.MySqlTypeUtils.fromDbzColumn;

Expand All @@ -58,8 +62,8 @@ public class CustomAlterTableParserListener extends MySqlParserBaseListener {
private final LinkedList<SchemaChangeEvent> changes;
private org.apache.flink.cdc.common.event.TableId currentTable;
private List<ColumnEditor> columnEditors;

private CustomColumnDefinitionParserListener columnDefinitionListener;
private TableEditor tableEditor;

private int parsingColumnIndex = STARTING_INDEX;

Expand All @@ -72,6 +76,106 @@ public CustomAlterTableParserListener(
this.changes = changes;
}

@Override
public void enterColumnCreateTable(MySqlParser.ColumnCreateTableContext ctx) {
TableId tableId = parser.parseQualifiedTableId(ctx.tableName().fullId());
if (parser.databaseTables().forTable(tableId) == null) {
tableEditor = parser.databaseTables().editOrCreateTable(tableId);
super.enterColumnCreateTable(ctx);
}
}

@Override
public void exitColumnCreateTable(MySqlParser.ColumnCreateTableContext ctx) {
parser.runIfNotNull(
() -> {
// Make sure that the table's character set has been set ...
if (!tableEditor.hasDefaultCharsetName()) {
tableEditor.setDefaultCharsetName(
parser.charsetForTable(tableEditor.tableId()));
}
listeners.remove(columnDefinitionListener);
columnDefinitionListener = null;
// remove column definition parser listener
final String defaultCharsetName = tableEditor.create().defaultCharsetName();
tableEditor.setColumns(
tableEditor.columns().stream()
.map(
column -> {
final ColumnEditor columnEditor = column.edit();
if (columnEditor.charsetNameOfTable() == null) {
columnEditor.charsetNameOfTable(
defaultCharsetName);
}
return columnEditor;
})
.map(ColumnEditor::create)
.collect(Collectors.toList()));
parser.databaseTables().overwriteTable(tableEditor.create());
parser.signalCreateTable(tableEditor.tableId(), ctx);
},
tableEditor);
Schema.Builder builder = Schema.newBuilder();
tableEditor.columns().forEach(column -> builder.column(toCdcColumn(column)));
if (tableEditor.hasPrimaryKey()) {
builder.primaryKey(tableEditor.primaryKeyColumnNames());
}
changes.add(new CreateTableEvent(toCdcTableId(tableEditor.tableId()), builder.build()));
super.exitColumnCreateTable(ctx);
}

@Override
public void enterColumnDeclaration(MySqlParser.ColumnDeclarationContext ctx) {
parser.runIfNotNull(
() -> {
String columnName = parser.parseName(ctx.uid());
ColumnEditor columnEditor = Column.editor().name(columnName);
if (columnDefinitionListener == null) {
columnDefinitionListener =
new CustomColumnDefinitionParserListener(
columnEditor, parser, listeners);
listeners.add(columnDefinitionListener);
} else {
columnDefinitionListener.setColumnEditor(columnEditor);
}
},
tableEditor);
super.enterColumnDeclaration(ctx);
}

@Override
public void exitColumnDeclaration(MySqlParser.ColumnDeclarationContext ctx) {
parser.runIfNotNull(
() -> {
tableEditor.addColumn(columnDefinitionListener.getColumn());
},
tableEditor,
columnDefinitionListener);
super.exitColumnDeclaration(ctx);
}

@Override
public void enterPrimaryKeyTableConstraint(MySqlParser.PrimaryKeyTableConstraintContext ctx) {
parser.runIfNotNull(
() -> {
parser.parsePrimaryIndexColumnNames(ctx.indexColumnNames(), tableEditor);
},
tableEditor);
super.enterPrimaryKeyTableConstraint(ctx);
}

@Override
public void enterUniqueKeyTableConstraint(MySqlParser.UniqueKeyTableConstraintContext ctx) {
parser.runIfNotNull(
() -> {
if (!tableEditor.hasPrimaryKey()) {
parser.parsePrimaryIndexColumnNames(ctx.indexColumnNames(), tableEditor);
}
},
tableEditor);
super.enterUniqueKeyTableConstraint(ctx);
}

@Override
public void enterAlterTable(MySqlParser.AlterTableContext ctx) {
this.currentTable = toCdcTableId(parser.parseQualifiedTableId(ctx.tableName().fullId()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,22 @@ public void testSchemaChangeEvents() throws Exception {
expected.add(
new DropTableEvent(
TableId.tableId(inventoryDatabase.getDatabaseName(), "customers")));

// Test create table DDL
statement.execute(
String.format(
"CREATE TABLE `%s`.`newlyAddedTable1`(id int, id2 int, primary key(id));",
inventoryDatabase.getDatabaseName()));

expected.add(
new CreateTableEvent(
TableId.tableId(
inventoryDatabase.getDatabaseName(), "newlyAddedTable1"),
Schema.newBuilder()
.physicalColumn("id", DataTypes.INT().notNull())
.physicalColumn("id2", DataTypes.INT())
.primaryKey("id")
.build()));
}
List<Event> actual = fetchResults(events, expected.size());
assertEqualsInAnyOrder(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.cdc.common.data.binary.BinaryStringData;
import org.apache.flink.cdc.common.event.ChangeEvent;
import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.DataChangeEvent;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.factories.Factory;
import org.apache.flink.cdc.common.factories.FactoryHelper;
Expand Down Expand Up @@ -182,9 +182,16 @@ public void testScanBinlogNewlyAddedTableEnabled() throws Exception {
addCollector(env, source, resultBuffer, serializer, accumulatorName);
env.executeAsync("AddNewlyTablesWhenReadingBinlog");
initialAddressTables(getConnection(), Collections.singletonList("address_beijing"));
List<Event> actual = fetchResults(iterator, 4);
assertThat(((ChangeEvent) actual.get(0)).tableId())
.isEqualTo(TableId.tableId(customDatabase.getDatabaseName(), "address_beijing"));
initialAddressTables(getConnection(), Collections.singletonList("address_shanghai"));
List<Event> actual = fetchResults(iterator, 8);
List<String> tableNames =
actual.stream()
.filter((event) -> event instanceof CreateTableEvent)
.map((event) -> ((SchemaChangeEvent) event).tableId().getTableName())
.collect(Collectors.toList());
assertThat(tableNames.size()).isEqualTo(2);
assertThat(tableNames.get(0)).isEqualTo("address_beijing");
assertThat(tableNames.get(1)).isEqualTo("address_shanghai");
}

@Test
Expand Down

0 comments on commit 2699c15

Please sign in to comment.