From 2633019496f69a053a1a5ddefb31f587df286f5d Mon Sep 17 00:00:00 2001 From: yuxiqian <34335406+yuxiqian@users.noreply.github.com> Date: Wed, 26 Jun 2024 12:07:00 +0800 Subject: [PATCH] Add lenient schema evolve behavior & tests --- .../cdc/common/pipeline/PipelineOptions.java | 6 +- .../common/pipeline/SchemaChangeBehavior.java | 1 + .../pipeline/tests/SchemaEvolveE2eITCase.java | 76 ++ .../operators/schema/SchemaOperator.java | 12 +- .../SchemaRegistryRequestHandler.java | 124 ++- .../schema/metrics/SchemaOperatorMetrics.java | 7 +- .../operators/schema/SchemaEvolveTest.java | 720 ++++++++++++++++++ 7 files changed, 937 insertions(+), 9 deletions(-) diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/pipeline/PipelineOptions.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/pipeline/PipelineOptions.java index 5c32a0290f1..48a4fbb13a3 100644 --- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/pipeline/PipelineOptions.java +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/pipeline/PipelineOptions.java @@ -55,9 +55,13 @@ public class PipelineOptions { .linebreak() .add( ListElement.list( + text("IGNORE: Drop all schema change events."), + text( + "LENIENT: Apply schema changes to downstream tolerantly, and keeps executing if applying fails."), + text( + "TRY_EVOLVE: Apply schema changes to downstream, but keeps executing if applying fails."), text( "EVOLVE: Apply schema changes to downstream. This requires sink to support handling schema changes."), - text("IGNORE: Drop all schema change events."), text( "EXCEPTION: Throw an exception to terminate the sync pipeline."))) .build()); diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/pipeline/SchemaChangeBehavior.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/pipeline/SchemaChangeBehavior.java index 3634caba44e..ad9db84a369 100644 --- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/pipeline/SchemaChangeBehavior.java +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/pipeline/SchemaChangeBehavior.java @@ -23,6 +23,7 @@ @PublicEvolving public enum SchemaChangeBehavior { IGNORE, + LENIENT, TRY_EVOLVE, EVOLVE, EXCEPTION diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolveE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolveE2eITCase.java index 89db109a0df..989cb86b13c 100644 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolveE2eITCase.java +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolveE2eITCase.java @@ -653,6 +653,82 @@ public void testFineGrainedSchemaEvolution() throws Exception { jobManagerConsumer); } + @Test + public void testLenientSchemaEvolution() throws Exception { + String pipelineJob = + String.format( + "source:\n" + + " type: mysql\n" + + " hostname: %s\n" + + " port: 3306\n" + + " username: %s\n" + + " password: %s\n" + + " tables: %s.members\n" + + " server-id: 5400-5404\n" + + " server-time-zone: UTC\n" + + "\n" + + "sink:\n" + + " type: values\n" + + "\n" + + "pipeline:\n" + + " schema.change.behavior: lenient\n" + + " parallelism: 1", + INTER_CONTAINER_MYSQL_ALIAS, + MYSQL_TEST_USER, + MYSQL_TEST_PASSWORD, + schemaEvolveDatabase.getDatabaseName()); + Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar"); + Path valuesCdcJar = TestUtils.getResource("values-cdc-pipeline-connector.jar"); + Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar"); + submitPipelineJob(pipelineJob, mysqlCdcJar, valuesCdcJar, mysqlDriverJar); + waitUntilJobRunning(Duration.ofSeconds(30)); + LOG.info("Pipeline job is running"); + validateSnapshotData("members"); + + LOG.info("Starting schema evolution"); + String mysqlJdbcUrl = + String.format( + "jdbc:mysql://%s:%s/%s", + MYSQL.getHost(), + MYSQL.getDatabasePort(), + schemaEvolveDatabase.getDatabaseName()); + + try (Connection conn = + DriverManager.getConnection( + mysqlJdbcUrl, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD); + Statement stmt = conn.createStatement()) { + + waitForIncrementalStage("members", stmt); + + // triggers AddColumnEvent + stmt.execute("ALTER TABLE members ADD COLUMN gender TINYINT AFTER age;"); + stmt.execute("INSERT INTO members VALUES (1012, 'Eve', 17, 0);"); + + // triggers AlterColumnTypeEvent and RenameColumnEvent + stmt.execute("ALTER TABLE members CHANGE COLUMN age precise_age DOUBLE;"); + + // triggers RenameColumnEvent + stmt.execute("ALTER TABLE members RENAME COLUMN gender TO biological_sex;"); + + // triggers DropColumnEvent + stmt.execute("ALTER TABLE members DROP COLUMN biological_sex"); + stmt.execute("INSERT INTO members VALUES (1013, 'Fiona', 16);"); + } + + List expected = + Stream.of( + "AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]}", + "DataChangeEvent{tableId=%s.members, before=[], after=[1012, Eve, 17, 0], op=INSERT, meta=()}", + "AlterColumnTypeEvent{tableId=%s.members, nameMapping={age=DOUBLE}}", + "AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`precise_age` DOUBLE, position=LAST, existedColumnName=null}]}", + "AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`biological_sex` TINYINT, position=LAST, existedColumnName=null}]}", + "DataChangeEvent{tableId=%s.members, before=[], after=[1013, Fiona, null, null, 16.0, null], op=INSERT, meta=()}") + .map(s -> String.format(s, schemaEvolveDatabase.getDatabaseName())) + .collect(Collectors.toList()); + + validateResult(expected, taskManagerConsumer); + } + private void validateResult(List expectedEvents, ToStringConsumer consumer) throws Exception { for (String event : expectedEvents) { diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java index 26930db6439..803186e330d 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java @@ -362,8 +362,13 @@ private RecordData regenerateRecordData( RecordData.FieldGetter fieldGetter = RecordData.createFieldGetter( originalSchema.getColumn(columnName).get().getType(), columnIndex); - // Check type compatibility - if (originalSchema.getColumn(columnName).get().getType().equals(column.getType())) { + // Check type compatibility, ignoring nullability + if (originalSchema + .getColumn(columnName) + .get() + .getType() + .nullable() + .equals(column.getType().nullable())) { fieldGetters.add(fieldGetter); } else { fieldGetters.add( @@ -438,6 +443,7 @@ private void handleSchemaChangeEvent(TableId tableId, SchemaChangeEvent schemaCh schemaEvolveResponse.getPrintableFailedSchemaChangeEvents())); } } else if (schemaChangeBehavior == SchemaChangeBehavior.TRY_EVOLVE + || schemaChangeBehavior == SchemaChangeBehavior.LENIENT || schemaChangeBehavior == SchemaChangeBehavior.IGNORE) { if (schemaEvolveResponse.hasException()) { schemaEvolveResponse @@ -445,7 +451,7 @@ private void handleSchemaChangeEvent(TableId tableId, SchemaChangeEvent schemaCh .forEach( e -> LOG.warn( - "Failed to apply event {}, but keeps running in TRY_EVOLVE mode. Caused by: {}", + "Failed to apply event {}, but keeps running in tolerant mode. Caused by: {}", e.f0, e.f1)); } diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java index 9d1e17f1782..2bfd411f441 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java @@ -19,13 +19,20 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.cdc.common.annotation.Internal; +import org.apache.flink.cdc.common.event.AddColumnEvent; +import org.apache.flink.cdc.common.event.AlterColumnTypeEvent; import org.apache.flink.cdc.common.event.CreateTableEvent; +import org.apache.flink.cdc.common.event.DropColumnEvent; +import org.apache.flink.cdc.common.event.RenameColumnEvent; import org.apache.flink.cdc.common.event.SchemaChangeEvent; import org.apache.flink.cdc.common.event.SchemaChangeEventType; import org.apache.flink.cdc.common.event.TableId; import org.apache.flink.cdc.common.exceptions.SchemaEvolveException; import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior; +import org.apache.flink.cdc.common.schema.Column; +import org.apache.flink.cdc.common.schema.Schema; import org.apache.flink.cdc.common.sink.MetadataApplier; +import org.apache.flink.cdc.common.types.DataType; import org.apache.flink.cdc.runtime.operators.schema.event.RefreshPendingListsResponse; import org.apache.flink.cdc.runtime.operators.schema.event.ReleaseUpstreamRequest; import org.apache.flink.cdc.runtime.operators.schema.event.ReleaseUpstreamResponse; @@ -41,14 +48,19 @@ import java.io.Closeable; import java.io.IOException; +import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.LinkedList; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistryRequestHandler.RequestStatus.RECEIVED_RELEASE_REQUEST; import static org.apache.flink.cdc.runtime.operators.schema.event.CoordinationResponseUtils.wrap; @@ -170,7 +182,7 @@ public CompletableFuture handleSchemaChangeRequest( } schemaManager.applyUpstreamSchemaChange(request.getSchemaChangeEvent()); List derivedSchemaChangeEvents = - schemaDerivation.applySchemaChange(request.getSchemaChangeEvent()); + calculateDerivedSchemaChangeEvents(request.getSchemaChangeEvent()); CompletableFuture response = CompletableFuture.completedFuture( wrap(new SchemaChangeResponse(derivedSchemaChangeEvents))); @@ -261,7 +273,7 @@ private void startNextSchemaChangeRequest() { } else { schemaManager.applyUpstreamSchemaChange(request.getSchemaChangeEvent()); List derivedSchemaChangeEvents = - schemaDerivation.applySchemaChange(request.getSchemaChangeEvent()); + calculateDerivedSchemaChangeEvents(request.getSchemaChangeEvent()); pendingSchemaChange .getResponseFuture() .complete(wrap(new SchemaChangeResponse(derivedSchemaChangeEvents))); @@ -301,6 +313,114 @@ public void close() throws IOException { } } + private List calculateDerivedSchemaChangeEvents(SchemaChangeEvent event) { + if (SchemaChangeBehavior.LENIENT.equals(schemaChangeBehavior)) { + return lenientizeSchemaChangeEvent(event).stream() + .flatMap(evt -> schemaDerivation.applySchemaChange(evt).stream()) + .collect(Collectors.toList()); + } else { + return schemaDerivation.applySchemaChange(event); + } + } + + private List lenientizeSchemaChangeEvent(SchemaChangeEvent event) { + if (event instanceof CreateTableEvent) { + return Collections.singletonList(event); + } + TableId tableId = event.tableId(); + Schema evolvedSchema = + schemaManager + .getLatestEvolvedSchema(tableId) + .orElseThrow( + () -> + new IllegalStateException( + "Evolved schema does not exist, not ready for schema change event " + + event)); + switch (event.getType()) { + case ADD_COLUMN: + { + AddColumnEvent addColumnEvent = (AddColumnEvent) event; + return Collections.singletonList( + new AddColumnEvent( + tableId, + addColumnEvent.getAddedColumns().stream() + .map( + col -> + new AddColumnEvent.ColumnWithPosition( + Column.physicalColumn( + col.getAddColumn() + .getName(), + col.getAddColumn() + .getType() + .nullable(), + col.getAddColumn() + .getComment()))) + .collect(Collectors.toList()))); + } + case DROP_COLUMN: + { + DropColumnEvent dropColumnEvent = (DropColumnEvent) event; + Map convertNullableColumns = + dropColumnEvent.getDroppedColumnNames().stream() + .map(evolvedSchema::getColumn) + .flatMap(e -> e.map(Stream::of).orElse(Stream.empty())) + .filter(col -> !col.getType().isNullable()) + .collect( + Collectors.toMap( + Column::getName, + column -> column.getType().nullable())); + + if (convertNullableColumns.isEmpty()) { + return Collections.emptyList(); + } else { + return Collections.singletonList( + new AlterColumnTypeEvent(tableId, convertNullableColumns)); + } + } + case RENAME_COLUMN: + { + RenameColumnEvent renameColumnEvent = (RenameColumnEvent) event; + List appendColumns = new ArrayList<>(); + Map convertNullableColumns = new HashMap<>(); + renameColumnEvent + .getNameMapping() + .forEach( + (key, value) -> { + Column column = + evolvedSchema + .getColumn(key) + .orElseThrow( + () -> + new IllegalArgumentException( + "Non-existed column " + + key + + " in evolved schema.")); + if (!column.getType().isNullable()) { + // It's a not-nullable column, we need to cast it to + // nullable first + convertNullableColumns.put( + key, column.getType().nullable()); + } + appendColumns.add( + new AddColumnEvent.ColumnWithPosition( + Column.physicalColumn( + value, + column.getType().nullable(), + column.getComment()))); + }); + + List events = new ArrayList<>(); + events.add(new AddColumnEvent(tableId, appendColumns)); + if (!convertNullableColumns.isEmpty()) { + events.add(new AlterColumnTypeEvent(tableId, convertNullableColumns)); + } + return events; + } + default: + return Collections.singletonList(event); + } + } + private static class PendingSchemaChange { private final SchemaChangeRequest changeRequest; private List derivedSchemaChangeEvents; diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/metrics/SchemaOperatorMetrics.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/metrics/SchemaOperatorMetrics.java index f650b6cacdb..4ba8e30c719 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/metrics/SchemaOperatorMetrics.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/metrics/SchemaOperatorMetrics.java @@ -35,9 +35,10 @@ public class SchemaOperatorMetrics { new HashMap() { { put(SchemaChangeBehavior.IGNORE, 0); - put(SchemaChangeBehavior.TRY_EVOLVE, 1); - put(SchemaChangeBehavior.EVOLVE, 2); - put(SchemaChangeBehavior.EXCEPTION, 3); + put(SchemaChangeBehavior.LENIENT, 1); + put(SchemaChangeBehavior.TRY_EVOLVE, 2); + put(SchemaChangeBehavior.EVOLVE, 3); + put(SchemaChangeBehavior.EXCEPTION, 4); } }; diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/SchemaEvolveTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/SchemaEvolveTest.java index 4d2a7ab2d25..8663e502f62 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/SchemaEvolveTest.java +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/SchemaEvolveTest.java @@ -1665,6 +1665,726 @@ tableId, buildRecord(INT, 12, STRING, "Jane", FLOAT, 11f)), harness.close(); } + /** Tests lenient schema change behavior. */ + @Test + public void testLenientSchemaEvolves() throws Exception { + TableId tableId = CUSTOMERS_TABLE_ID; + Schema schemaV1 = + Schema.newBuilder() + .physicalColumn("id", INT) + .physicalColumn("name", STRING.notNull()) + .physicalColumn("age", SMALLINT) + .primaryKey("id") + .build(); + + SchemaChangeBehavior behavior = SchemaChangeBehavior.LENIENT; + + SchemaOperator schemaOperator = + new SchemaOperator(new ArrayList<>(), Duration.ofSeconds(30), behavior); + EventOperatorTestHarness harness = + new EventOperatorTestHarness<>(schemaOperator, 17, Duration.ofSeconds(3), behavior); + harness.open(); + + // Test CreateTableEvent + { + List createAndInsertDataEvents = + Arrays.asList( + new CreateTableEvent(tableId, schemaV1), + DataChangeEvent.insertEvent( + tableId, + buildRecord(INT, 1, STRING, "Alice", SMALLINT, (short) 17)), + DataChangeEvent.insertEvent( + tableId, + buildRecord(INT, 2, STRING, "Bob", SMALLINT, (short) 18)), + DataChangeEvent.insertEvent( + tableId, + buildRecord(INT, 3, STRING, "Carol", SMALLINT, (short) 19))); + + processEvent(schemaOperator, createAndInsertDataEvents); + + Assertions.assertThat( + harness.getOutputRecords().stream() + .map(StreamRecord::getValue) + .collect(Collectors.toList())) + .isEqualTo( + ListUtils.union( + Collections.singletonList(new FlushEvent(tableId)), + createAndInsertDataEvents)); + + Assertions.assertThat(harness.getLatestUpstreamSchema(tableId)).isEqualTo(schemaV1); + Assertions.assertThat(harness.getLatestEvolvedSchema(tableId)).isEqualTo(schemaV1); + + harness.clearOutputRecords(); + } + + // Test AddColumnEvent + { + List addColumnEvents = + Arrays.asList( + new AddColumnEvent( + tableId, + Arrays.asList( + new AddColumnEvent.ColumnWithPosition( + Column.physicalColumn( + "score", INT, "Score data")), + new AddColumnEvent.ColumnWithPosition( + Column.physicalColumn( + "height", DOUBLE, "Height data")))), + DataChangeEvent.insertEvent( + tableId, + buildRecord( + INT, + 4, + STRING, + "Derrida", + SMALLINT, + (short) 20, + INT, + 100, + DOUBLE, + 173.25)), + DataChangeEvent.insertEvent( + tableId, + buildRecord( + INT, + 5, + STRING, + "Eve", + SMALLINT, + (short) 21, + INT, + 97, + DOUBLE, + 160.))); + processEvent(schemaOperator, addColumnEvents); + + Assertions.assertThat( + harness.getOutputRecords().stream() + .map(StreamRecord::getValue) + .collect(Collectors.toList())) + .isEqualTo( + ListUtils.union( + Collections.singletonList(new FlushEvent(tableId)), + addColumnEvents)); + + Schema schemaV2 = + Schema.newBuilder() + .physicalColumn("id", INT) + .physicalColumn("name", STRING.notNull()) + .physicalColumn("age", SMALLINT) + .physicalColumn("score", INT, "Score data") + .physicalColumn("height", DOUBLE, "Height data") + .primaryKey("id") + .build(); + Assertions.assertThat(harness.getLatestUpstreamSchema(tableId)).isEqualTo(schemaV2); + Assertions.assertThat(harness.getLatestEvolvedSchema(tableId)).isEqualTo(schemaV2); + + harness.clearOutputRecords(); + } + + // Test RenameColumnEvent + { + List renameColumnEvents = + Arrays.asList( + new RenameColumnEvent( + tableId, ImmutableMap.of("name", "namae", "age", "toshi")), + DataChangeEvent.insertEvent( + tableId, + buildRecord( + INT, + 6, + STRING, + "Fiona", + SMALLINT, + (short) 22, + INT, + 100, + DOUBLE, + 173.25)), + DataChangeEvent.insertEvent( + tableId, + buildRecord( + INT, + 7, + STRING, + "Gloria", + SMALLINT, + (short) 23, + INT, + 97, + DOUBLE, + 160.))); + + processEvent(schemaOperator, renameColumnEvents); + + List lenientRenameColumnEvents = + Arrays.asList( + new AddColumnEvent( + tableId, + Arrays.asList( + new AddColumnEvent.ColumnWithPosition( + Column.physicalColumn("namae", STRING, null)), + new AddColumnEvent.ColumnWithPosition( + Column.physicalColumn( + "toshi", SMALLINT, null)))), + new AlterColumnTypeEvent( + tableId, Collections.singletonMap("name", STRING)), + DataChangeEvent.insertEvent( + tableId, + buildRecord( + INT, + 6, + STRING, + null, + SMALLINT, + null, + INT, + 100, + DOUBLE, + 173.25, + STRING, + "Fiona", + SMALLINT, + (short) 22)), + DataChangeEvent.insertEvent( + tableId, + buildRecord( + INT, + 7, + STRING, + null, + SMALLINT, + null, + INT, + 97, + DOUBLE, + 160., + STRING, + "Gloria", + SMALLINT, + (short) 23))); + + Assertions.assertThat( + harness.getOutputRecords().stream() + .map(StreamRecord::getValue) + .collect(Collectors.toList())) + .isEqualTo( + ListUtils.union( + Collections.singletonList(new FlushEvent(tableId)), + lenientRenameColumnEvents)); + + Schema schemaV3 = + Schema.newBuilder() + .physicalColumn("id", INT) + .physicalColumn("namae", STRING.notNull()) + .physicalColumn("toshi", SMALLINT) + .physicalColumn("score", INT, "Score data") + .physicalColumn("height", DOUBLE, "Height data") + .primaryKey("id") + .build(); + + Schema schemaV3E = + Schema.newBuilder() + .physicalColumn("id", INT) + .physicalColumn("name", STRING) + .physicalColumn("age", SMALLINT) + .physicalColumn("score", INT, "Score data") + .physicalColumn("height", DOUBLE, "Height data") + .physicalColumn("namae", STRING) + .physicalColumn("toshi", SMALLINT) + .primaryKey("id") + .build(); + + Assertions.assertThat(harness.getLatestUpstreamSchema(tableId)).isEqualTo(schemaV3); + Assertions.assertThat(harness.getLatestEvolvedSchema(tableId)).isEqualTo(schemaV3E); + + harness.clearOutputRecords(); + } + + // Test AlterColumnTypeEvent + { + List alterColumnTypeEvents = + Arrays.asList( + new AlterColumnTypeEvent( + tableId, ImmutableMap.of("score", BIGINT, "toshi", FLOAT)), + DataChangeEvent.insertEvent( + tableId, + buildRecord( + INT, 8, STRING, "Helen", FLOAT, 22f, BIGINT, 100L, + DOUBLE, 173.25)), + DataChangeEvent.insertEvent( + tableId, + buildRecord( + INT, 9, STRING, "Iva", FLOAT, 23f, BIGINT, 97L, DOUBLE, + 160.))); + + processEvent(schemaOperator, alterColumnTypeEvents); + + List lenientAlterColumnTypeEvents = + Arrays.asList( + new AlterColumnTypeEvent( + tableId, ImmutableMap.of("score", BIGINT, "toshi", FLOAT)), + DataChangeEvent.insertEvent( + tableId, + buildRecord( + INT, 8, STRING, null, SMALLINT, null, BIGINT, 100L, + DOUBLE, 173.25, STRING, "Helen", FLOAT, 22f)), + DataChangeEvent.insertEvent( + tableId, + buildRecord( + INT, 9, STRING, null, SMALLINT, null, BIGINT, 97L, + DOUBLE, 160., STRING, "Iva", FLOAT, 23f))); + + Assertions.assertThat( + harness.getOutputRecords().stream() + .map(StreamRecord::getValue) + .collect(Collectors.toList())) + .isEqualTo( + ListUtils.union( + Collections.singletonList(new FlushEvent(tableId)), + lenientAlterColumnTypeEvents)); + + Schema schemaV4 = + Schema.newBuilder() + .physicalColumn("id", INT) + .physicalColumn("namae", STRING.notNull()) + .physicalColumn("toshi", FLOAT) + .physicalColumn("score", BIGINT, "Score data") + .physicalColumn("height", DOUBLE, "Height data") + .primaryKey("id") + .build(); + + Schema schemaV4E = + Schema.newBuilder() + .physicalColumn("id", INT) + .physicalColumn("name", STRING) + .physicalColumn("age", SMALLINT) + .physicalColumn("score", BIGINT, "Score data") + .physicalColumn("height", DOUBLE, "Height data") + .physicalColumn("namae", STRING) + .physicalColumn("toshi", FLOAT) + .primaryKey("id") + .build(); + Assertions.assertThat(harness.getLatestUpstreamSchema(tableId)).isEqualTo(schemaV4); + Assertions.assertThat(harness.getLatestEvolvedSchema(tableId)).isEqualTo(schemaV4E); + + harness.clearOutputRecords(); + } + + // Test DropColumnEvent + { + List dropColumnEvents = + Arrays.asList( + new DropColumnEvent(tableId, Arrays.asList("score", "height")), + DataChangeEvent.insertEvent( + tableId, buildRecord(INT, 12, STRING, "Jane", FLOAT, 11f)), + DataChangeEvent.insertEvent( + tableId, buildRecord(INT, 13, STRING, "Kryo", FLOAT, 23f))); + + processEvent(schemaOperator, dropColumnEvents); + + List lenientDropColumnEvents = + Arrays.asList( + DataChangeEvent.insertEvent( + tableId, + buildRecord( + INT, 12, STRING, null, SMALLINT, null, BIGINT, null, + DOUBLE, null, STRING, "Jane", FLOAT, 11f)), + DataChangeEvent.insertEvent( + tableId, + buildRecord( + INT, 13, STRING, null, SMALLINT, null, BIGINT, null, + DOUBLE, null, STRING, "Kryo", FLOAT, 23f))); + + Assertions.assertThat( + harness.getOutputRecords().stream() + .map(StreamRecord::getValue) + .collect(Collectors.toList())) + .isEqualTo(lenientDropColumnEvents); + + Schema schemaV5 = + Schema.newBuilder() + .physicalColumn("id", INT) + .physicalColumn("namae", STRING.notNull()) + .physicalColumn("toshi", FLOAT) + .primaryKey("id") + .build(); + + Schema schemaV5E = + Schema.newBuilder() + .physicalColumn("id", INT) + .physicalColumn("name", STRING) + .physicalColumn("age", SMALLINT) + .physicalColumn("score", BIGINT, "Score data") + .physicalColumn("height", DOUBLE, "Height data") + .physicalColumn("namae", STRING) + .physicalColumn("toshi", FLOAT) + .primaryKey("id") + .build(); + Assertions.assertThat(harness.getLatestUpstreamSchema(tableId)).isEqualTo(schemaV5); + Assertions.assertThat(harness.getLatestEvolvedSchema(tableId)).isEqualTo(schemaV5E); + + harness.clearOutputRecords(); + } + harness.close(); + } + + @Test + public void testLenientEvolveTweaks() throws Exception { + TableId tableId = CUSTOMERS_TABLE_ID; + Schema schemaV1 = + Schema.newBuilder() + .physicalColumn("id", INT) + .physicalColumn("iina", INT.notNull()) + .physicalColumn("name", STRING.notNull()) + .physicalColumn("age", SMALLINT) + .primaryKey("id") + .build(); + + SchemaChangeBehavior behavior = SchemaChangeBehavior.LENIENT; + + SchemaOperator schemaOperator = + new SchemaOperator(new ArrayList<>(), Duration.ofSeconds(30), behavior); + EventOperatorTestHarness harness = + new EventOperatorTestHarness<>(schemaOperator, 17, Duration.ofSeconds(3), behavior); + harness.open(); + + // Test CreateTableEvent + { + List createAndInsertDataEvents = + Arrays.asList( + new CreateTableEvent(tableId, schemaV1), + DataChangeEvent.insertEvent( + tableId, + buildRecord( + INT, 1, INT, 0, STRING, "Alice", SMALLINT, (short) 17)), + DataChangeEvent.insertEvent( + tableId, + buildRecord( + INT, 2, INT, 0, STRING, "Bob", SMALLINT, (short) 18)), + DataChangeEvent.insertEvent( + tableId, + buildRecord( + INT, + 3, + INT, + 0, + STRING, + "Carol", + SMALLINT, + (short) 19))); + + processEvent(schemaOperator, createAndInsertDataEvents); + + Assertions.assertThat( + harness.getOutputRecords().stream() + .map(StreamRecord::getValue) + .collect(Collectors.toList())) + .isEqualTo( + ListUtils.union( + Collections.singletonList(new FlushEvent(tableId)), + createAndInsertDataEvents)); + + Assertions.assertThat(harness.getLatestUpstreamSchema(tableId)).isEqualTo(schemaV1); + Assertions.assertThat(harness.getLatestEvolvedSchema(tableId)).isEqualTo(schemaV1); + + harness.clearOutputRecords(); + } + + // Test drop a non-null column + { + List dropColumnEvents = + Arrays.asList( + new DropColumnEvent(tableId, Collections.singletonList("name")), + DataChangeEvent.insertEvent( + tableId, buildRecord(INT, 12, INT, 0, SMALLINT, (short) 11)), + DataChangeEvent.insertEvent( + tableId, buildRecord(INT, 13, INT, 0, SMALLINT, (short) 23))); + + processEvent(schemaOperator, dropColumnEvents); + + List lenientDropColumnEvents = + Arrays.asList( + new AlterColumnTypeEvent( + tableId, Collections.singletonMap("name", STRING)), + DataChangeEvent.insertEvent( + tableId, + buildRecord( + INT, 12, INT, 0, STRING, null, SMALLINT, (short) 11)), + DataChangeEvent.insertEvent( + tableId, + buildRecord( + INT, 13, INT, 0, STRING, null, SMALLINT, (short) 23))); + + Assertions.assertThat( + harness.getOutputRecords().stream() + .map(StreamRecord::getValue) + .collect(Collectors.toList())) + .isEqualTo( + ListUtils.union( + Collections.singletonList(new FlushEvent(tableId)), + lenientDropColumnEvents)); + + Schema schemaV2 = + Schema.newBuilder() + .physicalColumn("id", INT) + .physicalColumn("iina", INT.notNull()) + .physicalColumn("age", SMALLINT) + .primaryKey("id") + .build(); + + Schema schemaV2E = + Schema.newBuilder() + .physicalColumn("id", INT) + .physicalColumn("iina", INT.notNull()) + .physicalColumn("name", STRING) + .physicalColumn("age", SMALLINT) + .primaryKey("id") + .build(); + + Assertions.assertThat(harness.getLatestUpstreamSchema(tableId)).isEqualTo(schemaV2); + Assertions.assertThat(harness.getLatestEvolvedSchema(tableId)).isEqualTo(schemaV2E); + + harness.clearOutputRecords(); + } + + // Test inserting non-null column and somewhere in the middle + { + List addColumnEvents = + Arrays.asList( + new AddColumnEvent( + tableId, + Arrays.asList( + new AddColumnEvent.ColumnWithPosition( + Column.physicalColumn("nickname", STRING), + AddColumnEvent.ColumnPosition.AFTER, + "id"), + new AddColumnEvent.ColumnWithPosition( + Column.physicalColumn( + "extra", STRING.notNull())))), + DataChangeEvent.insertEvent( + tableId, + buildRecord( + INT, + 12, + STRING, + "Alice", + INT, + 0, + SMALLINT, + (short) 11, + STRING, + "ailisi")), + DataChangeEvent.insertEvent( + tableId, + buildRecord( + INT, + 13, + STRING, + "Bob", + INT, + 0, + SMALLINT, + (short) 23, + STRING, + "baobo"))); + + processEvent(schemaOperator, addColumnEvents); + + List lenientAddColumnEvents = + Arrays.asList( + new AddColumnEvent( + tableId, + Arrays.asList( + new AddColumnEvent.ColumnWithPosition( + Column.physicalColumn("nickname", STRING)), + new AddColumnEvent.ColumnWithPosition( + Column.physicalColumn("extra", STRING)))), + DataChangeEvent.insertEvent( + tableId, + buildRecord( + INT, + 12, + INT, + 0, + STRING, + null, + SMALLINT, + (short) 11, + STRING, + "Alice", + STRING, + "ailisi")), + DataChangeEvent.insertEvent( + tableId, + buildRecord( + INT, + 13, + INT, + 0, + STRING, + null, + SMALLINT, + (short) 23, + STRING, + "Bob", + STRING, + "baobo"))); + + Assertions.assertThat( + harness.getOutputRecords().stream() + .map(StreamRecord::getValue) + .collect(Collectors.toList())) + .isEqualTo( + ListUtils.union( + Collections.singletonList(new FlushEvent(tableId)), + lenientAddColumnEvents)); + + Schema schemaV3 = + Schema.newBuilder() + .physicalColumn("id", INT) + .physicalColumn("nickname", STRING) + .physicalColumn("iina", INT.notNull()) + .physicalColumn("age", SMALLINT) + .physicalColumn("extra", STRING.notNull()) + .primaryKey("id") + .build(); + + Schema schemaV3E = + Schema.newBuilder() + .physicalColumn("id", INT) + .physicalColumn("iina", INT.notNull()) + .physicalColumn("name", STRING) + .physicalColumn("age", SMALLINT) + .physicalColumn("nickname", STRING) + .physicalColumn("extra", STRING) + .primaryKey("id") + .build(); + + Assertions.assertThat(harness.getLatestUpstreamSchema(tableId)).isEqualTo(schemaV3); + Assertions.assertThat(harness.getLatestEvolvedSchema(tableId)).isEqualTo(schemaV3E); + + harness.clearOutputRecords(); + } + + // Test renaming a non-null column + { + List renameColumnEvents = + Arrays.asList( + new RenameColumnEvent( + tableId, Collections.singletonMap("iina", "yina")), + DataChangeEvent.insertEvent( + tableId, + buildRecord( + INT, + 41, + STRING, + "Carol", + INT, + 0, + SMALLINT, + (short) 11, + STRING, + "kaluo")), + DataChangeEvent.insertEvent( + tableId, + buildRecord( + INT, + 42, + STRING, + "Dorothy", + INT, + 0, + SMALLINT, + (short) 11, + STRING, + "duoluoxi"))); + + processEvent(schemaOperator, renameColumnEvents); + + harness.getLatestEvolvedSchema(tableId); + List lenientRenameColumnEvents = + Arrays.asList( + new AddColumnEvent( + tableId, + Collections.singletonList( + new AddColumnEvent.ColumnWithPosition( + Column.physicalColumn("yina", INT)))), + new AlterColumnTypeEvent( + tableId, Collections.singletonMap("iina", INT)), + DataChangeEvent.insertEvent( + tableId, + buildRecord( + INT, + 41, + INT, + null, + STRING, + null, + SMALLINT, + (short) 11, + STRING, + "Carol", + STRING, + "kaluo", + INT, + 0)), + DataChangeEvent.insertEvent( + tableId, + buildRecord( + INT, + 42, + INT, + null, + STRING, + null, + SMALLINT, + (short) 11, + STRING, + "Dorothy", + STRING, + "duoluoxi", + INT, + 0))); + + Assertions.assertThat( + harness.getOutputRecords().stream() + .map(StreamRecord::getValue) + .collect(Collectors.toList())) + .isEqualTo( + ListUtils.union( + Collections.singletonList(new FlushEvent(tableId)), + lenientRenameColumnEvents)); + + Schema schemaV4 = + Schema.newBuilder() + .physicalColumn("id", INT) + .physicalColumn("nickname", STRING) + .physicalColumn("yina", INT.notNull()) + .physicalColumn("age", SMALLINT) + .physicalColumn("extra", STRING.notNull()) + .primaryKey("id") + .build(); + + Schema schemaV4E = + Schema.newBuilder() + .physicalColumn("id", INT) + .physicalColumn("iina", INT) + .physicalColumn("name", STRING) + .physicalColumn("age", SMALLINT) + .physicalColumn("nickname", STRING) + .physicalColumn("extra", STRING) + .physicalColumn("yina", INT) + .primaryKey("id") + .build(); + + Assertions.assertThat(harness.getLatestUpstreamSchema(tableId)).isEqualTo(schemaV4); + Assertions.assertThat(harness.getLatestEvolvedSchema(tableId)).isEqualTo(schemaV4E); + + harness.clearOutputRecords(); + } + } + private RecordData buildRecord(final Object... args) { List dataTypes = new ArrayList<>(); List objects = new ArrayList<>();