diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml
index 2326240b6d4..c5ded309990 100644
--- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml
+++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml
@@ -105,6 +105,12 @@ limitations under the License.
${project.version}
test
+
+ org.apache.flink
+ flink-cdc-pipeline-udf-examples
+ ${project.version}
+ test
+
@@ -231,6 +237,16 @@ limitations under the License.
${project.build.directory}/dependencies
+
+
+ org.apache.flink
+ flink-cdc-pipeline-udf-examples
+ ${project.version}
+ udf-examples.jar
+ jar
+ ${project.build.directory}/dependencies
+
+
diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/UdfE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/UdfE2eITCase.java
new file mode 100644
index 00000000000..5b359cb9b08
--- /dev/null
+++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/UdfE2eITCase.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.pipeline.tests;
+
+import org.apache.flink.cdc.common.test.utils.TestUtils;
+import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer;
+import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion;
+import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase;
+import org.apache.flink.cdc.pipeline.tests.utils.PipelineTestEnvironment;
+import org.apache.flink.cdc.runtime.operators.transform.TransformSchemaOperator;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+
+import java.nio.file.Path;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.TimeoutException;
+
+/** E2e tests for the {@link TransformSchemaOperator}. */
+@RunWith(Parameterized.class)
+public class UdfE2eITCase extends PipelineTestEnvironment {
+ private static final Logger LOG = LoggerFactory.getLogger(TransformE2eITCase.class);
+
+ // ------------------------------------------------------------------------------------------
+ // MySQL Variables (we always use MySQL as the data source for easier verifying)
+ // ------------------------------------------------------------------------------------------
+ protected static final String MYSQL_TEST_USER = "mysqluser";
+ protected static final String MYSQL_TEST_PASSWORD = "mysqlpw";
+ protected static final String MYSQL_DRIVER_CLASS = "com.mysql.cj.jdbc.Driver";
+ protected static final String INTER_CONTAINER_MYSQL_ALIAS = "mysql";
+
+ @ClassRule
+ public static final MySqlContainer MYSQL =
+ (MySqlContainer)
+ new MySqlContainer(
+ MySqlVersion.V8_0) // v8 support both ARM and AMD architectures
+ .withConfigurationOverride("docker/mysql/my.cnf")
+ .withSetupSQL("docker/mysql/setup.sql")
+ .withDatabaseName("flink-test")
+ .withUsername("flinkuser")
+ .withPassword("flinkpw")
+ .withNetwork(NETWORK)
+ .withNetworkAliases(INTER_CONTAINER_MYSQL_ALIAS)
+ .withLogConsumer(new Slf4jLogConsumer(LOG));
+
+ protected final UniqueDatabase transformRenameDatabase =
+ new UniqueDatabase(MYSQL, "transform_test", MYSQL_TEST_USER, MYSQL_TEST_PASSWORD);
+
+ @Before
+ public void before() throws Exception {
+ super.before();
+ transformRenameDatabase.createAndInitialize();
+ }
+
+ @After
+ public void after() {
+ super.after();
+ transformRenameDatabase.dropDatabase();
+ }
+
+ @Test
+ public void testUserDefinedFunctions() throws Exception {
+ String pipelineJob =
+ String.format(
+ "source:\n"
+ + " type: mysql\n"
+ + " hostname: %s\n"
+ + " port: 3306\n"
+ + " username: %s\n"
+ + " password: %s\n"
+ + " tables: %s.\\.*\n"
+ + " server-id: 5400-5404\n"
+ + " server-time-zone: UTC\n"
+ + "\n"
+ + "sink:\n"
+ + " type: values\n"
+ + "udf:\n"
+ + " - name: addone\n"
+ + " classpath: org.apache.flink.cdc.udf.examples.AddOneFunctionClass\n"
+ + " - name: format\n"
+ + " classpath: org.apache.flink.cdc.udf.examples.FormatFunctionClass\n"
+ + " - name: stateful\n"
+ + " classpath: org.apache.flink.cdc.udf.examples.StatefulFunctionClass\n"
+ + " - name: typeof\n"
+ + " classpath: org.apache.flink.cdc.udf.examples.TypeOfFunctionClass\n"
+ + "transform:\n"
+ + " - source-table: %s.TABLEALPHA\n"
+ + " projection: ID, VERSION, ADDONE(ADDONE(ID)) AS INC_ID, FORMAT('<%%s>', VERSION) AS FMT_VER\n"
+ + " filter: ID > 1008\n"
+ + " - source-table: %s.TABLEBETA\n"
+ + " projection: ID, VERSION, STATEFUL AS STT, TYPEOF(ID) AS TYP\n"
+ + "\n"
+ + "pipeline:\n"
+ + " parallelism: 1",
+ INTER_CONTAINER_MYSQL_ALIAS,
+ MYSQL_TEST_USER,
+ MYSQL_TEST_PASSWORD,
+ transformRenameDatabase.getDatabaseName(),
+ transformRenameDatabase.getDatabaseName(),
+ transformRenameDatabase.getDatabaseName(),
+ transformRenameDatabase.getDatabaseName(),
+ transformRenameDatabase.getDatabaseName());
+ Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
+ Path valuesCdcJar = TestUtils.getResource("values-cdc-pipeline-connector.jar");
+ Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
+ Path udfJar = TestUtils.getResource("udf-examples.jar");
+ submitPipelineJob(pipelineJob, mysqlCdcJar, valuesCdcJar, mysqlDriverJar, udfJar);
+ waitUntilJobRunning(Duration.ofSeconds(30));
+ LOG.info("Pipeline job is running");
+ waitUntilSpecificEvent(
+ String.format(
+ "DataChangeEvent{tableId=%s.terminus, before=[], after=[1011, 11], op=INSERT, meta=()}",
+ transformRenameDatabase.getDatabaseName()),
+ 60000L);
+
+ waitUntilSpecificEvent(
+ String.format(
+ "DataChangeEvent{tableId=%s.terminus, before=[], after=[2014, 14], op=INSERT, meta=()}",
+ transformRenameDatabase.getDatabaseName()),
+ 60000L);
+
+ List expectedEvents =
+ Arrays.asList(
+ String.format(
+ "CreateTableEvent{tableId=%s.terminus, schema=columns={`ID` INT NOT NULL,`VERSION` STRING}, primaryKeys=ID, options=()}",
+ transformRenameDatabase.getDatabaseName()),
+ String.format(
+ "DataChangeEvent{tableId=%s.terminus, before=[], after=[1009, 8.1], op=INSERT, meta=()}",
+ transformRenameDatabase.getDatabaseName()),
+ String.format(
+ "DataChangeEvent{tableId=%s.terminus, before=[], after=[1010, 10], op=INSERT, meta=()}",
+ transformRenameDatabase.getDatabaseName()),
+ String.format(
+ "DataChangeEvent{tableId=%s.terminus, before=[], after=[1011, 11], op=INSERT, meta=()}",
+ transformRenameDatabase.getDatabaseName()),
+ String.format(
+ "DataChangeEvent{tableId=%s.terminus, before=[], after=[2011, 11], op=INSERT, meta=()}",
+ transformRenameDatabase.getDatabaseName()),
+ String.format(
+ "DataChangeEvent{tableId=%s.terminus, before=[], after=[2012, 12], op=INSERT, meta=()}",
+ transformRenameDatabase.getDatabaseName()),
+ String.format(
+ "DataChangeEvent{tableId=%s.terminus, before=[], after=[2014, 14], op=INSERT, meta=()}",
+ transformRenameDatabase.getDatabaseName()));
+ validateResult(expectedEvents);
+ LOG.info("Begin incremental reading stage.");
+ // generate binlogs
+ String mysqlJdbcUrl =
+ String.format(
+ "jdbc:mysql://%s:%s/%s",
+ MYSQL.getHost(),
+ MYSQL.getDatabasePort(),
+ transformRenameDatabase.getDatabaseName());
+ try (Connection conn =
+ DriverManager.getConnection(
+ mysqlJdbcUrl, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD);
+ Statement stat = conn.createStatement()) {
+ stat.execute("UPDATE TABLEALPHA SET VERSION='100' WHERE id=1009;");
+ stat.execute("INSERT INTO TABLEALPHA VALUES (3007, '7', 79);");
+ stat.execute("DELETE FROM TABLEBETA WHERE id=2011;");
+ } catch (SQLException e) {
+ LOG.error("Update table for CDC failed.", e);
+ throw e;
+ }
+
+ waitUntilSpecificEvent(
+ String.format(
+ "DataChangeEvent{tableId=%s.terminus, before=[], after=[3007, 7], op=INSERT, meta=()}",
+ transformRenameDatabase.getDatabaseName()),
+ 20000L);
+
+ waitUntilSpecificEvent(
+ String.format(
+ "DataChangeEvent{tableId=%s.terminus, before=[1009, 8.1], after=[1009, 100], op=UPDATE, meta=()}",
+ transformRenameDatabase.getDatabaseName()),
+ 20000L);
+
+ waitUntilSpecificEvent(
+ String.format(
+ "DataChangeEvent{tableId=%s.terminus, before=[2011, 11], after=[], op=DELETE, meta=()}",
+ transformRenameDatabase.getDatabaseName()),
+ 20000L);
+
+ String stdout = taskManagerConsumer.toUtf8String();
+ System.out.println(stdout);
+ }
+
+ private void validateResult(List expectedEvents) throws Exception {
+ for (String event : expectedEvents) {
+ waitUntilSpecificEvent(event, 6000L);
+ }
+ }
+
+ private void waitUntilSpecificEvent(String event, long timeout) throws Exception {
+ boolean result = false;
+ long endTimeout = System.currentTimeMillis() + timeout;
+ while (System.currentTimeMillis() < endTimeout) {
+ String stdout = taskManagerConsumer.toUtf8String();
+ if (stdout.contains(event)) {
+ result = true;
+ break;
+ }
+ Thread.sleep(1000);
+ }
+ if (!result) {
+ throw new TimeoutException(
+ "failed to get specific event: "
+ + event
+ + " from stdout: "
+ + taskManagerConsumer.toUtf8String());
+ }
+ }
+}
diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-udf-examples/pom.xml b/flink-cdc-e2e-tests/flink-cdc-pipeline-udf-examples/pom.xml
new file mode 100644
index 00000000000..5213aa85a72
--- /dev/null
+++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-udf-examples/pom.xml
@@ -0,0 +1,20 @@
+
+
+ 4.0.0
+
+ org.apache.flink
+ flink-cdc-e2e-tests
+ ${revision}
+
+
+ flink-cdc-pipeline-udf-examples
+
+
+ 8
+ 8
+ UTF-8
+
+
+
\ No newline at end of file
diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-udf-examples/src/main/java/org/apache/flink/cdc/udf/examples/AddOneFunctionClass.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-udf-examples/src/main/java/org/apache/flink/cdc/udf/examples/AddOneFunctionClass.java
new file mode 100644
index 00000000000..b5f643e2f39
--- /dev/null
+++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-udf-examples/src/main/java/org/apache/flink/cdc/udf/examples/AddOneFunctionClass.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.udf.examples;
+
+/** This is an example UDF class for testing purposes only. */
+public class AddOneFunctionClass {
+ public String eval(String num) {
+ return String.valueOf(Integer.parseInt(num) + 1);
+ }
+}
diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-udf-examples/src/main/java/org/apache/flink/cdc/udf/examples/FormatFunctionClass.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-udf-examples/src/main/java/org/apache/flink/cdc/udf/examples/FormatFunctionClass.java
new file mode 100644
index 00000000000..f5c68f70b14
--- /dev/null
+++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-udf-examples/src/main/java/org/apache/flink/cdc/udf/examples/FormatFunctionClass.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.udf.examples;
+
+/** This is an example UDF class for testing purposes only. */
+public class FormatFunctionClass {
+ public String eval(String format, Object... args) {
+ return String.format(format, args);
+ }
+}
diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-udf-examples/src/main/java/org/apache/flink/cdc/udf/examples/StatefulFunctionClass.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-udf-examples/src/main/java/org/apache/flink/cdc/udf/examples/StatefulFunctionClass.java
new file mode 100644
index 00000000000..69098deb57b
--- /dev/null
+++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-udf-examples/src/main/java/org/apache/flink/cdc/udf/examples/StatefulFunctionClass.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.udf.examples;
+
+/** This is an example UDF class for testing purposes only. */
+public class StatefulFunctionClass {
+ private static Integer counter = 0;
+
+ public String eval() {
+ // Mod 6 here since testcases might be run multiple times.
+ // In FlinkPipelineComposerITCase#testStatefulUdf,
+ // this UDF would be invoked 6 times.
+ return "#" + (counter++ % 6);
+ }
+}
diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-udf-examples/src/main/java/org/apache/flink/cdc/udf/examples/TypeOfFunctionClass.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-udf-examples/src/main/java/org/apache/flink/cdc/udf/examples/TypeOfFunctionClass.java
new file mode 100644
index 00000000000..921dbcffa14
--- /dev/null
+++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-udf-examples/src/main/java/org/apache/flink/cdc/udf/examples/TypeOfFunctionClass.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.udf.examples;
+
+/** This is an example UDF class for testing purposes only. */
+public class TypeOfFunctionClass {
+ public String eval(Boolean b) {
+ return "Boolean: " + b;
+ }
+
+ public String eval(Integer i) {
+ return "Integer: " + i;
+ }
+
+ public String eval(Float f) {
+ return "Float: " + f;
+ }
+
+ public String eval(Double d) {
+ return "Double: " + d;
+ }
+
+ public String eval(String s) {
+ return "String: " + s;
+ }
+}
diff --git a/flink-cdc-e2e-tests/pom.xml b/flink-cdc-e2e-tests/pom.xml
index 3ac5fe5980c..9a8dba53bed 100644
--- a/flink-cdc-e2e-tests/pom.xml
+++ b/flink-cdc-e2e-tests/pom.xml
@@ -31,6 +31,7 @@ limitations under the License.
flink-cdc-e2e-utils
+ flink-cdc-pipeline-udf-examples
flink-cdc-source-e2e-tests
flink-cdc-pipeline-e2e-tests