Skip to content

Commit

Permalink
[FLINK-34876][transform] Support UDF functions in transform
Browse files Browse the repository at this point in the history
  • Loading branch information
yuxiqian committed Aug 1, 2024
1 parent 1388cf9 commit 2ca3e97
Show file tree
Hide file tree
Showing 50 changed files with 3,039 additions and 121 deletions.
75 changes: 40 additions & 35 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,41 +35,46 @@ full database synchronization, sharding table synchronization, schema evolution
2. [Download](https://github.com/apache/flink-cdc/releases) Flink CDC tar, unzip it and put jars of pipeline connector to Flink `lib` directory.
3. Create a **YAML** file to describe the data source and data sink, the following example synchronizes all tables under MySQL app_db database to Doris :
```yaml
source:
type: mysql
name: MySQL Source
hostname: 127.0.0.1
port: 3306
username: admin
password: pass
tables: adb.\.*
server-id: 5401-5404

sink:
type: doris
name: Doris Sink
fenodes: 127.0.0.1:8030
username: root
password: pass

transform:
- source-table: adb.web_order01
projection: \*, UPPER(product_name) as product_name
filter: id > 10 AND order_id > 100
description: project fields and filter
- source-table: adb.web_order02
projection: \*, UPPER(product_name) as product_name
filter: id > 20 AND order_id > 200
description: project fields and filter

route:
- source-table: adb.web_order\.*
sink-table: adb.ods_web_orders
description: sync sharding tables to one destination table

pipeline:
name: MySQL to Doris Pipeline
parallelism: 4
source:
type: mysql
hostname: localhost
port: 3306
username: root
password: 123456
tables: app_db.\.*

sink:
type: doris
fenodes: 127.0.0.1:8030
username: root
password: ""

transform:
- source-table: adb.web_order01
projection: \*, format('%S', product_name) as product_name
filter: addone(id) > 10 AND order_id > 100
description: project fields and filter
- source-table: adb.web_order02
projection: \*, format('%S', product_name) as product_name
filter: addone(id) > 20 AND order_id > 200
description: project fields and filter

route:
- source-table: app_db.orders
sink-table: ods_db.ods_orders
- source-table: app_db.shipments
sink-table: ods_db.ods_shipments
- source-table: app_db.products
sink-table: ods_db.ods_products

pipeline:
name: Sync MySQL Database to Doris
parallelism: 2
user-defined-function:
- name: addone
classpath: com.example.functions.AddOneFunctionClass
- name: format
classpath: com.example.functions.FormatFunctionClass
```
4. Submit pipeline job using `flink-cdc.sh` script.
```shell
Expand Down
17 changes: 11 additions & 6 deletions docs/content.zh/docs/core-concept/data-pipeline.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,15 +79,15 @@ We could use following yaml file to define a complicated Data Pipeline describin
fenodes: 127.0.0.1:8030
username: root
password: ""

transform:
- source-table: adb.web_order01
projection: \*, UPPER(product_name) as product_name
filter: id > 10 AND order_id > 100
projection: \*, format('%S', product_name) as product_name
filter: addone(id) > 10 AND order_id > 100
description: project fields and filter
- source-table: adb.web_order02
projection: \*, UPPER(product_name) as product_name
filter: id > 20 AND order_id > 200
projection: \*, format('%S', product_name) as product_name
filter: addone(id) > 20 AND order_id > 200
description: project fields and filter

route:
Expand All @@ -96,11 +96,16 @@ We could use following yaml file to define a complicated Data Pipeline describin
- source-table: app_db.shipments
sink-table: ods_db.ods_shipments
- source-table: app_db.products
sink-table: ods_db.ods_products
sink-table: ods_db.ods_products

pipeline:
name: Sync MySQL Database to Doris
parallelism: 2
user-defined-function:
- name: addone
classpath: com.example.functions.AddOneFunctionClass
- name: format
classpath: com.example.functions.FormatFunctionClass
```
# Pipeline Configurations
Expand Down
69 changes: 69 additions & 0 deletions docs/content.zh/docs/core-concept/transform.md
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,75 @@ transform:
description: classification mapping example
```

## User-defined Functions

User-defined functions (UDFs) can be used in transform rules.

Classes could be used as a UDF if:

* implements `org.apache.flink.cdc.common.udf.UserDefinedFunction` interface
* has a public constructor with no parameters
* has at least one public method named `eval`

It may also:

* overrides `getReturnType` method to indicate its return CDC type
* overrides `open` and `close` method to do some initialization and cleanup work

For example, this is a valid UDF class:

```java
public class AddOneFunctionClass implements UserDefinedFunction {
public Object eval(Integer num) {
return num + 1;
}
@Override
public DataType getReturnType() {
return DataTypes.INT();
}
@Override
public void open() throws Exception {
// ...
}
@Override
public void close() throws Exception {
// ...
}
}
```

To ease the migration from Flink SQL to Flink CDC, a Flink `ScalarFunction` could also be used as a transform UDF, with some limitations:

* `ScalarFunction` with parameters is not supported.
* Flink-style type hint in `ScalarFunction` will be ignored.
* `open` / `close` lifecycle hooks will not be invoked.

UDF classes could be registered by adding a `user-defined-function` block:

```yaml
pipeline:
user-defined-function:
- name: addone
classpath: org.apache.flink.cdc.udf.examples.java.AddOneFunctionClass
- name: format
classpath: org.apache.flink.cdc.udf.examples.java.FormatFunctionClass
```

Notice that given classpath must be fully-qualified, and corresponding `jar` files must be included in Flink `/lib` folder, or be passed with `flink-cdc.sh --jar` option.

After being correctly registered, UDFs could be used in both `projection` and `filter` expressions, just like built-in functions:

```yaml
transform:
- source-table: db.\.*
projection: "*, inc(inc(inc(id))) as inc_id, format(id, 'id -> %d') as formatted_id"
filter: inc(id) < 100
```

# Known limitations
* Currently, transform doesn't work with route rules. It will be supported in future versions.
* Computed columns cannot reference trimmed columns that do not present in final projection results. This will be fixed in future versions.
Expand Down
18 changes: 17 additions & 1 deletion docs/content/docs/core-concept/data-pipeline.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,17 +97,33 @@ We could use following yaml file to define a complicated Data Pipeline describin
fenodes: 127.0.0.1:8030
username: root
password: ""

transform:
- source-table: adb.web_order01
projection: \*, format('%S', product_name) as product_name
filter: addone(id) > 10 AND order_id > 100
description: project fields and filter
- source-table: adb.web_order02
projection: \*, format('%S', product_name) as product_name
filter: addone(id) > 20 AND order_id > 200
description: project fields and filter

route:
- source-table: app_db.orders
sink-table: ods_db.ods_orders
- source-table: app_db.shipments
sink-table: ods_db.ods_shipments
- source-table: app_db.products
sink-table: ods_db.ods_products
sink-table: ods_db.ods_products

pipeline:
name: Sync MySQL Database to Doris
parallelism: 2
user-defined-function:
- name: addone
classpath: com.example.functions.AddOneFunctionClass
- name: format
classpath: com.example.functions.FormatFunctionClass
```
# Pipeline Configurations
Expand Down
69 changes: 69 additions & 0 deletions docs/content/docs/core-concept/transform.md
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,75 @@ transform:
description: classification mapping example
```

## User-defined Functions

User-defined functions (UDFs) can be used in transform rules.

Classes could be used as a UDF if:

* implements `org.apache.flink.cdc.common.udf.UserDefinedFunction` interface
* has a public constructor with no parameters
* has at least one public method named `eval`

It may also:

* overrides `getReturnType` method to indicate its return CDC type
* overrides `open` and `close` method to do some initialization and cleanup work

For example, this is a valid UDF class:

```java
public class AddOneFunctionClass implements UserDefinedFunction {
public Object eval(Integer num) {
return num + 1;
}
@Override
public DataType getReturnType() {
return DataTypes.INT();
}
@Override
public void open() throws Exception {
// ...
}
@Override
public void close() throws Exception {
// ...
}
}
```

To ease the migration from Flink SQL to Flink CDC, a Flink `ScalarFunction` could also be used as a transform UDF, with some limitations:

* `ScalarFunction` with parameters is not supported.
* Flink-style type hint in `ScalarFunction` will be ignored.
* `open` / `close` lifecycle hooks will not be invoked.

UDF classes could be registered by adding a `user-defined-function` block:

```yaml
pipeline:
user-defined-function:
- name: addone
classpath: org.apache.flink.cdc.udf.examples.java.AddOneFunctionClass
- name: format
classpath: org.apache.flink.cdc.udf.examples.java.FormatFunctionClass
```

Notice that given classpath must be fully-qualified, and corresponding `jar` files must be included in Flink `/lib` folder, or be passed with `flink-cdc.sh --jar` option.

After being correctly registered, UDFs could be used in both `projection` and `filter` expressions, just like built-in functions:

```yaml
transform:
- source-table: db.\.*
projection: "*, inc(inc(inc(id))) as inc_id, format(id, 'id -> %d') as formatted_id"
filter: inc(id) < 100
```

# Known limitations
* Currently, transform doesn't work with route rules. It will be supported in future versions.
* Computed columns cannot reference trimmed columns that do not present in final projection results. This will be fixed in future versions.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@
import org.apache.flink.cdc.composer.definition.SinkDef;
import org.apache.flink.cdc.composer.definition.SourceDef;
import org.apache.flink.cdc.composer.definition.TransformDef;
import org.apache.flink.cdc.composer.definition.UdfDef;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.yaml.YAMLFactory;

import java.nio.file.Path;
Expand Down Expand Up @@ -64,6 +66,11 @@ public class YamlPipelineDefinitionParser implements PipelineDefinitionParser {
private static final String TRANSFORM_FILTER_KEY = "filter";
private static final String TRANSFORM_DESCRIPTION_KEY = "description";

// UDF related keys
private static final String UDF_KEY = "user-defined-function";
private static final String UDF_FUNCTION_NAME_KEY = "name";
private static final String UDF_CLASSPATH_KEY = "classpath";

public static final String TRANSFORM_PRIMARY_KEY_KEY = "primary-keys";

public static final String TRANSFORM_PARTITION_KEY_KEY = "partition-keys";
Expand Down Expand Up @@ -107,6 +114,11 @@ public PipelineDef parse(Path pipelineDefPath, Configuration globalPipelineConfi
Optional.ofNullable(root.get(ROUTE_KEY))
.ifPresent(node -> node.forEach(route -> routeDefs.add(toRouteDef(route))));

// UDFs are optional
List<UdfDef> udfDefs = new ArrayList<>();
Optional.ofNullable(((ObjectNode) root.get(PIPELINE_KEY)).remove(UDF_KEY))
.ifPresent(node -> node.forEach(udf -> udfDefs.add(toUdfDef(udf))));

// Pipeline configs are optional
Configuration userPipelineConfig = toPipelineConfig(root.get(PIPELINE_KEY));

Expand All @@ -115,7 +127,8 @@ public PipelineDef parse(Path pipelineDefPath, Configuration globalPipelineConfi
pipelineConfig.addAll(globalPipelineConfig);
pipelineConfig.addAll(userPipelineConfig);

return new PipelineDef(sourceDef, sinkDef, routeDefs, transformDefs, pipelineConfig);
return new PipelineDef(
sourceDef, sinkDef, routeDefs, transformDefs, udfDefs, pipelineConfig);
}

private SourceDef toSourceDef(JsonNode sourceNode) {
Expand Down Expand Up @@ -176,6 +189,23 @@ private RouteDef toRouteDef(JsonNode routeNode) {
return new RouteDef(sourceTable, sinkTable, replaceSymbol, description);
}

private UdfDef toUdfDef(JsonNode udfNode) {
String functionName =
checkNotNull(
udfNode.get(UDF_FUNCTION_NAME_KEY),
"Missing required field \"%s\" in UDF configuration",
UDF_FUNCTION_NAME_KEY)
.asText();
String classPath =
checkNotNull(
udfNode.get(UDF_CLASSPATH_KEY),
"Missing required field \"%s\" in UDF configuration",
UDF_CLASSPATH_KEY)
.asText();

return new UdfDef(functionName, classPath);
}

private TransformDef toTransformDef(JsonNode transformNode) {
String sourceTable =
checkNotNull(
Expand Down
Loading

0 comments on commit 2ca3e97

Please sign in to comment.