Skip to content

Commit

Permalink
fix: test case since changed schema role has been changed
Browse files Browse the repository at this point in the history
  • Loading branch information
yuxiqian committed Aug 22, 2024
1 parent d6f3a9a commit 8d0708d
Showing 1 changed file with 51 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,14 @@ public void testSchemaEvolve() throws Exception {
false,
Arrays.asList(
"AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]}",
"DataChangeEvent{tableId=%s.members, before=[], after=[1012, Eve, 17, 0, 1024144], op=INSERT, meta=()}",
"DataChangeEvent{tableId=%s.members, before=[], after=[1012 -> Eve, 1012, Eve, 17, 0, 1024144, age < 20], op=INSERT, meta=()}",
"AlterColumnTypeEvent{tableId=%s.members, typeMapping={age=DOUBLE}, oldTypeMapping={age=INT}}",
"AlterColumnTypeEvent{tableId=%s.members, typeMapping={gender=BIGINT}, oldTypeMapping={gender=TINYINT}}",
"RenameColumnEvent{tableId=%s.members, nameMapping={gender=biological_sex}}",
"DropColumnEvent{tableId=%s.members, droppedColumnNames=[biological_sex]}",
"DataChangeEvent{tableId=%s.members, before=[], after=[1013, Fiona, 16.0, 1026169], op=INSERT, meta=()}"));
"DataChangeEvent{tableId=%s.members, before=[], after=[1013 -> Fiona, 1013, Fiona, 16.0, 1026169, age < 20], op=INSERT, meta=()}",
"TruncateTableEvent{tableId=%s.members}",
"DataChangeEvent{tableId=%s.members, before=[], after=[1014 -> Gem, 1014, Gem, 17.0, 1028196, age < 20], op=INSERT, meta=()}",
"DropTableEvent{tableId=%s.members}"));
}

@Test
Expand All @@ -111,9 +113,9 @@ public void testSchemaEvolveWithIncompatibleChanges() throws Exception {
true,
false,
false,
Collections.singletonList(
"java.lang.IllegalStateException: Incompatible types: \"INT\" and \"DOUBLE\""),
Collections.singletonList(
Collections.emptyList(),
Arrays.asList(
"java.lang.IllegalStateException: Incompatible types: \"INT\" and \"DOUBLE\"",
"org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy"));
}

Expand All @@ -124,11 +126,10 @@ public void testSchemaEvolveWithException() throws Exception {
false,
true,
false,
Collections.singletonList(
"AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]}"),
Collections.emptyList(),
Arrays.asList(
"Failed to apply schema change event AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]}.",
"SchemaEvolveException{applyingEvent=AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]}, exceptionMessage='Rejected schema change event since error.on.schema.change is enabled.', cause='null'}",
"Failed to apply schema change AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]} to table %s.members. Caused by: UnsupportedSchemaChangeEventException{applyingEvent=AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]}, exceptionMessage='Rejected schema change event since error.on.schema.change is enabled.', cause='null'}",
"UnsupportedSchemaChangeEventException{applyingEvent=AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]}, exceptionMessage='Rejected schema change event since error.on.schema.change is enabled.', cause='null'}",
"org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy"));
}

Expand All @@ -141,11 +142,12 @@ public void testSchemaTryEvolveWithException() throws Exception {
false,
Arrays.asList(
// Add column never succeeded, so age column will not appear.
"DataChangeEvent{tableId=%s.members, before=[], after=[1012, Eve, 17, 1024144], op=INSERT, meta=()}",
"DataChangeEvent{tableId=%s.members, before=[], after=[1013, Fiona, null, 1026169], op=INSERT, meta=()}"),
"DataChangeEvent{tableId=%s.members, before=[], after=[1012 -> Eve, 1012, Eve, 17, 1024144, age < 20], op=INSERT, meta=()}",
"DataChangeEvent{tableId=%s.members, before=[], after=[1013 -> Fiona, 1013, Fiona, null, 1026169, age < 20], op=INSERT, meta=()}",
"DataChangeEvent{tableId=%s.members, before=[], after=[1014 -> Gem, 1014, Gem, null, 1028196, age < 20], op=INSERT, meta=()}"),
Arrays.asList(
"Failed to apply schema change AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]} to table %s.members.",
"SchemaEvolveException{applyingEvent=AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]}, exceptionMessage='Rejected schema change event since error.on.schema.change is enabled.', cause='null'}"));
"Failed to apply schema change AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]} to table %s.members. Caused by: UnsupportedSchemaChangeEventException{applyingEvent=AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]}, exceptionMessage='Rejected schema change event since error.on.schema.change is enabled.', cause='null'}",
"UnsupportedSchemaChangeEventException{applyingEvent=AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]}, exceptionMessage='Rejected schema change event since error.on.schema.change is enabled.', cause='null'}"));
}

@Test
Expand All @@ -157,8 +159,9 @@ public void testSchemaIgnore() throws Exception {
false,
false,
Arrays.asList(
"DataChangeEvent{tableId=%s.members, before=[], after=[1012, Eve, 17, 1024144], op=INSERT, meta=()}",
"DataChangeEvent{tableId=%s.members, before=[], after=[1013, Fiona, null, 1026169], op=INSERT, meta=()}"));
"DataChangeEvent{tableId=%s.members, before=[], after=[1012 -> Eve, 1012, Eve, 17, 1024144, age < 20], op=INSERT, meta=()}",
"DataChangeEvent{tableId=%s.members, before=[], after=[1013 -> Fiona, 1013, Fiona, null, 1026169, age < 20], op=INSERT, meta=()}",
"DataChangeEvent{tableId=%s.members, before=[], after=[1014 -> Gem, 1014, Gem, null, 1028196, age < 20], op=INSERT, meta=()}"));
}

@Test
Expand All @@ -182,10 +185,12 @@ public void testLenientSchemaEvolution() throws Exception {
false,
Arrays.asList(
"AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=LAST, existedColumnName=null}]}",
"DataChangeEvent{tableId=%s.members, before=[], after=[1012, Eve, 17, 1024144, 0], op=INSERT, meta=()}",
"DataChangeEvent{tableId=%s.members, before=[], after=[1012 -> Eve, 1012, Eve, 17, 1024144, age < 20, 0], op=INSERT, meta=()}",
"AlterColumnTypeEvent{tableId=%s.members, typeMapping={age=DOUBLE}, oldTypeMapping={age=INT}}",
"AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`biological_sex` BIGINT, position=LAST, existedColumnName=null}]}",
"DataChangeEvent{tableId=%s.members, before=[], after=[1013, Fiona, 16.0, 1026169, null, null], op=INSERT, meta=()}"));
"AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`biological_sex` TINYINT, position=LAST, existedColumnName=null}]}",
"DataChangeEvent{tableId=%s.members, before=[], after=[1013 -> Fiona, 1013, Fiona, 16.0, 1026169, age < 20, null, null], op=INSERT, meta=()}",
"TruncateTableEvent{tableId=%s.members}",
"DataChangeEvent{tableId=%s.members, before=[], after=[1014 -> Gem, 1014, Gem, 17.0, 1028196, age < 20, null, null], op=INSERT, meta=()}"));
}

@Test
Expand All @@ -198,13 +203,15 @@ public void testFineGrainedSchemaEvolution() throws Exception {
true,
Arrays.asList(
"AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]}",
"DataChangeEvent{tableId=%s.members, before=[], after=[1012, Eve, 17, 0, 1024144], op=INSERT, meta=()}",
"DataChangeEvent{tableId=%s.members, before=[], after=[1012 -> Eve, 1012, Eve, 17, 0, 1024144, age < 20], op=INSERT, meta=()}",
"AlterColumnTypeEvent{tableId=%s.members, typeMapping={age=DOUBLE}, oldTypeMapping={age=INT}}",
"AlterColumnTypeEvent{tableId=%s.members, typeMapping={gender=BIGINT}, oldTypeMapping={gender=TINYINT}}",
"RenameColumnEvent{tableId=%s.members, nameMapping={gender=biological_sex}}",
"DataChangeEvent{tableId=%s.members, before=[], after=[1013, Fiona, 16.0, null, 1026169], op=INSERT, meta=()}"),
Collections.singletonList(
"Ignored schema change DropColumnEvent{tableId=%s.members, droppedColumnNames=[biological_sex]} to table %s.members."));
"DataChangeEvent{tableId=%s.members, before=[], after=[1013 -> Fiona, 1013, Fiona, 16.0, null, 1026169, age < 20], op=INSERT, meta=()}",
"TruncateTableEvent{tableId=%s.members}",
"DataChangeEvent{tableId=%s.members, before=[], after=[1014 -> Gem, 1014, Gem, 17.0, null, 1028196, age < 20], op=INSERT, meta=()}"),
Arrays.asList(
"Ignored schema change DropColumnEvent{tableId=%s.members, droppedColumnNames=[biological_sex]} to table %s.members.",
"Ignored schema change DropTableEvent{tableId=%s.members} to table %s.members."));
}

@Test
Expand Down Expand Up @@ -288,10 +295,10 @@ private void testGenericSchemaEvolution(
+ (triggerError ? " error.on.schema.change: true\n" : "\n")
+ "transform:\n"
+ " - source-table: %s.\\.*\n"
+ " projection: \\*, id * id AS id_square\n"
+ " projection: CAST(id AS VARCHAR) || ' -> ' || name AS uid, *, id * id AS id_square, 'age < 20' as tag\n"
+ " filter: age < 20\n"
+ " - source-table: %s.\\.*\n"
+ " projection: \\*, 0 - id * id AS id_square\n"
+ " projection: CAST(id AS VARCHAR) || ' -> ' || name AS uid, *, 0 - id * id AS id_square, 'age >= 20' as tag\n"
+ " filter: age >= 20\n"
+ (mergeTable
? String.format(
Expand Down Expand Up @@ -330,25 +337,28 @@ private void testGenericSchemaEvolution(
mysqlJdbcUrl, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD);
Statement stmt = conn.createStatement()) {

// Current schema: (id, name, age)
waitForIncrementalStage(dbName, mergeTable ? "merged" : "members", stmt);

// triggers AddColumnEvent
// Current schema: (id, name, age, gender)
stmt.execute("ALTER TABLE members ADD COLUMN gender TINYINT AFTER age;");
stmt.execute("INSERT INTO members VALUES (1012, 'Eve', 17, 0);");

// triggers AlterColumnTypeEvent
stmt.execute("ALTER TABLE members MODIFY COLUMN age DOUBLE");
stmt.execute("ALTER TABLE members MODIFY COLUMN age DOUBLE;");

// triggers AlterColumnTypeEvent and RenameColumnEvent
// Current schema: (id, name, age, biological_sex)
stmt.execute("ALTER TABLE members CHANGE COLUMN gender biological_sex BIGINT;");
// triggers RenameColumnEvent
stmt.execute("ALTER TABLE members RENAME COLUMN gender TO biological_sex;");

// triggers DropColumnEvent
// Current schema: (id, name, age)
stmt.execute("ALTER TABLE members DROP COLUMN biological_sex");
stmt.execute("INSERT INTO members VALUES (1013, 'Fiona', 16);");

// triggers TruncateTableEvent
stmt.execute("TRUNCATE TABLE members;");
stmt.execute("INSERT INTO members VALUES (1014, 'Gem', 17);");

// triggers DropTableEvent
stmt.execute("DROP TABLE members;");
}

List<String> expectedTmEvents =
Expand All @@ -360,7 +370,7 @@ private void testGenericSchemaEvolution(

List<String> expectedJmEvents =
expectedJobManagerEvents.stream()
.map(s -> String.format(s, dbName, dbName))
.map(s -> String.format(s, dbName, dbName, dbName))
.collect(Collectors.toList());

validateResult(expectedJmEvents, jobManagerConsumer);
Expand All @@ -369,11 +379,11 @@ private void testGenericSchemaEvolution(
private void validateSnapshotData(String dbName, String tableName) throws Exception {
List<String> expected =
Stream.of(
"CreateTableEvent{tableId=%s.members, schema=columns={`id` INT NOT NULL,`name` VARCHAR(17),`age` INT,`id_square` INT}, primaryKeys=id, options=()}",
"DataChangeEvent{tableId=%s.%s, before=[], after=[1008, Alice, 21, -1016064], op=INSERT, meta=()}",
"DataChangeEvent{tableId=%s.%s, before=[], after=[1009, Bob, 20, -1018081], op=INSERT, meta=()}",
"DataChangeEvent{tableId=%s.%s, before=[], after=[1010, Carol, 19, 1020100], op=INSERT, meta=()}",
"DataChangeEvent{tableId=%s.%s, before=[], after=[1011, Derrida, 18, 1022121], op=INSERT, meta=()}")
"CreateTableEvent{tableId=%s.%s, schema=columns={`uid` STRING,`id` INT NOT NULL,`name` VARCHAR(17),`age` INT,`id_square` INT,`tag` STRING}, primaryKeys=id, options=()}",
"DataChangeEvent{tableId=%s.%s, before=[], after=[1009 -> Bob, 1009, Bob, 20, -1018081, age >= 20], op=INSERT, meta=()}",
"DataChangeEvent{tableId=%s.%s, before=[], after=[1008 -> Alice, 1008, Alice, 21, -1016064, age >= 20], op=INSERT, meta=()}",
"DataChangeEvent{tableId=%s.%s, before=[], after=[1011 -> Derrida, 1011, Derrida, 18, 1022121, age < 20], op=INSERT, meta=()}",
"DataChangeEvent{tableId=%s.%s, before=[], after=[1010 -> Carol, 1010, Carol, 19, 1020100, age < 20], op=INSERT, meta=()}")
.map(s -> String.format(s, dbName, tableName))
.collect(Collectors.toList());

Expand All @@ -387,7 +397,7 @@ private void waitForIncrementalStage(String dbName, String tableName, Statement
// Ensure we change schema after incremental stage
waitUntilSpecificEvent(
String.format(
"DataChangeEvent{tableId=%s.%s, before=[], after=[0, __fence__, 0, 0], op=INSERT, meta=()}",
"DataChangeEvent{tableId=%s.%s, before=[], after=[0 -> __fence__, 0, __fence__, 0, 0, age < 20], op=INSERT, meta=()}",
dbName, tableName),
taskManagerConsumer);
}
Expand All @@ -405,7 +415,7 @@ private void waitUntilSpecificEvent(String event, ToStringConsumer consumer) thr
System.currentTimeMillis() + SchemaEvolvingTransformE2eITCase.EVENT_WAITING_TIMEOUT;
while (System.currentTimeMillis() < endTimeout) {
String stdout = consumer.toUtf8String();
if (stdout.contains(event)) {
if (stdout.contains(event + "\n")) {
result = true;
break;
}
Expand Down

0 comments on commit 8d0708d

Please sign in to comment.