Skip to content

Commit

Permalink
[FLINK-35237][cdc-common] Improve the interfaces and reorganize the d…
Browse files Browse the repository at this point in the history
…irectory.
  • Loading branch information
leonardBang authored and qiaozongmi committed Sep 23, 2024
1 parent 237812c commit f89d9de
Show file tree
Hide file tree
Showing 8 changed files with 68 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,17 @@
* limitations under the License.
*/

package org.apache.flink.cdc.common.sink;
package org.apache.flink.cdc.common.function;

import org.apache.flink.cdc.common.event.DataChangeEvent;
import org.apache.flink.cdc.common.annotation.Internal;

import java.util.function.Function;
/**
* The hash function used to calculate the hash code from a given event.
*
* @param <T> the type of given event.
*/
@Internal
public interface HashFunction<T> {

/** use for {@code PrePartitionOperator} when calculating hash code of primary key. */
public interface HashFunction extends Function<DataChangeEvent, Integer> {
@Override
Integer apply(DataChangeEvent event);
int hashcode(T event);
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,27 @@
* limitations under the License.
*/

package org.apache.flink.cdc.common.sink;
package org.apache.flink.cdc.common.function;

import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.common.event.DataChangeEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.sink.DataSink;

import javax.annotation.Nullable;

import java.io.Serializable;

/**
* Provide {@link HashFunction} to help {@code PrePartitionOperator} to shuffle {@link
* DataChangeEvent} to designated subtask. This is usually beneficial for load balancing, when
* writing to different partitions/buckets in {@link DataSink}, add custom implementation to further
* improve efficiency.
* Provider that provides {@link HashFunction} to help {@code PrePartitionOperator} to shuffle event
* to designated subtask. This is usually beneficial for load balancing, when writing to different
* partitions/buckets in {@link DataSink}, add custom implementation to further improve efficiency.
*
* @param <T> the type of given element
*/
public interface HashFunctionProvider extends Serializable {
@Internal
public interface HashFunctionProvider<T> extends Serializable {

/**
* Gets a hash function based on the given table ID and schema, to help {@code
Expand All @@ -39,5 +45,5 @@ public interface HashFunctionProvider extends Serializable {
* @param schema flink table schema
* @return hash function based on the given table ID and schema
*/
HashFunction getHashFunction(TableId tableId, Schema schema);
HashFunction<T> getHashFunction(@Nullable TableId tableId, Schema schema);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package org.apache.flink.cdc.common.sink;

import org.apache.flink.cdc.common.annotation.PublicEvolving;
import org.apache.flink.cdc.common.event.DataChangeEvent;
import org.apache.flink.cdc.common.function.HashFunctionProvider;

/**
* {@code DataSink} is used to write change data to external system and apply metadata changes to
Expand All @@ -33,10 +35,10 @@ public interface DataSink {
MetadataApplier getMetadataApplier();

/**
* Get the {@link HashFunctionProvider} for calculating hash value when partition by primary
* ley.
* Get the {@code HashFunctionProvider<DataChangeEvent>} for calculating hash value if you need
* to partition by data change event before Sink.
*/
default HashFunctionProvider getHashFunctionProvider() {
return new DefaultHashFunctionProvider();
default HashFunctionProvider<DataChangeEvent> getDataChangeEventHashFunctionProvider() {
return new DefaultDataChangeEventHashFunctionProvider();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,34 +18,43 @@
package org.apache.flink.cdc.common.sink;

import org.apache.flink.cdc.common.data.RecordData;
import org.apache.flink.cdc.common.data.RecordData.FieldGetter;
import org.apache.flink.cdc.common.event.DataChangeEvent;
import org.apache.flink.cdc.common.event.OperationType;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.function.HashFunction;
import org.apache.flink.cdc.common.function.HashFunctionProvider;
import org.apache.flink.cdc.common.schema.Schema;

import javax.annotation.Nullable;

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;

/** the default implementation of hash function. */
public class DefaultHashFunctionProvider implements HashFunctionProvider {
/** The default {@link HashFunctionProvider} implementation for data change event. */
public class DefaultDataChangeEventHashFunctionProvider
implements HashFunctionProvider<DataChangeEvent> {

private static final long serialVersionUID = 1L;

@Override
public HashFunction getHashFunction(TableId tableId, Schema schema) {
return new DefaultHashFunction(schema);
public HashFunction<DataChangeEvent> getHashFunction(@Nullable TableId tableId, Schema schema) {
return new DefaultDataChangeEventHashFunction(schema);
}

static class DefaultHashFunction implements HashFunction {
private final List<RecordData.FieldGetter> primaryKeyGetters;
/** The default {@link HashFunction} implementation for data change event. */
static class DefaultDataChangeEventHashFunction implements HashFunction<DataChangeEvent> {

private final List<FieldGetter> primaryKeyGetters;

public DefaultHashFunction(Schema schema) {
public DefaultDataChangeEventHashFunction(Schema schema) {
primaryKeyGetters = createFieldGetters(schema);
}

@Override
public Integer apply(DataChangeEvent event) {
public int hashcode(DataChangeEvent event) {
List<Object> objectsToHash = new ArrayList<>();
// Table ID
TableId tableId = event.tableId();
Expand All @@ -56,17 +65,16 @@ public Integer apply(DataChangeEvent event) {
// Primary key
RecordData data =
event.op().equals(OperationType.DELETE) ? event.before() : event.after();
for (RecordData.FieldGetter primaryKeyGetter : primaryKeyGetters) {
for (FieldGetter primaryKeyGetter : primaryKeyGetters) {
objectsToHash.add(primaryKeyGetter.getFieldOrNull(data));
}

// Calculate hash
return (Objects.hash(objectsToHash.toArray()) * 31) & 0x7FFFFFFF;
}

private List<RecordData.FieldGetter> createFieldGetters(Schema schema) {
List<RecordData.FieldGetter> fieldGetters =
new ArrayList<>(schema.primaryKeys().size());
private List<FieldGetter> createFieldGetters(Schema schema) {
List<FieldGetter> fieldGetters = new ArrayList<>(schema.primaryKeys().size());
schema.primaryKeys().stream()
.mapToInt(
pk -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ public PipelineExecution compose(PipelineDef pipelineDef) {
parallelism,
parallelism,
schemaOperatorIDGenerator.generate(),
dataSink.getHashFunctionProvider());
dataSink.getDataChangeEventHashFunctionProvider());

// Build Sink Operator
DataSinkTranslator sinkTranslator = new DataSinkTranslator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@
package org.apache.flink.cdc.composer.flink.translator;

import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.common.event.DataChangeEvent;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.sink.HashFunctionProvider;
import org.apache.flink.cdc.common.function.HashFunctionProvider;
import org.apache.flink.cdc.runtime.partitioning.EventPartitioner;
import org.apache.flink.cdc.runtime.partitioning.PartitioningEventKeySelector;
import org.apache.flink.cdc.runtime.partitioning.PostPartitionProcessor;
Expand All @@ -41,7 +42,7 @@ public DataStream<Event> translate(
int upstreamParallelism,
int downstreamParallelism,
OperatorID schemaOperatorID,
HashFunctionProvider hashFunctionProvider) {
HashFunctionProvider<DataChangeEvent> hashFunctionProvider) {
return input.transform(
"PrePartition",
new PartitioningEventTypeInfo(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@
import org.apache.flink.cdc.common.event.FlushEvent;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.function.HashFunction;
import org.apache.flink.cdc.common.function.HashFunctionProvider;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.sink.HashFunction;
import org.apache.flink.cdc.common.sink.HashFunctionProvider;
import org.apache.flink.cdc.runtime.operators.schema.SchemaOperator;
import org.apache.flink.cdc.runtime.operators.sink.SchemaEvolutionClient;
import org.apache.flink.runtime.jobgraph.OperatorID;
Expand All @@ -47,19 +47,20 @@
public class PrePartitionOperator extends AbstractStreamOperator<PartitioningEvent>
implements OneInputStreamOperator<Event, PartitioningEvent> {

private static final long serialVersionUID = 1L;
private static final Duration CACHE_EXPIRE_DURATION = Duration.ofDays(1);

private final OperatorID schemaOperatorId;
private final int downstreamParallelism;
private final HashFunctionProvider hashFunctionProvider;
private final HashFunctionProvider<DataChangeEvent> hashFunctionProvider;

private transient SchemaEvolutionClient schemaEvolutionClient;
private transient LoadingCache<TableId, HashFunction> cachedHashFunctions;
private transient LoadingCache<TableId, HashFunction<DataChangeEvent>> cachedHashFunctions;

public PrePartitionOperator(
OperatorID schemaOperatorId,
int downstreamParallelism,
HashFunctionProvider hashFunctionProvider) {
HashFunctionProvider<DataChangeEvent> hashFunctionProvider) {
this.chainingStrategy = ChainingStrategy.ALWAYS;
this.schemaOperatorId = schemaOperatorId;
this.downstreamParallelism = downstreamParallelism;
Expand Down Expand Up @@ -100,7 +101,7 @@ private void partitionBy(DataChangeEvent dataChangeEvent) throws Exception {
dataChangeEvent,
cachedHashFunctions
.get(dataChangeEvent.tableId())
.apply(dataChangeEvent)
.hashcode(dataChangeEvent)
% downstreamParallelism)));
}

Expand All @@ -126,17 +127,17 @@ private Schema loadLatestSchemaFromRegistry(TableId tableId) {
return schema.get();
}

private HashFunction recreateHashFunction(TableId tableId) {
private HashFunction<DataChangeEvent> recreateHashFunction(TableId tableId) {
return hashFunctionProvider.getHashFunction(tableId, loadLatestSchemaFromRegistry(tableId));
}

private LoadingCache<TableId, HashFunction> createCache() {
private LoadingCache<TableId, HashFunction<DataChangeEvent>> createCache() {
return CacheBuilder.newBuilder()
.expireAfterAccess(CACHE_EXPIRE_DURATION)
.build(
new CacheLoader<TableId, HashFunction>() {
new CacheLoader<TableId, HashFunction<DataChangeEvent>>() {
@Override
public HashFunction load(TableId key) {
public HashFunction<DataChangeEvent> load(TableId key) {
return recreateHashFunction(key);
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.apache.flink.cdc.common.event.FlushEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.sink.DefaultHashFunctionProvider;
import org.apache.flink.cdc.common.sink.DefaultDataChangeEventHashFunctionProvider;
import org.apache.flink.cdc.common.types.DataTypes;
import org.apache.flink.cdc.common.types.RowType;
import org.apache.flink.cdc.runtime.testutils.operators.EventOperatorTestHarness;
Expand Down Expand Up @@ -128,9 +128,9 @@ void testPartitioningDataChangeEvent() throws Exception {
}

private int getPartitioningTarget(Schema schema, DataChangeEvent dataChangeEvent) {
return new DefaultHashFunctionProvider()
return new DefaultDataChangeEventHashFunctionProvider()
.getHashFunction(null, schema)
.apply(dataChangeEvent)
.hashcode(dataChangeEvent)
% DOWNSTREAM_PARALLELISM;
}

Expand All @@ -139,7 +139,7 @@ private EventOperatorTestHarness<PrePartitionOperator, PartitioningEvent> create
new PrePartitionOperator(
TestingSchemaRegistryGateway.SCHEMA_OPERATOR_ID,
DOWNSTREAM_PARALLELISM,
new DefaultHashFunctionProvider());
new DefaultDataChangeEventHashFunctionProvider());
return new EventOperatorTestHarness<>(operator, DOWNSTREAM_PARALLELISM);
}
}

0 comments on commit f89d9de

Please sign in to comment.