Skip to content

Commit

Permalink
Add lenient schema evolve behavior & tests
Browse files Browse the repository at this point in the history
  • Loading branch information
yuxiqian committed Jul 12, 2024
1 parent 804704f commit 2633019
Show file tree
Hide file tree
Showing 7 changed files with 937 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,13 @@ public class PipelineOptions {
.linebreak()
.add(
ListElement.list(
text("IGNORE: Drop all schema change events."),
text(
"LENIENT: Apply schema changes to downstream tolerantly, and keeps executing if applying fails."),
text(
"TRY_EVOLVE: Apply schema changes to downstream, but keeps executing if applying fails."),
text(
"EVOLVE: Apply schema changes to downstream. This requires sink to support handling schema changes."),
text("IGNORE: Drop all schema change events."),
text(
"EXCEPTION: Throw an exception to terminate the sync pipeline.")))
.build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
@PublicEvolving
public enum SchemaChangeBehavior {
IGNORE,
LENIENT,
TRY_EVOLVE,
EVOLVE,
EXCEPTION
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -653,6 +653,82 @@ public void testFineGrainedSchemaEvolution() throws Exception {
jobManagerConsumer);
}

@Test
public void testLenientSchemaEvolution() throws Exception {
String pipelineJob =
String.format(
"source:\n"
+ " type: mysql\n"
+ " hostname: %s\n"
+ " port: 3306\n"
+ " username: %s\n"
+ " password: %s\n"
+ " tables: %s.members\n"
+ " server-id: 5400-5404\n"
+ " server-time-zone: UTC\n"
+ "\n"
+ "sink:\n"
+ " type: values\n"
+ "\n"
+ "pipeline:\n"
+ " schema.change.behavior: lenient\n"
+ " parallelism: 1",
INTER_CONTAINER_MYSQL_ALIAS,
MYSQL_TEST_USER,
MYSQL_TEST_PASSWORD,
schemaEvolveDatabase.getDatabaseName());
Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
Path valuesCdcJar = TestUtils.getResource("values-cdc-pipeline-connector.jar");
Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
submitPipelineJob(pipelineJob, mysqlCdcJar, valuesCdcJar, mysqlDriverJar);
waitUntilJobRunning(Duration.ofSeconds(30));
LOG.info("Pipeline job is running");
validateSnapshotData("members");

LOG.info("Starting schema evolution");
String mysqlJdbcUrl =
String.format(
"jdbc:mysql://%s:%s/%s",
MYSQL.getHost(),
MYSQL.getDatabasePort(),
schemaEvolveDatabase.getDatabaseName());

try (Connection conn =
DriverManager.getConnection(
mysqlJdbcUrl, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD);
Statement stmt = conn.createStatement()) {

waitForIncrementalStage("members", stmt);

// triggers AddColumnEvent
stmt.execute("ALTER TABLE members ADD COLUMN gender TINYINT AFTER age;");
stmt.execute("INSERT INTO members VALUES (1012, 'Eve', 17, 0);");

// triggers AlterColumnTypeEvent and RenameColumnEvent
stmt.execute("ALTER TABLE members CHANGE COLUMN age precise_age DOUBLE;");

// triggers RenameColumnEvent
stmt.execute("ALTER TABLE members RENAME COLUMN gender TO biological_sex;");

// triggers DropColumnEvent
stmt.execute("ALTER TABLE members DROP COLUMN biological_sex");
stmt.execute("INSERT INTO members VALUES (1013, 'Fiona', 16);");
}

List<String> expected =
Stream.of(
"AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]}",
"DataChangeEvent{tableId=%s.members, before=[], after=[1012, Eve, 17, 0], op=INSERT, meta=()}",
"AlterColumnTypeEvent{tableId=%s.members, nameMapping={age=DOUBLE}}",
"AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`precise_age` DOUBLE, position=LAST, existedColumnName=null}]}",
"AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`biological_sex` TINYINT, position=LAST, existedColumnName=null}]}",
"DataChangeEvent{tableId=%s.members, before=[], after=[1013, Fiona, null, null, 16.0, null], op=INSERT, meta=()}")
.map(s -> String.format(s, schemaEvolveDatabase.getDatabaseName()))
.collect(Collectors.toList());

validateResult(expected, taskManagerConsumer);
}

private void validateResult(List<String> expectedEvents, ToStringConsumer consumer)
throws Exception {
for (String event : expectedEvents) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -362,8 +362,13 @@ private RecordData regenerateRecordData(
RecordData.FieldGetter fieldGetter =
RecordData.createFieldGetter(
originalSchema.getColumn(columnName).get().getType(), columnIndex);
// Check type compatibility
if (originalSchema.getColumn(columnName).get().getType().equals(column.getType())) {
// Check type compatibility, ignoring nullability
if (originalSchema
.getColumn(columnName)
.get()
.getType()
.nullable()
.equals(column.getType().nullable())) {
fieldGetters.add(fieldGetter);
} else {
fieldGetters.add(
Expand Down Expand Up @@ -438,14 +443,15 @@ private void handleSchemaChangeEvent(TableId tableId, SchemaChangeEvent schemaCh
schemaEvolveResponse.getPrintableFailedSchemaChangeEvents()));
}
} else if (schemaChangeBehavior == SchemaChangeBehavior.TRY_EVOLVE
|| schemaChangeBehavior == SchemaChangeBehavior.LENIENT
|| schemaChangeBehavior == SchemaChangeBehavior.IGNORE) {
if (schemaEvolveResponse.hasException()) {
schemaEvolveResponse
.getFailedSchemaChangeEvents()
.forEach(
e ->
LOG.warn(
"Failed to apply event {}, but keeps running in TRY_EVOLVE mode. Caused by: {}",
"Failed to apply event {}, but keeps running in tolerant mode. Caused by: {}",
e.f0,
e.f1));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,20 @@

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.common.event.AddColumnEvent;
import org.apache.flink.cdc.common.event.AlterColumnTypeEvent;
import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.DropColumnEvent;
import org.apache.flink.cdc.common.event.RenameColumnEvent;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.SchemaChangeEventType;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.exceptions.SchemaEvolveException;
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.sink.MetadataApplier;
import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.runtime.operators.schema.event.RefreshPendingListsResponse;
import org.apache.flink.cdc.runtime.operators.schema.event.ReleaseUpstreamRequest;
import org.apache.flink.cdc.runtime.operators.schema.event.ReleaseUpstreamResponse;
Expand All @@ -41,14 +48,19 @@

import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistryRequestHandler.RequestStatus.RECEIVED_RELEASE_REQUEST;
import static org.apache.flink.cdc.runtime.operators.schema.event.CoordinationResponseUtils.wrap;
Expand Down Expand Up @@ -170,7 +182,7 @@ public CompletableFuture<CoordinationResponse> handleSchemaChangeRequest(
}
schemaManager.applyUpstreamSchemaChange(request.getSchemaChangeEvent());
List<SchemaChangeEvent> derivedSchemaChangeEvents =
schemaDerivation.applySchemaChange(request.getSchemaChangeEvent());
calculateDerivedSchemaChangeEvents(request.getSchemaChangeEvent());
CompletableFuture<CoordinationResponse> response =
CompletableFuture.completedFuture(
wrap(new SchemaChangeResponse(derivedSchemaChangeEvents)));
Expand Down Expand Up @@ -261,7 +273,7 @@ private void startNextSchemaChangeRequest() {
} else {
schemaManager.applyUpstreamSchemaChange(request.getSchemaChangeEvent());
List<SchemaChangeEvent> derivedSchemaChangeEvents =
schemaDerivation.applySchemaChange(request.getSchemaChangeEvent());
calculateDerivedSchemaChangeEvents(request.getSchemaChangeEvent());
pendingSchemaChange
.getResponseFuture()
.complete(wrap(new SchemaChangeResponse(derivedSchemaChangeEvents)));
Expand Down Expand Up @@ -301,6 +313,114 @@ public void close() throws IOException {
}
}

private List<SchemaChangeEvent> calculateDerivedSchemaChangeEvents(SchemaChangeEvent event) {
if (SchemaChangeBehavior.LENIENT.equals(schemaChangeBehavior)) {
return lenientizeSchemaChangeEvent(event).stream()
.flatMap(evt -> schemaDerivation.applySchemaChange(evt).stream())
.collect(Collectors.toList());
} else {
return schemaDerivation.applySchemaChange(event);
}
}

private List<SchemaChangeEvent> lenientizeSchemaChangeEvent(SchemaChangeEvent event) {
if (event instanceof CreateTableEvent) {
return Collections.singletonList(event);
}
TableId tableId = event.tableId();
Schema evolvedSchema =
schemaManager
.getLatestEvolvedSchema(tableId)
.orElseThrow(
() ->
new IllegalStateException(
"Evolved schema does not exist, not ready for schema change event "
+ event));
switch (event.getType()) {
case ADD_COLUMN:
{
AddColumnEvent addColumnEvent = (AddColumnEvent) event;
return Collections.singletonList(
new AddColumnEvent(
tableId,
addColumnEvent.getAddedColumns().stream()
.map(
col ->
new AddColumnEvent.ColumnWithPosition(
Column.physicalColumn(
col.getAddColumn()
.getName(),
col.getAddColumn()
.getType()
.nullable(),
col.getAddColumn()
.getComment())))
.collect(Collectors.toList())));
}
case DROP_COLUMN:
{
DropColumnEvent dropColumnEvent = (DropColumnEvent) event;
Map<String, DataType> convertNullableColumns =
dropColumnEvent.getDroppedColumnNames().stream()
.map(evolvedSchema::getColumn)
.flatMap(e -> e.map(Stream::of).orElse(Stream.empty()))
.filter(col -> !col.getType().isNullable())
.collect(
Collectors.toMap(
Column::getName,
column -> column.getType().nullable()));

if (convertNullableColumns.isEmpty()) {
return Collections.emptyList();
} else {
return Collections.singletonList(
new AlterColumnTypeEvent(tableId, convertNullableColumns));
}
}
case RENAME_COLUMN:
{
RenameColumnEvent renameColumnEvent = (RenameColumnEvent) event;
List<AddColumnEvent.ColumnWithPosition> appendColumns = new ArrayList<>();
Map<String, DataType> convertNullableColumns = new HashMap<>();
renameColumnEvent
.getNameMapping()
.forEach(
(key, value) -> {
Column column =
evolvedSchema
.getColumn(key)
.orElseThrow(
() ->
new IllegalArgumentException(
"Non-existed column "
+ key
+ " in evolved schema."));
if (!column.getType().isNullable()) {
// It's a not-nullable column, we need to cast it to
// nullable first
convertNullableColumns.put(
key, column.getType().nullable());
}
appendColumns.add(
new AddColumnEvent.ColumnWithPosition(
Column.physicalColumn(
value,
column.getType().nullable(),
column.getComment())));
});

List<SchemaChangeEvent> events = new ArrayList<>();
events.add(new AddColumnEvent(tableId, appendColumns));
if (!convertNullableColumns.isEmpty()) {
events.add(new AlterColumnTypeEvent(tableId, convertNullableColumns));
}
return events;
}
default:
return Collections.singletonList(event);
}
}

private static class PendingSchemaChange {
private final SchemaChangeRequest changeRequest;
private List<SchemaChangeEvent> derivedSchemaChangeEvents;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,10 @@ public class SchemaOperatorMetrics {
new HashMap<SchemaChangeBehavior, Integer>() {
{
put(SchemaChangeBehavior.IGNORE, 0);
put(SchemaChangeBehavior.TRY_EVOLVE, 1);
put(SchemaChangeBehavior.EVOLVE, 2);
put(SchemaChangeBehavior.EXCEPTION, 3);
put(SchemaChangeBehavior.LENIENT, 1);
put(SchemaChangeBehavior.TRY_EVOLVE, 2);
put(SchemaChangeBehavior.EVOLVE, 3);
put(SchemaChangeBehavior.EXCEPTION, 4);
}
};

Expand Down
Loading

0 comments on commit 2633019

Please sign in to comment.