Skip to content

Commit

Permalink
[FLINK-35237][cdc-common] Allow Sink to choose HashFunction in PrePar…
Browse files Browse the repository at this point in the history
…titionOperator
  • Loading branch information
dingxin-tech authored and qiaozongmi committed Sep 23, 2024
1 parent 634b2d0 commit 237812c
Show file tree
Hide file tree
Showing 8 changed files with 198 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,12 @@ public interface DataSink {

/** Get the {@link MetadataApplier} for applying metadata changes to external systems. */
MetadataApplier getMetadataApplier();

/**
* Get the {@link HashFunctionProvider} for calculating hash value when partition by primary
* ley.
*/
default HashFunctionProvider getHashFunctionProvider() {
return new DefaultHashFunctionProvider();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.cdc.common.sink;

import org.apache.flink.cdc.common.data.RecordData;
import org.apache.flink.cdc.common.event.DataChangeEvent;
import org.apache.flink.cdc.common.event.OperationType;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.schema.Schema;

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;

/** the default implementation of hash function. */
public class DefaultHashFunctionProvider implements HashFunctionProvider {
private static final long serialVersionUID = 1L;

@Override
public HashFunction getHashFunction(TableId tableId, Schema schema) {
return new DefaultHashFunction(schema);
}

static class DefaultHashFunction implements HashFunction {
private final List<RecordData.FieldGetter> primaryKeyGetters;

public DefaultHashFunction(Schema schema) {
primaryKeyGetters = createFieldGetters(schema);
}

@Override
public Integer apply(DataChangeEvent event) {
List<Object> objectsToHash = new ArrayList<>();
// Table ID
TableId tableId = event.tableId();
Optional.ofNullable(tableId.getNamespace()).ifPresent(objectsToHash::add);
Optional.ofNullable(tableId.getSchemaName()).ifPresent(objectsToHash::add);
objectsToHash.add(tableId.getTableName());

// Primary key
RecordData data =
event.op().equals(OperationType.DELETE) ? event.before() : event.after();
for (RecordData.FieldGetter primaryKeyGetter : primaryKeyGetters) {
objectsToHash.add(primaryKeyGetter.getFieldOrNull(data));
}

// Calculate hash
return (Objects.hash(objectsToHash.toArray()) * 31) & 0x7FFFFFFF;
}

private List<RecordData.FieldGetter> createFieldGetters(Schema schema) {
List<RecordData.FieldGetter> fieldGetters =
new ArrayList<>(schema.primaryKeys().size());
schema.primaryKeys().stream()
.mapToInt(
pk -> {
int index = schema.getColumnNames().indexOf(pk);
if (index == -1) {
throw new IllegalStateException(
String.format(
"Unable to find column \"%s\" which is defined as primary key",
pk));
}
return index;
})
.forEach(
primaryKeyPosition ->
fieldGetters.add(
RecordData.createFieldGetter(
schema.getColumns()
.get(primaryKeyPosition)
.getType(),
primaryKeyPosition)));
return fieldGetters;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.cdc.common.sink;

import org.apache.flink.cdc.common.event.DataChangeEvent;

import java.util.function.Function;

/** use for {@code PrePartitionOperator} when calculating hash code of primary key. */
public interface HashFunction extends Function<DataChangeEvent, Integer> {
@Override
Integer apply(DataChangeEvent event);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.cdc.common.sink;

import org.apache.flink.cdc.common.event.DataChangeEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.schema.Schema;

import java.io.Serializable;

/**
* Provide {@link HashFunction} to help {@code PrePartitionOperator} to shuffle {@link
* DataChangeEvent} to designated subtask. This is usually beneficial for load balancing, when
* writing to different partitions/buckets in {@link DataSink}, add custom implementation to further
* improve efficiency.
*/
public interface HashFunctionProvider extends Serializable {

/**
* Gets a hash function based on the given table ID and schema, to help {@code
* PrePartitionOperator} to shuffle {@link DataChangeEvent} to designated subtask.
*
* @param tableId table ID
* @param schema flink table schema
* @return hash function based on the given table ID and schema
*/
HashFunction getHashFunction(TableId tableId, Schema schema);
}
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,11 @@ public PipelineExecution compose(PipelineDef pipelineDef) {
PartitioningTranslator partitioningTranslator = new PartitioningTranslator();
stream =
partitioningTranslator.translate(
stream, parallelism, parallelism, schemaOperatorIDGenerator.generate());
stream,
parallelism,
parallelism,
schemaOperatorIDGenerator.generate(),
dataSink.getHashFunctionProvider());

// Build Sink Operator
DataSinkTranslator sinkTranslator = new DataSinkTranslator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.sink.HashFunctionProvider;
import org.apache.flink.cdc.runtime.partitioning.EventPartitioner;
import org.apache.flink.cdc.runtime.partitioning.PartitioningEventKeySelector;
import org.apache.flink.cdc.runtime.partitioning.PostPartitionProcessor;
Expand All @@ -39,11 +40,13 @@ public DataStream<Event> translate(
DataStream<Event> input,
int upstreamParallelism,
int downstreamParallelism,
OperatorID schemaOperatorID) {
OperatorID schemaOperatorID,
HashFunctionProvider hashFunctionProvider) {
return input.transform(
"PrePartition",
new PartitioningEventTypeInfo(),
new PrePartitionOperator(schemaOperatorID, downstreamParallelism))
new PrePartitionOperator(
schemaOperatorID, downstreamParallelism, hashFunctionProvider))
.setParallelism(upstreamParallelism)
.partitionCustom(new EventPartitioner(), new PartitioningEventKeySelector())
.map(new PostPartitionProcessor(), new EventTypeInfo())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,14 @@
package org.apache.flink.cdc.runtime.partitioning;

import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.common.annotation.VisibleForTesting;
import org.apache.flink.cdc.common.data.RecordData;
import org.apache.flink.cdc.common.event.DataChangeEvent;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.FlushEvent;
import org.apache.flink.cdc.common.event.OperationType;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.sink.HashFunction;
import org.apache.flink.cdc.common.sink.HashFunctionProvider;
import org.apache.flink.cdc.runtime.operators.schema.SchemaOperator;
import org.apache.flink.cdc.runtime.operators.sink.SchemaEvolutionClient;
import org.apache.flink.runtime.jobgraph.OperatorID;
Expand All @@ -41,11 +40,7 @@
import org.apache.flink.shaded.guava31.com.google.common.cache.LoadingCache;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Function;

/** Operator for processing events from {@link SchemaOperator} before {@link EventPartitioner}. */
@Internal
Expand All @@ -56,14 +51,19 @@ public class PrePartitionOperator extends AbstractStreamOperator<PartitioningEve

private final OperatorID schemaOperatorId;
private final int downstreamParallelism;
private final HashFunctionProvider hashFunctionProvider;

private transient SchemaEvolutionClient schemaEvolutionClient;
private transient LoadingCache<TableId, HashFunction> cachedHashFunctions;

public PrePartitionOperator(OperatorID schemaOperatorId, int downstreamParallelism) {
public PrePartitionOperator(
OperatorID schemaOperatorId,
int downstreamParallelism,
HashFunctionProvider hashFunctionProvider) {
this.chainingStrategy = ChainingStrategy.ALWAYS;
this.schemaOperatorId = schemaOperatorId;
this.downstreamParallelism = downstreamParallelism;
this.hashFunctionProvider = hashFunctionProvider;
}

@Override
Expand Down Expand Up @@ -127,7 +127,7 @@ private Schema loadLatestSchemaFromRegistry(TableId tableId) {
}

private HashFunction recreateHashFunction(TableId tableId) {
return new HashFunction(loadLatestSchemaFromRegistry(tableId));
return hashFunctionProvider.getHashFunction(tableId, loadLatestSchemaFromRegistry(tableId));
}

private LoadingCache<TableId, HashFunction> createCache() {
Expand All @@ -141,62 +141,4 @@ public HashFunction load(TableId key) {
}
});
}

@VisibleForTesting
static class HashFunction implements Function<DataChangeEvent, Integer> {
private final List<RecordData.FieldGetter> primaryKeyGetters;

public HashFunction(Schema schema) {
primaryKeyGetters = createFieldGetters(schema);
}

@Override
public Integer apply(DataChangeEvent event) {
List<Object> objectsToHash = new ArrayList<>();
// Table ID
TableId tableId = event.tableId();
Optional.ofNullable(tableId.getNamespace()).ifPresent(objectsToHash::add);
Optional.ofNullable(tableId.getSchemaName()).ifPresent(objectsToHash::add);
objectsToHash.add(tableId.getTableName());

// Primary key
RecordData data =
event.op().equals(OperationType.DELETE) ? event.before() : event.after();
for (RecordData.FieldGetter primaryKeyGetter : primaryKeyGetters) {
objectsToHash.add(primaryKeyGetter.getFieldOrNull(data));
}

// Calculate hash
return (Objects.hash(objectsToHash.toArray()) * 31) & 0x7FFFFFFF;
}

private List<RecordData.FieldGetter> createFieldGetters(Schema schema) {
List<RecordData.FieldGetter> fieldGetters =
new ArrayList<>(schema.primaryKeys().size());
int[] primaryKeyPositions =
schema.primaryKeys().stream()
.mapToInt(
pk -> {
int i = 0;
while (!schema.getColumns().get(i).getName().equals(pk)) {
++i;
}
if (i >= schema.getColumnCount()) {
throw new IllegalStateException(
String.format(
"Unable to find column \"%s\" which is defined as primary key",
pk));
}
return i;
})
.toArray();
for (int primaryKeyPosition : primaryKeyPositions) {
fieldGetters.add(
RecordData.createFieldGetter(
schema.getColumns().get(primaryKeyPosition).getType(),
primaryKeyPosition));
}
return fieldGetters;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.cdc.common.event.FlushEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.sink.DefaultHashFunctionProvider;
import org.apache.flink.cdc.common.types.DataTypes;
import org.apache.flink.cdc.common.types.RowType;
import org.apache.flink.cdc.runtime.testutils.operators.EventOperatorTestHarness;
Expand Down Expand Up @@ -127,14 +128,18 @@ void testPartitioningDataChangeEvent() throws Exception {
}

private int getPartitioningTarget(Schema schema, DataChangeEvent dataChangeEvent) {
return new PrePartitionOperator.HashFunction(schema).apply(dataChangeEvent)
return new DefaultHashFunctionProvider()
.getHashFunction(null, schema)
.apply(dataChangeEvent)
% DOWNSTREAM_PARALLELISM;
}

private EventOperatorTestHarness<PrePartitionOperator, PartitioningEvent> createTestHarness() {
PrePartitionOperator operator =
new PrePartitionOperator(
TestingSchemaRegistryGateway.SCHEMA_OPERATOR_ID, DOWNSTREAM_PARALLELISM);
TestingSchemaRegistryGateway.SCHEMA_OPERATOR_ID,
DOWNSTREAM_PARALLELISM,
new DefaultHashFunctionProvider());
return new EventOperatorTestHarness<>(operator, DOWNSTREAM_PARALLELISM);
}
}

0 comments on commit 237812c

Please sign in to comment.