diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp index 1036097689ca07..9ac8fc00774d7d 100644 --- a/be/src/vec/sink/vdata_stream_sender.cpp +++ b/be/src/vec/sink/vdata_stream_sender.cpp @@ -423,9 +423,11 @@ Status VDataStreamSender::init(const TDataSink& tsink) { if (_hash_type == THashType::CRC32) { _partitioner.reset( new Crc32HashPartitioner(_channel_shared_ptrs.size())); - } else { + } else if (_hash_type == THashType::SPARK_MURMUR32) { _partitioner.reset(new Murmur32HashPartitioner( _channel_shared_ptrs.size())); + } else { + return Status::InternalError("Invalid hash type for bucket shuffle: {}", _hash_type); } RETURN_IF_ERROR(_partitioner->init(t_stream_sink.output_partition.partition_exprs)); } else if (_part_type == TPartitionType::RANGE_PARTITIONED) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveExternalDistributionInfo.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveExternalDistributionInfo.java index 5b15874401908a..d30d0f2e36cbfb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveExternalDistributionInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveExternalDistributionInfo.java @@ -29,26 +29,11 @@ public class HiveExternalDistributionInfo extends HashDistributionInfo { @SerializedName(value = "bucketingVersion") private final int bucketingVersion; - public HiveExternalDistributionInfo() { - bucketingVersion = 2; - } - public HiveExternalDistributionInfo(int bucketNum, List distributionColumns, int bucketingVersion) { super(bucketNum, distributionColumns); this.bucketingVersion = bucketingVersion; } - public HiveExternalDistributionInfo(int bucketNum, boolean autoBucket, - List distributionColumns, int bucketingVersion) { - super(bucketNum, autoBucket, distributionColumns); - this.bucketingVersion = bucketingVersion; - } - - public int getBucketingVersion() { - return bucketingVersion; - } - - @Override public boolean equals(Object o) { if (this == o) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/FederationBackendPolicy.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/FederationBackendPolicy.java index 73b1ce9c8371f5..2540bb0377feca 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/FederationBackendPolicy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FederationBackendPolicy.java @@ -198,8 +198,6 @@ public void init(BeSelectionPolicy policy) throws UserException { } catch (ExecutionException e) { throw new UserException("failed to get consistent hash", e); } - /*consistentBucket = new ConsistentHash<>(Hashing.murmur3_128(), new BucketHash(), - new BackendHash(), backends, Config.virtual_node_number);*/ } public Backend getNextBe() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java index a9d8797612256d..ccf3fdf807e504 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java @@ -350,12 +350,12 @@ public void createScanRangeLocations() throws UserException { ? BrokerUtil.parseColumnsFromPath(fileSplit.getPath().toString(), pathPartitionKeys, false, isACID) : fileSplit.getPartitionValues(); - boolean isBucketedHiveTable = false; + boolean isSparkBucketedHiveTable = false; int bucketNum = 0; TableIf targetTable = getTargetTable(); if (targetTable instanceof HMSExternalTable) { - isBucketedHiveTable = ((HMSExternalTable) targetTable).isBucketedTable(); - if (isBucketedHiveTable) { + isSparkBucketedHiveTable = ((HMSExternalTable) targetTable).isSparkBucketedTable(); + if (isSparkBucketedHiveTable) { bucketNum = HiveBucketUtil.getBucketNumberFromPath(fileSplit.getPath().getName()).getAsInt(); } } @@ -397,7 +397,7 @@ public void createScanRangeLocations() throws UserException { fileSplit.getStart(), fileSplit.getLength(), Joiner.on("|").join(fileSplit.getHosts())); } - if (isBucketedHiveTable) { + if (isSparkBucketedHiveTable) { bucketSeq2locations.put(bucketNum, curLocations); } scanRangeLocations.add(curLocations); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java index a770f688aa435f..f48ceebe5dfd71 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java @@ -31,6 +31,7 @@ import org.apache.doris.catalog.Type; import org.apache.doris.common.AnalysisException; import org.apache.doris.datasource.ExternalTable; +import org.apache.doris.datasource.hive.HiveBucketUtil.HiveBucketType; import org.apache.doris.datasource.hudi.HudiUtils; import org.apache.doris.datasource.iceberg.IcebergUtils; import org.apache.doris.mtmv.MTMVMaxTimestampSnapshot; @@ -167,7 +168,7 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI protected volatile org.apache.hadoop.hive.metastore.api.Table remoteTable = null; protected List partitionColumns; private List bucketColumns; - private boolean isSparkTable; + private HiveBucketType hiveBucketType = HiveBucketType.NONE; private DLAType dlaType = DLAType.UNKNOWN; @@ -256,12 +257,8 @@ public boolean isHoodieCowTable() { return "org.apache.hudi.hadoop.HoodieParquetInputFormat".equals(inputFormatName); } - public boolean isSparkTable() { - return isSparkTable; - } - - public boolean isBucketedTable() { - return bucketColumns != null && !bucketColumns.isEmpty() && isSparkTable; + public boolean isSparkBucketedTable() { + return bucketColumns != null && !bucketColumns.isEmpty() && hiveBucketType == HiveBucketType.SPARK; } /** @@ -507,7 +504,7 @@ public List initSchema() { private void initBucketingColumns(List columns) { List bucketCols = new ArrayList<>(5); int numBuckets = getBucketColumns(bucketCols); - if (bucketCols.isEmpty() || !isSparkTable) { + if (bucketCols.isEmpty() || hiveBucketType != HiveBucketType.SPARK) { bucketColumns = ImmutableList.of(); distributionInfo = new RandomDistributionInfo(1, true); return; @@ -544,6 +541,7 @@ private int getBucketColumns(List bucketCols) { /* Hive Bucketed Table */ bucketCols.addAll(descriptor.getBucketCols()); numBuckets = descriptor.getNumBuckets(); + hiveBucketType = HiveBucketType.HIVE; } else if (remoteTable.isSetParameters() && !Collections.disjoint(SUPPORTED_BUCKET_PROPERTIES, remoteTable.getParameters().keySet())) { Map parameters = remoteTable.getParameters(); @@ -558,7 +556,7 @@ private int getBucketColumns(List bucketCols) { } if (numBuckets > 0) { - isSparkTable = true; + hiveBucketType = HiveBucketType.SPARK; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveBucketUtil.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveBucketUtil.java index ce0d9cfba98bf7..fc0bed8d5e1c25 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveBucketUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveBucketUtil.java @@ -65,6 +65,12 @@ public class HiveBucketUtil { private static final Logger LOG = LogManager.getLogger(HiveBucketUtil.class); + public enum HiveBucketType { + NONE, + HIVE, + SPARK + } + private static final Set SUPPORTED_TYPES_FOR_BUCKET_FILTER = ImmutableSet.of( PrimitiveType.BOOLEAN, PrimitiveType.TINYINT, diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java index 4264b8c50f4e85..570d2e61c70bc5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java @@ -425,7 +425,7 @@ protected TFileCompressType getFileCompressType(FileSplit fileSplit) throws User @Override public DataPartition constructInputPartitionByDistributionInfo() { - if (hmsTable.isBucketedTable()) { + if (hmsTable.isSparkBucketedTable()) { DistributionInfo distributionInfo = hmsTable.getDefaultDistributionInfo(); if (!(distributionInfo instanceof HashDistributionInfo)) { return DataPartition.RANDOM; @@ -448,7 +448,7 @@ public HMSExternalTable getHiveTable() { @Override public THashType getHashType() { - if (hmsTable.isBucketedTable() + if (hmsTable.isSparkBucketedTable() && hmsTable.getDefaultDistributionInfo() instanceof HashDistributionInfo) { return THashType.SPARK_MURMUR32; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index a43903f9481721..3d4d152a6370d1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -2458,17 +2458,7 @@ private DataPartition toDataPartition(DistributionSpec distributionSpec, THashType hashType = THashType.XXHASH64; switch (distributionSpecHash.getShuffleType()) { case STORAGE_BUCKETED: - switch (distributionSpecHash.getShuffleFunction()) { - case STORAGE_BUCKET_SPARK_MURMUR32: - hashType = THashType.SPARK_MURMUR32; - break; - case STORAGE_BUCKET_CRC32: - hashType = THashType.CRC32; - break; - case STORAGE_BUCKET_XXHASH64: - default: - break; - } + hashType = distributionSpecHash.getShuffleFunction().toThrift(); partitionType = TPartitionType.BUCKET_SHFFULE_HASH_PARTITIONED; break; case EXECUTION_BUCKETED: diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpecHash.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpecHash.java index 5bf1a7f52472bc..3bd1ab1ae52964 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpecHash.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpecHash.java @@ -20,6 +20,7 @@ import org.apache.doris.nereids.annotation.Developing; import org.apache.doris.nereids.trees.expressions.ExprId; import org.apache.doris.nereids.util.Utils; +import org.apache.doris.thrift.THashType; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -338,9 +339,24 @@ public enum ShuffleType { * Enums for concrete shuffle functions. */ public enum StorageBucketHashType { + // CRC32 is for Doris internal storage bucket hash function STORAGE_BUCKET_CRC32, + // XXHASH64 is the default hash function for Doris computation layer STORAGE_BUCKET_XXHASH64, - STORAGE_BUCKET_SPARK_MURMUR32 + // SPARK_MURMUR32 is the hash function for Spark bucketed hive table's storage and computation + STORAGE_BUCKET_SPARK_MURMUR32; + + public THashType toThrift() { + switch (this) { + case STORAGE_BUCKET_CRC32: + return THashType.CRC32; + case STORAGE_BUCKET_SPARK_MURMUR32: + return THashType.SPARK_MURMUR32; + case STORAGE_BUCKET_XXHASH64: + default: + return THashType.XXHASH64; + } + } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalFileScanToPhysicalFileScan.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalFileScanToPhysicalFileScan.java index 682b5eb2659b19..387cf0245a922e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalFileScanToPhysicalFileScan.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalFileScanToPhysicalFileScan.java @@ -80,7 +80,7 @@ private DistributionSpec convertDistribution(LogicalFileScan fileScan) { } } StorageBucketHashType function = StorageBucketHashType.STORAGE_BUCKET_CRC32; - if (hmsExternalTable.isBucketedTable()) { + if (hmsExternalTable.isSparkBucketedTable()) { function = StorageBucketHashType.STORAGE_BUCKET_SPARK_MURMUR32; } return new DistributionSpecHash(hashColumns, DistributionSpecHash.ShuffleType.NATURAL, diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java index 4cc9608088cb81..3ba8efa48a29a0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java @@ -625,11 +625,8 @@ private boolean canBucketShuffleJoin(HashJoinNode node, PlanFragment leftChildFr } PlanNode leftRoot = leftChildFragment.getPlanRoot(); - // 1.leftRoot be OlapScanNode - if (leftRoot instanceof OlapScanNode) { - return canBucketShuffleJoin(node, (OlapScanNode) leftRoot, rhsHashExprs); - } else if (leftRoot instanceof HiveScanNode) { - return canBucketShuffleJoin(node, (HiveScanNode) leftRoot, rhsHashExprs, hashType); + if (leftRoot instanceof ScanNode) { + return canBucketShuffleJoin(node, (ScanNode) leftRoot, rhsHashExprs, hashType); } // 2.leftRoot be hashjoin node @@ -637,17 +634,26 @@ private boolean canBucketShuffleJoin(HashJoinNode node, PlanFragment leftChildFr while (leftRoot instanceof HashJoinNode) { leftRoot = leftRoot.getChild(0); } - if (leftRoot instanceof OlapScanNode) { - return canBucketShuffleJoin(node, (OlapScanNode) leftRoot, rhsHashExprs); - } else if (leftRoot instanceof HiveScanNode) { - return canBucketShuffleJoin(node, (HiveScanNode) leftRoot, rhsHashExprs, hashType); + if (leftRoot instanceof ScanNode) { + canBucketShuffleJoin(node, (ScanNode) leftRoot, rhsHashExprs, hashType); } } return false; } - private boolean canBucketShuffleJoin(HashJoinNode node, HiveScanNode leftScanNode, + private boolean canBucketShuffleJoin(HashJoinNode node, ScanNode leftScanNode, + List rhsJoinExprs, Ref hashType) { + if (leftScanNode instanceof OlapScanNode) { + return canBucketShuffleJoinForOlap(node, (OlapScanNode) leftScanNode, rhsJoinExprs); + } else if (leftScanNode instanceof HiveScanNode) { + return canBucketShuffleJoinForHive(node, (HiveScanNode) leftScanNode, rhsJoinExprs, hashType); + } else { + return false; + } + } + + private boolean canBucketShuffleJoinForHive(HashJoinNode node, HiveScanNode leftScanNode, List rhsJoinExprs, Ref hashType) { HMSExternalTable leftTable = leftScanNode.getHiveTable(); @@ -713,7 +719,7 @@ private boolean canBucketShuffleJoin(HashJoinNode node, HiveScanNode leftScanNod } //the join expr must contian left table distribute column - private boolean canBucketShuffleJoin(HashJoinNode node, OlapScanNode leftScanNode, + private boolean canBucketShuffleJoinForOlap(HashJoinNode node, OlapScanNode leftScanNode, List rhsJoinExprs) { OlapTable leftTable = leftScanNode.getOlapTable(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index bfaec73287d167..cc93950ca86e3e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -2291,13 +2291,8 @@ private void computeScanRangeAssignment() throws Exception { computeScanRangeAssignmentByColocate((OlapScanNode) scanNode, assignedBytesPerHost, replicaNumPerHost); } if (fragmentContainsBucketShuffleJoin) { - if (scanNode instanceof OlapScanNode) { - bucketShuffleJoinController.computeScanRangeAssignmentByBucket((OlapScanNode) scanNode, - idToBackend, addressToBackendID, replicaNumPerHost); - } else if (scanNode instanceof HiveScanNode) { - bucketShuffleJoinController.computeScanRangeAssignmentByBucket((HiveScanNode) scanNode, - idToBackend, addressToBackendID, replicaNumPerHost); - } + bucketShuffleJoinController.computeScanRangeAssignmentByBucket(scanNode, + idToBackend, addressToBackendID, replicaNumPerHost); } if (!(fragmentContainsColocateJoin || fragmentContainsBucketShuffleJoin)) { computeScanRangeAssignmentByScheduler(scanNode, locations, assignment, assignedBytesPerHost, @@ -2922,8 +2917,21 @@ private void getExecHostPortForFragmentIDAndBucketSeq(TScanRangeLocations seqLoc this.fragmentIdToSeqToAddressMap.get(fragmentId).put(bucketSeq, execHostPort); } - // to ensure the same bucketSeq tablet to the same execHostPort private void computeScanRangeAssignmentByBucket( + final ScanNode scanNode, ImmutableMap idToBackend, + Map addressToBackendID, + Map replicaNumPerHost) throws Exception { + if (scanNode instanceof OlapScanNode) { + computeScanRangeAssignmentByBucketForOlap((OlapScanNode) scanNode, idToBackend, addressToBackendID, + replicaNumPerHost); + } else if (scanNode instanceof HiveScanNode) { + computeScanRangeAssignmentByBucketForHive((HiveScanNode) scanNode, idToBackend, addressToBackendID, + replicaNumPerHost); + } + } + + // to ensure the same bucketSeq tablet to the same execHostPort + private void computeScanRangeAssignmentByBucketForOlap( final OlapScanNode scanNode, ImmutableMap idToBackend, Map addressToBackendID, Map replicaNumPerHost) throws Exception { @@ -2974,13 +2982,13 @@ private void computeScanRangeAssignmentByBucket( } } - private void computeScanRangeAssignmentByBucket( + private void computeScanRangeAssignmentByBucketForHive( final HiveScanNode scanNode, ImmutableMap idToBackend, Map addressToBackendID, Map replicaNumPerHost) throws Exception { if (!fragmentIdToSeqToAddressMap.containsKey(scanNode.getFragmentId())) { int bucketNum = 0; - if (scanNode.getHiveTable().isBucketedTable()) { + if (scanNode.getHiveTable().isSparkBucketedTable()) { bucketNum = scanNode.getHiveTable().getDefaultDistributionInfo().getBucketNum(); } else { throw new NotImplementedException("bucket shuffle for non-bucketed table not supported");