Skip to content

Commit

Permalink
fix: postpone udf descriptor creation to runtime operator#open() & mi…
Browse files Browse the repository at this point in the history
…nor: code reformat

Previously udf descriptor was created in Composer#compose, which is not correct since class-loading behavior might differ between CLI and runtime operator.
  • Loading branch information
yuxiqian committed Aug 5, 2024
1 parent 36c99b7 commit 767b456
Show file tree
Hide file tree
Showing 18 changed files with 64 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@

package org.apache.flink.cdc.composer.flink;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.common.configuration.Configuration;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.factories.DataSinkFactory;
import org.apache.flink.cdc.common.factories.FactoryHelper;
import org.apache.flink.cdc.common.pipeline.PipelineOptions;
import org.apache.flink.cdc.common.sink.DataSink;
import org.apache.flink.cdc.common.udf.UserDefinedFunctionDescriptor;
import org.apache.flink.cdc.composer.PipelineComposer;
import org.apache.flink.cdc.composer.PipelineExecution;
import org.apache.flink.cdc.composer.definition.PipelineDef;
Expand Down Expand Up @@ -97,12 +97,9 @@ public PipelineExecution compose(PipelineDef pipelineDef) {
int parallelism = pipelineDef.getConfig().get(PipelineOptions.PIPELINE_PARALLELISM);
env.getConfig().setParallelism(parallelism);

List<UserDefinedFunctionDescriptor> udfFunctions =
List<Tuple2<String, String>> udfFunctions =
pipelineDef.getUdfs().stream()
.map(
udf ->
new UserDefinedFunctionDescriptor(
udf.getName(), udf.getClassPath()))
.map(udf -> Tuple2.of(udf.getName(), udf.getClassPath()))
.collect(Collectors.toList());

// Build Source Operator
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@

package org.apache.flink.cdc.composer.flink.translator;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.udf.UserDefinedFunctionDescriptor;
import org.apache.flink.cdc.composer.definition.TransformDef;
import org.apache.flink.cdc.runtime.operators.transform.TransformDataOperator;
import org.apache.flink.cdc.runtime.operators.transform.TransformSchemaOperator;
Expand All @@ -37,7 +37,7 @@ public class TransformTranslator {
public DataStream<Event> translateSchema(
DataStream<Event> input,
List<TransformDef> transforms,
List<UserDefinedFunctionDescriptor> udfFunctions) {
List<Tuple2<String, String>> udfFunctions) {
if (transforms.isEmpty()) {
return input;
}
Expand All @@ -64,7 +64,7 @@ public DataStream<Event> translateData(
List<TransformDef> transforms,
OperatorID schemaOperatorID,
String timezone,
List<UserDefinedFunctionDescriptor> udfFunctions) {
List<Tuple2<String, String>> udfFunctions) {
if (transforms.isEmpty()) {
return input;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,5 @@ import scala.annotation.varargs

/** This is an example UDF class for testing purposes only. */
class FormatFunctionClass extends UserDefinedFunction {
@varargs def eval(format: String, args: Object*): String = String.format(format, args:_*)
@varargs def eval(format: String, args: Object*): String = String.format(format, args: _*)
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import org.apache.flink.cdc.common.udf.UserDefinedFunction

/** This is an example UDF class for testing purposes only. */
class LifecycleFunctionClass extends UserDefinedFunction {
private var counter: Integer = null
private var counter: Integer = 0

def eval: String = {
"#" + {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.flink.cdc.common.udf.UserDefinedFunction

/** This is an example UDF class for testing purposes only. */
class TypeHintFunctionClass extends UserDefinedFunction {
override def getReturnType: DataType = DataTypes.STRING()
override def getReturnType: DataType = DataTypes.STRING

def eval: Object = {
// Return type could not be inferred from function signature
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,5 @@ import scala.annotation.varargs

/** This is an example UDF class for testing purposes only. */
class FormatFunctionClass extends ScalarFunction {
@varargs def eval(format: String, args: Object*): String = String.format(format, args:_*)
@varargs def eval(format: String, args: Object*): String = String.format(format, args: _*)
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.apache.flink.cdc.common.data.RecordData;
import org.apache.flink.cdc.common.data.binary.BinaryRecordData;
import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.udf.UserDefinedFunctionDescriptor;
import org.apache.flink.cdc.runtime.parser.JaninoCompiler;
import org.apache.flink.cdc.runtime.parser.TransformParser;
import org.apache.flink.cdc.runtime.typeutils.DataTypeConverter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.apache.flink.cdc.common.pipeline.PipelineOptions;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.schema.Selectors;
import org.apache.flink.cdc.common.udf.UserDefinedFunctionDescriptor;
import org.apache.flink.cdc.common.utils.SchemaUtils;
import org.apache.flink.cdc.common.utils.StringUtils;
import org.apache.flink.cdc.runtime.operators.sink.SchemaEvolutionClient;
Expand Down Expand Up @@ -73,7 +72,8 @@ public class TransformDataOperator extends AbstractStreamOperator<Event>
/** keep the relationship of TableId and table information. */
private final Map<TableId, TableInfo> tableInfoMap;

private final List<UserDefinedFunctionDescriptor> udfFunctions;
private final List<Tuple2<String, String>> udfFunctions;
private List<UserDefinedFunctionDescriptor> udfDescriptors;
private transient Map<String, Object> udfFunctionInstances;

private transient Map<Tuple2<TableId, TransformProjection>, TransformProjectionProcessor>
Expand All @@ -90,7 +90,7 @@ public static class Builder {
private final List<Tuple3<String, String, String>> transformRules = new ArrayList<>();
private OperatorID schemaOperatorID;
private String timezone;
private List<UserDefinedFunctionDescriptor> udfFunctions = new ArrayList<>();
private List<Tuple2<String, String>> udfFunctions = new ArrayList<>();

public TransformDataOperator.Builder addTransform(
String tableInclusions, @Nullable String projection, @Nullable String filter) {
Expand All @@ -113,7 +113,7 @@ public TransformDataOperator.Builder addTimezone(String timezone) {
}

public TransformDataOperator.Builder addUdfFunctions(
List<UserDefinedFunctionDescriptor> udfFunctions) {
List<Tuple2<String, String>> udfFunctions) {
this.udfFunctions.addAll(udfFunctions);
return this;
}
Expand All @@ -128,7 +128,7 @@ private TransformDataOperator(
List<Tuple3<String, String, String>> transformRules,
OperatorID schemaOperatorID,
String timezone,
List<UserDefinedFunctionDescriptor> udfFunctions) {
List<Tuple2<String, String>> udfFunctions) {
this.transformRules = transformRules;
this.schemaOperatorID = schemaOperatorID;
this.timezone = timezone;
Expand All @@ -149,6 +149,13 @@ public void setup(
new SchemaEvolutionClient(
containingTask.getEnvironment().getOperatorCoordinatorEventGateway(),
schemaOperatorID);
udfDescriptors =
udfFunctions.stream()
.map(
udf -> {
return new UserDefinedFunctionDescriptor(udf.f0, udf.f1);
})
.collect(Collectors.toList());
}

@Override
Expand All @@ -169,15 +176,15 @@ public void open() throws Exception {
return new Tuple4<>(
selectors,
TransformProjection.of(projection),
TransformFilter.of(filterExpression, udfFunctions),
TransformFilter.of(filterExpression, udfDescriptors),
containFilteredComputedColumn(
projection, filterExpression));
})
.collect(Collectors.toList());
this.transformFilterProcessorMap = new ConcurrentHashMap<>();
this.transformProjectionProcessorMap = new ConcurrentHashMap<>();
this.udfFunctionInstances = new ConcurrentHashMap<>();
udfFunctions.forEach(
udfDescriptors.forEach(
udf -> {
try {
Class<?> clazz = Class.forName(udf.getClassPath());
Expand Down Expand Up @@ -269,7 +276,7 @@ private void transformSchema(TableId tableId, Schema schema) throws Exception {
Tuple2.of(tableId, transformProjection),
TransformProjectionProcessor.of(
transformProjection,
udfFunctions,
udfDescriptors,
getUdfFunctionInstances()));
}
TransformProjectionProcessor transformProjectionProcessor =
Expand All @@ -283,7 +290,7 @@ private void transformSchema(TableId tableId, Schema schema) throws Exception {
}

private List<Object> getUdfFunctionInstances() {
return udfFunctions.stream()
return udfDescriptors.stream()
.map(e -> udfFunctionInstances.get(e.getName()))
.collect(Collectors.toList());
}
Expand Down Expand Up @@ -315,7 +322,7 @@ private Optional<DataChangeEvent> processDataChangeEvent(DataChangeEvent dataCha
getTableInfoFromSchemaEvolutionClient(tableId),
transformProjection,
timezone,
udfFunctions,
udfDescriptors,
getUdfFunctionInstances()));
}
TransformProjectionProcessor transformProjectionProcessor =
Expand All @@ -339,7 +346,7 @@ private Optional<DataChangeEvent> processDataChangeEvent(DataChangeEvent dataCha
getTableInfoFromSchemaEvolutionClient(tableId),
transformFilter,
timezone,
udfFunctions,
udfDescriptors,
getUdfFunctionInstances()));
}
TransformFilterProcessor transformFilterProcessor =
Expand All @@ -366,7 +373,7 @@ private Optional<DataChangeEvent> processDataChangeEvent(DataChangeEvent dataCha
getTableInfoFromSchemaEvolutionClient(tableId),
transformProjection,
timezone,
udfFunctions,
udfDescriptors,
getUdfFunctionInstances()));
}
TransformProjectionProcessor transformProjectionProcessor =
Expand Down Expand Up @@ -462,7 +469,7 @@ private void clearOperator() {
}

private void initializeUdf() {
udfFunctions.forEach(
udfDescriptors.forEach(
udf -> {
try {
if (udf.isCdcPipelineUdf()) {
Expand All @@ -483,7 +490,7 @@ private void initializeUdf() {
}

private void destroyUdf() {
udfFunctions.forEach(
udfDescriptors.forEach(
udf -> {
try {
if (udf.isCdcPipelineUdf()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.flink.cdc.runtime.operators.transform;

import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.cdc.common.udf.UserDefinedFunctionDescriptor;
import org.apache.flink.util.FlinkRuntimeException;

import org.apache.flink.shaded.guava31.com.google.common.cache.Cache;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.flink.cdc.runtime.operators.transform;

import org.apache.flink.cdc.common.udf.UserDefinedFunctionDescriptor;
import org.apache.flink.cdc.common.utils.StringUtils;
import org.apache.flink.cdc.runtime.parser.TransformParser;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.apache.flink.cdc.common.data.RecordData;
import org.apache.flink.cdc.common.data.binary.BinaryRecordData;
import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.udf.UserDefinedFunctionDescriptor;
import org.apache.flink.cdc.runtime.parser.JaninoCompiler;
import org.apache.flink.cdc.runtime.parser.TransformParser;
import org.apache.flink.cdc.runtime.typeutils.DataTypeConverter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.udf.UserDefinedFunctionDescriptor;
import org.apache.flink.cdc.runtime.parser.TransformParser;
import org.apache.flink.cdc.runtime.typeutils.DataTypeConverter;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,16 @@
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.schema.Selectors;
import org.apache.flink.cdc.common.udf.UserDefinedFunctionDescriptor;
import org.apache.flink.cdc.common.utils.SchemaUtils;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;

import javax.annotation.Nullable;

Expand All @@ -60,7 +62,8 @@ public class TransformSchemaOperator extends AbstractStreamOperator<Event>
private transient Map<TableId, TransformProjectionProcessor> processorMap;
private final List<Tuple2<Selectors, SchemaMetadataTransform>> schemaMetadataTransformers;
private transient ListState<byte[]> state;
private final List<UserDefinedFunctionDescriptor> udfFunctions;
private final List<Tuple2<String, String>> udfFunctions;
private List<UserDefinedFunctionDescriptor> udfDescriptors;

public static TransformSchemaOperator.Builder newBuilder() {
return new TransformSchemaOperator.Builder();
Expand All @@ -71,7 +74,7 @@ public static class Builder {
private final List<Tuple5<String, String, String, String, String>> transformRules =
new ArrayList<>();

private final List<UserDefinedFunctionDescriptor> udfFunctions = new ArrayList<>();
private final List<Tuple2<String, String>> udfFunctions = new ArrayList<>();

public TransformSchemaOperator.Builder addTransform(
String tableInclusions,
Expand All @@ -85,7 +88,7 @@ public TransformSchemaOperator.Builder addTransform(
}

public TransformSchemaOperator.Builder addUdfFunctions(
List<UserDefinedFunctionDescriptor> udfFunctions) {
List<Tuple2<String, String>> udfFunctions) {
this.udfFunctions.addAll(udfFunctions);
return this;
}
Expand All @@ -97,7 +100,7 @@ public TransformSchemaOperator build() {

private TransformSchemaOperator(
List<Tuple5<String, String, String, String, String>> transformRules,
List<UserDefinedFunctionDescriptor> udfFunctions) {
List<Tuple2<String, String>> udfFunctions) {
this.transformRules = transformRules;
this.tableChangeInfoMap = new ConcurrentHashMap<>();
this.processorMap = new ConcurrentHashMap<>();
Expand All @@ -106,6 +109,18 @@ private TransformSchemaOperator(
this.udfFunctions = udfFunctions;
}

@Override
public void setup(
StreamTask<?, ?> containingTask,
StreamConfig config,
Output<StreamRecord<Event>> output) {
super.setup(containingTask, config, output);
this.udfDescriptors =
this.udfFunctions.stream()
.map(udf -> new UserDefinedFunctionDescriptor(udf.f0, udf.f1))
.collect(Collectors.toList());
}

@Override
public void open() throws Exception {
super.open();
Expand Down Expand Up @@ -232,7 +247,7 @@ private CreateTableEvent transformCreateTableEvent(CreateTableEvent createTableE
tableId,
TransformProjectionProcessor.of(
transformProjection,
udfFunctions,
udfDescriptors,
Collections.emptyList()));
}
TransformProjectionProcessor transformProjectionProcessor =
Expand Down Expand Up @@ -292,7 +307,7 @@ private DataChangeEvent processProjection(
TransformProjectionProcessor.of(
tableChangeInfo,
transformProjection,
udfFunctions,
udfDescriptors,
Collections.emptyList()));
}
TransformProjectionProcessor transformProjectionProcessor = processorMap.get(tableId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@
* limitations under the License.
*/

package org.apache.flink.cdc.common.udf;
package org.apache.flink.cdc.runtime.operators.transform;

import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.udf.UserDefinedFunction;

import java.io.Serializable;
import java.lang.reflect.InvocationTargetException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@

import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.io.ParseException;
import org.apache.flink.cdc.common.udf.UserDefinedFunctionDescriptor;
import org.apache.flink.cdc.common.utils.StringUtils;
import org.apache.flink.cdc.runtime.operators.transform.UserDefinedFunctionDescriptor;
import org.apache.flink.cdc.runtime.typeutils.DataTypeConverter;

import org.apache.calcite.sql.SqlBasicCall;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.types.DataTypes;
import org.apache.flink.cdc.common.udf.UserDefinedFunctionDescriptor;
import org.apache.flink.cdc.common.utils.StringUtils;
import org.apache.flink.cdc.runtime.operators.transform.ProjectionColumn;
import org.apache.flink.cdc.runtime.operators.transform.UserDefinedFunctionDescriptor;
import org.apache.flink.cdc.runtime.parser.metadata.TransformSchemaFactory;
import org.apache.flink.cdc.runtime.parser.metadata.TransformSqlOperatorTable;
import org.apache.flink.cdc.runtime.typeutils.DataTypeConverter;
Expand Down
Loading

0 comments on commit 767b456

Please sign in to comment.