diff --git a/flink-cdc-migration-tests/flink-cdc-migration-testcases/pom.xml b/flink-cdc-migration-tests/flink-cdc-migration-testcases/pom.xml
index 4be8f9c1292..12c468a4b47 100644
--- a/flink-cdc-migration-tests/flink-cdc-migration-testcases/pom.xml
+++ b/flink-cdc-migration-tests/flink-cdc-migration-testcases/pom.xml
@@ -52,6 +52,12 @@ limitations under the License.
${revision}
compile
+
+ org.apache.flink
+ flink-cdc-release-3.1.1
+ ${revision}
+ compile
+
org.apache.flink
flink-cdc-release-snapshot
diff --git a/flink-cdc-migration-tests/flink-cdc-migration-testcases/src/test/java/org/apache/flink/cdc/migration/tests/MigrationTestBase.java b/flink-cdc-migration-tests/flink-cdc-migration-testcases/src/test/java/org/apache/flink/cdc/migration/tests/MigrationTestBase.java
index 2172f011e5e..e1020442b76 100644
--- a/flink-cdc-migration-tests/flink-cdc-migration-testcases/src/test/java/org/apache/flink/cdc/migration/tests/MigrationTestBase.java
+++ b/flink-cdc-migration-tests/flink-cdc-migration-testcases/src/test/java/org/apache/flink/cdc/migration/tests/MigrationTestBase.java
@@ -35,6 +35,7 @@ public enum FlinkCdcVersion {
v3_0_0,
v3_0_1,
v3_1_0,
+ v3_1_1,
SNAPSHOT;
public String getShadedClassPrefix() {
@@ -45,6 +46,8 @@ public String getShadedClassPrefix() {
return "com.ververica.cdc.v3_0_1";
case v3_1_0:
return "org.apache.flink.cdc.v3_1_0";
+ case v3_1_1:
+ return "org.apache.flink.cdc.v3_1_1";
case SNAPSHOT:
return "org.apache.flink.cdc.snapshot";
default:
@@ -58,6 +61,7 @@ public String getShadedClassPrefix() {
FlinkCdcVersion.v3_0_0,
FlinkCdcVersion.v3_0_1,
FlinkCdcVersion.v3_1_0,
+ FlinkCdcVersion.v3_1_1,
FlinkCdcVersion.SNAPSHOT);
public static List getAllVersions() {
diff --git a/flink-cdc-migration-tests/flink-cdc-release-3.1.1/pom.xml b/flink-cdc-migration-tests/flink-cdc-release-3.1.1/pom.xml
new file mode 100644
index 00000000000..751b0f9d877
--- /dev/null
+++ b/flink-cdc-migration-tests/flink-cdc-release-3.1.1/pom.xml
@@ -0,0 +1,87 @@
+
+
+
+ 4.0.0
+
+ org.apache.flink
+ flink-cdc-migration-tests
+ ${revision}
+
+
+ flink-cdc-release-3.1.1
+ flink-cdc-release-3.1.1
+
+
+
+ org.apache.flink
+ flink-cdc-base
+ 3.1.1
+
+
+ org.apache.flink
+ flink-cdc-common
+ 3.1.1
+
+
+ org.apache.flink
+ flink-cdc-runtime
+ 3.1.1
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+ 3.2.4
+
+
+ shade-flink-cdc
+ package
+
+ shade
+
+
+
+
+ *:*
+
+ META-INF/*.SF
+ META-INF/*.DSA
+ META-INF/*.RSA
+
+
+
+
+
+ org.apache.flink.cdc
+ org.apache.flink.cdc.v3_1_1
+ META-INF/*.SF,META-INF/*.DSA,META-INF/*.RSA
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/flink-cdc-migration-tests/flink-cdc-release-3.1.1/src/main/java/org/apache/flink/cdc/migration/tests/MigrationMockBase.java b/flink-cdc-migration-tests/flink-cdc-release-3.1.1/src/main/java/org/apache/flink/cdc/migration/tests/MigrationMockBase.java
new file mode 100644
index 00000000000..3f52615dbde
--- /dev/null
+++ b/flink-cdc-migration-tests/flink-cdc-release-3.1.1/src/main/java/org/apache/flink/cdc/migration/tests/MigrationMockBase.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.migration.tests;
+
+/** Base classes for migration test cases. */
+public interface MigrationMockBase {
+ int getSerializerVersion();
+
+ byte[] serializeObject() throws Exception;
+
+ boolean deserializeAndCheckObject(int v, byte[] b) throws Exception;
+}
diff --git a/flink-cdc-migration-tests/flink-cdc-release-3.1.1/src/main/java/org/apache/flink/cdc/migration/tests/SchemaManagerMigrationMock.java b/flink-cdc-migration-tests/flink-cdc-release-3.1.1/src/main/java/org/apache/flink/cdc/migration/tests/SchemaManagerMigrationMock.java
new file mode 100644
index 00000000000..c4f0788dd0d
--- /dev/null
+++ b/flink-cdc-migration-tests/flink-cdc-release-3.1.1/src/main/java/org/apache/flink/cdc/migration/tests/SchemaManagerMigrationMock.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.migration.tests;
+
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.common.types.DataTypes;
+import org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaManager;
+
+import java.util.Collections;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+/** Dummy classes for migration test. Called via reflection. */
+public class SchemaManagerMigrationMock implements MigrationMockBase {
+ private static final TableId DUMMY_TABLE_ID =
+ TableId.tableId("dummyNamespace", "dummySchema", "dummyTable");
+ private static final Schema DUMMY_SCHEMA =
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.INT())
+ .physicalColumn("name", DataTypes.STRING())
+ .physicalColumn("age", DataTypes.DOUBLE())
+ .primaryKey("id", "name")
+ .build();
+
+ private static final String SCHEMA_MANAGER =
+ "runtime.operators.schema.coordinator.SchemaManager";
+
+ public SchemaManager generateDummyObject() {
+ SortedMap schemaVersions = new TreeMap<>();
+ schemaVersions.put(1, DUMMY_SCHEMA);
+ schemaVersions.put(2, DUMMY_SCHEMA);
+ schemaVersions.put(3, DUMMY_SCHEMA);
+ return new SchemaManager(Collections.singletonMap(DUMMY_TABLE_ID, schemaVersions));
+ }
+
+ @Override
+ public int getSerializerVersion() {
+ return SchemaManager.SERIALIZER.getVersion();
+ }
+
+ @Override
+ public byte[] serializeObject() throws Exception {
+ return SchemaManager.SERIALIZER.serialize(generateDummyObject());
+ }
+
+ @Override
+ public boolean deserializeAndCheckObject(int version, byte[] serialized) throws Exception {
+ Object expected = generateDummyObject();
+ Object actual = SchemaManager.SERIALIZER.deserialize(version, serialized);
+ return expected.equals(actual);
+ }
+}
diff --git a/flink-cdc-migration-tests/flink-cdc-release-3.1.1/src/main/java/org/apache/flink/cdc/migration/tests/SchemaRegistryMigrationMock.java b/flink-cdc-migration-tests/flink-cdc-release-3.1.1/src/main/java/org/apache/flink/cdc/migration/tests/SchemaRegistryMigrationMock.java
new file mode 100644
index 00000000000..93269abece8
--- /dev/null
+++ b/flink-cdc-migration-tests/flink-cdc-release-3.1.1/src/main/java/org/apache/flink/cdc/migration/tests/SchemaRegistryMigrationMock.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.migration.tests;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.common.schema.Selectors;
+import org.apache.flink.cdc.common.types.DataTypes;
+import org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaDerivation;
+import org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaManager;
+import org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistry;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.CompletableFuture;
+
+/** Dummy classes for migration test. Called via reflection. */
+public class SchemaRegistryMigrationMock implements MigrationMockBase {
+ private static final TableId DUMMY_TABLE_ID =
+ TableId.tableId("dummyNamespace", "dummySchema", "dummyTable");
+ private static final Schema DUMMY_SCHEMA =
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.INT())
+ .physicalColumn("name", DataTypes.STRING())
+ .physicalColumn("age", DataTypes.DOUBLE())
+ .primaryKey("id", "name")
+ .build();
+
+ public SchemaManager generateDummySchemaManager() {
+ SortedMap schemaVersions = new TreeMap<>();
+ schemaVersions.put(1, DUMMY_SCHEMA);
+ schemaVersions.put(2, DUMMY_SCHEMA);
+ schemaVersions.put(3, DUMMY_SCHEMA);
+ return new SchemaManager(Collections.singletonMap(DUMMY_TABLE_ID, schemaVersions));
+ }
+
+ public SchemaRegistry generateSchemaRegistry() {
+ return new SchemaRegistry("Dummy Name", null, e -> {}, new ArrayList<>());
+ }
+
+ private SchemaManager getSchemaManager(SchemaRegistry schemaRegistry) throws Exception {
+ Field field = SchemaRegistry.class.getDeclaredField("schemaManager");
+ field.setAccessible(true);
+ return (SchemaManager) field.get(schemaRegistry);
+ }
+
+ private void setSchemaManager(SchemaRegistry schemaRegistry, SchemaManager schemaManager)
+ throws Exception {
+ Field field = SchemaRegistry.class.getDeclaredField("schemaManager");
+ field.setAccessible(true);
+ field.set(schemaRegistry, schemaManager);
+ }
+
+ private SchemaDerivation getSchemaDerivation(SchemaRegistry schemaRegistry) throws Exception {
+ Field field = SchemaRegistry.class.getDeclaredField("schemaDerivation");
+ field.setAccessible(true);
+ return (SchemaDerivation) field.get(schemaRegistry);
+ }
+
+ private List> getSchemaRoutes(SchemaRegistry schemaRegistry)
+ throws Exception {
+ SchemaDerivation schemaDerivation = getSchemaDerivation(schemaRegistry);
+ Field field = SchemaDerivation.class.getDeclaredField("routes");
+ field.setAccessible(true);
+ return (List>) field.get(schemaDerivation);
+ }
+
+ @Override
+ public int getSerializerVersion() {
+ return -1;
+ }
+
+ @Override
+ public byte[] serializeObject() throws Exception {
+ CompletableFuture future = new CompletableFuture<>();
+ SchemaRegistry registry = generateSchemaRegistry();
+ setSchemaManager(registry, generateDummySchemaManager());
+
+ registry.checkpointCoordinator(0, future);
+
+ while (!future.isDone()) {
+ Thread.sleep(1000);
+ }
+ return future.get();
+ }
+
+ @Override
+ public boolean deserializeAndCheckObject(int v, byte[] b) throws Exception {
+ SchemaRegistry expected = generateSchemaRegistry();
+ setSchemaManager(expected, generateDummySchemaManager());
+ SchemaRegistry actual = generateSchemaRegistry();
+ actual.resetToCheckpoint(0, b);
+ return getSchemaManager(expected).equals(getSchemaManager(actual))
+ && getSchemaRoutes(expected).equals(getSchemaRoutes(actual));
+ }
+}
diff --git a/flink-cdc-migration-tests/flink-cdc-release-3.1.1/src/main/java/org/apache/flink/cdc/migration/tests/TableChangeInfoMigrationMock.java b/flink-cdc-migration-tests/flink-cdc-release-3.1.1/src/main/java/org/apache/flink/cdc/migration/tests/TableChangeInfoMigrationMock.java
new file mode 100644
index 00000000000..6a14a2be2a5
--- /dev/null
+++ b/flink-cdc-migration-tests/flink-cdc-release-3.1.1/src/main/java/org/apache/flink/cdc/migration/tests/TableChangeInfoMigrationMock.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.migration.tests;
+
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.common.types.DataTypes;
+import org.apache.flink.cdc.runtime.operators.transform.TableChangeInfo;
+
+/** Dummy classes for migration test. Called via reflection. */
+public class TableChangeInfoMigrationMock implements MigrationMockBase {
+ private static final TableId DUMMY_TABLE_ID =
+ TableId.tableId("dummyNamespace", "dummySchema", "dummyTable");
+ private static final Schema DUMMY_SCHEMA =
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.INT())
+ .physicalColumn("name", DataTypes.STRING())
+ .physicalColumn("age", DataTypes.DOUBLE())
+ .primaryKey("id", "name")
+ .build();
+
+ public TableChangeInfo generateDummyObject() {
+ return TableChangeInfo.of(DUMMY_TABLE_ID, DUMMY_SCHEMA, DUMMY_SCHEMA);
+ }
+
+ @Override
+ public int getSerializerVersion() {
+ return TableChangeInfo.SERIALIZER.getVersion();
+ }
+
+ @Override
+ public byte[] serializeObject() throws Exception {
+ return TableChangeInfo.SERIALIZER.serialize(generateDummyObject());
+ }
+
+ @Override
+ public boolean deserializeAndCheckObject(int version, byte[] bytes) throws Exception {
+ TableChangeInfo expected = generateDummyObject();
+ TableChangeInfo actual = TableChangeInfo.SERIALIZER.deserialize(version, bytes);
+
+ return expected.getTableId().equals(actual.getTableId())
+ && expected.getOriginalSchema().equals(actual.getOriginalSchema())
+ && expected.getTransformedSchema().equals(actual.getTransformedSchema());
+ }
+}
diff --git a/flink-cdc-migration-tests/pom.xml b/flink-cdc-migration-tests/pom.xml
index da36ef50281..269c5c0db77 100644
--- a/flink-cdc-migration-tests/pom.xml
+++ b/flink-cdc-migration-tests/pom.xml
@@ -33,6 +33,7 @@ limitations under the License.
flink-cdc-release-3.0.0
flink-cdc-release-3.0.1
flink-cdc-release-3.1.0
+ flink-cdc-release-3.1.1
flink-cdc-release-snapshot
flink-cdc-migration-testcases