Skip to content

Commit

Permalink
fix migration failure
Browse files Browse the repository at this point in the history
This was caused by late initialization of `transforms` blocks. `open` isn't early enough since it won't be executed until `initializeState` phase. According to Flink docs, putting data fields initialization phase in `setup` should be suitable.
  • Loading branch information
yuxiqian committed Aug 22, 2024
1 parent 22b64b2 commit f328961
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public void testSchemaEvolve() throws Exception {
"AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]}",
"DataChangeEvent{tableId=%s.members, before=[], after=[1012, Eve, 17, 0, 1024144], op=INSERT, meta=()}",
"AlterColumnTypeEvent{tableId=%s.members, typeMapping={age=DOUBLE}, oldTypeMapping={age=INT}}",
"RenameColumnEvent{tableId=%s.members, nameMapping={age=precise_age}}",
"AlterColumnTypeEvent{tableId=%s.members, typeMapping={gender=BIGINT}, oldTypeMapping={gender=TINYINT}}",
"RenameColumnEvent{tableId=%s.members, nameMapping={gender=biological_sex}}",
"DropColumnEvent{tableId=%s.members, droppedColumnNames=[biological_sex]}",
"DataChangeEvent{tableId=%s.members, before=[], after=[1013, Fiona, 16.0, 1026169], op=INSERT, meta=()}"));
Expand Down Expand Up @@ -184,9 +184,8 @@ public void testLenientSchemaEvolution() throws Exception {
"AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=LAST, existedColumnName=null}]}",
"DataChangeEvent{tableId=%s.members, before=[], after=[1012, Eve, 17, 1024144, 0], op=INSERT, meta=()}",
"AlterColumnTypeEvent{tableId=%s.members, typeMapping={age=DOUBLE}, oldTypeMapping={age=INT}}",
"AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`precise_age` DOUBLE, position=LAST, existedColumnName=null}]}",
"AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`biological_sex` TINYINT, position=LAST, existedColumnName=null}]}",
"DataChangeEvent{tableId=%s.members, before=[], after=[1013, Fiona, null, 1026169, null, 16.0, null], op=INSERT, meta=()}"));
"AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`biological_sex` BIGINT, position=LAST, existedColumnName=null}]}",
"DataChangeEvent{tableId=%s.members, before=[], after=[1013, Fiona, 16.0, 1026169, null, null], op=INSERT, meta=()}"));
}

@Test
Expand All @@ -201,7 +200,7 @@ public void testFineGrainedSchemaEvolution() throws Exception {
"AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]}",
"DataChangeEvent{tableId=%s.members, before=[], after=[1012, Eve, 17, 0, 1024144], op=INSERT, meta=()}",
"AlterColumnTypeEvent{tableId=%s.members, typeMapping={age=DOUBLE}, oldTypeMapping={age=INT}}",
"RenameColumnEvent{tableId=%s.members, nameMapping={age=precise_age}}",
"AlterColumnTypeEvent{tableId=%s.members, typeMapping={gender=BIGINT}, oldTypeMapping={gender=TINYINT}}",
"RenameColumnEvent{tableId=%s.members, nameMapping={gender=biological_sex}}",
"DataChangeEvent{tableId=%s.members, before=[], after=[1013, Fiona, 16.0, null, 1026169], op=INSERT, meta=()}"),
Collections.singletonList(
Expand Down Expand Up @@ -331,19 +330,23 @@ private void testGenericSchemaEvolution(
mysqlJdbcUrl, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD);
Statement stmt = conn.createStatement()) {

// Current schema: (id, name, age)
waitForIncrementalStage(dbName, mergeTable ? "merged" : "members", stmt);

// triggers AddColumnEvent
// Current schema: (id, name, age, gender)
stmt.execute("ALTER TABLE members ADD COLUMN gender TINYINT AFTER age;");
stmt.execute("INSERT INTO members VALUES (1012, 'Eve', 17, 0);");

// triggers AlterColumnTypeEvent and RenameColumnEvent
stmt.execute("ALTER TABLE members CHANGE COLUMN age precise_age DOUBLE;");
// triggers AlterColumnTypeEvent
stmt.execute("ALTER TABLE members MODIFY COLUMN age DOUBLE");

// triggers RenameColumnEvent
stmt.execute("ALTER TABLE members RENAME COLUMN gender TO biological_sex;");
// triggers AlterColumnTypeEvent and RenameColumnEvent
// Current schema: (id, name, age, biological_sex)
stmt.execute("ALTER TABLE members CHANGE COLUMN gender biological_sex BIGINT;");

// triggers DropColumnEvent
// Current schema: (id, name, age)
stmt.execute("ALTER TABLE members DROP COLUMN biological_sex");
stmt.execute("INSERT INTO members VALUES (1013, 'Fiona', 16);");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand All @@ -73,8 +72,8 @@ public class PreTransformOperator extends AbstractStreamOperator<Event>
private final List<Tuple2<String, String>> udfFunctions;
private List<UserDefinedFunctionDescriptor> udfDescriptors;
private Map<TableId, PreTransformProcessor> preTransformProcessorMap;
private final Map<TableId, Boolean> hasAsteriskMap;
private final Map<TableId, List<String>> referencedColumnsMap;
private Map<TableId, Boolean> hasAsteriskMap;
private Map<TableId, List<String>> referencedColumnsMap;

public static PreTransformOperator.Builder newBuilder() {
return new PreTransformOperator.Builder();
Expand Down Expand Up @@ -129,8 +128,6 @@ private PreTransformOperator(
this.schemaMetadataTransformers = new ArrayList<>();
this.chainingStrategy = ChainingStrategy.ALWAYS;
this.udfFunctions = udfFunctions;
this.hasAsteriskMap = new HashMap<>();
this.referencedColumnsMap = new HashMap<>();
}

@Override
Expand All @@ -143,12 +140,10 @@ public void setup(
this.udfFunctions.stream()
.map(udf -> new UserDefinedFunctionDescriptor(udf.f0, udf.f1))
.collect(Collectors.toList());
}

@Override
public void open() throws Exception {
super.open();
transforms = new ArrayList<>();
// Initialize data fields in advance because they might be accessed in
// `::initializeState` function when restoring from a previous state.
this.transforms = new ArrayList<>();
for (TransformRule transformRule : transformRules) {
String tableInclusions = transformRule.getTableInclusions();
String projection = transformRule.getProjection();
Expand All @@ -169,6 +164,8 @@ public void open() throws Exception {
new SchemaMetadataTransform(primaryKeys, partitionKeys, tableOptions)));
}
this.preTransformProcessorMap = new ConcurrentHashMap<>();
this.hasAsteriskMap = new ConcurrentHashMap<>();
this.referencedColumnsMap = new ConcurrentHashMap<>();
}

@Override
Expand Down

0 comments on commit f328961

Please sign in to comment.