Skip to content

Commit

Permalink
Merge pull request #1 from morningman/feature-hiveBucketShuffle
Browse files Browse the repository at this point in the history
[opt] refine some method name
  • Loading branch information
Nitin-Kashyap authored Mar 1, 2024
2 parents a5ce239 + 3f651ad commit 83b98e5
Show file tree
Hide file tree
Showing 12 changed files with 76 additions and 67 deletions.
4 changes: 3 additions & 1 deletion be/src/vec/sink/vdata_stream_sender.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -423,9 +423,11 @@ Status VDataStreamSender::init(const TDataSink& tsink) {
if (_hash_type == THashType::CRC32) {
_partitioner.reset(
new Crc32HashPartitioner<ShuffleChannelIds>(_channel_shared_ptrs.size()));
} else {
} else if (_hash_type == THashType::SPARK_MURMUR32) {
_partitioner.reset(new Murmur32HashPartitioner<ShufflePModChannelIds>(
_channel_shared_ptrs.size()));
} else {
return Status::InternalError("Invalid hash type for bucket shuffle: {}", _hash_type);
}
RETURN_IF_ERROR(_partitioner->init(t_stream_sink.output_partition.partition_exprs));
} else if (_part_type == TPartitionType::RANGE_PARTITIONED) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,26 +29,11 @@ public class HiveExternalDistributionInfo extends HashDistributionInfo {
@SerializedName(value = "bucketingVersion")
private final int bucketingVersion;

public HiveExternalDistributionInfo() {
bucketingVersion = 2;
}

public HiveExternalDistributionInfo(int bucketNum, List<Column> distributionColumns, int bucketingVersion) {
super(bucketNum, distributionColumns);
this.bucketingVersion = bucketingVersion;
}

public HiveExternalDistributionInfo(int bucketNum, boolean autoBucket,
List<Column> distributionColumns, int bucketingVersion) {
super(bucketNum, autoBucket, distributionColumns);
this.bucketingVersion = bucketingVersion;
}

public int getBucketingVersion() {
return bucketingVersion;
}


@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,6 @@ public void init(BeSelectionPolicy policy) throws UserException {
} catch (ExecutionException e) {
throw new UserException("failed to get consistent hash", e);
}
/*consistentBucket = new ConsistentHash<>(Hashing.murmur3_128(), new BucketHash(),
new BackendHash(), backends, Config.virtual_node_number);*/
}

public Backend getNextBe() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,12 +350,12 @@ public void createScanRangeLocations() throws UserException {
? BrokerUtil.parseColumnsFromPath(fileSplit.getPath().toString(), pathPartitionKeys,
false, isACID) : fileSplit.getPartitionValues();

boolean isBucketedHiveTable = false;
boolean isSparkBucketedHiveTable = false;
int bucketNum = 0;
TableIf targetTable = getTargetTable();
if (targetTable instanceof HMSExternalTable) {
isBucketedHiveTable = ((HMSExternalTable) targetTable).isBucketedTable();
if (isBucketedHiveTable) {
isSparkBucketedHiveTable = ((HMSExternalTable) targetTable).isSparkBucketedTable();
if (isSparkBucketedHiveTable) {
bucketNum = HiveBucketUtil.getBucketNumberFromPath(fileSplit.getPath().getName()).getAsInt();
}
}
Expand Down Expand Up @@ -397,7 +397,7 @@ public void createScanRangeLocations() throws UserException {
fileSplit.getStart(), fileSplit.getLength(),
Joiner.on("|").join(fileSplit.getHosts()));
}
if (isBucketedHiveTable) {
if (isSparkBucketedHiveTable) {
bucketSeq2locations.put(bucketNum, curLocations);
}
scanRangeLocations.add(curLocations);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.doris.catalog.Type;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.hive.HiveBucketUtil.HiveBucketType;
import org.apache.doris.datasource.hudi.HudiUtils;
import org.apache.doris.datasource.iceberg.IcebergUtils;
import org.apache.doris.mtmv.MTMVMaxTimestampSnapshot;
Expand Down Expand Up @@ -167,7 +168,7 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI
protected volatile org.apache.hadoop.hive.metastore.api.Table remoteTable = null;
protected List<Column> partitionColumns;
private List<Column> bucketColumns;
private boolean isSparkTable;
private HiveBucketType hiveBucketType = HiveBucketType.NONE;

private DLAType dlaType = DLAType.UNKNOWN;

Expand Down Expand Up @@ -256,12 +257,8 @@ public boolean isHoodieCowTable() {
return "org.apache.hudi.hadoop.HoodieParquetInputFormat".equals(inputFormatName);
}

public boolean isSparkTable() {
return isSparkTable;
}

public boolean isBucketedTable() {
return bucketColumns != null && !bucketColumns.isEmpty() && isSparkTable;
public boolean isSparkBucketedTable() {
return bucketColumns != null && !bucketColumns.isEmpty() && hiveBucketType == HiveBucketType.SPARK;
}

/**
Expand Down Expand Up @@ -507,7 +504,7 @@ public List<Column> initSchema() {
private void initBucketingColumns(List<Column> columns) {
List<String> bucketCols = new ArrayList<>(5);
int numBuckets = getBucketColumns(bucketCols);
if (bucketCols.isEmpty() || !isSparkTable) {
if (bucketCols.isEmpty() || hiveBucketType != HiveBucketType.SPARK) {
bucketColumns = ImmutableList.of();
distributionInfo = new RandomDistributionInfo(1, true);
return;
Expand Down Expand Up @@ -544,6 +541,7 @@ private int getBucketColumns(List<String> bucketCols) {
/* Hive Bucketed Table */
bucketCols.addAll(descriptor.getBucketCols());
numBuckets = descriptor.getNumBuckets();
hiveBucketType = HiveBucketType.HIVE;
} else if (remoteTable.isSetParameters()
&& !Collections.disjoint(SUPPORTED_BUCKET_PROPERTIES, remoteTable.getParameters().keySet())) {
Map<String, String> parameters = remoteTable.getParameters();
Expand All @@ -558,7 +556,7 @@ private int getBucketColumns(List<String> bucketCols) {
}

if (numBuckets > 0) {
isSparkTable = true;
hiveBucketType = HiveBucketType.SPARK;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,12 @@
public class HiveBucketUtil {
private static final Logger LOG = LogManager.getLogger(HiveBucketUtil.class);

public enum HiveBucketType {
NONE,
HIVE,
SPARK
}

private static final Set<PrimitiveType> SUPPORTED_TYPES_FOR_BUCKET_FILTER = ImmutableSet.of(
PrimitiveType.BOOLEAN,
PrimitiveType.TINYINT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ protected TFileCompressType getFileCompressType(FileSplit fileSplit) throws User

@Override
public DataPartition constructInputPartitionByDistributionInfo() {
if (hmsTable.isBucketedTable()) {
if (hmsTable.isSparkBucketedTable()) {
DistributionInfo distributionInfo = hmsTable.getDefaultDistributionInfo();
if (!(distributionInfo instanceof HashDistributionInfo)) {
return DataPartition.RANDOM;
Expand All @@ -448,7 +448,7 @@ public HMSExternalTable getHiveTable() {

@Override
public THashType getHashType() {
if (hmsTable.isBucketedTable()
if (hmsTable.isSparkBucketedTable()
&& hmsTable.getDefaultDistributionInfo() instanceof HashDistributionInfo) {
return THashType.SPARK_MURMUR32;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2458,17 +2458,7 @@ private DataPartition toDataPartition(DistributionSpec distributionSpec,
THashType hashType = THashType.XXHASH64;
switch (distributionSpecHash.getShuffleType()) {
case STORAGE_BUCKETED:
switch (distributionSpecHash.getShuffleFunction()) {
case STORAGE_BUCKET_SPARK_MURMUR32:
hashType = THashType.SPARK_MURMUR32;
break;
case STORAGE_BUCKET_CRC32:
hashType = THashType.CRC32;
break;
case STORAGE_BUCKET_XXHASH64:
default:
break;
}
hashType = distributionSpecHash.getShuffleFunction().toThrift();
partitionType = TPartitionType.BUCKET_SHFFULE_HASH_PARTITIONED;
break;
case EXECUTION_BUCKETED:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.doris.nereids.annotation.Developing;
import org.apache.doris.nereids.trees.expressions.ExprId;
import org.apache.doris.nereids.util.Utils;
import org.apache.doris.thrift.THashType;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -338,9 +339,24 @@ public enum ShuffleType {
* Enums for concrete shuffle functions.
*/
public enum StorageBucketHashType {
// CRC32 is for Doris internal storage bucket hash function
STORAGE_BUCKET_CRC32,
// XXHASH64 is the default hash function for Doris computation layer
STORAGE_BUCKET_XXHASH64,
STORAGE_BUCKET_SPARK_MURMUR32
// SPARK_MURMUR32 is the hash function for Spark bucketed hive table's storage and computation
STORAGE_BUCKET_SPARK_MURMUR32;

public THashType toThrift() {
switch (this) {
case STORAGE_BUCKET_CRC32:
return THashType.CRC32;
case STORAGE_BUCKET_SPARK_MURMUR32:
return THashType.SPARK_MURMUR32;
case STORAGE_BUCKET_XXHASH64:
default:
return THashType.XXHASH64;
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ private DistributionSpec convertDistribution(LogicalFileScan fileScan) {
}
}
StorageBucketHashType function = StorageBucketHashType.STORAGE_BUCKET_CRC32;
if (hmsExternalTable.isBucketedTable()) {
if (hmsExternalTable.isSparkBucketedTable()) {
function = StorageBucketHashType.STORAGE_BUCKET_SPARK_MURMUR32;
}
return new DistributionSpecHash(hashColumns, DistributionSpecHash.ShuffleType.NATURAL,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -625,29 +625,35 @@ private boolean canBucketShuffleJoin(HashJoinNode node, PlanFragment leftChildFr
}

PlanNode leftRoot = leftChildFragment.getPlanRoot();
// 1.leftRoot be OlapScanNode
if (leftRoot instanceof OlapScanNode) {
return canBucketShuffleJoin(node, (OlapScanNode) leftRoot, rhsHashExprs);
} else if (leftRoot instanceof HiveScanNode) {
return canBucketShuffleJoin(node, (HiveScanNode) leftRoot, rhsHashExprs, hashType);
if (leftRoot instanceof ScanNode) {
return canBucketShuffleJoin(node, (ScanNode) leftRoot, rhsHashExprs, hashType);
}

// 2.leftRoot be hashjoin node
if (leftRoot instanceof HashJoinNode) {
while (leftRoot instanceof HashJoinNode) {
leftRoot = leftRoot.getChild(0);
}
if (leftRoot instanceof OlapScanNode) {
return canBucketShuffleJoin(node, (OlapScanNode) leftRoot, rhsHashExprs);
} else if (leftRoot instanceof HiveScanNode) {
return canBucketShuffleJoin(node, (HiveScanNode) leftRoot, rhsHashExprs, hashType);
if (leftRoot instanceof ScanNode) {
canBucketShuffleJoin(node, (ScanNode) leftRoot, rhsHashExprs, hashType);
}
}

return false;
}

private boolean canBucketShuffleJoin(HashJoinNode node, HiveScanNode leftScanNode,
private boolean canBucketShuffleJoin(HashJoinNode node, ScanNode leftScanNode,
List<Expr> rhsJoinExprs, Ref<THashType> hashType) {
if (leftScanNode instanceof OlapScanNode) {
return canBucketShuffleJoinForOlap(node, (OlapScanNode) leftScanNode, rhsJoinExprs);
} else if (leftScanNode instanceof HiveScanNode) {
return canBucketShuffleJoinForHive(node, (HiveScanNode) leftScanNode, rhsJoinExprs, hashType);
} else {
return false;
}
}

private boolean canBucketShuffleJoinForHive(HashJoinNode node, HiveScanNode leftScanNode,
List<Expr> rhsJoinExprs, Ref<THashType> hashType) {
HMSExternalTable leftTable = leftScanNode.getHiveTable();

Expand Down Expand Up @@ -713,7 +719,7 @@ private boolean canBucketShuffleJoin(HashJoinNode node, HiveScanNode leftScanNod
}

//the join expr must contian left table distribute column
private boolean canBucketShuffleJoin(HashJoinNode node, OlapScanNode leftScanNode,
private boolean canBucketShuffleJoinForOlap(HashJoinNode node, OlapScanNode leftScanNode,
List<Expr> rhsJoinExprs) {
OlapTable leftTable = leftScanNode.getOlapTable();

Expand Down
28 changes: 18 additions & 10 deletions fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
Original file line number Diff line number Diff line change
Expand Up @@ -2291,13 +2291,8 @@ private void computeScanRangeAssignment() throws Exception {
computeScanRangeAssignmentByColocate((OlapScanNode) scanNode, assignedBytesPerHost, replicaNumPerHost);
}
if (fragmentContainsBucketShuffleJoin) {
if (scanNode instanceof OlapScanNode) {
bucketShuffleJoinController.computeScanRangeAssignmentByBucket((OlapScanNode) scanNode,
idToBackend, addressToBackendID, replicaNumPerHost);
} else if (scanNode instanceof HiveScanNode) {
bucketShuffleJoinController.computeScanRangeAssignmentByBucket((HiveScanNode) scanNode,
idToBackend, addressToBackendID, replicaNumPerHost);
}
bucketShuffleJoinController.computeScanRangeAssignmentByBucket(scanNode,
idToBackend, addressToBackendID, replicaNumPerHost);
}
if (!(fragmentContainsColocateJoin || fragmentContainsBucketShuffleJoin)) {
computeScanRangeAssignmentByScheduler(scanNode, locations, assignment, assignedBytesPerHost,
Expand Down Expand Up @@ -2922,8 +2917,21 @@ private void getExecHostPortForFragmentIDAndBucketSeq(TScanRangeLocations seqLoc
this.fragmentIdToSeqToAddressMap.get(fragmentId).put(bucketSeq, execHostPort);
}

// to ensure the same bucketSeq tablet to the same execHostPort
private void computeScanRangeAssignmentByBucket(
final ScanNode scanNode, ImmutableMap<Long, Backend> idToBackend,
Map<TNetworkAddress, Long> addressToBackendID,
Map<TNetworkAddress, Long> replicaNumPerHost) throws Exception {
if (scanNode instanceof OlapScanNode) {
computeScanRangeAssignmentByBucketForOlap((OlapScanNode) scanNode, idToBackend, addressToBackendID,
replicaNumPerHost);
} else if (scanNode instanceof HiveScanNode) {
computeScanRangeAssignmentByBucketForHive((HiveScanNode) scanNode, idToBackend, addressToBackendID,
replicaNumPerHost);
}
}

// to ensure the same bucketSeq tablet to the same execHostPort
private void computeScanRangeAssignmentByBucketForOlap(
final OlapScanNode scanNode, ImmutableMap<Long, Backend> idToBackend,
Map<TNetworkAddress, Long> addressToBackendID,
Map<TNetworkAddress, Long> replicaNumPerHost) throws Exception {
Expand Down Expand Up @@ -2974,13 +2982,13 @@ private void computeScanRangeAssignmentByBucket(
}
}

private void computeScanRangeAssignmentByBucket(
private void computeScanRangeAssignmentByBucketForHive(
final HiveScanNode scanNode, ImmutableMap<Long, Backend> idToBackend,
Map<TNetworkAddress, Long> addressToBackendID,
Map<TNetworkAddress, Long> replicaNumPerHost) throws Exception {
if (!fragmentIdToSeqToAddressMap.containsKey(scanNode.getFragmentId())) {
int bucketNum = 0;
if (scanNode.getHiveTable().isBucketedTable()) {
if (scanNode.getHiveTable().isSparkBucketedTable()) {
bucketNum = scanNode.getHiveTable().getDefaultDistributionInfo().getBucketNum();
} else {
throw new NotImplementedException("bucket shuffle for non-bucketed table not supported");
Expand Down

0 comments on commit 83b98e5

Please sign in to comment.