Skip to content

Commit

Permalink
[FLINK-34876][transform] Support UDF functions in transform
Browse files Browse the repository at this point in the history
  • Loading branch information
yuxiqian committed Jul 15, 2024
1 parent a098dda commit eeda325
Show file tree
Hide file tree
Showing 18 changed files with 411 additions and 46 deletions.
19 changes: 18 additions & 1 deletion docs/content.zh/docs/core-concept/data-pipeline.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,30 @@ We could use following yaml file to define a complicated Data Pipeline describin
fenodes: 127.0.0.1:8030
username: root
password: ""

transform:
- source-table: adb.web_order01
projection: \*, UPPER(product_name) as product_name
filter: id > 10 AND order_id > 100
description: project fields and filter
- source-table: adb.web_order02
projection: \*, UPPER(product_name) as product_name
filter: id > 20 AND order_id > 200
description: project fields and filter

route:
- source-table: app_db.orders
sink-table: ods_db.ods_orders
- source-table: app_db.shipments
sink-table: ods_db.ods_shipments
- source-table: app_db.products
sink-table: ods_db.ods_products
sink-table: ods_db.ods_products

udf:
- name: substring
classpath: com.example.functions.scalar.SubStringFunctionClass
- name: encrypt
classpath: com.example.functions.scalar.EncryptFunctionClass

pipeline:
name: Sync MySQL Database to Doris
Expand Down
19 changes: 18 additions & 1 deletion docs/content/docs/core-concept/data-pipeline.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,30 @@ We could use following yaml file to define a complicated Data Pipeline describin
fenodes: 127.0.0.1:8030
username: root
password: ""

transform:
- source-table: adb.web_order01
projection: \*, UPPER(product_name) as product_name
filter: id > 10 AND order_id > 100
description: project fields and filter
- source-table: adb.web_order02
projection: \*, UPPER(product_name) as product_name
filter: id > 20 AND order_id > 200
description: project fields and filter

route:
- source-table: app_db.orders
sink-table: ods_db.ods_orders
- source-table: app_db.shipments
sink-table: ods_db.ods_shipments
- source-table: app_db.products
sink-table: ods_db.ods_products
sink-table: ods_db.ods_products

udf:
- name: substring
classpath: com.example.functions.scalar.SubStringFunctionClass
- name: encrypt
classpath: com.example.functions.scalar.EncryptFunctionClass

pipeline:
name: Sync MySQL Database to Doris
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.flink.cdc.composer.definition.SinkDef;
import org.apache.flink.cdc.composer.definition.SourceDef;
import org.apache.flink.cdc.composer.definition.TransformDef;
import org.apache.flink.cdc.composer.definition.UdfDef;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
Expand All @@ -46,6 +47,7 @@ public class YamlPipelineDefinitionParser implements PipelineDefinitionParser {
private static final String SINK_KEY = "sink";
private static final String ROUTE_KEY = "route";
private static final String TRANSFORM_KEY = "transform";
private static final String UDF_KEY = "udf";
private static final String PIPELINE_KEY = "pipeline";

// Source / sink keys
Expand All @@ -64,6 +66,10 @@ public class YamlPipelineDefinitionParser implements PipelineDefinitionParser {
private static final String TRANSFORM_FILTER_KEY = "filter";
private static final String TRANSFORM_DESCRIPTION_KEY = "description";

// UDF keys
private static final String UDF_FUNCTION_NAME_KEY = "name";
private static final String UDF_CLASSPATH_KEY = "classpath";

public static final String TRANSFORM_PRIMARY_KEY_KEY = "primary-keys";

public static final String TRANSFORM_PARTITION_KEY_KEY = "partition-keys";
Expand Down Expand Up @@ -107,6 +113,11 @@ public PipelineDef parse(Path pipelineDefPath, Configuration globalPipelineConfi
Optional.ofNullable(root.get(ROUTE_KEY))
.ifPresent(node -> node.forEach(route -> routeDefs.add(toRouteDef(route))));

// Udfs are optional
List<UdfDef> udfDefs = new ArrayList<>();
Optional.ofNullable(root.get(UDF_KEY))
.ifPresent(node -> node.forEach(udf -> udfDefs.add(toUdfDef(udf))));

// Pipeline configs are optional
Configuration userPipelineConfig = toPipelineConfig(root.get(PIPELINE_KEY));

Expand All @@ -115,7 +126,8 @@ public PipelineDef parse(Path pipelineDefPath, Configuration globalPipelineConfi
pipelineConfig.addAll(globalPipelineConfig);
pipelineConfig.addAll(userPipelineConfig);

return new PipelineDef(sourceDef, sinkDef, routeDefs, transformDefs, pipelineConfig);
return new PipelineDef(
sourceDef, sinkDef, routeDefs, transformDefs, udfDefs, pipelineConfig);
}

private SourceDef toSourceDef(JsonNode sourceNode) {
Expand Down Expand Up @@ -176,6 +188,23 @@ private RouteDef toRouteDef(JsonNode routeNode) {
return new RouteDef(sourceTable, sinkTable, replaceSymbol, description);
}

private UdfDef toUdfDef(JsonNode udfNode) {
String functionName =
checkNotNull(
udfNode.get(UDF_FUNCTION_NAME_KEY),
"Missing required field \"%s\" in UDF configuration",
UDF_FUNCTION_NAME_KEY)
.asText();
String classPath =
checkNotNull(
udfNode.get(UDF_CLASSPATH_KEY),
"Missing required field \"%s\" in UDF configuration",
UDF_CLASSPATH_KEY)
.asText();

return new UdfDef(functionName, classPath);
}

private TransformDef toTransformDef(JsonNode transformNode) {
String sourceTable =
checkNotNull(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.flink.cdc.composer.definition.SinkDef;
import org.apache.flink.cdc.composer.definition.SourceDef;
import org.apache.flink.cdc.composer.definition.TransformDef;
import org.apache.flink.cdc.composer.definition.UdfDef;

import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableMap;
import org.apache.flink.shaded.guava31.com.google.common.io.Resources;
Expand Down Expand Up @@ -230,6 +231,13 @@ void testRouteWithReplacementSymbol() throws Exception {
null,
null,
"add new uniq_id for each row")),
Arrays.asList(
new UdfDef(
"substring",
"com.example.functions.scalar.SubStringFunctionClass"),
new UdfDef(
"encrypt",
"com.example.functions.scalar.EncryptFunctionClass")),
Configuration.fromMap(
ImmutableMap.<String, String>builder()
.put("name", "source-database-sync-pipe")
Expand Down Expand Up @@ -293,6 +301,13 @@ void testRouteWithReplacementSymbol() throws Exception {
null,
null,
"add new uniq_id for each row")),
Arrays.asList(
new UdfDef(
"substring",
"com.example.functions.scalar.SubStringFunctionClass"),
new UdfDef(
"encrypt",
"com.example.functions.scalar.EncryptFunctionClass")),
Configuration.fromMap(
ImmutableMap.<String, String>builder()
.put("name", "source-database-sync-pipe")
Expand Down Expand Up @@ -330,6 +345,7 @@ void testRouteWithReplacementSymbol() throws Exception {
null,
null)),
Collections.emptyList(),
Collections.emptyList(),
Configuration.fromMap(
ImmutableMap.<String, String>builder()
.put("parallelism", "4")
Expand All @@ -341,6 +357,7 @@ void testRouteWithReplacementSymbol() throws Exception {
new SinkDef("kafka", null, new Configuration()),
Collections.emptyList(),
Collections.emptyList(),
Collections.emptyList(),
Configuration.fromMap(Collections.singletonMap("parallelism", "1")));

private final PipelineDef fullDefWithRouteRepSym =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,12 @@ transform:
filter: uniq_id > 10
description: add new uniq_id for each row

udf:
- name: substring
classpath: com.example.functions.scalar.SubStringFunctionClass
- name: encrypt
classpath: com.example.functions.scalar.EncryptFunctionClass

pipeline:
name: source-database-sync-pipe
parallelism: 4
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,18 +54,21 @@ public class PipelineDef {
private final SinkDef sink;
private final List<RouteDef> routes;
private final List<TransformDef> transforms;
private final List<UdfDef> udfs;
private final Configuration config;

public PipelineDef(
SourceDef source,
SinkDef sink,
List<RouteDef> routes,
List<TransformDef> transforms,
List<UdfDef> udfs,
Configuration config) {
this.source = source;
this.sink = sink;
this.routes = routes;
this.transforms = transforms;
this.udfs = udfs;
this.config = evaluatePipelineTimeZone(config);
}

Expand All @@ -85,6 +88,10 @@ public List<TransformDef> getTransforms() {
return transforms;
}

public List<UdfDef> getUdfs() {
return udfs;
}

public Configuration getConfig() {
return config;
}
Expand All @@ -100,6 +107,8 @@ public String toString() {
+ routes
+ ", transforms="
+ transforms
+ ", udfs="
+ udfs
+ ", config="
+ config
+ '}';
Expand All @@ -118,12 +127,13 @@ public boolean equals(Object o) {
&& Objects.equals(sink, that.sink)
&& Objects.equals(routes, that.routes)
&& Objects.equals(transforms, that.transforms)
&& Objects.equals(udfs, that.udfs)
&& Objects.equals(config, that.config);
}

@Override
public int hashCode() {
return Objects.hash(source, sink, routes, transforms, config);
return Objects.hash(source, sink, routes, transforms, udfs, config);
}

// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.cdc.composer.definition;

import java.util.Objects;

/**
* Definition of a transformation.
*
* <p>A transformation definition contains:
*
* <ul>
* <li>name: Static method name of user-defined functions.
* <li>classpath: Fully-qualified class path of package containing given function.
* </ul>
*/
public class UdfDef {
private final String name;
private final String classPath;

public UdfDef(String name, String classPath) {
this.name = name;
this.classPath = classPath;
}

public String getName() {
return name;
}

public String getClassPath() {
return classPath;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}

UdfDef udfDef = (UdfDef) o;
return Objects.equals(name, udfDef.name) && Objects.equals(classPath, udfDef.classPath);
}

@Override
public int hashCode() {
return Objects.hash(name, classPath);
}

@Override
public String toString() {
return "UdfDef{" + "name='" + name + '\'' + ", classPath='" + classPath + '\'' + '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.flink.cdc.composer.flink;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.common.configuration.Configuration;
import org.apache.flink.cdc.common.event.Event;
Expand Down Expand Up @@ -49,6 +50,7 @@
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

/** Composer for translating data pipeline to a Flink DataStream job. */
@Internal
Expand Down Expand Up @@ -95,14 +97,21 @@ public PipelineExecution compose(PipelineDef pipelineDef) {
int parallelism = pipelineDef.getConfig().get(PipelineOptions.PIPELINE_PARALLELISM);
env.getConfig().setParallelism(parallelism);

List<Tuple2<String, String>> udfFunctions =
pipelineDef.getUdfs().stream()
.map(udf -> Tuple2.of(udf.getName(), udf.getClassPath()))
.collect(Collectors.toList());

// Build Source Operator
DataSourceTranslator sourceTranslator = new DataSourceTranslator();
DataStream<Event> stream =
sourceTranslator.translate(pipelineDef.getSource(), env, pipelineDef.getConfig());

// Build TransformSchemaOperator for processing Schema Event
TransformTranslator transformTranslator = new TransformTranslator();
stream = transformTranslator.translateSchema(stream, pipelineDef.getTransforms());
stream =
transformTranslator.translateSchema(
stream, pipelineDef.getTransforms(), udfFunctions);

// Schema operator
SchemaOperatorTranslator schemaOperatorTranslator =
Expand All @@ -123,7 +132,8 @@ public PipelineExecution compose(PipelineDef pipelineDef) {
stream,
pipelineDef.getTransforms(),
schemaOperatorIDGenerator.generate(),
pipelineDef.getConfig().get(PipelineOptions.PIPELINE_LOCAL_TIME_ZONE));
pipelineDef.getConfig().get(PipelineOptions.PIPELINE_LOCAL_TIME_ZONE),
udfFunctions);

// Build DataSink in advance as schema operator requires MetadataApplier
DataSink dataSink = createDataSink(pipelineDef.getSink(), pipelineDef.getConfig());
Expand Down
Loading

0 comments on commit eeda325

Please sign in to comment.