diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TableChangeInfo.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TableChangeInfo.java index fa04fa50a5..ca9781eb18 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TableChangeInfo.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TableChangeInfo.java @@ -109,8 +109,14 @@ public static TableChangeInfo of( /** Serializer for {@link TableChangeInfo}. */ public static class Serializer implements SimpleVersionedSerializer { + /** The latest version before change of state compatibility. */ + public static final int VERSION_BEFORE_STATE_COMPATIBILITY = 1; + public static final int CURRENT_VERSION = 2; + /** Used to distinguish with the state which CURRENT_VERSION was not written. */ + public static final TableId MAGIC_TABLE_ID = TableId.tableId("__magic_table__"); + @Override public int getVersion() { return CURRENT_VERSION; @@ -122,6 +128,8 @@ public byte[] serialize(TableChangeInfo tableChangeInfo) throws IOException { SchemaSerializer schemaSerializer = SchemaSerializer.INSTANCE; try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); DataOutputStream out = new DataOutputStream(baos)) { + tableIdSerializer.serialize(MAGIC_TABLE_ID, new DataOutputViewStreamWrapper(out)); + out.writeInt(CURRENT_VERSION); tableIdSerializer.serialize( tableChangeInfo.getTableId(), new DataOutputViewStreamWrapper(out)); schemaSerializer.serialize( @@ -139,6 +147,12 @@ public TableChangeInfo deserialize(int version, byte[] serialized) throws IOExce try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized); DataInputStream in = new DataInputStream(bais)) { TableId tableId = tableIdSerializer.deserialize(new DataInputViewStreamWrapper(in)); + if (tableId.equals(MAGIC_TABLE_ID)) { + version = in.readInt(); + tableId = tableIdSerializer.deserialize(new DataInputViewStreamWrapper(in)); + } else { + version = VERSION_BEFORE_STATE_COMPATIBILITY; + } Schema originalSchema = schemaSerializer.deserialize(version, new DataInputViewStreamWrapper(in)); Schema transformedSchema =