From f89d9de7487e6cd0139442b02d097dc7a73da38c Mon Sep 17 00:00:00 2001 From: Leonard Xu Date: Fri, 19 Jul 2024 17:44:43 +0800 Subject: [PATCH] [FLINK-35237][cdc-common] Improve the interfaces and reorganize the directory. --- .../{sink => function}/HashFunction.java | 17 ++++++---- .../HashFunctionProvider.java | 20 ++++++++---- .../flink/cdc/common/sink/DataSink.java | 10 +++--- ...tDataChangeEventHashFunctionProvider.java} | 32 ++++++++++++------- .../composer/flink/FlinkPipelineComposer.java | 2 +- .../translator/PartitioningTranslator.java | 5 +-- .../partitioning/PrePartitionOperator.java | 21 ++++++------ .../PrePartitionOperatorTest.java | 8 ++--- 8 files changed, 68 insertions(+), 47 deletions(-) rename flink-cdc-common/src/main/java/org/apache/flink/cdc/common/{sink => function}/HashFunction.java (69%) rename flink-cdc-common/src/main/java/org/apache/flink/cdc/common/{sink => function}/HashFunctionProvider.java (66%) rename flink-cdc-common/src/main/java/org/apache/flink/cdc/common/sink/{DefaultHashFunctionProvider.java => DefaultDataChangeEventHashFunctionProvider.java} (73%) diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/sink/HashFunction.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/function/HashFunction.java similarity index 69% rename from flink-cdc-common/src/main/java/org/apache/flink/cdc/common/sink/HashFunction.java rename to flink-cdc-common/src/main/java/org/apache/flink/cdc/common/function/HashFunction.java index f74a084958..323a3de1af 100644 --- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/sink/HashFunction.java +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/function/HashFunction.java @@ -15,14 +15,17 @@ * limitations under the License. */ -package org.apache.flink.cdc.common.sink; +package org.apache.flink.cdc.common.function; -import org.apache.flink.cdc.common.event.DataChangeEvent; +import org.apache.flink.cdc.common.annotation.Internal; -import java.util.function.Function; +/** + * The hash function used to calculate the hash code from a given event. + * + * @param the type of given event. + */ +@Internal +public interface HashFunction { -/** use for {@code PrePartitionOperator} when calculating hash code of primary key. */ -public interface HashFunction extends Function { - @Override - Integer apply(DataChangeEvent event); + int hashcode(T event); } diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/sink/HashFunctionProvider.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/function/HashFunctionProvider.java similarity index 66% rename from flink-cdc-common/src/main/java/org/apache/flink/cdc/common/sink/HashFunctionProvider.java rename to flink-cdc-common/src/main/java/org/apache/flink/cdc/common/function/HashFunctionProvider.java index 06353882be..ea17dcbe5d 100644 --- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/sink/HashFunctionProvider.java +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/function/HashFunctionProvider.java @@ -15,21 +15,27 @@ * limitations under the License. */ -package org.apache.flink.cdc.common.sink; +package org.apache.flink.cdc.common.function; +import org.apache.flink.cdc.common.annotation.Internal; import org.apache.flink.cdc.common.event.DataChangeEvent; import org.apache.flink.cdc.common.event.TableId; import org.apache.flink.cdc.common.schema.Schema; +import org.apache.flink.cdc.common.sink.DataSink; + +import javax.annotation.Nullable; import java.io.Serializable; /** - * Provide {@link HashFunction} to help {@code PrePartitionOperator} to shuffle {@link - * DataChangeEvent} to designated subtask. This is usually beneficial for load balancing, when - * writing to different partitions/buckets in {@link DataSink}, add custom implementation to further - * improve efficiency. + * Provider that provides {@link HashFunction} to help {@code PrePartitionOperator} to shuffle event + * to designated subtask. This is usually beneficial for load balancing, when writing to different + * partitions/buckets in {@link DataSink}, add custom implementation to further improve efficiency. + * + * @param the type of given element */ -public interface HashFunctionProvider extends Serializable { +@Internal +public interface HashFunctionProvider extends Serializable { /** * Gets a hash function based on the given table ID and schema, to help {@code @@ -39,5 +45,5 @@ public interface HashFunctionProvider extends Serializable { * @param schema flink table schema * @return hash function based on the given table ID and schema */ - HashFunction getHashFunction(TableId tableId, Schema schema); + HashFunction getHashFunction(@Nullable TableId tableId, Schema schema); } diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/sink/DataSink.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/sink/DataSink.java index 4efdeabbf9..f565f7b3d0 100644 --- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/sink/DataSink.java +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/sink/DataSink.java @@ -18,6 +18,8 @@ package org.apache.flink.cdc.common.sink; import org.apache.flink.cdc.common.annotation.PublicEvolving; +import org.apache.flink.cdc.common.event.DataChangeEvent; +import org.apache.flink.cdc.common.function.HashFunctionProvider; /** * {@code DataSink} is used to write change data to external system and apply metadata changes to @@ -33,10 +35,10 @@ public interface DataSink { MetadataApplier getMetadataApplier(); /** - * Get the {@link HashFunctionProvider} for calculating hash value when partition by primary - * ley. + * Get the {@code HashFunctionProvider} for calculating hash value if you need + * to partition by data change event before Sink. */ - default HashFunctionProvider getHashFunctionProvider() { - return new DefaultHashFunctionProvider(); + default HashFunctionProvider getDataChangeEventHashFunctionProvider() { + return new DefaultDataChangeEventHashFunctionProvider(); } } diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/sink/DefaultHashFunctionProvider.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/sink/DefaultDataChangeEventHashFunctionProvider.java similarity index 73% rename from flink-cdc-common/src/main/java/org/apache/flink/cdc/common/sink/DefaultHashFunctionProvider.java rename to flink-cdc-common/src/main/java/org/apache/flink/cdc/common/sink/DefaultDataChangeEventHashFunctionProvider.java index 612f1491c5..d0e2d99d73 100644 --- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/sink/DefaultHashFunctionProvider.java +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/sink/DefaultDataChangeEventHashFunctionProvider.java @@ -18,34 +18,43 @@ package org.apache.flink.cdc.common.sink; import org.apache.flink.cdc.common.data.RecordData; +import org.apache.flink.cdc.common.data.RecordData.FieldGetter; import org.apache.flink.cdc.common.event.DataChangeEvent; import org.apache.flink.cdc.common.event.OperationType; import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.function.HashFunction; +import org.apache.flink.cdc.common.function.HashFunctionProvider; import org.apache.flink.cdc.common.schema.Schema; +import javax.annotation.Nullable; + import java.util.ArrayList; import java.util.List; import java.util.Objects; import java.util.Optional; -/** the default implementation of hash function. */ -public class DefaultHashFunctionProvider implements HashFunctionProvider { +/** The default {@link HashFunctionProvider} implementation for data change event. */ +public class DefaultDataChangeEventHashFunctionProvider + implements HashFunctionProvider { + private static final long serialVersionUID = 1L; @Override - public HashFunction getHashFunction(TableId tableId, Schema schema) { - return new DefaultHashFunction(schema); + public HashFunction getHashFunction(@Nullable TableId tableId, Schema schema) { + return new DefaultDataChangeEventHashFunction(schema); } - static class DefaultHashFunction implements HashFunction { - private final List primaryKeyGetters; + /** The default {@link HashFunction} implementation for data change event. */ + static class DefaultDataChangeEventHashFunction implements HashFunction { + + private final List primaryKeyGetters; - public DefaultHashFunction(Schema schema) { + public DefaultDataChangeEventHashFunction(Schema schema) { primaryKeyGetters = createFieldGetters(schema); } @Override - public Integer apply(DataChangeEvent event) { + public int hashcode(DataChangeEvent event) { List objectsToHash = new ArrayList<>(); // Table ID TableId tableId = event.tableId(); @@ -56,7 +65,7 @@ public Integer apply(DataChangeEvent event) { // Primary key RecordData data = event.op().equals(OperationType.DELETE) ? event.before() : event.after(); - for (RecordData.FieldGetter primaryKeyGetter : primaryKeyGetters) { + for (FieldGetter primaryKeyGetter : primaryKeyGetters) { objectsToHash.add(primaryKeyGetter.getFieldOrNull(data)); } @@ -64,9 +73,8 @@ public Integer apply(DataChangeEvent event) { return (Objects.hash(objectsToHash.toArray()) * 31) & 0x7FFFFFFF; } - private List createFieldGetters(Schema schema) { - List fieldGetters = - new ArrayList<>(schema.primaryKeys().size()); + private List createFieldGetters(Schema schema) { + List fieldGetters = new ArrayList<>(schema.primaryKeys().size()); schema.primaryKeys().stream() .mapToInt( pk -> { diff --git a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java index c79d3d1a47..0735b90cf3 100644 --- a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java +++ b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java @@ -140,7 +140,7 @@ public PipelineExecution compose(PipelineDef pipelineDef) { parallelism, parallelism, schemaOperatorIDGenerator.generate(), - dataSink.getHashFunctionProvider()); + dataSink.getDataChangeEventHashFunctionProvider()); // Build Sink Operator DataSinkTranslator sinkTranslator = new DataSinkTranslator(); diff --git a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/PartitioningTranslator.java b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/PartitioningTranslator.java index 8bd34fb036..b4e0b34b8f 100644 --- a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/PartitioningTranslator.java +++ b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/PartitioningTranslator.java @@ -18,8 +18,9 @@ package org.apache.flink.cdc.composer.flink.translator; import org.apache.flink.cdc.common.annotation.Internal; +import org.apache.flink.cdc.common.event.DataChangeEvent; import org.apache.flink.cdc.common.event.Event; -import org.apache.flink.cdc.common.sink.HashFunctionProvider; +import org.apache.flink.cdc.common.function.HashFunctionProvider; import org.apache.flink.cdc.runtime.partitioning.EventPartitioner; import org.apache.flink.cdc.runtime.partitioning.PartitioningEventKeySelector; import org.apache.flink.cdc.runtime.partitioning.PostPartitionProcessor; @@ -41,7 +42,7 @@ public DataStream translate( int upstreamParallelism, int downstreamParallelism, OperatorID schemaOperatorID, - HashFunctionProvider hashFunctionProvider) { + HashFunctionProvider hashFunctionProvider) { return input.transform( "PrePartition", new PartitioningEventTypeInfo(), diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/partitioning/PrePartitionOperator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/partitioning/PrePartitionOperator.java index 26df32d887..0c11005653 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/partitioning/PrePartitionOperator.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/partitioning/PrePartitionOperator.java @@ -23,9 +23,9 @@ import org.apache.flink.cdc.common.event.FlushEvent; import org.apache.flink.cdc.common.event.SchemaChangeEvent; import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.function.HashFunction; +import org.apache.flink.cdc.common.function.HashFunctionProvider; import org.apache.flink.cdc.common.schema.Schema; -import org.apache.flink.cdc.common.sink.HashFunction; -import org.apache.flink.cdc.common.sink.HashFunctionProvider; import org.apache.flink.cdc.runtime.operators.schema.SchemaOperator; import org.apache.flink.cdc.runtime.operators.sink.SchemaEvolutionClient; import org.apache.flink.runtime.jobgraph.OperatorID; @@ -47,19 +47,20 @@ public class PrePartitionOperator extends AbstractStreamOperator implements OneInputStreamOperator { + private static final long serialVersionUID = 1L; private static final Duration CACHE_EXPIRE_DURATION = Duration.ofDays(1); private final OperatorID schemaOperatorId; private final int downstreamParallelism; - private final HashFunctionProvider hashFunctionProvider; + private final HashFunctionProvider hashFunctionProvider; private transient SchemaEvolutionClient schemaEvolutionClient; - private transient LoadingCache cachedHashFunctions; + private transient LoadingCache> cachedHashFunctions; public PrePartitionOperator( OperatorID schemaOperatorId, int downstreamParallelism, - HashFunctionProvider hashFunctionProvider) { + HashFunctionProvider hashFunctionProvider) { this.chainingStrategy = ChainingStrategy.ALWAYS; this.schemaOperatorId = schemaOperatorId; this.downstreamParallelism = downstreamParallelism; @@ -100,7 +101,7 @@ private void partitionBy(DataChangeEvent dataChangeEvent) throws Exception { dataChangeEvent, cachedHashFunctions .get(dataChangeEvent.tableId()) - .apply(dataChangeEvent) + .hashcode(dataChangeEvent) % downstreamParallelism))); } @@ -126,17 +127,17 @@ private Schema loadLatestSchemaFromRegistry(TableId tableId) { return schema.get(); } - private HashFunction recreateHashFunction(TableId tableId) { + private HashFunction recreateHashFunction(TableId tableId) { return hashFunctionProvider.getHashFunction(tableId, loadLatestSchemaFromRegistry(tableId)); } - private LoadingCache createCache() { + private LoadingCache> createCache() { return CacheBuilder.newBuilder() .expireAfterAccess(CACHE_EXPIRE_DURATION) .build( - new CacheLoader() { + new CacheLoader>() { @Override - public HashFunction load(TableId key) { + public HashFunction load(TableId key) { return recreateHashFunction(key); } }); diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/partitioning/PrePartitionOperatorTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/partitioning/PrePartitionOperatorTest.java index 64660c6626..38c80914d3 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/partitioning/PrePartitionOperatorTest.java +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/partitioning/PrePartitionOperatorTest.java @@ -23,7 +23,7 @@ import org.apache.flink.cdc.common.event.FlushEvent; import org.apache.flink.cdc.common.event.TableId; import org.apache.flink.cdc.common.schema.Schema; -import org.apache.flink.cdc.common.sink.DefaultHashFunctionProvider; +import org.apache.flink.cdc.common.sink.DefaultDataChangeEventHashFunctionProvider; import org.apache.flink.cdc.common.types.DataTypes; import org.apache.flink.cdc.common.types.RowType; import org.apache.flink.cdc.runtime.testutils.operators.EventOperatorTestHarness; @@ -128,9 +128,9 @@ void testPartitioningDataChangeEvent() throws Exception { } private int getPartitioningTarget(Schema schema, DataChangeEvent dataChangeEvent) { - return new DefaultHashFunctionProvider() + return new DefaultDataChangeEventHashFunctionProvider() .getHashFunction(null, schema) - .apply(dataChangeEvent) + .hashcode(dataChangeEvent) % DOWNSTREAM_PARALLELISM; } @@ -139,7 +139,7 @@ private EventOperatorTestHarness create new PrePartitionOperator( TestingSchemaRegistryGateway.SCHEMA_OPERATOR_ID, DOWNSTREAM_PARALLELISM, - new DefaultHashFunctionProvider()); + new DefaultDataChangeEventHashFunctionProvider()); return new EventOperatorTestHarness<>(operator, DOWNSTREAM_PARALLELISM); } }