Skip to content

Commit

Permalink
[minor][test] Add Flink CDC 3.1.1 version to migration test version list
Browse files Browse the repository at this point in the history
This closes apache#3426.
  • Loading branch information
yuxiqian authored and wuzhenhua01 committed Aug 4, 2024
1 parent cbf2b19 commit 020a0ad
Show file tree
Hide file tree
Showing 8 changed files with 369 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,12 @@ limitations under the License.
<version>${revision}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cdc-release-3.1.1</artifactId>
<version>${revision}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cdc-release-snapshot</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public enum FlinkCdcVersion {
v3_0_0,
v3_0_1,
v3_1_0,
v3_1_1,
SNAPSHOT;

public String getShadedClassPrefix() {
Expand All @@ -45,6 +46,8 @@ public String getShadedClassPrefix() {
return "com.ververica.cdc.v3_0_1";
case v3_1_0:
return "org.apache.flink.cdc.v3_1_0";
case v3_1_1:
return "org.apache.flink.cdc.v3_1_1";
case SNAPSHOT:
return "org.apache.flink.cdc.snapshot";
default:
Expand All @@ -58,6 +61,7 @@ public String getShadedClassPrefix() {
FlinkCdcVersion.v3_0_0,
FlinkCdcVersion.v3_0_1,
FlinkCdcVersion.v3_1_0,
FlinkCdcVersion.v3_1_1,
FlinkCdcVersion.SNAPSHOT);

public static List<FlinkCdcVersion> getAllVersions() {
Expand Down
87 changes: 87 additions & 0 deletions flink-cdc-migration-tests/flink-cdc-release-3.1.1/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cdc-migration-tests</artifactId>
<version>${revision}</version>
</parent>

<artifactId>flink-cdc-release-3.1.1</artifactId>
<name>flink-cdc-release-3.1.1</name>

<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cdc-base</artifactId>
<version>3.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cdc-common</artifactId>
<version>3.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cdc-runtime</artifactId>
<version>3.1.1</version>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.4</version>
<executions>
<execution>
<id>shade-flink-cdc</id>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<relocations>
<relocation>
<pattern>org.apache.flink.cdc</pattern>
<shadedPattern>org.apache.flink.cdc.v3_1_1</shadedPattern>
<excludes>META-INF/*.SF,META-INF/*.DSA,META-INF/*.RSA</excludes>
</relocation>
</relocations>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.cdc.migration.tests;

/** Base classes for migration test cases. */
public interface MigrationMockBase {
int getSerializerVersion();

byte[] serializeObject() throws Exception;

boolean deserializeAndCheckObject(int v, byte[] b) throws Exception;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.cdc.migration.tests;

import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.types.DataTypes;
import org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaManager;

import java.util.Collections;
import java.util.SortedMap;
import java.util.TreeMap;

/** Dummy classes for migration test. Called via reflection. */
public class SchemaManagerMigrationMock implements MigrationMockBase {
private static final TableId DUMMY_TABLE_ID =
TableId.tableId("dummyNamespace", "dummySchema", "dummyTable");
private static final Schema DUMMY_SCHEMA =
Schema.newBuilder()
.physicalColumn("id", DataTypes.INT())
.physicalColumn("name", DataTypes.STRING())
.physicalColumn("age", DataTypes.DOUBLE())
.primaryKey("id", "name")
.build();

private static final String SCHEMA_MANAGER =
"runtime.operators.schema.coordinator.SchemaManager";

public SchemaManager generateDummyObject() {
SortedMap<Integer, Schema> schemaVersions = new TreeMap<>();
schemaVersions.put(1, DUMMY_SCHEMA);
schemaVersions.put(2, DUMMY_SCHEMA);
schemaVersions.put(3, DUMMY_SCHEMA);
return new SchemaManager(Collections.singletonMap(DUMMY_TABLE_ID, schemaVersions));
}

@Override
public int getSerializerVersion() {
return SchemaManager.SERIALIZER.getVersion();
}

@Override
public byte[] serializeObject() throws Exception {
return SchemaManager.SERIALIZER.serialize(generateDummyObject());
}

@Override
public boolean deserializeAndCheckObject(int version, byte[] serialized) throws Exception {
Object expected = generateDummyObject();
Object actual = SchemaManager.SERIALIZER.deserialize(version, serialized);
return expected.equals(actual);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.cdc.migration.tests;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.schema.Selectors;
import org.apache.flink.cdc.common.types.DataTypes;
import org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaDerivation;
import org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaManager;
import org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistry;

import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;

/** Dummy classes for migration test. Called via reflection. */
public class SchemaRegistryMigrationMock implements MigrationMockBase {
private static final TableId DUMMY_TABLE_ID =
TableId.tableId("dummyNamespace", "dummySchema", "dummyTable");
private static final Schema DUMMY_SCHEMA =
Schema.newBuilder()
.physicalColumn("id", DataTypes.INT())
.physicalColumn("name", DataTypes.STRING())
.physicalColumn("age", DataTypes.DOUBLE())
.primaryKey("id", "name")
.build();

public SchemaManager generateDummySchemaManager() {
SortedMap<Integer, Schema> schemaVersions = new TreeMap<>();
schemaVersions.put(1, DUMMY_SCHEMA);
schemaVersions.put(2, DUMMY_SCHEMA);
schemaVersions.put(3, DUMMY_SCHEMA);
return new SchemaManager(Collections.singletonMap(DUMMY_TABLE_ID, schemaVersions));
}

public SchemaRegistry generateSchemaRegistry() {
return new SchemaRegistry("Dummy Name", null, e -> {}, new ArrayList<>());
}

private SchemaManager getSchemaManager(SchemaRegistry schemaRegistry) throws Exception {
Field field = SchemaRegistry.class.getDeclaredField("schemaManager");
field.setAccessible(true);
return (SchemaManager) field.get(schemaRegistry);
}

private void setSchemaManager(SchemaRegistry schemaRegistry, SchemaManager schemaManager)
throws Exception {
Field field = SchemaRegistry.class.getDeclaredField("schemaManager");
field.setAccessible(true);
field.set(schemaRegistry, schemaManager);
}

private SchemaDerivation getSchemaDerivation(SchemaRegistry schemaRegistry) throws Exception {
Field field = SchemaRegistry.class.getDeclaredField("schemaDerivation");
field.setAccessible(true);
return (SchemaDerivation) field.get(schemaRegistry);
}

private List<Tuple2<Selectors, TableId>> getSchemaRoutes(SchemaRegistry schemaRegistry)
throws Exception {
SchemaDerivation schemaDerivation = getSchemaDerivation(schemaRegistry);
Field field = SchemaDerivation.class.getDeclaredField("routes");
field.setAccessible(true);
return (List<Tuple2<Selectors, TableId>>) field.get(schemaDerivation);
}

@Override
public int getSerializerVersion() {
return -1;
}

@Override
public byte[] serializeObject() throws Exception {
CompletableFuture<byte[]> future = new CompletableFuture<>();
SchemaRegistry registry = generateSchemaRegistry();
setSchemaManager(registry, generateDummySchemaManager());

registry.checkpointCoordinator(0, future);

while (!future.isDone()) {
Thread.sleep(1000);
}
return future.get();
}

@Override
public boolean deserializeAndCheckObject(int v, byte[] b) throws Exception {
SchemaRegistry expected = generateSchemaRegistry();
setSchemaManager(expected, generateDummySchemaManager());
SchemaRegistry actual = generateSchemaRegistry();
actual.resetToCheckpoint(0, b);
return getSchemaManager(expected).equals(getSchemaManager(actual))
&& getSchemaRoutes(expected).equals(getSchemaRoutes(actual));
}
}
Loading

0 comments on commit 020a0ad

Please sign in to comment.