Skip to content

Commit

Permalink
Keep backwards seralization compatibility & polish CI
Browse files Browse the repository at this point in the history
  • Loading branch information
yuxiqian committed May 27, 2024
1 parent b96beb2 commit 6959c47
Show file tree
Hide file tree
Showing 11 changed files with 57 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,6 @@ public Set<SchemaChangeEventType> getSupportedSchemaEvolutionTypes() {

@Override
public void applySchemaChange(SchemaChangeEvent event) {
if (!enabledSchemaEvolutionTypes.contains(event.getType())) {
LOG.info("Sink ignores schema change event {}", event);
return;
}
try {
// send schema change op to doris
if (event instanceof CreateTableEvent) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public MetadataApplier getMetadataApplier(Set<SchemaChangeEventType> enabledEven
return new MetadataApplier() {
@Override
public boolean acceptsSchemaEvolutionType(SchemaChangeEventType schemaChangeEventType) {
return true;
return enabledEventTypes.contains(schemaChangeEventType);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,6 @@ public Set<SchemaChangeEventType> getSupportedSchemaEvolutionTypes() {

@Override
public void applySchemaChange(SchemaChangeEvent schemaChangeEvent) {
if (!enabledSchemaEvolutionTypes.contains(schemaChangeEvent.getType())) {
LOG.info("Sink ignores schema change event {}", schemaChangeEvent);
return;
}
if (catalog == null) {
catalog = FlinkCatalogFactory.createPaimonCatalog(catalogOptions);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,6 @@ public Set<SchemaChangeEventType> getSupportedSchemaEvolutionTypes() {

@Override
public void applySchemaChange(SchemaChangeEvent schemaChangeEvent) {
if (!enabledSchemaEvolutionTypes.contains(schemaChangeEvent.getType())) {
LOG.info("Sink ignores schema change event {}", schemaChangeEvent);
return;
}
if (!isOpened) {
isOpened = true;
catalog.open();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,7 @@ public Set<SchemaChangeEventType> getSupportedSchemaEvolutionTypes() {

@Override
public void applySchemaChange(SchemaChangeEvent schemaChangeEvent) {
if (enabledSchemaEvolutionTypes.contains(schemaChangeEvent.getType())) {
applySchemaChangeEvent(schemaChangeEvent);
} else {
LOG.info("Sink ignores schema change event {}", schemaChangeEvent);
}
applySchemaChangeEvent(schemaChangeEvent);
}
}

Expand Down Expand Up @@ -143,13 +139,11 @@ public void applySchemaChange(SchemaChangeEvent schemaChangeEvent) {
new ValuesTable(
tableId, ((CreateTableEvent) schemaChangeEvent).getSchema()));
}
} else if (enabledSchemaEvolutionTypes.contains(schemaChangeEvent.getType())) {
} else {
throw new RuntimeException(
String.format(
"Rejected schema change event %s since error.on.schema.change is enabled.",
schemaChangeEvent));
} else {
LOG.info("Sink ignores schema change event {}", schemaChangeEvent);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,7 @@ public void testSchemaException() throws Exception {

waitUntilSpecificEvent(
String.format(
"AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]}",
"java.lang.RuntimeException: Refused to apply schema change event AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]} in EXCEPTION mode.",
schemaEvolveDatabase.getDatabaseName()),
taskManagerConsumer,
6000L);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.utils.SchemaUtils;
import org.apache.flink.cdc.runtime.serializer.TableIdSerializer;
Expand Down Expand Up @@ -56,6 +57,7 @@ public class SchemaManager {
private static final Logger LOG = LoggerFactory.getLogger(SchemaManager.class);
private static final int INITIAL_SCHEMA_VERSION = 0;
private static final int VERSIONS_TO_KEEP = 3;
private final SchemaChangeBehavior behavior;

// Serializer for checkpointing
public static final Serializer SERIALIZER = new Serializer();
Expand All @@ -69,13 +71,26 @@ public class SchemaManager {
public SchemaManager() {
evolvedSchemas = new HashMap<>();
upstreamSchemas = new HashMap<>();
behavior = SchemaChangeBehavior.EVOLVE;
}

public SchemaManager(SchemaChangeBehavior behavior) {
evolvedSchemas = new HashMap<>();
upstreamSchemas = new HashMap<>();
this.behavior = behavior;
}

public SchemaManager(
Map<TableId, SortedMap<Integer, Schema>> upstreamSchemas,
Map<TableId, SortedMap<Integer, Schema>> evolvedSchemas) {
Map<TableId, SortedMap<Integer, Schema>> evolvedSchemas,
SchemaChangeBehavior behavior) {
this.evolvedSchemas = evolvedSchemas;
this.upstreamSchemas = upstreamSchemas;
this.behavior = behavior;
}

public SchemaChangeBehavior getBehavior() {
return behavior;
}

public final boolean schemaExists(
Expand Down Expand Up @@ -234,7 +249,7 @@ private void registerNewSchema(
/** Serializer for {@link SchemaManager}. */
public static class Serializer implements SimpleVersionedSerializer<SchemaManager> {

public static final int CURRENT_VERSION = 0;
public static final int CURRENT_VERSION = 1;

@Override
public int getVersion() {
Expand All @@ -247,6 +262,7 @@ public byte[] serialize(SchemaManager schemaManager) throws IOException {
DataOutputStream out = new DataOutputStream(baos)) {
serializeSchemaMap(schemaManager.evolvedSchemas, out);
serializeSchemaMap(schemaManager.upstreamSchemas, out);
out.writeUTF(schemaManager.getBehavior().name());
return baos.toByteArray();
}
}
Expand All @@ -255,9 +271,28 @@ public byte[] serialize(SchemaManager schemaManager) throws IOException {
public SchemaManager deserialize(int version, byte[] serialized) throws IOException {
try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
DataInputStream in = new DataInputStream(bais)) {
Map<TableId, SortedMap<Integer, Schema>> evolvedSchemas = deserializeSchemaMap(in);
Map<TableId, SortedMap<Integer, Schema>> upstreamSchemas = deserializeSchemaMap(in);
return new SchemaManager(upstreamSchemas, evolvedSchemas);
switch (version) {
case 0:
{
Map<TableId, SortedMap<Integer, Schema>> schemas =
deserializeSchemaMap(in);
// In legacy mode, upstream schema and evolved schema never differs
return new SchemaManager(schemas, schemas, SchemaChangeBehavior.EVOLVE);
}

case 1:
{
Map<TableId, SortedMap<Integer, Schema>> evolvedSchemas =
deserializeSchemaMap(in);
Map<TableId, SortedMap<Integer, Schema>> upstreamSchemas =
deserializeSchemaMap(in);
SchemaChangeBehavior behavior =
SchemaChangeBehavior.valueOf(in.readUTF());
return new SchemaManager(upstreamSchemas, evolvedSchemas, behavior);
}
default:
throw new RuntimeException("Unknown serialize version: " + version);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public SchemaRegistry(
this.failedReasons = new HashMap<>();
this.metadataApplier = metadataApplier;
this.routes = routes;
this.schemaManager = new SchemaManager();
this.schemaManager = new SchemaManager(schemaChangeBehavior);
this.schemaDerivation = new SchemaDerivation(schemaManager, routes, new HashMap<>());
this.requestHandler =
new SchemaRegistryRequestHandler(
Expand Down Expand Up @@ -173,7 +173,6 @@ public void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> r
// Serialize SchemaManager
int schemaManagerSerializerVersion = SchemaManager.SERIALIZER.getVersion();
out.writeInt(schemaManagerSerializerVersion);
out.writeUTF(schemaChangeBehavior.name());
byte[] serializedSchemaManager = SchemaManager.SERIALIZER.serialize(schemaManager);
out.writeInt(serializedSchemaManager.length);
out.write(serializedSchemaManager);
Expand Down Expand Up @@ -230,13 +229,15 @@ public void resetToCheckpoint(long checkpointId, @Nullable byte[] checkpointData
try (ByteArrayInputStream bais = new ByteArrayInputStream(checkpointData);
DataInputStream in = new DataInputStream(bais)) {
int schemaManagerSerializerVersion = in.readInt();
SchemaChangeBehavior schemaChangeBehavior = SchemaChangeBehavior.valueOf(in.readUTF());
int length = in.readInt();
byte[] serializedSchemaManager = new byte[length];
in.readFully(serializedSchemaManager);
schemaManager =
SchemaManager.SERIALIZER.deserialize(
schemaManagerSerializerVersion, serializedSchemaManager);

schemaChangeBehavior = schemaManager.getBehavior();

Map<TableId, Set<TableId>> derivationMapping =
SchemaDerivation.deserializerDerivationMapping(in);
schemaDerivation = new SchemaDerivation(schemaManager, routes, derivationMapping);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ private void applySchemaChange(
} else {
try {
metadataApplier.applySchemaChange(changeEvent);
LOG.debug("Apply schema change {} to table {}.", changeEvent, tableId);
LOG.debug("Applied schema change {} to table {}.", changeEvent, tableId);
finishedSchemaChanges.add(changeEvent);
} catch (Throwable t) {
LOG.error(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,8 @@ private SchemaChangeEvent cacheSchema(SchemaChangeEvent event) throws Exception
private TableInfo getTableInfoFromSchemaEvolutionClient(TableId tableId) throws Exception {
TableInfo tableInfo = tableInfoMap.get(tableId);
if (tableInfo == null) {
Optional<Schema> schemaOptional = schemaEvolutionClient.getLatestEvolvedSchema(tableId);
Optional<Schema> schemaOptional =
schemaEvolutionClient.getLatestUpstreamSchema(tableId);
if (schemaOptional.isPresent()) {
tableInfo = TableInfo.of(tableId, schemaOptional.get());
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,9 @@ void testSerde() throws Exception {
schemaManager.applyUpstreamSchemaChange(
new CreateTableEvent(PRODUCTS, PRODUCTS_SCHEMA));
byte[] serialized = SchemaManager.SERIALIZER.serialize(schemaManager);
SchemaManager deserialized = SchemaManager.SERIALIZER.deserialize(0, serialized);
SchemaManager deserialized =
SchemaManager.SERIALIZER.deserialize(
SchemaManager.Serializer.CURRENT_VERSION, serialized);
assertThat(deserialized).isEqualTo(schemaManager);
}
{
Expand All @@ -301,7 +303,9 @@ void testSerde() throws Exception {
new CreateTableEvent(CUSTOMERS, CUSTOMERS_SCHEMA));
schemaManager.applyEvolvedSchemaChange(new CreateTableEvent(PRODUCTS, PRODUCTS_SCHEMA));
byte[] serialized = SchemaManager.SERIALIZER.serialize(schemaManager);
SchemaManager deserialized = SchemaManager.SERIALIZER.deserialize(0, serialized);
SchemaManager deserialized =
SchemaManager.SERIALIZER.deserialize(
SchemaManager.Serializer.CURRENT_VERSION, serialized);
assertThat(deserialized).isEqualTo(schemaManager);
}
}
Expand Down

0 comments on commit 6959c47

Please sign in to comment.