Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-34876][transform] Support UDF functions in transform #3465

Merged
merged 1 commit into from
Aug 9, 2024

Conversation

yuxiqian
Copy link
Contributor

@yuxiqian yuxiqian commented Jul 10, 2024

This closes FLINK-34876, aims to introduce customized logic into transform expressions, with some key features:

  • supports UDF classes implemented in Java and Scala
  • explicit CDC pipeline type hints
  • open & close lifecycle hooks
  • compatible with simple Flink ScalarFunction

One may add a UDF block into existing pipeline definition and call them in transform expressions:

transform:
  - projection: addone(addone(addone(addone(1))))
    # ...

pipeline:
  user-defined-function:
    - name: addone                                   # function name used in expressions
      classpath: org.example.udf.AddOneFunctionClass # fully-qualified class name
    - name: format
      classpath: org.example.udf.FormatFunctionClass

where classpath value either implements CDC's org.apache.flink.cdc.common.udf.UserDefinedFunction interface or extends Flink org.apache.flink.table.functions.ScalarFunction abstract class.

UserDefinedFunction is declared as follows:

public interface UserDefinedFunction {

    default DataType getReturnType() { return null; }

    default void open() throws Exception {}

    default void close() throws Exception {}
}

A valid UserDefinedFunction implementation must:

  • Have a nullary constructor
  • Have (at least one) eval method that doesn't return void

A UserDefinedFunction implementation may:

  • Override getReturnType method to declare its return type.
    • It will be inferred from eval signature if not provided.
public class TypeHintFunctionClass implements UserDefinedFunction {

    @Override
    public DataType getReturnType() {
        return DataTypes.STRING();
    }

    public Object eval() {
        // Return type could not be inferred from function signature
        return "Forty-two";
    }
}
  • Override open and close lifecycle hooks to initialize / clean-up necessary resources.
    • props defined in YAML will be passed in UserDefinedFunctionContext as an argument.
    • UDF's lifecycle is identical to TransformDataOperator (or PostTransformOperator after FLINK-35272).
    • UserDefinedFunction might be constructed temporarily elsewhere, so make sure resources are handled in open and close instead of constructors.
public class LifecycleFunctionClass implements UserDefinedFunction {

    private Integer counter;

    public String eval() {
        return "#" + (counter++); // This returns #1, #2, #3, ...
    }

    @Override
    public void open() {
        counter = 0;
        System.out.println("[ LifeFun ] opened.");
    }

    @Override
    public void close() {
        System.out.println("[ LifeFun ] closed. Called " + counter + " times.");
    }
}

eval method overloading is supported, and runtime invocations will be dispatched to the correct one:

public class TypeOfFunctionClass implements UserDefinedFunction {

    public String eval(Boolean b) {
        return "Boolean: " + b;
    }

    public String eval(Integer i) {
        return "Integer: " + i;
    }

    public String eval(Float f) {
        return "Float: " + f;
    }

    public String eval(Double d) {
        return "Double: " + d;
    }

    public String eval(String s) {
        return "String: " + s;
    }
}

Flink ScalarFunction is supported with the following limitations:

  • Flink-style type hint (TypeInformation) will be ignored.
  • ScalarFunction with arguments are not supported.
  • open and close methods declared will be ignored.

@github-actions github-actions bot added docs Improvements or additions to documentation composer runtime cli labels Jul 10, 2024
@yuxiqian
Copy link
Contributor Author

yuxiqian commented Jul 10, 2024

Having some difficulties getting Calcite related materials, maybe @aiwenmo could provide some expert advice?

@yuxiqian
Copy link
Contributor Author

yuxiqian commented Aug 1, 2024

Adjusted based on previous discussions, cleaned up changes, and added UDF-related docs.

@yuxiqian yuxiqian force-pushed the FLINK-34876 branch 4 times, most recently from f9368db to cb6fa92 Compare August 8, 2024 07:14
@yuxiqian
Copy link
Contributor Author

yuxiqian commented Aug 8, 2024

Rebased to latest master, cc @ruanhang1993

Minor concern: By adding more E2e test cases, now it takes nearly 1h 30min to finish running e2e case, which is worrying since current timeout limit is configured to 90 minutes. Maybe something like #3514 is necessary?

docs/content.zh/docs/core-concept/transform.md Outdated Show resolved Hide resolved
flink-cdc-pipeline-udf-examples/pom.xml Outdated Show resolved Hide resolved
Comment on lines +29 to +31
default DataType getReturnType() {
return null;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
default DataType getReturnType() {
return null;
}
default Optional<DataType> getReturnType() {
return Optional.empty();
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have some doubt on this. This is meant to be a interface protocol, and any reasonable overwrite of this method should not return an empty value.

@yuxiqian
Copy link
Contributor Author

yuxiqian commented Aug 8, 2024

Seems CI is failing due to an expired link in Doris docs. Rebased & fixed.

# Conflicts:
#	flink-cdc-composer/pom.xml
#	flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java
#	flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/TransformTranslator.java
#	flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java
#	flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java
#	flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumnProcessor.java
#	flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilterProcessor.java
#	flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformProjectionProcessor.java
#	flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java
#	flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java
@yuxiqian
Copy link
Contributor Author

yuxiqian commented Aug 9, 2024

Rebased with master and resolved conflicts. cc @leonardBang @ruanhang1993

Copy link
Contributor

@aiwenmo aiwenmo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@ruanhang1993 ruanhang1993 merged commit e2bb917 into apache:master Aug 9, 2024
24 checks passed
qiaozongmi pushed a commit to qiaozongmi/flink-cdc that referenced this pull request Sep 23, 2024
@mamineturki
Copy link

@yuxiqian Hi, I have been trying to run an example with this new feature but I always get java.lang.ClassNotFoundException: org.example.udf.AddOneFunctionClass

I tried loading the udf function using the path in the PR description, as well as the package name org.apache.flink.cdc.udf.examples.scala.AddOneFunctionClass with no luck.

I imported the function by downloading the pre-packaged jar wget https://repo1.maven.org/maven2/org/apache/flink/flink-cdc-pipeline-udf-examples/3.2.0/flink-cdc-pipeline-udf-examples-3.2.0.jar

please advise on what am I missing exactly. Thanks

@yuxiqian
Copy link
Contributor Author

Hi @mamineturki, flink-cdc-pipeline-udf-examples are for testing purposes only. If you'd like to test it, using org.apache.flink.cdc.udf.examples.scala.AddOneFunctionClass should work. Seems it wasn't successfully loaded by Flink classloaders in your case.

Have you tried putting .jar file into Flink's /lib path (not Flink CDC /lib!) and restart the Flink cluster? If that doesn't work, try passing it with --jar XXX.jar argument when submitting pipeline job with bin/flink-cdc.sh.

@mamineturki
Copy link

Thanks for the speedy response @yuxiqian . passing the jar using --jar worked for me.

@mamineturki
Copy link

Hi @yuxiqian I have another question please.
I was able to produce custom UDF functions that take one, two or multiple arguments. what I am looking to test next is a UDF that generates multiple columns as output (for example splitting a full name into first and last or one hot encoding of a column)
is this currently supported? Thanks,

@yuxiqian
Copy link
Contributor Author

yuxiqian commented Oct 5, 2024

Hi @mamineturki, unfortunately it's not possible now, as YAML UDF is basically equivalent to Flink's ScalarFunction.

(For TableFunction or AggregationFunction support, feel free to create another feature request ticket on JIRA.)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants