From afa70ae8b465c6fc4fa78519d6569fba30f1c88c Mon Sep 17 00:00:00 2001 From: yuxiqian <34335406+yuxiqian@users.noreply.github.com> Date: Wed, 17 Jul 2024 19:16:33 +0800 Subject: [PATCH] Fix test cases --- .../coordinator/SchemaDerivationTest.java | 26 ++++++++++++++++--- 1 file changed, 23 insertions(+), 3 deletions(-) diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaDerivationTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaDerivationTest.java index 7499a90eced..9a2d1cfb4f9 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaDerivationTest.java +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaDerivationTest.java @@ -147,8 +147,9 @@ void testOneToOneMapping() { @Test void testMergingTablesWithExactSameSchema() { + SchemaManager schemaManager = new SchemaManager(); SchemaDerivation schemaDerivation = - new SchemaDerivation(new SchemaManager(), ROUTES, new HashMap<>()); + new SchemaDerivation(schemaManager, ROUTES, new HashMap<>()); // Create table 1 List derivedChangesAfterCreateTable = @@ -158,6 +159,8 @@ void testMergingTablesWithExactSameSchema() { .asCreateTableEvent() .hasTableId(MERGED_TABLE) .hasSchema(SCHEMA); + derivedChangesAfterCreateTable.forEach(schemaManager::applyEvolvedSchemaChange); + // Create table 2 assertThat(schemaDerivation.applySchemaChange(new CreateTableEvent(TABLE_2, SCHEMA))) .isEmpty(); @@ -177,6 +180,8 @@ void testMergingTablesWithExactSameSchema() { .asAddColumnEvent() .hasTableId(MERGED_TABLE) .containsAddedColumns(newCol1, newCol2); + derivedChangesAfterAddColumn.forEach(schemaManager::applyEvolvedSchemaChange); + // Add column for table 2 assertThat(schemaDerivation.applySchemaChange(new AddColumnEvent(TABLE_2, newColumns))) .isEmpty(); @@ -190,6 +195,8 @@ void testMergingTablesWithExactSameSchema() { .asAlterColumnTypeEvent() .hasTableId(MERGED_TABLE) .containsTypeMapping(typeMapping); + derivedChangesAfterAlterColumnType.forEach(schemaManager::applyEvolvedSchemaChange); + // Alter column type for table 2 assertThat( schemaDerivation.applySchemaChange( @@ -215,6 +222,8 @@ void testMergingTablesWithExactSameSchema() { .containsAddedColumns( new AddColumnEvent.ColumnWithPosition( new PhysicalColumn("last_name", DataTypes.STRING(), null))); + derivedChangesAfterRenameColumn.forEach(schemaManager::applyEvolvedSchemaChange); + // Rename column for table 2 assertThat( schemaDerivation.applySchemaChange( @@ -235,6 +244,8 @@ void testMergingTableWithDifferentSchemas() { .asCreateTableEvent() .hasTableId(MERGED_TABLE) .hasSchema(SCHEMA); + derivedChangesAfterCreateTable.forEach(schemaManager::applyEvolvedSchemaChange); + // Create table 2 List derivedChangesAfterCreateTable2 = schemaDerivation.applySchemaChange( @@ -250,6 +261,7 @@ void testMergingTableWithDifferentSchemas() { "gender", DataTypes.STRING(), null)))), new AlterColumnTypeEvent( MERGED_TABLE, ImmutableMap.of("age", DataTypes.BIGINT()))); + derivedChangesAfterCreateTable2.forEach(schemaManager::applyEvolvedSchemaChange); // Add column for table 1 AddColumnEvent.ColumnWithPosition newCol1 = @@ -266,6 +278,8 @@ void testMergingTableWithDifferentSchemas() { .asAddColumnEvent() .hasTableId(MERGED_TABLE) .containsAddedColumns(newCol1, newCol2); + derivedChangesAfterAddColumn.forEach(schemaManager::applyEvolvedSchemaChange); + // Add column for table 2 List derivedChangesAfterAddColumnForTable2 = schemaDerivation.applySchemaChange( @@ -284,6 +298,7 @@ void testMergingTableWithDifferentSchemas() { .containsTypeMapping( ImmutableMap.of( "new_col1", DataTypes.STRING(), "new_col2", DataTypes.STRING())); + derivedChangesAfterAddColumnForTable2.forEach(schemaManager::applyEvolvedSchemaChange); // Alter column type for table 1 ImmutableMap typeMapping = ImmutableMap.of("age", DataTypes.BIGINT()); @@ -316,6 +331,8 @@ void testMergingTableWithDifferentSchemas() { .containsAddedColumns( new AddColumnEvent.ColumnWithPosition( new PhysicalColumn("last_name", DataTypes.STRING(), null))); + derivedChangesAfterRenameColumn.forEach(schemaManager::applyEvolvedSchemaChange); + // Rename column for table 2 List derivedChangesAfterRenameColumnForTable2 = schemaDerivation.applySchemaChange( @@ -327,8 +344,9 @@ void testMergingTableWithDifferentSchemas() { .containsAddedColumns( new AddColumnEvent.ColumnWithPosition( new PhysicalColumn("first_name", DataTypes.STRING(), null))); + derivedChangesAfterRenameColumnForTable2.forEach(schemaManager::applyEvolvedSchemaChange); - assertThat(schemaManager.getLatestUpstreamSchema(MERGED_TABLE)) + assertThat(schemaManager.getLatestEvolvedSchema(MERGED_TABLE)) .contains( Schema.newBuilder() .column(Column.physicalColumn("id", DataTypes.BIGINT())) @@ -344,8 +362,9 @@ void testMergingTableWithDifferentSchemas() { @Test void testIncompatibleTypes() { + SchemaManager schemaManager = new SchemaManager(); SchemaDerivation schemaDerivation = - new SchemaDerivation(new SchemaManager(), ROUTES, new HashMap<>()); + new SchemaDerivation(schemaManager, ROUTES, new HashMap<>()); // Create table 1 List derivedChangesAfterCreateTable = schemaDerivation.applySchemaChange(new CreateTableEvent(TABLE_1, SCHEMA)); @@ -354,6 +373,7 @@ void testIncompatibleTypes() { .asCreateTableEvent() .hasTableId(MERGED_TABLE) .hasSchema(SCHEMA); + derivedChangesAfterCreateTable.forEach(schemaManager::applyEvolvedSchemaChange); // Create table 2 assertThatThrownBy(