diff --git a/docs/content.zh/docs/core-concept/data-pipeline.md b/docs/content.zh/docs/core-concept/data-pipeline.md index b9745e8823..029d3bf43c 100644 --- a/docs/content.zh/docs/core-concept/data-pipeline.md +++ b/docs/content.zh/docs/core-concept/data-pipeline.md @@ -79,13 +79,30 @@ We could use following yaml file to define a complicated Data Pipeline describin fenodes: 127.0.0.1:8030 username: root password: "" + + transform: + - source-table: adb.web_order01 + projection: \*, UPPER(product_name) as product_name + filter: id > 10 AND order_id > 100 + description: project fields and filter + - source-table: adb.web_order02 + projection: \*, UPPER(product_name) as product_name + filter: id > 20 AND order_id > 200 + description: project fields and filter + route: - source-table: app_db.orders sink-table: ods_db.ods_orders - source-table: app_db.shipments sink-table: ods_db.ods_shipments - source-table: app_db.products - sink-table: ods_db.ods_products + sink-table: ods_db.ods_products + + udf: + - name: substring + classpath: com.example.functions.scalar.SubStringFunctionClass + - name: encrypt + classpath: com.example.functions.scalar.EncryptFunctionClass pipeline: name: Sync MySQL Database to Doris diff --git a/docs/content/docs/core-concept/data-pipeline.md b/docs/content/docs/core-concept/data-pipeline.md index b9745e8823..029d3bf43c 100644 --- a/docs/content/docs/core-concept/data-pipeline.md +++ b/docs/content/docs/core-concept/data-pipeline.md @@ -79,13 +79,30 @@ We could use following yaml file to define a complicated Data Pipeline describin fenodes: 127.0.0.1:8030 username: root password: "" + + transform: + - source-table: adb.web_order01 + projection: \*, UPPER(product_name) as product_name + filter: id > 10 AND order_id > 100 + description: project fields and filter + - source-table: adb.web_order02 + projection: \*, UPPER(product_name) as product_name + filter: id > 20 AND order_id > 200 + description: project fields and filter + route: - source-table: app_db.orders sink-table: ods_db.ods_orders - source-table: app_db.shipments sink-table: ods_db.ods_shipments - source-table: app_db.products - sink-table: ods_db.ods_products + sink-table: ods_db.ods_products + + udf: + - name: substring + classpath: com.example.functions.scalar.SubStringFunctionClass + - name: encrypt + classpath: com.example.functions.scalar.EncryptFunctionClass pipeline: name: Sync MySQL Database to Doris diff --git a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java index 7ad07af1a4..ce8ed5f686 100644 --- a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java +++ b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java @@ -24,6 +24,7 @@ import org.apache.flink.cdc.composer.definition.SinkDef; import org.apache.flink.cdc.composer.definition.SourceDef; import org.apache.flink.cdc.composer.definition.TransformDef; +import org.apache.flink.cdc.composer.definition.UdfDef; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; @@ -46,6 +47,7 @@ public class YamlPipelineDefinitionParser implements PipelineDefinitionParser { private static final String SINK_KEY = "sink"; private static final String ROUTE_KEY = "route"; private static final String TRANSFORM_KEY = "transform"; + private static final String UDF_KEY = "udf"; private static final String PIPELINE_KEY = "pipeline"; // Source / sink keys @@ -64,6 +66,10 @@ public class YamlPipelineDefinitionParser implements PipelineDefinitionParser { private static final String TRANSFORM_FILTER_KEY = "filter"; private static final String TRANSFORM_DESCRIPTION_KEY = "description"; + // UDF keys + private static final String UDF_FUNCTION_NAME_KEY = "name"; + private static final String UDF_CLASSPATH_KEY = "classpath"; + public static final String TRANSFORM_PRIMARY_KEY_KEY = "primary-keys"; public static final String TRANSFORM_PARTITION_KEY_KEY = "partition-keys"; @@ -107,6 +113,11 @@ public PipelineDef parse(Path pipelineDefPath, Configuration globalPipelineConfi Optional.ofNullable(root.get(ROUTE_KEY)) .ifPresent(node -> node.forEach(route -> routeDefs.add(toRouteDef(route)))); + // Udfs are optional + List udfDefs = new ArrayList<>(); + Optional.ofNullable(root.get(UDF_KEY)) + .ifPresent(node -> node.forEach(udf -> udfDefs.add(toUdfDef(udf)))); + // Pipeline configs are optional Configuration userPipelineConfig = toPipelineConfig(root.get(PIPELINE_KEY)); @@ -115,7 +126,8 @@ public PipelineDef parse(Path pipelineDefPath, Configuration globalPipelineConfi pipelineConfig.addAll(globalPipelineConfig); pipelineConfig.addAll(userPipelineConfig); - return new PipelineDef(sourceDef, sinkDef, routeDefs, transformDefs, pipelineConfig); + return new PipelineDef( + sourceDef, sinkDef, routeDefs, transformDefs, udfDefs, pipelineConfig); } private SourceDef toSourceDef(JsonNode sourceNode) { @@ -176,6 +188,23 @@ private RouteDef toRouteDef(JsonNode routeNode) { return new RouteDef(sourceTable, sinkTable, replaceSymbol, description); } + private UdfDef toUdfDef(JsonNode udfNode) { + String functionName = + checkNotNull( + udfNode.get(UDF_FUNCTION_NAME_KEY), + "Missing required field \"%s\" in UDF configuration", + UDF_FUNCTION_NAME_KEY) + .asText(); + String classPath = + checkNotNull( + udfNode.get(UDF_CLASSPATH_KEY), + "Missing required field \"%s\" in UDF configuration", + UDF_CLASSPATH_KEY) + .asText(); + + return new UdfDef(functionName, classPath); + } + private TransformDef toTransformDef(JsonNode transformNode) { String sourceTable = checkNotNull( diff --git a/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java b/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java index 75dc5bd628..01ee50aff7 100644 --- a/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java +++ b/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java @@ -24,6 +24,7 @@ import org.apache.flink.cdc.composer.definition.SinkDef; import org.apache.flink.cdc.composer.definition.SourceDef; import org.apache.flink.cdc.composer.definition.TransformDef; +import org.apache.flink.cdc.composer.definition.UdfDef; import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableMap; import org.apache.flink.shaded.guava31.com.google.common.io.Resources; @@ -230,6 +231,13 @@ void testRouteWithReplacementSymbol() throws Exception { null, null, "add new uniq_id for each row")), + Arrays.asList( + new UdfDef( + "substring", + "com.example.functions.scalar.SubStringFunctionClass"), + new UdfDef( + "encrypt", + "com.example.functions.scalar.EncryptFunctionClass")), Configuration.fromMap( ImmutableMap.builder() .put("name", "source-database-sync-pipe") @@ -293,6 +301,13 @@ void testRouteWithReplacementSymbol() throws Exception { null, null, "add new uniq_id for each row")), + Arrays.asList( + new UdfDef( + "substring", + "com.example.functions.scalar.SubStringFunctionClass"), + new UdfDef( + "encrypt", + "com.example.functions.scalar.EncryptFunctionClass")), Configuration.fromMap( ImmutableMap.builder() .put("name", "source-database-sync-pipe") @@ -330,6 +345,7 @@ void testRouteWithReplacementSymbol() throws Exception { null, null)), Collections.emptyList(), + Collections.emptyList(), Configuration.fromMap( ImmutableMap.builder() .put("parallelism", "4") @@ -341,6 +357,7 @@ void testRouteWithReplacementSymbol() throws Exception { new SinkDef("kafka", null, new Configuration()), Collections.emptyList(), Collections.emptyList(), + Collections.emptyList(), Configuration.fromMap(Collections.singletonMap("parallelism", "1"))); private final PipelineDef fullDefWithRouteRepSym = diff --git a/flink-cdc-cli/src/test/resources/definitions/pipeline-definition-full.yaml b/flink-cdc-cli/src/test/resources/definitions/pipeline-definition-full.yaml index b92e237d16..8407b7ffdc 100644 --- a/flink-cdc-cli/src/test/resources/definitions/pipeline-definition-full.yaml +++ b/flink-cdc-cli/src/test/resources/definitions/pipeline-definition-full.yaml @@ -52,6 +52,12 @@ transform: filter: uniq_id > 10 description: add new uniq_id for each row +udf: + - name: substring + classpath: com.example.functions.scalar.SubStringFunctionClass + - name: encrypt + classpath: com.example.functions.scalar.EncryptFunctionClass + pipeline: name: source-database-sync-pipe parallelism: 4 diff --git a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/PipelineDef.java b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/PipelineDef.java index 49e6a4a0c6..6353c4e746 100644 --- a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/PipelineDef.java +++ b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/PipelineDef.java @@ -54,6 +54,7 @@ public class PipelineDef { private final SinkDef sink; private final List routes; private final List transforms; + private final List udfs; private final Configuration config; public PipelineDef( @@ -61,11 +62,13 @@ public PipelineDef( SinkDef sink, List routes, List transforms, + List udfs, Configuration config) { this.source = source; this.sink = sink; this.routes = routes; this.transforms = transforms; + this.udfs = udfs; this.config = evaluatePipelineTimeZone(config); } @@ -85,6 +88,10 @@ public List getTransforms() { return transforms; } + public List getUdfs() { + return udfs; + } + public Configuration getConfig() { return config; } @@ -100,6 +107,8 @@ public String toString() { + routes + ", transforms=" + transforms + + ", udfs=" + + udfs + ", config=" + config + '}'; @@ -118,12 +127,13 @@ public boolean equals(Object o) { && Objects.equals(sink, that.sink) && Objects.equals(routes, that.routes) && Objects.equals(transforms, that.transforms) + && Objects.equals(udfs, that.udfs) && Objects.equals(config, that.config); } @Override public int hashCode() { - return Objects.hash(source, sink, routes, transforms, config); + return Objects.hash(source, sink, routes, transforms, udfs, config); } // ------------------------------------------------------------------------ diff --git a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/UdfDef.java b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/UdfDef.java new file mode 100644 index 0000000000..a1f21792fb --- /dev/null +++ b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/UdfDef.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.composer.definition; + +import java.util.Objects; + +/** + * Definition of a transformation. + * + *

A transformation definition contains: + * + *

    + *
  • name: Static method name of user-defined functions. + *
  • classpath: Fully-qualified class path of package containing given function. + *
+ */ +public class UdfDef { + private final String name; + private final String classPath; + + public UdfDef(String name, String classPath) { + this.name = name; + this.classPath = classPath; + } + + public String getName() { + return name; + } + + public String getClassPath() { + return classPath; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + UdfDef udfDef = (UdfDef) o; + return Objects.equals(name, udfDef.name) && Objects.equals(classPath, udfDef.classPath); + } + + @Override + public int hashCode() { + return Objects.hash(name, classPath); + } + + @Override + public String toString() { + return "UdfDef{" + "name='" + name + '\'' + ", classPath='" + classPath + '\'' + '}'; + } +} diff --git a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java index 0735b90cf3..2eecbbbf93 100644 --- a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java +++ b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java @@ -17,6 +17,7 @@ package org.apache.flink.cdc.composer.flink; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.cdc.common.annotation.Internal; import org.apache.flink.cdc.common.configuration.Configuration; import org.apache.flink.cdc.common.event.Event; @@ -49,6 +50,7 @@ import java.util.List; import java.util.Optional; import java.util.Set; +import java.util.stream.Collectors; /** Composer for translating data pipeline to a Flink DataStream job. */ @Internal @@ -95,6 +97,11 @@ public PipelineExecution compose(PipelineDef pipelineDef) { int parallelism = pipelineDef.getConfig().get(PipelineOptions.PIPELINE_PARALLELISM); env.getConfig().setParallelism(parallelism); + List> udfFunctions = + pipelineDef.getUdfs().stream() + .map(udf -> Tuple2.of(udf.getName(), udf.getClassPath())) + .collect(Collectors.toList()); + // Build Source Operator DataSourceTranslator sourceTranslator = new DataSourceTranslator(); DataStream stream = @@ -102,7 +109,9 @@ public PipelineExecution compose(PipelineDef pipelineDef) { // Build TransformSchemaOperator for processing Schema Event TransformTranslator transformTranslator = new TransformTranslator(); - stream = transformTranslator.translateSchema(stream, pipelineDef.getTransforms()); + stream = + transformTranslator.translateSchema( + stream, pipelineDef.getTransforms(), udfFunctions); // Schema operator SchemaOperatorTranslator schemaOperatorTranslator = @@ -123,7 +132,8 @@ public PipelineExecution compose(PipelineDef pipelineDef) { stream, pipelineDef.getTransforms(), schemaOperatorIDGenerator.generate(), - pipelineDef.getConfig().get(PipelineOptions.PIPELINE_LOCAL_TIME_ZONE)); + pipelineDef.getConfig().get(PipelineOptions.PIPELINE_LOCAL_TIME_ZONE), + udfFunctions); // Build DataSink in advance as schema operator requires MetadataApplier DataSink dataSink = createDataSink(pipelineDef.getSink(), pipelineDef.getConfig()); diff --git a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/TransformTranslator.java b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/TransformTranslator.java index 53400f628f..c834cb70e2 100644 --- a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/TransformTranslator.java +++ b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/TransformTranslator.java @@ -17,6 +17,7 @@ package org.apache.flink.cdc.composer.flink.translator; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.cdc.common.event.Event; import org.apache.flink.cdc.composer.definition.TransformDef; import org.apache.flink.cdc.runtime.operators.transform.TransformDataOperator; @@ -34,7 +35,9 @@ public class TransformTranslator { public DataStream translateSchema( - DataStream input, List transforms) { + DataStream input, + List transforms, + List> udfFunctions) { if (transforms.isEmpty()) { return input; } @@ -51,6 +54,7 @@ public DataStream translateSchema( transform.getTableOptions()); } } + transformSchemaFunctionBuilder.addUdfFunctions(udfFunctions); return input.transform( "Transform:Schema", new EventTypeInfo(), transformSchemaFunctionBuilder.build()); } @@ -59,7 +63,8 @@ public DataStream translateData( DataStream input, List transforms, OperatorID schemaOperatorID, - String timezone) { + String timezone, + List> udfFunctions) { if (transforms.isEmpty()) { return input; } @@ -76,6 +81,7 @@ public DataStream translateData( } transformDataFunctionBuilder.addSchemaOperatorID(schemaOperatorID); transformDataFunctionBuilder.addTimezone(timezone); + transformDataFunctionBuilder.addUdfFunctions(udfFunctions); return input.transform( "Transform:Data", new EventTypeInfo(), transformDataFunctionBuilder.build()); } diff --git a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java index 26c9c91875..4a9d96cb20 100644 --- a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java +++ b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java @@ -36,6 +36,7 @@ import org.apache.flink.cdc.composer.definition.SinkDef; import org.apache.flink.cdc.composer.definition.SourceDef; import org.apache.flink.cdc.composer.definition.TransformDef; +import org.apache.flink.cdc.composer.definition.UdfDef; import org.apache.flink.cdc.connectors.values.ValuesDatabase; import org.apache.flink.cdc.connectors.values.factory.ValuesDataFactory; import org.apache.flink.cdc.connectors.values.sink.ValuesDataSink; @@ -139,6 +140,7 @@ void testSingleSplitSingleTable(ValuesDataSink.SinkApi sinkApi) throws Exception sinkDef, Collections.emptyList(), Collections.emptyList(), + Collections.emptyList(), pipelineConfig); // Execute the pipeline @@ -195,6 +197,7 @@ void testSingleSplitMultipleTables(ValuesDataSink.SinkApi sinkApi) throws Except sinkDef, Collections.emptyList(), Collections.emptyList(), + Collections.emptyList(), pipelineConfig); // Execute the pipeline @@ -261,6 +264,7 @@ void testMultiSplitsSingleTable(ValuesDataSink.SinkApi sinkApi) throws Exception sinkDef, Collections.emptyList(), Collections.emptyList(), + Collections.emptyList(), pipelineConfig); // Execute the pipeline @@ -315,6 +319,7 @@ void testTransform(ValuesDataSink.SinkApi sinkApi) throws Exception { sinkDef, Collections.emptyList(), new ArrayList<>(Arrays.asList(transformDef)), + Collections.emptyList(), pipelineConfig); // Execute the pipeline @@ -382,6 +387,7 @@ void testTransformTwice(ValuesDataSink.SinkApi sinkApi) throws Exception { sinkDef, Collections.emptyList(), new ArrayList<>(Arrays.asList(transformDef1, transformDef2)), + Collections.emptyList(), pipelineConfig); // Execute the pipeline @@ -432,7 +438,12 @@ void testOneToOneRouting() throws Exception { pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1); PipelineDef pipelineDef = new PipelineDef( - sourceDef, sinkDef, routeDef, Collections.emptyList(), pipelineConfig); + sourceDef, + sinkDef, + routeDef, + Collections.emptyList(), + Collections.emptyList(), + pipelineConfig); // Execute the pipeline PipelineExecution execution = composer.compose(pipelineDef); @@ -624,7 +635,12 @@ void testMergingWithRoute() throws Exception { pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1); PipelineDef pipelineDef = new PipelineDef( - sourceDef, sinkDef, routeDef, Collections.emptyList(), pipelineConfig); + sourceDef, + sinkDef, + routeDef, + Collections.emptyList(), + Collections.emptyList(), + pipelineConfig); // Execute the pipeline PipelineExecution execution = composer.compose(pipelineDef); @@ -692,6 +708,7 @@ void testRouteWithReplaceSymbol(ValuesDataSink.SinkApi sinkApi) throws Exception "__$__", null)), Collections.emptyList(), + Collections.emptyList(), pipelineConfig); // Execute the pipeline @@ -716,4 +733,67 @@ void testRouteWithReplaceSymbol(ValuesDataSink.SinkApi sinkApi) throws Exception "DataChangeEvent{tableId=replaced_namespace.replaced_schema.table1, before=[1, 1], after=[], op=DELETE, meta=()}", "DataChangeEvent{tableId=replaced_namespace.replaced_schema.table1, before=[2, 2], after=[2, x], op=UPDATE, meta=()}"); } + + @ParameterizedTest + @EnumSource + void testTransformWithUdf(ValuesDataSink.SinkApi sinkApi) throws Exception { + FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster(); + + // Setup value source + Configuration sourceConfig = new Configuration(); + sourceConfig.set( + ValuesDataSourceOptions.EVENT_SET_ID, + ValuesDataSourceHelper.EventSetId.TRANSFORM_TABLE); + SourceDef sourceDef = + new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", sourceConfig); + + // Setup value sink + Configuration sinkConfig = new Configuration(); + sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true); + sinkConfig.set(ValuesDataSinkOptions.SINK_API, sinkApi); + SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig); + + // Setup transform + TransformDef transformDef = + new TransformDef( + "default_namespace.default_schema.table1", + "*,format('from %s to %s is %s', col1, 'z', 'lie') AS fmt", + null, + "col1", + null, + "key1=value1", + ""); + + UdfDef udfDef = new UdfDef("format", "java.lang.String"); + + // Setup pipeline + Configuration pipelineConfig = new Configuration(); + pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1); + PipelineDef pipelineDef = + new PipelineDef( + sourceDef, + sinkDef, + Collections.emptyList(), + Collections.singletonList(transformDef), + Collections.singletonList(udfDef), + pipelineConfig); + + // Execute the pipeline + PipelineExecution execution = composer.compose(pipelineDef); + execution.execute(); + + // Check the order and content of all received events + String[] outputEvents = outCaptor.toString().trim().split("\n"); + assertThat(outputEvents) + .containsExactly( + "CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING,`col2` STRING,`fmt` STRING}, primaryKeys=col1, options=({key1=value1})}", + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[1, 1, from 1 to z is lie], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[2, 2, from 2 to z is lie], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[3, 3, from 3 to z is lie], op=INSERT, meta=()}", + "AddColumnEvent{tableId=default_namespace.default_schema.table1, addedColumns=[ColumnWithPosition{column=`col3` STRING, position=LAST, existedColumnName=null}]}", + "RenameColumnEvent{tableId=default_namespace.default_schema.table1, nameMapping={col2=newCol2, col3=newCol3}}", + "DropColumnEvent{tableId=default_namespace.default_schema.table1, droppedColumnNames=[newCol2]}", + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[1, 1, from 1 to z is lie], after=[], op=DELETE, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, , from 2 to z is lie], after=[2, x, from 2 to z is lie], op=UPDATE, meta=()}"); + } } diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumnProcessor.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumnProcessor.java index 6c5202342d..10ed551113 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumnProcessor.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumnProcessor.java @@ -17,6 +17,7 @@ package org.apache.flink.cdc.runtime.operators.transform; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.cdc.common.data.RecordData; import org.apache.flink.cdc.common.data.binary.BinaryRecordData; import org.apache.flink.cdc.common.schema.Column; @@ -43,18 +44,26 @@ public class ProjectionColumnProcessor { private ProjectionColumn projectionColumn; private String timezone; private TransformExpressionKey transformExpressionKey; + private List> udfFunctions; public ProjectionColumnProcessor( - TableInfo tableInfo, ProjectionColumn projectionColumn, String timezone) { + TableInfo tableInfo, + ProjectionColumn projectionColumn, + String timezone, + List> udfFunctions) { this.tableInfo = tableInfo; this.projectionColumn = projectionColumn; this.timezone = timezone; + this.udfFunctions = udfFunctions; this.transformExpressionKey = generateTransformExpressionKey(); } public static ProjectionColumnProcessor of( - TableInfo tableInfo, ProjectionColumn projectionColumn, String timezone) { - return new ProjectionColumnProcessor(tableInfo, projectionColumn, timezone); + TableInfo tableInfo, + ProjectionColumn projectionColumn, + String timezone, + List> udfFunctions) { + return new ProjectionColumnProcessor(tableInfo, projectionColumn, timezone, udfFunctions); } public Object evaluate(BinaryRecordData after, long epochTime) { @@ -145,7 +154,7 @@ private TransformExpressionKey generateTransformExpressionKey() { paramTypes.add(Long.class); return TransformExpressionKey.of( - JaninoCompiler.loadSystemFunction(scriptExpression), + JaninoCompiler.loadScalarFunctions(udfFunctions, scriptExpression), argumentNames, paramTypes, DataTypeConverter.convertOriginalClass(projectionColumn.getDataType())); diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformDataOperator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformDataOperator.java index 20479c8732..45c3a959ad 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformDataOperator.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformDataOperator.java @@ -71,6 +71,8 @@ public class TransformDataOperator extends AbstractStreamOperator /** keep the relationship of TableId and table information. */ private final Map tableInfoMap; + private final List> udfFunctions; + private transient Map, TransformProjectionProcessor> transformProjectionProcessorMap; private transient Map, TransformFilterProcessor> @@ -85,6 +87,7 @@ public static class Builder { private final List> transformRules = new ArrayList<>(); private OperatorID schemaOperatorID; private String timezone; + private List> udfFunctions = new ArrayList<>(); public TransformDataOperator.Builder addTransform( String tableInclusions, @Nullable String projection, @Nullable String filter) { @@ -106,21 +109,30 @@ public TransformDataOperator.Builder addTimezone(String timezone) { return this; } + public TransformDataOperator.Builder addUdfFunctions( + List> udfFunctions) { + this.udfFunctions.addAll(udfFunctions); + return this; + } + public TransformDataOperator build() { - return new TransformDataOperator(transformRules, schemaOperatorID, timezone); + return new TransformDataOperator( + transformRules, schemaOperatorID, timezone, udfFunctions); } } private TransformDataOperator( List> transformRules, OperatorID schemaOperatorID, - String timezone) { + String timezone, + List> udfFunctions) { this.transformRules = transformRules; this.schemaOperatorID = schemaOperatorID; this.timezone = timezone; this.tableInfoMap = new ConcurrentHashMap<>(); this.transformFilterProcessorMap = new ConcurrentHashMap<>(); this.transformProjectionProcessorMap = new ConcurrentHashMap<>(); + this.udfFunctions = udfFunctions; } @Override @@ -234,7 +246,7 @@ private void transformSchema(TableId tableId, Schema schema) throws Exception { Tuple2.of(tableId, transformProjection))) { transformProjectionProcessorMap.put( Tuple2.of(tableId, transformProjection), - TransformProjectionProcessor.of(transformProjection)); + TransformProjectionProcessor.of(transformProjection, udfFunctions)); } TransformProjectionProcessor transformProjectionProcessor = transformProjectionProcessorMap.get( @@ -272,7 +284,8 @@ private Optional processDataChangeEvent(DataChangeEvent dataCha TransformProjectionProcessor.of( getTableInfoFromSchemaEvolutionClient(tableId), transformProjection, - timezone)); + timezone, + udfFunctions)); } TransformProjectionProcessor transformProjectionProcessor = transformProjectionProcessorMap.get( @@ -294,7 +307,8 @@ private Optional processDataChangeEvent(DataChangeEvent dataCha TransformFilterProcessor.of( getTableInfoFromSchemaEvolutionClient(tableId), transformFilter, - timezone)); + timezone, + udfFunctions)); } TransformFilterProcessor transformFilterProcessor = transformFilterProcessorMap.get(Tuple2.of(tableId, transformFilter)); @@ -319,7 +333,8 @@ private Optional processDataChangeEvent(DataChangeEvent dataCha TransformProjectionProcessor.of( getTableInfoFromSchemaEvolutionClient(tableId), transformProjection, - timezone)); + timezone, + udfFunctions)); } TransformProjectionProcessor transformProjectionProcessor = transformProjectionProcessorMap.get( diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilterProcessor.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilterProcessor.java index 7eaefc3c84..4c12b10064 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilterProcessor.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilterProcessor.java @@ -17,6 +17,7 @@ package org.apache.flink.cdc.runtime.operators.transform; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.cdc.common.data.RecordData; import org.apache.flink.cdc.common.data.binary.BinaryRecordData; import org.apache.flink.cdc.common.schema.Column; @@ -39,18 +40,26 @@ public class TransformFilterProcessor { private TransformFilter transformFilter; private String timezone; private TransformExpressionKey transformExpressionKey; + private List> udfFunctions; public TransformFilterProcessor( - TableInfo tableInfo, TransformFilter transformFilter, String timezone) { + TableInfo tableInfo, + TransformFilter transformFilter, + String timezone, + List> udfFunctions) { this.tableInfo = tableInfo; this.transformFilter = transformFilter; this.timezone = timezone; + this.udfFunctions = udfFunctions; transformExpressionKey = generateTransformExpressionKey(); } public static TransformFilterProcessor of( - TableInfo tableInfo, TransformFilter transformFilter, String timezone) { - return new TransformFilterProcessor(tableInfo, transformFilter, timezone); + TableInfo tableInfo, + TransformFilter transformFilter, + String timezone, + List> udfFunctions) { + return new TransformFilterProcessor(tableInfo, transformFilter, timezone, udfFunctions); } public boolean process(BinaryRecordData after, long epochTime) { @@ -140,7 +149,7 @@ private TransformExpressionKey generateTransformExpressionKey() { paramTypes.add(Long.class); return TransformExpressionKey.of( - JaninoCompiler.loadSystemFunction(scriptExpression), + JaninoCompiler.loadScalarFunctions(udfFunctions, scriptExpression), argumentNames, paramTypes, Boolean.class); diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformProjectionProcessor.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformProjectionProcessor.java index 7049bbdfda..d52a5b2ad4 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformProjectionProcessor.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformProjectionProcessor.java @@ -17,6 +17,7 @@ package org.apache.flink.cdc.runtime.operators.transform; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.cdc.common.data.RecordData; import org.apache.flink.cdc.common.data.binary.BinaryRecordData; import org.apache.flink.cdc.common.event.CreateTableEvent; @@ -52,17 +53,20 @@ public class TransformProjectionProcessor { private TransformProjection transformProjection; private String timezone; private Map projectionColumnProcessorMap; + private List> udfFunctions; public TransformProjectionProcessor( TableInfo tableInfo, TableChangeInfo tableChangeInfo, TransformProjection transformProjection, - String timezone) { + String timezone, + List> udfFunctions) { this.tableInfo = tableInfo; this.tableChangeInfo = tableChangeInfo; this.transformProjection = transformProjection; this.timezone = timezone; this.projectionColumnProcessorMap = new ConcurrentHashMap<>(); + this.udfFunctions = udfFunctions; } public boolean hasTableChangeInfo() { @@ -74,24 +78,34 @@ public boolean hasTableInfo() { } public static TransformProjectionProcessor of( - TableInfo tableInfo, TransformProjection transformProjection, String timezone) { - return new TransformProjectionProcessor(tableInfo, null, transformProjection, timezone); + TableInfo tableInfo, + TransformProjection transformProjection, + String timezone, + List> udfFunctions) { + return new TransformProjectionProcessor( + tableInfo, null, transformProjection, timezone, udfFunctions); } public static TransformProjectionProcessor of( - TableChangeInfo tableChangeInfo, TransformProjection transformProjection) { - return new TransformProjectionProcessor(null, tableChangeInfo, transformProjection, null); + TableChangeInfo tableChangeInfo, + TransformProjection transformProjection, + List> udfFunctions) { + return new TransformProjectionProcessor( + null, tableChangeInfo, transformProjection, null, udfFunctions); } - public static TransformProjectionProcessor of(TransformProjection transformProjection) { - return new TransformProjectionProcessor(null, null, transformProjection, null); + public static TransformProjectionProcessor of( + TransformProjection transformProjection, List> udfFunctions) { + return new TransformProjectionProcessor( + null, null, transformProjection, null, udfFunctions); } public CreateTableEvent processCreateTableEvent(CreateTableEvent createTableEvent) { List projectionColumns = TransformParser.generateProjectionColumns( transformProjection.getProjection(), - createTableEvent.getSchema().getColumns()); + createTableEvent.getSchema().getColumns(), + udfFunctions); transformProjection.setProjectionColumns(projectionColumns); List allColumnList = transformProjection.getAllColumnList(); // add the column of projection into Schema @@ -102,7 +116,7 @@ public CreateTableEvent processCreateTableEvent(CreateTableEvent createTableEven public void processSchemaChangeEvent(Schema schema) { List projectionColumns = TransformParser.generateProjectionColumns( - transformProjection.getProjection(), schema.getColumns()); + transformProjection.getProjection(), schema.getColumns(), udfFunctions); transformProjection.setProjectionColumns(projectionColumns); } @@ -144,7 +158,7 @@ public BinaryRecordData processData(BinaryRecordData after, long epochTime) { projectionColumnProcessorMap.put( projectionColumn.getColumnName(), ProjectionColumnProcessor.of( - tableInfo, projectionColumn, timezone)); + tableInfo, projectionColumn, timezone, udfFunctions)); } ProjectionColumnProcessor projectionColumnProcessor = projectionColumnProcessorMap.get(projectionColumn.getColumnName()); diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformSchemaOperator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformSchemaOperator.java index 230fc1a6ff..f06c617efc 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformSchemaOperator.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformSchemaOperator.java @@ -58,6 +58,7 @@ public class TransformSchemaOperator extends AbstractStreamOperator private transient Map processorMap; private final List> schemaMetadataTransformers; private transient ListState state; + private final List> udfFunctions; public static TransformSchemaOperator.Builder newBuilder() { return new TransformSchemaOperator.Builder(); @@ -68,6 +69,8 @@ public static class Builder { private final List> transformRules = new ArrayList<>(); + private final List> udfFunctions = new ArrayList<>(); + public TransformSchemaOperator.Builder addTransform( String tableInclusions, @Nullable String projection, @@ -79,18 +82,26 @@ public TransformSchemaOperator.Builder addTransform( return this; } + public TransformSchemaOperator.Builder addUdfFunctions( + List> udfFunctions) { + this.udfFunctions.addAll(udfFunctions); + return this; + } + public TransformSchemaOperator build() { - return new TransformSchemaOperator(transformRules); + return new TransformSchemaOperator(transformRules, udfFunctions); } } private TransformSchemaOperator( - List> transformRules) { + List> transformRules, + List> udfFunctions) { this.transformRules = transformRules; this.tableChangeInfoMap = new ConcurrentHashMap<>(); this.processorMap = new ConcurrentHashMap<>(); this.schemaMetadataTransformers = new ArrayList<>(); this.chainingStrategy = ChainingStrategy.ALWAYS; + this.udfFunctions = udfFunctions; } @Override @@ -216,7 +227,8 @@ private CreateTableEvent transformCreateTableEvent(CreateTableEvent createTableE if (transformProjection.isValid()) { if (!processorMap.containsKey(tableId)) { processorMap.put( - tableId, TransformProjectionProcessor.of(transformProjection)); + tableId, + TransformProjectionProcessor.of(transformProjection, udfFunctions)); } TransformProjectionProcessor transformProjectionProcessor = processorMap.get(tableId); @@ -271,7 +283,9 @@ private DataChangeEvent processProjection( TableChangeInfo tableChangeInfo = tableChangeInfoMap.get(tableId); if (!processorMap.containsKey(tableId) || !processorMap.get(tableId).hasTableChangeInfo()) { processorMap.put( - tableId, TransformProjectionProcessor.of(tableChangeInfo, transformProjection)); + tableId, + TransformProjectionProcessor.of( + tableChangeInfo, transformProjection, udfFunctions)); } TransformProjectionProcessor transformProjectionProcessor = processorMap.get(tableId); BinaryRecordData before = (BinaryRecordData) dataChangeEvent.before(); diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java index 5af9755edf..df2557d4bb 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java @@ -19,6 +19,7 @@ import org.apache.flink.api.common.InvalidProgramException; import org.apache.flink.api.common.io.ParseException; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.cdc.common.utils.StringUtils; import org.apache.calcite.sql.SqlBasicCall; @@ -37,6 +38,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.stream.Collectors; /** * Use Janino compiler to compiler the statement of flink cdc pipeline transform into the executable @@ -57,8 +59,12 @@ public class JaninoCompiler { public static final String DEFAULT_EPOCH_TIME = "__epoch_time__"; public static final String DEFAULT_TIME_ZONE = "__time_zone__"; - public static String loadSystemFunction(String expression) { + public static String loadScalarFunctions( + List> udfFunctions, String expression) { return "import static org.apache.flink.cdc.runtime.functions.SystemFunctionUtils.*;" + + udfFunctions.stream() + .map(udf -> "import static " + udf.f1 + "." + udf.f0 + ";") + .collect(Collectors.joining()) + expression; } diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java index 23cf4b376d..b93684b537 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java @@ -18,6 +18,7 @@ package org.apache.flink.cdc.runtime.parser; import org.apache.flink.api.common.io.ParseException; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.cdc.common.schema.Column; import org.apache.flink.cdc.common.types.DataType; import org.apache.flink.cdc.common.types.DataTypes; @@ -40,7 +41,12 @@ import org.apache.calcite.rel.type.RelDataTypeField; import org.apache.calcite.rel.type.RelDataTypeSystem; import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.schema.ScalarFunction; +import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.schema.impl.ScalarFunctionImpl; import org.apache.calcite.sql.SqlBasicCall; +import org.apache.calcite.sql.SqlFunction; +import org.apache.calcite.sql.SqlFunctionCategory; import org.apache.calcite.sql.SqlIdentifier; import org.apache.calcite.sql.SqlKind; import org.apache.calcite.sql.SqlNode; @@ -50,7 +56,10 @@ import org.apache.calcite.sql.fun.SqlStdOperatorTable; import org.apache.calcite.sql.parser.SqlParseException; import org.apache.calcite.sql.parser.SqlParser; +import org.apache.calcite.sql.type.InferTypes; +import org.apache.calcite.sql.type.OperandTypes; import org.apache.calcite.sql.type.SqlTypeFactoryImpl; +import org.apache.calcite.sql.util.ListSqlOperatorTable; import org.apache.calcite.sql.util.SqlOperatorTables; import org.apache.calcite.sql.validate.SqlConformanceEnum; import org.apache.calcite.sql.validate.SqlValidator; @@ -87,15 +96,34 @@ private static SqlParser getCalciteParser(String sql) { .withLex(Lex.JAVA)); } - private static RelNode sqlToRel(List columns, SqlNode sqlNode) { + private static RelNode sqlToRel( + List columns, SqlNode sqlNode, List> udfs) { List columnsWithMetadata = copyFillMetadataColumn(sqlNode.toString(), columns); CalciteSchema rootSchema = CalciteSchema.createRootSchema(true); + SchemaPlus schema = rootSchema.plus(); Map operand = new HashMap<>(); operand.put("tableName", DEFAULT_TABLE); operand.put("columns", columnsWithMetadata); rootSchema.add( DEFAULT_SCHEMA, - TransformSchemaFactory.INSTANCE.create(rootSchema.plus(), DEFAULT_SCHEMA, operand)); + TransformSchemaFactory.INSTANCE.create(schema, DEFAULT_SCHEMA, operand)); + List udfFunctions = new ArrayList<>(); + for (Tuple2 udf : udfs) { + try { + ScalarFunction function = ScalarFunctionImpl.create(Class.forName(udf.f1), udf.f0); + schema.add(udf.f0.toUpperCase(), function); + udfFunctions.add( + new SqlFunction( + udf.f0, + SqlKind.OTHER_FUNCTION, + o -> function.getReturnType(o.getTypeFactory()), + InferTypes.RETURN_TYPE, + OperandTypes.VARIADIC, + SqlFunctionCategory.USER_DEFINED_FUNCTION)); + } catch (ClassNotFoundException e) { + throw new RuntimeException("Failed to resolve UDF at classpath " + udf.f1); + } + } SqlTypeFactoryImpl factory = new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT); CalciteCatalogReader calciteCatalogReader = new CalciteCatalogReader( @@ -105,9 +133,13 @@ private static RelNode sqlToRel(List columns, SqlNode sqlNode) { new CalciteConnectionConfigImpl(new Properties())); TransformSqlOperatorTable transformSqlOperatorTable = TransformSqlOperatorTable.instance(); SqlStdOperatorTable sqlStdOperatorTable = SqlStdOperatorTable.instance(); + ListSqlOperatorTable udfOperatorTable = new ListSqlOperatorTable(); + udfFunctions.forEach(udfOperatorTable::add); + SqlValidator validator = SqlValidatorUtil.newValidator( - SqlOperatorTables.chain(sqlStdOperatorTable, transformSqlOperatorTable), + SqlOperatorTables.chain( + sqlStdOperatorTable, transformSqlOperatorTable, udfOperatorTable), calciteCatalogReader, factory, SqlValidator.Config.DEFAULT.withIdentifierExpansion(true)); @@ -143,7 +175,9 @@ public static SqlSelect parseSelect(String statement) { // Parse all columns public static List generateProjectionColumns( - String projectionExpression, List columns) { + String projectionExpression, + List columns, + List> udfFunctions) { if (StringUtils.isNullOrWhitespaceOnly(projectionExpression)) { return new ArrayList<>(); } @@ -151,7 +185,7 @@ public static List generateProjectionColumns( if (sqlSelect.getSelectList().isEmpty()) { return new ArrayList<>(); } - RelNode relNode = sqlToRel(columns, sqlSelect); + RelNode relNode = sqlToRel(columns, sqlSelect, udfFunctions); Map relDataTypeMap = relNode.getRowType().getFieldList().stream() .collect( diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/JaninoCompilerTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/JaninoCompilerTest.java index db86783d2b..436915fe42 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/JaninoCompilerTest.java +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/JaninoCompilerTest.java @@ -35,6 +35,7 @@ import java.lang.reflect.InvocationTargetException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.TimeZone; @@ -118,7 +119,7 @@ public void testJaninoTimestampFunction() throws InvocationTargetException { List params = Arrays.asList(epochTime); ExpressionEvaluator expressionEvaluator = JaninoCompiler.compileExpression( - JaninoCompiler.loadSystemFunction(expression), + JaninoCompiler.loadScalarFunctions(Collections.emptyList(), expression), columnNames, paramTypes, TimestampData.class); @@ -134,7 +135,7 @@ public void testBuildInFunction() throws InvocationTargetException { List params = new ArrayList<>(); ExpressionEvaluator expressionEvaluator = JaninoCompiler.compileExpression( - JaninoCompiler.loadSystemFunction(expression), + JaninoCompiler.loadScalarFunctions(Collections.emptyList(), expression), columnNames, paramTypes, Double.class);