From a5ce2395a2ea0752c987d764edc0b39f902f82ce Mon Sep 17 00:00:00 2001 From: Nitin Kashyap Date: Mon, 27 Nov 2023 13:20:53 +0530 Subject: [PATCH] [feature](datalake) Add BucketShuffleJoin support for Hive table data generated by Spark. (27783) 1. Original planner updated to consider BucketShuffle for bucketed hive table 2. Neerids planner updated for bucketShuffle join on hive tables. 3. Added spark style hash calculation in BE for shuffle on one side. 4. Added shuffle hash selection based on left(non-shuffling) side. --- be/src/util/hash_util.hpp | 8 + be/src/vec/columns/column.h | 27 +++ be/src/vec/columns/column_array.cpp | 54 ++++++ be/src/vec/columns/column_array.h | 6 + be/src/vec/columns/column_const.cpp | 18 ++ be/src/vec/columns/column_const.h | 9 + be/src/vec/columns/column_decimal.cpp | 50 ++++++ be/src/vec/columns/column_decimal.h | 14 +- be/src/vec/columns/column_map.cpp | 54 ++++++ be/src/vec/columns/column_map.h | 6 + be/src/vec/columns/column_nullable.cpp | 38 +++++ be/src/vec/columns/column_nullable.h | 5 + be/src/vec/columns/column_string.cpp | 23 +++ be/src/vec/columns/column_string.h | 23 +++ be/src/vec/columns/column_struct.cpp | 15 ++ be/src/vec/columns/column_struct.h | 6 + be/src/vec/columns/column_vector.cpp | 35 ++++ be/src/vec/columns/column_vector.h | 23 +++ be/src/vec/runtime/partitioner.cpp | 22 +++ be/src/vec/runtime/partitioner.h | 14 ++ be/src/vec/sink/vdata_stream_sender.cpp | 14 +- be/src/vec/sink/vdata_stream_sender.h | 8 + .../scripts/create_preinstalled_table.hql | 47 +++++- .../catalog/HiveExternalDistributionInfo.java | 95 +++++++++++ .../datasource/FederationBackendPolicy.java | 10 ++ .../doris/datasource/FileQueryScanNode.java | 27 +++ .../datasource/hive/HMSExternalTable.java | 132 ++++++++++++++- .../doris/datasource/hive/HiveBucketUtil.java | 5 +- .../datasource/hive/source/HiveScanNode.java | 40 +++++ .../translator/PhysicalPlanTranslator.java | 29 +++- .../ChildOutputPropertyDeriver.java | 4 +- .../ChildrenPropertiesRegulator.java | 2 +- .../properties/DistributionSpecHash.java | 50 ++++-- .../EnforceMissingPropertiesHelper.java | 4 +- .../LogicalFileScanToPhysicalFileScan.java | 50 +++++- .../apache/doris/planner/DataPartition.java | 23 ++- .../doris/planner/DistributedPlanner.java | 93 ++++++++++- .../apache/doris/planner/OlapScanNode.java | 3 +- .../java/org/apache/doris/qe/Coordinator.java | 55 +++++- .../apache/doris/planner/QueryPlanTest.java | 14 +- .../org/apache/doris/qe/CoordinatorTest.java | 7 +- gensrc/thrift/Ddl.thrift | 6 +- gensrc/thrift/Partitions.thrift | 7 + .../hive/test_hive_spark_clustered_table.out | 158 ++++++++++++++++++ .../test_hive_spark_clustered_table.groovy | 67 ++++++++ .../join/bucket_shuffle_join.groovy | 2 +- 46 files changed, 1342 insertions(+), 60 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/catalog/HiveExternalDistributionInfo.java create mode 100644 regression-test/data/external_table_p0/hive/test_hive_spark_clustered_table.out create mode 100644 regression-test/suites/external_table_p0/hive/test_hive_spark_clustered_table.groovy diff --git a/be/src/util/hash_util.hpp b/be/src/util/hash_util.hpp index 402797a8e35ce3..afae30fec99cf4 100644 --- a/be/src/util/hash_util.hpp +++ b/be/src/util/hash_util.hpp @@ -133,6 +133,9 @@ class HashUtil { // refer to https://github.com/apache/commons-codec/blob/master/src/main/java/org/apache/commons/codec/digest/MurmurHash3.java static const uint32_t MURMUR3_32_SEED = 104729; + // refer https://github.com/apache/spark/blob/v3.5.0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala#L615 + static const uint32_t SPARK_MURMUR_32_SEED = 42; + // modify from https://github.com/aappleby/smhasher/blob/master/src/MurmurHash3.cpp static uint32_t murmur_hash3_32(const void* key, int32_t len, uint32_t seed) { uint32_t out = 0; @@ -140,6 +143,11 @@ class HashUtil { return out; } + static uint32_t murmur_hash3_32_null(uint32_t seed) { + static const int INT_VALUE = 0; + return murmur_hash3_32((const unsigned char*)(&INT_VALUE), 4, seed); + } + static const int MURMUR_R = 47; // Murmur2 hash implementation returning 64-bit hashes. diff --git a/be/src/vec/columns/column.h b/be/src/vec/columns/column.h index bf869961544817..45dbfeac64205e 100644 --- a/be/src/vec/columns/column.h +++ b/be/src/vec/columns/column.h @@ -58,6 +58,18 @@ class SipHash; } \ } +#define DO_MURMUR_HASHES_FUNCTION_COLUMN_IMPL(SEED) \ + if (null_data == nullptr) { \ + for (size_t i = 0; i < s; i++) { \ + hashes[i] = HashUtil::murmur_hash3_32(&data[i], sizeof(T), SEED); \ + } \ + } else { \ + for (size_t i = 0; i < s; i++) { \ + if (null_data[i] == 0) \ + hashes[i] = HashUtil::murmur_hash3_32(&data[i], sizeof(T), SEED); \ + } \ + } + namespace doris::vectorized { class Arena; @@ -379,6 +391,21 @@ class IColumn : public COW { LOG(FATAL) << get_name() << " update_crc_with_value not supported"; } + /// Update state of murmur3 hash function (spark files) with value of n elements to avoid the virtual + /// function call null_data to mark whether need to do hash compute, null_data == nullptr + /// means all element need to do hash function, else only *null_data != 0 need to do hash func + virtual void update_murmurs_with_value(int32_t* __restrict hash, PrimitiveType type, + int32_t rows, uint32_t offset = 0, + const uint8_t* __restrict null_data = nullptr) const { + LOG(FATAL) << get_name() << "update_murmurs_with_value not supported"; + } + + // use range for one hash value to avoid virtual function call in loop + virtual void update_murmur_with_value(size_t start, size_t end, int32_t& hash, + const uint8_t* __restrict null_data) const { + LOG(FATAL) << get_name() << " update_murmur_with_value not supported"; + } + /** Removes elements that don't match the filter. * Is used in WHERE and HAVING operations. * If result_size_hint > 0, then makes advance reserve(result_size_hint) for the result column; diff --git a/be/src/vec/columns/column_array.cpp b/be/src/vec/columns/column_array.cpp index 86d31c9223baf0..84dd75f7e1f9c1 100644 --- a/be/src/vec/columns/column_array.cpp +++ b/be/src/vec/columns/column_array.cpp @@ -389,6 +389,60 @@ void ColumnArray::update_crcs_with_value(uint32_t* __restrict hash, PrimitiveTyp } } +// for every array row calculate murmurHash +void ColumnArray::update_murmur_with_value(size_t start, size_t end, int32_t& hash, + const uint8_t* __restrict null_data) const { + auto& offsets_column = get_offsets(); + if (hash == 0) { + hash = HashUtil::SPARK_MURMUR_32_SEED; + } + if (null_data) { + for (size_t i = start; i < end; ++i) { + if (null_data[i] == 0) { + size_t elem_size = offsets_column[i] - offsets_column[i - 1]; + if (elem_size == 0) { + hash = HashUtil::murmur_hash3_32(reinterpret_cast(&elem_size), + sizeof(elem_size), hash); + } else { + get_data().update_murmur_with_value(offsets_column[i - 1], offsets_column[i], + hash, nullptr); + } + } + } + } else { + for (size_t i = start; i < end; ++i) { + size_t elem_size = offsets_column[i] - offsets_column[i - 1]; + if (elem_size == 0) { + hash = HashUtil::murmur_hash3_32(reinterpret_cast(&elem_size), + sizeof(elem_size), hash); + } else { + get_data().update_murmur_with_value(offsets_column[i - 1], offsets_column[i], hash, + nullptr); + } + } + } +} + +void ColumnArray::update_murmurs_with_value(int32_t* __restrict hash, PrimitiveType type, + int32_t rows, uint32_t offset, + const uint8_t* __restrict null_data) const { + auto s = rows; + DCHECK(s == size()); + + if (null_data) { + for (size_t i = 0; i < s; ++i) { + // every row + if (null_data[i] == 0) { + update_murmur_with_value(i, i + 1, hash[i], nullptr); + } + } + } else { + for (size_t i = 0; i < s; ++i) { + update_murmur_with_value(i, i + 1, hash[i], nullptr); + } + } +} + void ColumnArray::insert(const Field& x) { if (x.is_null()) { get_data().insert(Null()); diff --git a/be/src/vec/columns/column_array.h b/be/src/vec/columns/column_array.h index 17408bfa63311e..50aa4322ec2f1e 100644 --- a/be/src/vec/columns/column_array.h +++ b/be/src/vec/columns/column_array.h @@ -141,6 +141,8 @@ class ColumnArray final : public COWHelper { const uint8_t* __restrict null_data) const override; void update_crc_with_value(size_t start, size_t end, uint32_t& hash, const uint8_t* __restrict null_data) const override; + void update_murmur_with_value(size_t start, size_t end, int32_t& hash, + const uint8_t* __restrict null_data) const override; void update_hashes_with_value(uint64_t* __restrict hashes, const uint8_t* __restrict null_data = nullptr) const override; @@ -149,6 +151,10 @@ class ColumnArray final : public COWHelper { uint32_t offset = 0, const uint8_t* __restrict null_data = nullptr) const override; + void update_murmurs_with_value(int32_t* __restrict hash, PrimitiveType type, int32_t rows, + uint32_t offset = 0, + const uint8_t* __restrict null_data = nullptr) const override; + void insert_range_from(const IColumn& src, size_t start, size_t length) override; void insert(const Field& x) override; void insert_from(const IColumn& src_, size_t n) override; diff --git a/be/src/vec/columns/column_const.cpp b/be/src/vec/columns/column_const.cpp index e06e53b42897b9..95bffcb277e2ec 100644 --- a/be/src/vec/columns/column_const.cpp +++ b/be/src/vec/columns/column_const.cpp @@ -126,6 +126,24 @@ void ColumnConst::update_hashes_with_value(uint64_t* __restrict hashes, } } +void ColumnConst::update_murmurs_with_value(int32_t* __restrict hashes, doris::PrimitiveType type, + int32_t rows, uint32_t offset, + const uint8_t* __restrict null_data) const { + DCHECK(null_data == nullptr); + DCHECK(rows == size()); + auto real_data = data->get_data_at(0); + if (real_data.data == nullptr) { + for (int i = 0; i < rows; ++i) { + hashes[i] = HashUtil::murmur_hash3_32_null(HashUtil::SPARK_MURMUR_32_SEED); + } + } else { + for (int i = 0; i < rows; ++i) { + hashes[i] = HashUtil::murmur_hash3_32(real_data.data, real_data.size, + HashUtil::SPARK_MURMUR_32_SEED); + } + } +} + MutableColumns ColumnConst::scatter(ColumnIndex num_columns, const Selector& selector) const { if (s != selector.size()) { LOG(FATAL) << fmt::format("Size of selector ({}) doesn't match size of column ({})", diff --git a/be/src/vec/columns/column_const.h b/be/src/vec/columns/column_const.h index 746cb00fd5dd38..b5b1993c8ca07b 100644 --- a/be/src/vec/columns/column_const.h +++ b/be/src/vec/columns/column_const.h @@ -168,6 +168,11 @@ class ColumnConst final : public COWHelper { get_data_column_ptr()->update_crc_with_value(start, end, hash, nullptr); } + void update_murmur_with_value(size_t start, size_t end, int32_t& hash, + const uint8_t* __restrict null_data) const override { + get_data_column_ptr()->update_murmur_with_value(start, end, hash, nullptr); + } + void serialize_vec_with_null_map(std::vector& keys, size_t num_rows, const uint8_t* null_map) const override { data->serialize_vec_with_null_map(keys, num_rows, null_map); @@ -185,6 +190,10 @@ class ColumnConst final : public COWHelper { void update_hashes_with_value(uint64_t* __restrict hashes, const uint8_t* __restrict null_data) const override; + void update_murmurs_with_value(int32_t* __restrict hashes, PrimitiveType type, int32_t rows, + uint32_t offset = 0, + const uint8_t* __restrict null_data = nullptr) const override; + ColumnPtr filter(const Filter& filt, ssize_t result_size_hint) const override; size_t filter(const Filter& filter) override; diff --git a/be/src/vec/columns/column_decimal.cpp b/be/src/vec/columns/column_decimal.cpp index 9a05a8d68dfd3c..77796c6ab5bcf5 100644 --- a/be/src/vec/columns/column_decimal.cpp +++ b/be/src/vec/columns/column_decimal.cpp @@ -177,6 +177,56 @@ void ColumnDecimal::update_crcs_with_value(uint32_t* __restrict hashes, Primi } } +template +void ColumnDecimal::update_murmur_with_value(size_t start, size_t end, int32_t& hash, + const uint8_t* __restrict null_data) const { + if (null_data == nullptr) { + for (size_t i = start; i < end; i++) { + if constexpr (!IsDecimalV2) { + hash = HashUtil::murmur_hash3_32(&data[i], sizeof(T), + HashUtil::SPARK_MURMUR_32_SEED); + } else { + decimalv2_do_murmur(i, hash); + } + } + } else { + for (size_t i = start; i < end; i++) { + if (null_data[i] == 0) { + if constexpr (!IsDecimalV2) { + hash = HashUtil::murmur_hash3_32(&data[i], sizeof(T), + HashUtil::SPARK_MURMUR_32_SEED); + } else { + decimalv2_do_murmur(i, hash); + } + } + } + } +} + +template +void ColumnDecimal::update_murmurs_with_value(int32_t* __restrict hashes, PrimitiveType type, + int32_t rows, uint32_t offset, + const uint8_t* __restrict null_data) const { + auto s = rows; + DCHECK(s == size()); + + if constexpr (!IsDecimalV2) { + DO_MURMUR_HASHES_FUNCTION_COLUMN_IMPL(HashUtil::SPARK_MURMUR_32_SEED) + } else { + if (null_data == nullptr) { + for (size_t i = 0; i < s; i++) { + decimalv2_do_murmur(i, hashes[i]); + } + } else { + for (size_t i = 0; i < s; i++) { + if (null_data[i] == 0) { + decimalv2_do_murmur(i, hashes[i]); + } + } + } + } +} + template void ColumnDecimal::update_xxHash_with_value(size_t start, size_t end, uint64_t& hash, const uint8_t* __restrict null_data) const { diff --git a/be/src/vec/columns/column_decimal.h b/be/src/vec/columns/column_decimal.h index 920f6ad8438fa5..eca1bd8708ef09 100644 --- a/be/src/vec/columns/column_decimal.h +++ b/be/src/vec/columns/column_decimal.h @@ -181,12 +181,16 @@ class ColumnDecimal final : public COWHelper diff --git a/be/src/vec/columns/column_map.cpp b/be/src/vec/columns/column_map.cpp index c1c668ef07cd8b..fdd89d466c1b97 100644 --- a/be/src/vec/columns/column_map.cpp +++ b/be/src/vec/columns/column_map.cpp @@ -332,6 +332,40 @@ void ColumnMap::update_crc_with_value(size_t start, size_t end, uint32_t& hash, } } +void ColumnMap::update_murmur_with_value(size_t start, size_t end, int32_t& hash, + const uint8_t* __restrict null_data) const { + auto& offsets = get_offsets(); + if (hash == 0) { + hash = HashUtil::SPARK_MURMUR_32_SEED; + } + if (null_data) { + for (size_t i = start; i < end; ++i) { + if (null_data[i] == 0) { + size_t kv_size = offsets[i] - offsets[i - 1]; + if (kv_size == 0) { + hash = HashUtil::murmur_hash3_32(reinterpret_cast(&kv_size), + sizeof(kv_size), hash); + } else { + get_keys().update_murmur_with_value(offsets[i - 1], offsets[i], hash, nullptr); + get_values().update_murmur_with_value(offsets[i - 1], offsets[i], hash, + nullptr); + } + } + } + } else { + for (size_t i = start; i < end; ++i) { + size_t kv_size = offsets[i] - offsets[i - 1]; + if (kv_size == 0) { + hash = HashUtil::murmur_hash3_32(reinterpret_cast(&kv_size), + sizeof(kv_size), hash); + } else { + get_keys().update_murmur_with_value(offsets[i - 1], offsets[i], hash, nullptr); + get_values().update_murmur_with_value(offsets[i - 1], offsets[i], hash, nullptr); + } + } + } +} + void ColumnMap::update_hashes_with_value(uint64_t* hashes, const uint8_t* null_data) const { size_t s = size(); if (null_data) { @@ -367,6 +401,26 @@ void ColumnMap::update_crcs_with_value(uint32_t* __restrict hash, PrimitiveType } } +void ColumnMap::update_murmurs_with_value(int32_t* __restrict hash, PrimitiveType type, + int32_t rows, uint32_t offset, + const uint8_t* __restrict null_data) const { + auto s = rows; + DCHECK(s == size()); + + if (null_data) { + for (size_t i = 0; i < s; ++i) { + // every row + if (null_data[i] == 0) { + update_murmur_with_value(i, i + 1, hash[i], nullptr); + } + } + } else { + for (size_t i = 0; i < s; ++i) { + update_murmur_with_value(i, i + 1, hash[i], nullptr); + } + } +} + void ColumnMap::insert_range_from(const IColumn& src, size_t start, size_t length) { if (length == 0) { return; diff --git a/be/src/vec/columns/column_map.h b/be/src/vec/columns/column_map.h index 0e6ad3c3d91cf2..b03249038071cc 100644 --- a/be/src/vec/columns/column_map.h +++ b/be/src/vec/columns/column_map.h @@ -181,6 +181,8 @@ class ColumnMap final : public COWHelper { const uint8_t* __restrict null_data) const override; void update_crc_with_value(size_t start, size_t end, uint32_t& hash, const uint8_t* __restrict null_data) const override; + void update_murmur_with_value(size_t start, size_t end, int32_t& hash, + const uint8_t* __restrict null_data) const override; void update_hashes_with_value(uint64_t* __restrict hashes, const uint8_t* __restrict null_data = nullptr) const override; @@ -189,6 +191,10 @@ class ColumnMap final : public COWHelper { uint32_t offset = 0, const uint8_t* __restrict null_data = nullptr) const override; + void update_murmurs_with_value(int32_t* __restrict hash, PrimitiveType type, int32_t rows, + uint32_t offset = 0, + const uint8_t* __restrict null_data = nullptr) const override; + /******************** keys and values ***************/ const ColumnPtr& get_keys_ptr() const { return keys_column; } ColumnPtr& get_keys_ptr() { return keys_column; } diff --git a/be/src/vec/columns/column_nullable.cpp b/be/src/vec/columns/column_nullable.cpp index e20b87af826aba..0cdf3a6536eb76 100644 --- a/be/src/vec/columns/column_nullable.cpp +++ b/be/src/vec/columns/column_nullable.cpp @@ -85,6 +85,23 @@ void ColumnNullable::update_crc_with_value(size_t start, size_t end, uint32_t& h } } +void ColumnNullable::update_murmur_with_value(size_t start, size_t end, int32_t& hash, + const uint8_t* __restrict null_data) const { + if (!has_null()) { + nested_column->update_murmur_with_value(start, end, hash, nullptr); + } else { + const auto* __restrict real_null_data = + assert_cast(*null_map).get_data().data(); + hash = HashUtil::SPARK_MURMUR_32_SEED; + for (int i = start; i < end; ++i) { + if (real_null_data[i] != 0) { + hash = HashUtil::murmur_hash3_32_null(hash); + } + } + nested_column->update_murmur_with_value(start, end, hash, real_null_data); + } +} + void ColumnNullable::update_hash_with_value(size_t n, SipHash& hash) const { if (is_null_at(n)) { hash.update(0); @@ -113,6 +130,27 @@ void ColumnNullable::update_crcs_with_value(uint32_t* __restrict hashes, doris:: } } +void ColumnNullable::update_murmurs_with_value(int32_t* __restrict hashes, + doris::PrimitiveType type, int32_t rows, + uint32_t offset, + const uint8_t* __restrict null_data) const { + DCHECK(null_data == nullptr); + auto s = rows; + DCHECK(s == size()); + const auto* __restrict real_null_data = + assert_cast(*null_map).get_data().data(); + if (!has_null()) { + nested_column->update_murmurs_with_value(hashes, type, rows, offset, nullptr); + } else { + for (int i = 0; i < s; ++i) { + if (real_null_data[i] != 0) { + hashes[i] = HashUtil::murmur_hash3_32_null(HashUtil::SPARK_MURMUR_32_SEED); + } + } + nested_column->update_murmurs_with_value(hashes, type, rows, offset, real_null_data); + } +} + void ColumnNullable::update_hashes_with_value(uint64_t* __restrict hashes, const uint8_t* __restrict null_data) const { DCHECK(null_data == nullptr); diff --git a/be/src/vec/columns/column_nullable.h b/be/src/vec/columns/column_nullable.h index de01907650e691..77ced88c012d61 100644 --- a/be/src/vec/columns/column_nullable.h +++ b/be/src/vec/columns/column_nullable.h @@ -217,6 +217,8 @@ class ColumnNullable final : public COWHelper { const uint8_t* __restrict null_data) const override; void update_crc_with_value(size_t start, size_t end, uint32_t& hash, const uint8_t* __restrict null_data) const override; + void update_murmur_with_value(size_t start, size_t end, int32_t& hash, + const uint8_t* __restrict null_data) const override; void update_hash_with_value(size_t n, SipHash& hash) const override; void update_crcs_with_value(uint32_t* __restrict hash, PrimitiveType type, uint32_t rows, @@ -224,6 +226,9 @@ class ColumnNullable final : public COWHelper { const uint8_t* __restrict null_data) const override; void update_hashes_with_value(uint64_t* __restrict hashes, const uint8_t* __restrict null_data) const override; + void update_murmurs_with_value(int32_t* __restrict hash, PrimitiveType type, int32_t rows, + uint32_t offset, + const uint8_t* __restrict null_data) const override; MutableColumns scatter(ColumnIndex num_columns, const Selector& selector) const override { return scatter_impl(num_columns, selector); diff --git a/be/src/vec/columns/column_string.cpp b/be/src/vec/columns/column_string.cpp index 76db0e58c44580..d921b9015ec8f0 100644 --- a/be/src/vec/columns/column_string.cpp +++ b/be/src/vec/columns/column_string.cpp @@ -180,6 +180,29 @@ void ColumnString::update_crcs_with_value(uint32_t* __restrict hashes, doris::Pr } } +void ColumnString::update_murmurs_with_value(int32_t* __restrict hashes, doris::PrimitiveType type, + int32_t rows, uint32_t offset, + const uint8_t* __restrict null_data) const { + auto s = rows; + DCHECK(s == size()); + + if (null_data == nullptr) { + for (size_t i = 0; i < s; i++) { + auto data_ref = get_data_at(i); + hashes[i] = HashUtil::murmur_hash3_32(data_ref.data, data_ref.size, + HashUtil::SPARK_MURMUR_32_SEED); + } + } else { + for (size_t i = 0; i < s; i++) { + if (null_data[i] == 0) { + auto data_ref = get_data_at(i); + hashes[i] = HashUtil::murmur_hash3_32(data_ref.data, data_ref.size, + HashUtil::SPARK_MURMUR_32_SEED); + } + } + } +} + ColumnPtr ColumnString::filter(const Filter& filt, ssize_t result_size_hint) const { if (offsets.size() == 0) { return ColumnString::create(); diff --git a/be/src/vec/columns/column_string.h b/be/src/vec/columns/column_string.h index 9bf43c9c627815..d7b7461560e9c0 100644 --- a/be/src/vec/columns/column_string.h +++ b/be/src/vec/columns/column_string.h @@ -442,6 +442,25 @@ class ColumnString final : public COWHelper { } } + void update_murmur_with_value(size_t start, size_t end, int32_t& hash, + const uint8_t* __restrict null_data) const override { + if (null_data) { + for (size_t i = start; i < end; ++i) { + if (null_data[i] == 0) { + auto data_ref = get_data_at(i); + hash = HashUtil::murmur_hash3_32(data_ref.data, data_ref.size, + HashUtil::SPARK_MURMUR_32_SEED); + } + } + } else { + for (size_t i = start; i < end; ++i) { + auto data_ref = get_data_at(i); + hash = HashUtil::murmur_hash3_32(data_ref.data, data_ref.size, + HashUtil::SPARK_MURMUR_32_SEED); + } + } + } + void update_hash_with_value(size_t n, SipHash& hash) const override { size_t string_size = size_at(n); size_t offset = offset_at(n); @@ -455,6 +474,10 @@ class ColumnString final : public COWHelper { uint32_t offset, const uint8_t* __restrict null_data) const override; + void update_murmurs_with_value(int32_t* __restrict hashes, PrimitiveType type, int32_t rows, + uint32_t offset, + const uint8_t* __restrict null_data) const override; + void update_hashes_with_value(uint64_t* __restrict hashes, const uint8_t* __restrict null_data) const override { auto s = size(); diff --git a/be/src/vec/columns/column_struct.cpp b/be/src/vec/columns/column_struct.cpp index 290452a1c8b663..6b456290c06854 100644 --- a/be/src/vec/columns/column_struct.cpp +++ b/be/src/vec/columns/column_struct.cpp @@ -220,6 +220,13 @@ void ColumnStruct::update_crc_with_value(size_t start, size_t end, uint32_t& has } } +void ColumnStruct::update_murmur_with_value(size_t start, size_t end, int32_t& hash, + const uint8_t* __restrict null_data) const { + for (const auto& column : columns) { + column->update_murmur_with_value(start, end, hash, nullptr); + } +} + void ColumnStruct::update_hashes_with_value(uint64_t* __restrict hashes, const uint8_t* __restrict null_data) const { for (const auto& column : columns) { @@ -235,6 +242,14 @@ void ColumnStruct::update_crcs_with_value(uint32_t* __restrict hash, PrimitiveTy } } +void ColumnStruct::update_murmurs_with_value(int32_t* __restrict hash, PrimitiveType type, + int32_t rows, uint32_t offset, + const uint8_t* __restrict null_data) const { + for (const auto& column : columns) { + column->update_murmurs_with_value(hash, type, rows, offset, null_data); + } +} + void ColumnStruct::insert_indices_from(const IColumn& src, const uint32_t* indices_begin, const uint32_t* indices_end) { const auto& src_concrete = assert_cast(src); diff --git a/be/src/vec/columns/column_struct.h b/be/src/vec/columns/column_struct.h index d91d38006485db..a81eac001abaa9 100644 --- a/be/src/vec/columns/column_struct.h +++ b/be/src/vec/columns/column_struct.h @@ -112,6 +112,8 @@ class ColumnStruct final : public COWHelper { const uint8_t* __restrict null_data) const override; void update_crc_with_value(size_t start, size_t end, uint32_t& hash, const uint8_t* __restrict null_data) const override; + void update_murmur_with_value(size_t start, size_t end, int32_t& hash, + const uint8_t* __restrict null_data) const override; void update_hashes_with_value(uint64_t* __restrict hashes, const uint8_t* __restrict null_data = nullptr) const override; @@ -120,6 +122,10 @@ class ColumnStruct final : public COWHelper { uint32_t offset = 0, const uint8_t* __restrict null_data = nullptr) const override; + void update_murmurs_with_value(int32_t* __restrict hash, PrimitiveType type, int32_t rows, + uint32_t offset = 0, + const uint8_t* __restrict null_data = nullptr) const override; + void insert_indices_from(const IColumn& src, const uint32_t* indices_begin, const uint32_t* indices_end) override; diff --git a/be/src/vec/columns/column_vector.cpp b/be/src/vec/columns/column_vector.cpp index 97fadcb4071052..e968ee6a9c4acb 100644 --- a/be/src/vec/columns/column_vector.cpp +++ b/be/src/vec/columns/column_vector.cpp @@ -196,6 +196,41 @@ void ColumnVector::update_crcs_with_value(uint32_t* __restrict hashes, Primit } } +template +void ColumnVector::update_murmurs_with_value(int32_t* __restrict hashes, PrimitiveType type, + int32_t rows, uint32_t offset, + const uint8_t* __restrict null_data) const { + auto s = rows; + DCHECK(s == size()); + + if constexpr (!std::is_same_v) { + DO_MURMUR_HASHES_FUNCTION_COLUMN_IMPL(HashUtil::SPARK_MURMUR_32_SEED) + } else { + if (type == TYPE_DATE || type == TYPE_DATETIME) { + char buf[64]; + auto date_convert_do_crc = [&](size_t i) { + const VecDateTimeValue& date_val = (const VecDateTimeValue&)data[i]; + auto len = date_val.to_buffer(buf); + hashes[i] = HashUtil::murmur_hash3_32(buf, len, HashUtil::SPARK_MURMUR_32_SEED); + }; + + if (null_data == nullptr) { + for (size_t i = 0; i < s; i++) { + date_convert_do_crc(i); + } + } else { + for (size_t i = 0; i < s; i++) { + if (null_data[i] == 0) { + date_convert_do_crc(i); + } + } + } + } else { + DO_MURMUR_HASHES_FUNCTION_COLUMN_IMPL(HashUtil::SPARK_MURMUR_32_SEED) + } + } +} + template struct ColumnVector::less { const Self& parent; diff --git a/be/src/vec/columns/column_vector.h b/be/src/vec/columns/column_vector.h index ff9197df357a3c..3465530b87fe65 100644 --- a/be/src/vec/columns/column_vector.h +++ b/be/src/vec/columns/column_vector.h @@ -319,12 +319,35 @@ class ColumnVector final : public COWHelper> } } } + + void update_murmur_with_value(size_t start, size_t end, int32_t& hash, + const uint8_t* __restrict null_data) const override { + if (hash == 0) { + hash = HashUtil::SPARK_MURMUR_32_SEED; + } + if (null_data) { + for (size_t i = start; i < end; i++) { + if (null_data[i] == 0) { + hash = HashUtil::murmur_hash3_32(&data[i], sizeof(T), hash); + } + } + } else { + for (size_t i = start; i < end; i++) { + hash = HashUtil::murmur_hash3_32(&data[i], sizeof(T), hash); + } + } + } + void update_hash_with_value(size_t n, SipHash& hash) const override; void update_crcs_with_value(uint32_t* __restrict hashes, PrimitiveType type, uint32_t rows, uint32_t offset, const uint8_t* __restrict null_data) const override; + void update_murmurs_with_value(int32_t* __restrict hashes, PrimitiveType type, int32_t rows, + uint32_t offset, + const uint8_t* __restrict null_data) const override; + void update_hashes_with_value(uint64_t* __restrict hashes, const uint8_t* __restrict null_data) const override; diff --git a/be/src/vec/runtime/partitioner.cpp b/be/src/vec/runtime/partitioner.cpp index db40610723cdb6..81085eda85f446 100644 --- a/be/src/vec/runtime/partitioner.cpp +++ b/be/src/vec/runtime/partitioner.cpp @@ -71,6 +71,13 @@ void XXHashPartitioner::_do_hash(const ColumnPtr& column, uint64_t* column->update_hashes_with_value(result); } +template +void Murmur32HashPartitioner::_do_hash(const ColumnPtr& column, + int32_t* __restrict result, int idx) const { + column->update_murmurs_with_value(result, Base::_partition_expr_ctxs[idx]->root()->type().type, + column->size()); +} + template Status XXHashPartitioner::clone(RuntimeState* state, std::unique_ptr& partitioner) { @@ -97,6 +104,19 @@ Status Crc32HashPartitioner::clone(RuntimeState* state, return Status::OK(); } +template +Status Murmur32HashPartitioner::clone(RuntimeState* state, + std::unique_ptr& partitioner) { + auto* new_partitioner = new Murmur32HashPartitioner(Base::_partition_count); + partitioner.reset(new_partitioner); + new_partitioner->_partition_expr_ctxs.resize(Base::_partition_expr_ctxs.size()); + for (size_t i = 0; i < Base::_partition_expr_ctxs.size(); i++) { + RETURN_IF_ERROR(Base::_partition_expr_ctxs[i]->clone( + state, new_partitioner->_partition_expr_ctxs[i])); + } + return Status::OK(); +} + template class Partitioner; template class XXHashPartitioner; template class Partitioner; @@ -104,5 +124,7 @@ template class XXHashPartitioner; template class Partitioner; template class Crc32HashPartitioner; template class Crc32HashPartitioner; +template class Murmur32HashPartitioner; +template class Murmur32HashPartitioner; } // namespace doris::vectorized diff --git a/be/src/vec/runtime/partitioner.h b/be/src/vec/runtime/partitioner.h index 66ed8809d7ce7c..92d2698c1f4c80 100644 --- a/be/src/vec/runtime/partitioner.h +++ b/be/src/vec/runtime/partitioner.h @@ -112,5 +112,19 @@ class Crc32HashPartitioner final : public Partitioner { void _do_hash(const ColumnPtr& column, uint32_t* __restrict result, int idx) const override; }; +template +class Murmur32HashPartitioner final : public Partitioner { +public: + using Base = Partitioner; + Murmur32HashPartitioner(int partition_count) + : Partitioner(partition_count) {} + ~Murmur32HashPartitioner() override = default; + + Status clone(RuntimeState* state, std::unique_ptr& partitioner) override; + +private: + void _do_hash(const ColumnPtr& column, int32_t* __restrict result, int idx) const override; +}; + } // namespace vectorized } // namespace doris diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp index b6e4d0b962b843..1036097689ca07 100644 --- a/be/src/vec/sink/vdata_stream_sender.cpp +++ b/be/src/vec/sink/vdata_stream_sender.cpp @@ -328,6 +328,7 @@ VDataStreamSender::VDataStreamSender(RuntimeState* state, ObjectPool* pool, int _pool(pool), _current_channel_idx(0), _part_type(sink.output_partition.type), + _hash_type(sink.output_partition.hash_type), _dest_node_id(sink.dest_node_id), _transfer_large_data_by_brpc(config::transfer_large_data_by_brpc), _serializer(this) { @@ -338,6 +339,9 @@ VDataStreamSender::VDataStreamSender(RuntimeState* state, ObjectPool* pool, int sink.output_partition.type == TPartitionType::RANGE_PARTITIONED || sink.output_partition.type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED || sink.output_partition.type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED); + DCHECK(sink.output_partition.hash_type == THashType::CRC32 || + sink.output_partition.hash_type == THashType::XXHASH64 || + sink.output_partition.hash_type == THashType::SPARK_MURMUR32); std::map fragment_id_to_channel_index; _enable_pipeline_exec = state->enable_pipeline_exec(); @@ -383,6 +387,7 @@ VDataStreamSender::VDataStreamSender(RuntimeState* state, ObjectPool* pool, int _pool(pool), _current_channel_idx(0), _part_type(TPartitionType::UNPARTITIONED), + _hash_type(THashType::XXHASH64), _dest_node_id(dest_node_id), _serializer(this) { _cur_pb_block = &_pb_block1; @@ -415,8 +420,13 @@ Status VDataStreamSender::init(const TDataSink& tsink) { RETURN_IF_ERROR(_partitioner->init(t_stream_sink.output_partition.partition_exprs)); } else if (_part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED) { _partition_count = _channel_shared_ptrs.size(); - _partitioner.reset( - new Crc32HashPartitioner(_channel_shared_ptrs.size())); + if (_hash_type == THashType::CRC32) { + _partitioner.reset( + new Crc32HashPartitioner(_channel_shared_ptrs.size())); + } else { + _partitioner.reset(new Murmur32HashPartitioner( + _channel_shared_ptrs.size())); + } RETURN_IF_ERROR(_partitioner->init(t_stream_sink.output_partition.partition_exprs)); } else if (_part_type == TPartitionType::RANGE_PARTITIONED) { return Status::InternalError("TPartitionType::RANGE_PARTITIONED should not be used"); diff --git a/be/src/vec/sink/vdata_stream_sender.h b/be/src/vec/sink/vdata_stream_sender.h index 88a948ed05cc0e..8222eb49dcce02 100644 --- a/be/src/vec/sink/vdata_stream_sender.h +++ b/be/src/vec/sink/vdata_stream_sender.h @@ -107,6 +107,13 @@ struct ShuffleChannelIds { } }; +struct ShufflePModChannelIds { + template + HashValueType operator()(HashValueType l, int32_t r) { + return (l % r + r) % r; + } +}; + class VDataStreamSender : public DataSink { public: friend class pipeline::ExchangeSinkOperator; @@ -185,6 +192,7 @@ class VDataStreamSender : public DataSink { int _current_channel_idx; // index of current channel to send to if _random == true TPartitionType::type _part_type; + THashType::type _hash_type; // serialized batches for broadcasting; we need two so we can write // one while the other one is still being sent diff --git a/docker/thirdparties/docker-compose/hive/scripts/create_preinstalled_table.hql b/docker/thirdparties/docker-compose/hive/scripts/create_preinstalled_table.hql index 00d2a699410b92..349efc069bf583 100644 --- a/docker/thirdparties/docker-compose/hive/scripts/create_preinstalled_table.hql +++ b/docker/thirdparties/docker-compose/hive/scripts/create_preinstalled_table.hql @@ -1897,7 +1897,51 @@ OUTPUTFORMAT LOCATION '/user/doris/preinstalled_data/parquet_table/parquet_decimal90_table'; -msck repair table parquet_decimal90_table; +CREATE TABLE `parquet_test2`( + `user_id` int, + `key` varchar(20)) +PARTITIONED BY ( + `part` varchar(10)) +ROW FORMAT SERDE + 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' +STORED AS INPUTFORMAT + 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' + OUTPUTFORMAT + 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' +LOCATION + '/user/doris/preinstalled_data/parquet_table/parquet_test2' +TBLPROPERTIES ( + 'spark.sql.statistics.colStats.user_id.nullCount'='0', + 'spark.sql.statistics.colStats.key.distinctCount'='4', + 'spark.sql.statistics.colStats.user_id.distinctCount'='4', + 'spark.sql.statistics.colStats.part.version'='2', + 'transient_lastDdlTime'='1697557072', + 'spark.sql.sources.schema.partCol.0'='part', + 'spark.sql.statistics.colStats.user_id.maxLen'='4', + 'spark.sql.create.version'='3.3.2', + 'spark.sql.statistics.colStats.part.avgLen'='2', + 'spark.sql.sources.schema.numBuckets'='3', + 'spark.sql.statistics.colStats.part.maxLen'='2', + 'spark.sql.sources.schema'='{"type":"struct","fields":[{"name":"user_id","type":"integer","nullable":true,"metadata":{}},{"name":"key","type":"string","nullable":true,"metadata":{"__CHAR_VARCHAR_TYPE_STRING":"varchar(20)"}},{"name":"part","type":"string","nullable":true,"metadata":{"__CHAR_VARCHAR_TYPE_STRING":"varchar(10)"}}]}', + 'spark.sql.partitionProvider'='catalog', + 'spark.sql.sources.schema.numBucketCols'='1', + 'spark.sql.statistics.colStats.user_id.max'='31', + 'spark.sql.sources.schema.bucketCol.0'='user_id', + 'spark.sql.statistics.colStats.user_id.version'='2', + 'spark.sql.statistics.numRows'='4', + 'spark.sql.statistics.colStats.part.nullCount'='0', + 'spark.sql.statistics.colStats.part.distinctCount'='1', + 'spark.sql.statistics.colStats.user_id.min'='1', + 'spark.sql.statistics.colStats.key.version'='2', + 'spark.sql.statistics.colStats.key.avgLen'='3', + 'spark.sql.sources.schema.numPartCols'='1', + 'spark.sql.statistics.colStats.user_id.avgLen'='4', + 'spark.sql.statistics.colStats.key.maxLen'='3', + 'spark.sql.statistics.colStats.key.nullCount'='0', + 'spark.sql.sources.provider'='PARQUET', + 'spark.sql.statistics.totalSize'='2873'); + +msck repair table parquet_test2; CREATE TABLE `fixed_length_byte_array_decimal_table`( `decimal_col1` decimal(7,2), @@ -1920,7 +1964,6 @@ msck repair table fixed_length_byte_array_decimal_table; show tables; - create database stats_test; use stats_test; create table stats_test1 (id INT, value STRING) STORED AS ORC; diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveExternalDistributionInfo.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveExternalDistributionInfo.java new file mode 100644 index 00000000000000..5b15874401908a --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveExternalDistributionInfo.java @@ -0,0 +1,95 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.catalog; + +import com.google.gson.annotations.SerializedName; + +import java.util.List; +import java.util.Objects; + +/* + * Hive Hash Distribution Info + */ +public class HiveExternalDistributionInfo extends HashDistributionInfo { + @SerializedName(value = "bucketingVersion") + private final int bucketingVersion; + + public HiveExternalDistributionInfo() { + bucketingVersion = 2; + } + + public HiveExternalDistributionInfo(int bucketNum, List distributionColumns, int bucketingVersion) { + super(bucketNum, distributionColumns); + this.bucketingVersion = bucketingVersion; + } + + public HiveExternalDistributionInfo(int bucketNum, boolean autoBucket, + List distributionColumns, int bucketingVersion) { + super(bucketNum, autoBucket, distributionColumns); + this.bucketingVersion = bucketingVersion; + } + + public int getBucketingVersion() { + return bucketingVersion; + } + + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } + HiveExternalDistributionInfo that = (HiveExternalDistributionInfo) o; + return bucketNum == that.bucketNum + && sameDistributionColumns(that) + && bucketingVersion == that.bucketingVersion; + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), bucketingVersion); + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append("type: ").append(type).append("; "); + + builder.append("distribution columns: ["); + for (Column column : getDistributionColumns()) { + builder.append(column.getName()).append(","); + } + builder.append("]; "); + + if (autoBucket) { + builder.append("bucket num: auto;"); + } else { + builder.append("bucket num: ").append(bucketNum).append(";"); + } + + builder.append("bucketingVersion: ").append(bucketingVersion).append(";"); + + return builder.toString(); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/FederationBackendPolicy.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/FederationBackendPolicy.java index 73a49bb24a8c54..73b1ce9c8371f5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/FederationBackendPolicy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FederationBackendPolicy.java @@ -198,6 +198,8 @@ public void init(BeSelectionPolicy policy) throws UserException { } catch (ExecutionException e) { throw new UserException("failed to get consistent hash", e); } + /*consistentBucket = new ConsistentHash<>(Hashing.murmur3_128(), new BucketHash(), + new BackendHash(), backends, Config.virtual_node_number);*/ } public Backend getNextBe() { @@ -250,6 +252,7 @@ public Multimap computeScanRangeAssignment(List splits) t Optional chosenNode = candidateNodes.stream() .min(Comparator.comparingLong(ownerNode -> assignedWeightPerBackend.get(ownerNode))); + //ToDo(Nitin): group assignment based on the bucketId if (chosenNode.isPresent()) { Backend selectedBackend = chosenNode.get(); assignment.put(selectedBackend, split); @@ -507,4 +510,11 @@ public void funnel(Split split, PrimitiveSink primitiveSink) { primitiveSink.putLong(split.getLength()); } } + + private static class BucketHash implements Funnel { + @Override + public void funnel(Integer bucketId, PrimitiveSink primitiveSink) { + primitiveSink.putLong(bucketId); + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java index 7afb04831ce029..a9d8797612256d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java @@ -36,10 +36,13 @@ import org.apache.doris.datasource.hive.AcidInfo; import org.apache.doris.datasource.hive.AcidInfo.DeleteDeltaInfo; import org.apache.doris.datasource.hive.HMSExternalCatalog; +import org.apache.doris.datasource.hive.HMSExternalTable; +import org.apache.doris.datasource.hive.HiveBucketUtil; import org.apache.doris.datasource.hive.source.HiveScanNode; import org.apache.doris.datasource.hive.source.HiveSplit; import org.apache.doris.datasource.iceberg.source.IcebergSplit; import org.apache.doris.nereids.glue.translator.PlanTranslatorContext; +import org.apache.doris.planner.DataPartition; import org.apache.doris.planner.PlanNodeId; import org.apache.doris.qe.ConnectContext; import org.apache.doris.spi.Split; @@ -55,6 +58,7 @@ import org.apache.doris.thrift.TFileScanRangeParams; import org.apache.doris.thrift.TFileScanSlotInfo; import org.apache.doris.thrift.TFileType; +import org.apache.doris.thrift.THashType; import org.apache.doris.thrift.THdfsParams; import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TScanRange; @@ -67,6 +71,7 @@ import com.google.common.base.Joiner; import com.google.common.base.Preconditions; +import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Multimap; @@ -91,6 +96,8 @@ public abstract class FileQueryScanNode extends FileScanNode { protected Map destSlotDescByName; protected TFileScanRangeParams params; + public ArrayListMultimap bucketSeq2locations = ArrayListMultimap.create(); + @Getter protected TableSample tableSample; @@ -343,6 +350,15 @@ public void createScanRangeLocations() throws UserException { ? BrokerUtil.parseColumnsFromPath(fileSplit.getPath().toString(), pathPartitionKeys, false, isACID) : fileSplit.getPartitionValues(); + boolean isBucketedHiveTable = false; + int bucketNum = 0; + TableIf targetTable = getTargetTable(); + if (targetTable instanceof HMSExternalTable) { + isBucketedHiveTable = ((HMSExternalTable) targetTable).isBucketedTable(); + if (isBucketedHiveTable) { + bucketNum = HiveBucketUtil.getBucketNumberFromPath(fileSplit.getPath().getName()).getAsInt(); + } + } TFileRangeDesc rangeDesc = createFileRangeDesc(fileSplit, partitionValuesFromPath, pathPartitionKeys, locationType); TFileCompressType fileCompressType = getFileCompressType(fileSplit); @@ -381,6 +397,9 @@ public void createScanRangeLocations() throws UserException { fileSplit.getStart(), fileSplit.getLength(), Joiner.on("|").join(fileSplit.getHosts())); } + if (isBucketedHiveTable) { + bucketSeq2locations.put(bucketNum, curLocations); + } scanRangeLocations.add(curLocations); this.totalFileSize += fileSplit.getLength(); } @@ -532,4 +551,12 @@ protected TFileAttributes getFileAttributes() throws UserException { protected abstract TableIf getTargetTable() throws UserException; protected abstract Map getLocationProperties() throws UserException; + + public DataPartition constructInputPartitionByDistributionInfo() { + return DataPartition.RANDOM; + } + + public THashType getHashType() { + return THashType.CRC32; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java index 47b684c264b83d..a770f688aa435f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java @@ -18,11 +18,15 @@ package org.apache.doris.datasource.hive; import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.DistributionInfo; import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.HashDistributionInfo; +import org.apache.doris.catalog.HiveExternalDistributionInfo; import org.apache.doris.catalog.ListPartitionItem; import org.apache.doris.catalog.PartitionItem; import org.apache.doris.catalog.PartitionType; import org.apache.doris.catalog.PrimitiveType; +import org.apache.doris.catalog.RandomDistributionInfo; import org.apache.doris.catalog.ScalarType; import org.apache.doris.catalog.Type; import org.apache.doris.common.AnalysisException; @@ -45,6 +49,7 @@ import org.apache.doris.thrift.TTableDescriptor; import org.apache.doris.thrift.TTableType; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; @@ -59,6 +64,7 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.LongColumnStatsData; import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.StringColumnStatsData; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.logging.log4j.LogManager; @@ -68,6 +74,8 @@ import java.math.BigDecimal; import java.math.BigInteger; import java.time.LocalDate; +import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -93,6 +101,22 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI private static final String TBL_PROP_TRANSIENT_LAST_DDL_TIME = "transient_lastDdlTime"; private static final String NUM_ROWS = "numRows"; + private static final String SPARK_BUCKET = "spark.sql.sources.schema.bucketCol."; + private static final String SPARK_NUM_BUCKET = "spark.sql.sources.schema.numBuckets"; + private static final String BUCKETING_VERSION = "bucketing_version"; + + private static final Set SUPPORTED_BUCKET_PROPERTIES; + + static { + SUPPORTED_BUCKET_PROPERTIES = Sets.newHashSet(); + SUPPORTED_BUCKET_PROPERTIES.add(SPARK_BUCKET + "0"); + SUPPORTED_BUCKET_PROPERTIES.add(SPARK_BUCKET + "1"); + SUPPORTED_BUCKET_PROPERTIES.add(SPARK_BUCKET + "2"); + SUPPORTED_BUCKET_PROPERTIES.add(SPARK_BUCKET + "3"); + SUPPORTED_BUCKET_PROPERTIES.add(SPARK_BUCKET + "4"); + SUPPORTED_BUCKET_PROPERTIES.add(SPARK_NUM_BUCKET); + SUPPORTED_BUCKET_PROPERTIES.add(BUCKETING_VERSION); + } private static final String SPARK_COL_STATS = "spark.sql.statistics.colStats."; private static final String SPARK_STATS_MAX = ".max"; @@ -140,8 +164,10 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI MAP_SPARK_STATS_TO_DORIS.put(StatsType.HISTOGRAM, SPARK_STATS_HISTOGRAM); } - private volatile org.apache.hadoop.hive.metastore.api.Table remoteTable = null; - private List partitionColumns; + protected volatile org.apache.hadoop.hive.metastore.api.Table remoteTable = null; + protected List partitionColumns; + private List bucketColumns; + private boolean isSparkTable; private DLAType dlaType = DLAType.UNKNOWN; @@ -152,6 +178,8 @@ public enum DLAType { UNKNOWN, HIVE, HUDI, ICEBERG } + private DistributionInfo distributionInfo; + /** * Create hive metastore external table. * @@ -228,6 +256,14 @@ public boolean isHoodieCowTable() { return "org.apache.hudi.hadoop.HoodieParquetInputFormat".equals(inputFormatName); } + public boolean isSparkTable() { + return isSparkTable; + } + + public boolean isBucketedTable() { + return bucketColumns != null && !bucketColumns.isEmpty() && isSparkTable; + } + /** * Some data lakes (such as Hudi) will synchronize their partition information to HMS, * then we can quickly obtain the partition information of the table from HMS. @@ -464,9 +500,71 @@ public List initSchema() { columns = getHiveSchema(); } initPartitionColumns(columns); + initBucketingColumns(columns); return columns; } + private void initBucketingColumns(List columns) { + List bucketCols = new ArrayList<>(5); + int numBuckets = getBucketColumns(bucketCols); + if (bucketCols.isEmpty() || !isSparkTable) { + bucketColumns = ImmutableList.of(); + distributionInfo = new RandomDistributionInfo(1, true); + return; + } + + int bucketingVersion = Integer.valueOf(remoteTable.getParameters().getOrDefault(BUCKETING_VERSION, "2")); + ImmutableList.Builder bucketColBuilder = ImmutableList.builder(); + for (String colName : bucketCols) { + // do not use "getColumn()", which will cause dead loop + for (Column column : columns) { + if (colName.equalsIgnoreCase(column.getName())) { + // For partition/bucket column, if it is string type, change it to varchar(65535) + // to be same as doris managed table. + // This is to avoid some unexpected behavior such as different partition pruning result + // between doris managed table and external table. + if (column.getType().getPrimitiveType() == PrimitiveType.STRING) { + column.setType(ScalarType.createVarcharType(ScalarType.MAX_VARCHAR_LENGTH)); + } + bucketColBuilder.add(column); + break; + } + } + } + + bucketColumns = bucketColBuilder.build(); + distributionInfo = new HiveExternalDistributionInfo(numBuckets, bucketColumns, bucketingVersion); + LOG.debug("get {} bucket columns for table: {}", bucketColumns.size(), name); + } + + private int getBucketColumns(List bucketCols) { + StorageDescriptor descriptor = remoteTable.getSd(); + int numBuckets = -1; + if (descriptor.isSetBucketCols() && !descriptor.getBucketCols().isEmpty()) { + /* Hive Bucketed Table */ + bucketCols.addAll(descriptor.getBucketCols()); + numBuckets = descriptor.getNumBuckets(); + } else if (remoteTable.isSetParameters() + && !Collections.disjoint(SUPPORTED_BUCKET_PROPERTIES, remoteTable.getParameters().keySet())) { + Map parameters = remoteTable.getParameters(); + for (Map.Entry param : parameters.entrySet()) { + if (param.getKey().startsWith(SPARK_BUCKET)) { + int index = Integer.valueOf(param.getKey() + .substring(param.getKey().lastIndexOf(".") + 1)); + bucketCols.add(index, param.getValue()); + } else if (param.getKey().equals(SPARK_NUM_BUCKET)) { + numBuckets = Integer.valueOf(param.getValue()); + } + } + + if (numBuckets > 0) { + isSparkTable = true; + } + } + + return numBuckets; + } + private List getIcebergSchema() { return IcebergUtils.getSchema(catalog, dbName, name); } @@ -575,6 +673,19 @@ public Optional getColumnStatistic(String colName) { return Optional.empty(); } + public DistributionInfo getDefaultDistributionInfo() { + makeSureInitialized(); + if (distributionInfo != null) { + return distributionInfo; + } + + return new RandomDistributionInfo(1, true); + } + + public Map getTableParameters() { + return remoteTable.getParameters(); + } + private Optional getHiveColumnStats(String colName) { List tableStats = getHiveTableColumnStats(Lists.newArrayList(colName)); if (tableStats == null || tableStats.isEmpty()) { @@ -751,14 +862,23 @@ public long getDataSize(boolean singleReplica) { @Override public boolean isDistributionColumn(String columnName) { - return getRemoteTable().getSd().getBucketCols().stream().map(String::toLowerCase) - .collect(Collectors.toSet()).contains(columnName.toLowerCase()); + Set distributeColumns = getDistributionColumnNames() + .stream().map(String::toLowerCase).collect(Collectors.toSet()); + return distributeColumns.contains(columnName.toLowerCase()); } @Override public Set getDistributionColumnNames() { - return getRemoteTable().getSd().getBucketCols().stream().map(String::toLowerCase) - .collect(Collectors.toSet()); + Set distributionColumnNames = Sets.newHashSet(); + if (distributionInfo instanceof RandomDistributionInfo) { + return distributionColumnNames; + } + HashDistributionInfo hashDistributionInfo = (HashDistributionInfo) distributionInfo; + List distColumn = hashDistributionInfo.getDistributionColumns(); + for (Column column : distColumn) { + distributionColumnNames.add(column.getName().toLowerCase()); + } + return distributionColumnNames; } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveBucketUtil.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveBucketUtil.java index 7435a3d58dc911..ce0d9cfba98bf7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveBucketUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveBucketUtil.java @@ -96,6 +96,9 @@ private static PrimitiveTypeInfo convertToHiveColType(PrimitiveType dorisType) t Pattern.compile("bucket_(\\d+)(_\\d+)?$"); private static final Iterable BUCKET_PATTERNS = ImmutableList.of( + // spark/parquet pattern + // format: f"part-[paritionId]-[tid]-[txnId]-[jobId]-[taskAttemptId]-[fileCount].c000.snappy.parquet" + Pattern.compile("part-\\d{5}-\\w{8}-\\w{4}-\\w{4}-\\w{4}-\\w{12}_(\\d{5})(?:[-_.].*)?"), // legacy Presto naming pattern (current version matches Hive) Pattern.compile("\\d{8}_\\d{6}_\\d{5}_[a-z0-9]{5}_bucket-(\\d+)(?:[-_.].*)?"), // Hive naming pattern per `org.apache.hadoop.hive.ql.exec.Utilities#getBucketIdFromFile()` @@ -398,7 +401,7 @@ private static int hashCodeV2(Object o, ObjectInspector objIns, ByteBuffer byteB throw new DdlException("Unknown type: " + objIns.getTypeName()); } - private static OptionalInt getBucketNumberFromPath(String name) { + public static OptionalInt getBucketNumberFromPath(String name) { for (Pattern pattern : BUCKET_PATTERNS) { Matcher matcher = pattern.matcher(name); if (matcher.matches()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java index c901ee654ae82d..4264b8c50f4e85 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java @@ -17,10 +17,15 @@ package org.apache.doris.datasource.hive.source; +import org.apache.doris.analysis.Expr; import org.apache.doris.analysis.FunctionCallExpr; +import org.apache.doris.analysis.SlotRef; import org.apache.doris.analysis.TupleDescriptor; import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.DistributionInfo; import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.HashDistributionInfo; +import org.apache.doris.catalog.HiveExternalDistributionInfo; import org.apache.doris.catalog.ListPartitionItem; import org.apache.doris.catalog.PartitionItem; import org.apache.doris.catalog.TableIf; @@ -42,6 +47,7 @@ import org.apache.doris.datasource.hive.HiveTransaction; import org.apache.doris.datasource.hive.source.HiveSplit.HiveSplitCreator; import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan.SelectedPartitions; +import org.apache.doris.planner.DataPartition; import org.apache.doris.planner.ListPartitionPrunerV2; import org.apache.doris.planner.PlanNodeId; import org.apache.doris.qe.ConnectContext; @@ -52,6 +58,7 @@ import org.apache.doris.thrift.TFileFormatType; import org.apache.doris.thrift.TFileTextScanRangeParams; import org.apache.doris.thrift.TFileType; +import org.apache.doris.thrift.THashType; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; @@ -415,4 +422,37 @@ protected TFileCompressType getFileCompressType(FileSplit fileSplit) throws User } return compressType; } + + @Override + public DataPartition constructInputPartitionByDistributionInfo() { + if (hmsTable.isBucketedTable()) { + DistributionInfo distributionInfo = hmsTable.getDefaultDistributionInfo(); + if (!(distributionInfo instanceof HashDistributionInfo)) { + return DataPartition.RANDOM; + } + List distributeColumns = ((HiveExternalDistributionInfo) distributionInfo).getDistributionColumns(); + List dataDistributeExprs = Lists.newArrayList(); + for (Column column : distributeColumns) { + SlotRef slotRef = new SlotRef(desc.getRef().getName(), column.getName()); + dataDistributeExprs.add(slotRef); + } + return DataPartition.hashPartitioned(dataDistributeExprs, THashType.SPARK_MURMUR32); + } + + return DataPartition.RANDOM; + } + + public HMSExternalTable getHiveTable() { + return hmsTable; + } + + @Override + public THashType getHashType() { + if (hmsTable.isBucketedTable() + && hmsTable.getDefaultDistributionInfo() instanceof HashDistributionInfo) { + return THashType.SPARK_MURMUR32; + } + + return THashType.CRC32; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index c64965080fbe3c..a43903f9481721 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -48,6 +48,7 @@ import org.apache.doris.common.UserException; import org.apache.doris.common.util.Util; import org.apache.doris.datasource.ExternalTable; +import org.apache.doris.datasource.FileQueryScanNode; import org.apache.doris.datasource.es.source.EsScanNode; import org.apache.doris.datasource.hive.HMSExternalTable; import org.apache.doris.datasource.hive.source.HiveScanNode; @@ -184,6 +185,7 @@ import org.apache.doris.statistics.StatisticConstants; import org.apache.doris.tablefunction.TableValuedFunctionIf; import org.apache.doris.thrift.TFetchOption; +import org.apache.doris.thrift.THashType; import org.apache.doris.thrift.TPartitionType; import org.apache.doris.thrift.TPushAggOp; import org.apache.doris.thrift.TResultSinkType; @@ -465,7 +467,8 @@ public PlanFragment visitPhysicalFileScan(PhysicalFileScan fileScan, PlanTransla TupleDescriptor tupleDescriptor = generateTupleDesc(slots, table, context); // TODO(cmy): determine the needCheckColumnPriv param - ScanNode scanNode; + FileQueryScanNode scanNode; + DataPartition dataPartition = DataPartition.RANDOM; if (table instanceof HMSExternalTable) { switch (((HMSExternalTable) table).getDlaType()) { case HUDI: @@ -515,8 +518,14 @@ public PlanFragment visitPhysicalFileScan(PhysicalFileScan fileScan, PlanTransla ) ); Utils.execWithUncheckedException(scanNode::finalizeForNereids); + if (fileScan.getDistributionSpec() instanceof DistributionSpecHash) { + DistributionSpecHash distributionSpecHash = (DistributionSpecHash) fileScan.getDistributionSpec(); + List partitionExprs = distributionSpecHash.getOrderedShuffledColumns().stream() + .map(context::findSlotRef).collect(Collectors.toList()); + dataPartition = new DataPartition(TPartitionType.HASH_PARTITIONED, + partitionExprs, scanNode.getHashType()); + } // Create PlanFragment - DataPartition dataPartition = DataPartition.RANDOM; PlanFragment planFragment = createPlanFragment(scanNode, dataPartition, fileScan); context.addPlanFragment(planFragment); updateLegacyPlanIdToPhysicalPlan(planFragment.getPlanRoot(), fileScan); @@ -2419,7 +2428,7 @@ private void addPlanRoot(PlanFragment fragment, PlanNode planNode, AbstractPlan } private DataPartition toDataPartition(DistributionSpec distributionSpec, - List childOutputIds, PlanTranslatorContext context) { + List childOutputIds, PlanTranslatorContext context) { if (distributionSpec instanceof DistributionSpecAny || distributionSpec instanceof DistributionSpecStorageAny || distributionSpec instanceof DistributionSpecExecutionAny) { @@ -2446,8 +2455,20 @@ private DataPartition toDataPartition(DistributionSpec distributionSpec, } } TPartitionType partitionType; + THashType hashType = THashType.XXHASH64; switch (distributionSpecHash.getShuffleType()) { case STORAGE_BUCKETED: + switch (distributionSpecHash.getShuffleFunction()) { + case STORAGE_BUCKET_SPARK_MURMUR32: + hashType = THashType.SPARK_MURMUR32; + break; + case STORAGE_BUCKET_CRC32: + hashType = THashType.CRC32; + break; + case STORAGE_BUCKET_XXHASH64: + default: + break; + } partitionType = TPartitionType.BUCKET_SHFFULE_HASH_PARTITIONED; break; case EXECUTION_BUCKETED: @@ -2458,7 +2479,7 @@ private DataPartition toDataPartition(DistributionSpec distributionSpec, throw new RuntimeException("Do not support shuffle type: " + distributionSpecHash.getShuffleType()); } - return new DataPartition(partitionType, partitionExprs); + return new DataPartition(partitionType, partitionExprs, hashType); } else if (distributionSpec instanceof DistributionSpecTabletIdShuffle) { return DataPartition.TABLET_ID; } else { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java index 3756c1bcfe3518..88866cbda323cf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java @@ -141,7 +141,7 @@ public PhysicalProperties visitPhysicalEsScan(PhysicalEsScan esScan, PlanContext @Override public PhysicalProperties visitPhysicalFileScan(PhysicalFileScan fileScan, PlanContext context) { - return PhysicalProperties.STORAGE_ANY; + return new PhysicalProperties(fileScan.getDistributionSpec()); } /** @@ -285,7 +285,7 @@ public PhysicalProperties visitPhysicalHashJoin( // retain left shuffle type, since coordinator use left most node to schedule fragment // forbid colocate join, since right table already shuffle return new PhysicalProperties(rightHashSpec.withShuffleTypeAndForbidColocateJoin( - leftHashSpec.getShuffleType())); + leftHashSpec.getShuffleType(), leftHashSpec.getShuffleFunction())); } case FULL_OUTER_JOIN: return PhysicalProperties.createAnyFromHash(leftHashSpec); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java index a70d72c565c707..a4c86552fcc6da 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java @@ -551,7 +551,7 @@ private PhysicalProperties calAnotherSideRequired(ShuffleType shuffleType, notShuffleSideRequired, shuffleSideRequired); return new PhysicalProperties(new DistributionSpecHash(shuffleSideIds, shuffleType, shuffleSideOutput.getTableId(), shuffleSideOutput.getSelectedIndexId(), - shuffleSideOutput.getPartitionIds())); + shuffleSideOutput.getPartitionIds(), notShuffleSideOutput.getShuffleFunction())); } private void updateChildEnforceAndCost(int index, PhysicalProperties targetProperties) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpecHash.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpecHash.java index 6ab8e054f8aaef..5bf1a7f52472bc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpecHash.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpecHash.java @@ -54,6 +54,8 @@ public class DistributionSpecHash extends DistributionSpec { private final Set partitionIds; private final long selectedIndexId; + private final StorageBucketHashType storageBucketHashType; + /** * Use for no need set table related attributes. */ @@ -70,10 +72,19 @@ public DistributionSpecHash(List orderedShuffledColumns, ShuffleType shu } /** - * Normal constructor. + * Use when no need set shuffle hash function */ public DistributionSpecHash(List orderedShuffledColumns, ShuffleType shuffleType, long tableId, long selectedIndexId, Set partitionIds) { + this(orderedShuffledColumns, shuffleType, tableId, selectedIndexId, partitionIds, + StorageBucketHashType.STORAGE_BUCKET_CRC32); + } + + /** + * Normal constructor. + */ + public DistributionSpecHash(List orderedShuffledColumns, ShuffleType shuffleType, + long tableId, long selectedIndexId, Set partitionIds, StorageBucketHashType storageBucketHashType) { this.orderedShuffledColumns = ImmutableList.copyOf( Objects.requireNonNull(orderedShuffledColumns, "orderedShuffledColumns should not null")); this.shuffleType = Objects.requireNonNull(shuffleType, "shuffleType should not null"); @@ -92,6 +103,7 @@ public DistributionSpecHash(List orderedShuffledColumns, ShuffleType shu } this.equivalenceExprIds = equivalenceExprIdsBuilder.build(); this.exprIdToEquivalenceSet = exprIdToEquivalenceSetBuilder.buildKeepingLast(); + this.storageBucketHashType = storageBucketHashType; } /** @@ -101,7 +113,7 @@ public DistributionSpecHash(List orderedShuffledColumns, ShuffleType shu long tableId, Set partitionIds, List> equivalenceExprIds, Map exprIdToEquivalenceSet) { this(orderedShuffledColumns, shuffleType, tableId, -1L, partitionIds, - equivalenceExprIds, exprIdToEquivalenceSet); + equivalenceExprIds, exprIdToEquivalenceSet, StorageBucketHashType.STORAGE_BUCKET_XXHASH64); } /** @@ -109,7 +121,7 @@ public DistributionSpecHash(List orderedShuffledColumns, ShuffleType shu */ public DistributionSpecHash(List orderedShuffledColumns, ShuffleType shuffleType, long tableId, long selectedIndexId, Set partitionIds, List> equivalenceExprIds, - Map exprIdToEquivalenceSet) { + Map exprIdToEquivalenceSet, StorageBucketHashType storageBucketHashType) { this.orderedShuffledColumns = ImmutableList.copyOf(Objects.requireNonNull(orderedShuffledColumns, "orderedShuffledColumns should not null")); this.shuffleType = Objects.requireNonNull(shuffleType, "shuffleType should not null"); @@ -121,6 +133,7 @@ public DistributionSpecHash(List orderedShuffledColumns, ShuffleType shu Objects.requireNonNull(equivalenceExprIds, "equivalenceExprIds should not null")); this.exprIdToEquivalenceSet = ImmutableMap.copyOf( Objects.requireNonNull(exprIdToEquivalenceSet, "exprIdToEquivalenceSet should not null")); + this.storageBucketHashType = storageBucketHashType; } static DistributionSpecHash merge(DistributionSpecHash left, DistributionSpecHash right, ShuffleType shuffleType) { @@ -140,7 +153,7 @@ static DistributionSpecHash merge(DistributionSpecHash left, DistributionSpecHas exprIdToEquivalenceSet.putAll(right.getExprIdToEquivalenceSet()); return new DistributionSpecHash(orderedShuffledColumns, shuffleType, left.getTableId(), left.getSelectedIndexId(), left.getPartitionIds(), equivalenceExprIds.build(), - exprIdToEquivalenceSet.buildKeepingLast()); + exprIdToEquivalenceSet.buildKeepingLast(), left.getShuffleFunction()); } static DistributionSpecHash merge(DistributionSpecHash left, DistributionSpecHash right) { @@ -175,6 +188,10 @@ public Map getExprIdToEquivalenceSet() { return exprIdToEquivalenceSet; } + public StorageBucketHashType getShuffleFunction() { + return storageBucketHashType; + } + public Set getEquivalenceExprIdsOf(ExprId exprId) { if (exprIdToEquivalenceSet.containsKey(exprId)) { return equivalenceExprIds.get(exprIdToEquivalenceSet.get(exprId)); @@ -227,14 +244,15 @@ private boolean equalsSatisfy(List required) { return true; } - public DistributionSpecHash withShuffleType(ShuffleType shuffleType) { + public DistributionSpecHash withShuffleType(ShuffleType shuffleType, StorageBucketHashType storageBucketHashType) { return new DistributionSpecHash(orderedShuffledColumns, shuffleType, tableId, selectedIndexId, partitionIds, - equivalenceExprIds, exprIdToEquivalenceSet); + equivalenceExprIds, exprIdToEquivalenceSet, storageBucketHashType); } - public DistributionSpecHash withShuffleTypeAndForbidColocateJoin(ShuffleType shuffleType) { + public DistributionSpecHash withShuffleTypeAndForbidColocateJoin(ShuffleType shuffleType, + StorageBucketHashType storageBucketHashType) { return new DistributionSpecHash(orderedShuffledColumns, shuffleType, -1, -1, partitionIds, - equivalenceExprIds, exprIdToEquivalenceSet); + equivalenceExprIds, exprIdToEquivalenceSet, storageBucketHashType); } /** @@ -272,7 +290,7 @@ public DistributionSpec project(Map projections, } } return new DistributionSpecHash(orderedShuffledColumns, shuffleType, tableId, selectedIndexId, partitionIds, - equivalenceExprIds, exprIdToEquivalenceSet); + equivalenceExprIds, exprIdToEquivalenceSet, storageBucketHashType); } @Override @@ -281,12 +299,13 @@ public boolean equals(Object o) { return false; } DistributionSpecHash that = (DistributionSpecHash) o; - return shuffleType == that.shuffleType && orderedShuffledColumns.equals(that.orderedShuffledColumns); + return shuffleType == that.shuffleType && storageBucketHashType == that.storageBucketHashType + && orderedShuffledColumns.equals(that.orderedShuffledColumns); } @Override public int hashCode() { - return Objects.hash(shuffleType, orderedShuffledColumns); + return Objects.hash(shuffleType, storageBucketHashType, orderedShuffledColumns); } @Override @@ -315,4 +334,13 @@ public enum ShuffleType { STORAGE_BUCKETED, } + /** + * Enums for concrete shuffle functions. + */ + public enum StorageBucketHashType { + STORAGE_BUCKET_CRC32, + STORAGE_BUCKET_XXHASH64, + STORAGE_BUCKET_SPARK_MURMUR32 + } + } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/EnforceMissingPropertiesHelper.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/EnforceMissingPropertiesHelper.java index b08db2aeba2b15..c7c40fe1e98189 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/EnforceMissingPropertiesHelper.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/EnforceMissingPropertiesHelper.java @@ -26,6 +26,7 @@ import org.apache.doris.nereids.metrics.event.EnforcerEvent; import org.apache.doris.nereids.minidump.NereidsTracer; import org.apache.doris.nereids.properties.DistributionSpecHash.ShuffleType; +import org.apache.doris.nereids.properties.DistributionSpecHash.StorageBucketHashType; import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan; import org.apache.doris.qe.ConnectContext; @@ -117,7 +118,8 @@ private PhysicalProperties enforceDistribution(PhysicalProperties oldOutputPrope DistributionSpec requiredDistributionSpec = required.getDistributionSpec(); if (requiredDistributionSpec instanceof DistributionSpecHash) { DistributionSpecHash requiredDistributionSpecHash = (DistributionSpecHash) requiredDistributionSpec; - outputDistributionSpec = requiredDistributionSpecHash.withShuffleType(ShuffleType.EXECUTION_BUCKETED); + outputDistributionSpec = requiredDistributionSpecHash.withShuffleType(ShuffleType.EXECUTION_BUCKETED, + StorageBucketHashType.STORAGE_BUCKET_XXHASH64); } else { outputDistributionSpec = requiredDistributionSpec; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalFileScanToPhysicalFileScan.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalFileScanToPhysicalFileScan.java index d86e1d1667e18a..682b5eb2659b19 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalFileScanToPhysicalFileScan.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalFileScanToPhysicalFileScan.java @@ -17,11 +17,27 @@ package org.apache.doris.nereids.rules.implementation; -import org.apache.doris.nereids.properties.DistributionSpecAny; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.DistributionInfo; +import org.apache.doris.catalog.HashDistributionInfo; +import org.apache.doris.catalog.TableIf; +import org.apache.doris.datasource.hive.HMSExternalTable; +import org.apache.doris.nereids.properties.DistributionSpec; +import org.apache.doris.nereids.properties.DistributionSpecHash; +import org.apache.doris.nereids.properties.DistributionSpecHash.StorageBucketHashType; +import org.apache.doris.nereids.properties.DistributionSpecStorageAny; import org.apache.doris.nereids.rules.Rule; import org.apache.doris.nereids.rules.RuleType; +import org.apache.doris.nereids.trees.expressions.ExprId; +import org.apache.doris.nereids.trees.expressions.Slot; +import org.apache.doris.nereids.trees.expressions.SlotReference; +import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan; import org.apache.doris.nereids.trees.plans.physical.PhysicalFileScan; +import com.google.common.collect.Lists; + +import java.util.Collections; +import java.util.List; import java.util.Optional; /** @@ -35,7 +51,7 @@ public Rule build() { fileScan.getRelationId(), fileScan.getTable(), fileScan.getQualifier(), - DistributionSpecAny.INSTANCE, + convertDistribution(fileScan), Optional.empty(), fileScan.getLogicalProperties(), fileScan.getConjuncts(), @@ -43,4 +59,34 @@ public Rule build() { fileScan.getTableSample()) ).toRule(RuleType.LOGICAL_FILE_SCAN_TO_PHYSICAL_FILE_SCAN_RULE); } + + private DistributionSpec convertDistribution(LogicalFileScan fileScan) { + TableIf table = fileScan.getTable(); + if (!(table instanceof HMSExternalTable)) { + return DistributionSpecStorageAny.INSTANCE; + } + + HMSExternalTable hmsExternalTable = (HMSExternalTable) table; + DistributionInfo distributionInfo = hmsExternalTable.getDefaultDistributionInfo(); + if (distributionInfo instanceof HashDistributionInfo) { + HashDistributionInfo hashDistributionInfo = (HashDistributionInfo) distributionInfo; + List output = fileScan.getOutput(); + List hashColumns = Lists.newArrayList(); + for (Slot slot : output) { + for (Column column : hashDistributionInfo.getDistributionColumns()) { + if (((SlotReference) slot).getColumn().get().equals(column)) { + hashColumns.add(slot.getExprId()); + } + } + } + StorageBucketHashType function = StorageBucketHashType.STORAGE_BUCKET_CRC32; + if (hmsExternalTable.isBucketedTable()) { + function = StorageBucketHashType.STORAGE_BUCKET_SPARK_MURMUR32; + } + return new DistributionSpecHash(hashColumns, DistributionSpecHash.ShuffleType.NATURAL, + fileScan.getTable().getId(), -1, Collections.emptySet(), function); + } + + return DistributionSpecStorageAny.INSTANCE; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/DataPartition.java b/fe/fe-core/src/main/java/org/apache/doris/planner/DataPartition.java index 9c5c375a35c06d..339de61cf9c9fb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/DataPartition.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DataPartition.java @@ -26,6 +26,7 @@ import org.apache.doris.common.AnalysisException; import org.apache.doris.thrift.TDataPartition; import org.apache.doris.thrift.TExplainLevel; +import org.apache.doris.thrift.THashType; import org.apache.doris.thrift.TPartitionType; import com.google.common.base.Joiner; @@ -50,10 +51,16 @@ public class DataPartition { public static final DataPartition TABLET_ID = new DataPartition(TPartitionType.TABLET_SINK_SHUFFLE_PARTITIONED); private final TPartitionType type; + private final THashType hashType; + // for hash partition: exprs used to compute hash value private ImmutableList partitionExprs; public DataPartition(TPartitionType type, List exprs) { + this(type, exprs, THashType.CRC32); + } + + public DataPartition(TPartitionType type, List exprs, THashType hashType) { Preconditions.checkNotNull(exprs); Preconditions.checkState(!exprs.isEmpty()); Preconditions.checkState(type == TPartitionType.HASH_PARTITIONED @@ -61,6 +68,7 @@ public DataPartition(TPartitionType type, List exprs) { || type == TPartitionType.BUCKET_SHFFULE_HASH_PARTITIONED); this.type = type; this.partitionExprs = ImmutableList.copyOf(exprs); + this.hashType = hashType; } public DataPartition(TPartitionType type) { @@ -69,10 +77,15 @@ public DataPartition(TPartitionType type) { || type == TPartitionType.TABLET_SINK_SHUFFLE_PARTITIONED); this.type = type; this.partitionExprs = ImmutableList.of(); + this.hashType = THashType.CRC32; + } + + public static DataPartition hashPartitioned(List exprs, THashType hashType) { + return new DataPartition(TPartitionType.HASH_PARTITIONED, exprs, hashType); } public static DataPartition hashPartitioned(List exprs) { - return new DataPartition(TPartitionType.HASH_PARTITIONED, exprs); + return new DataPartition(TPartitionType.HASH_PARTITIONED, exprs, THashType.CRC32); } public void substitute(ExprSubstitutionMap smap, Analyzer analyzer) throws AnalysisException { @@ -96,17 +109,25 @@ public List getPartitionExprs() { return partitionExprs; } + public THashType getHashType() { + return hashType; + } + public TDataPartition toThrift() { TDataPartition result = new TDataPartition(type); if (partitionExprs != null) { result.setPartitionExprs(Expr.treesToThrift(partitionExprs)); } + result.setHashType(hashType); return result; } public String getExplainString(TExplainLevel explainLevel) { StringBuilder str = new StringBuilder(); str.append(type.toString()); + if (type == TPartitionType.BUCKET_SHFFULE_HASH_PARTITIONED) { + str.append("(").append(hashType.toString()).append(")"); + } if (explainLevel == TExplainLevel.BRIEF) { return str.toString(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java index e1a8d36424eebe..4cc9608088cb81 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java @@ -33,17 +33,22 @@ import org.apache.doris.catalog.DistributionInfo; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.HashDistributionInfo; +import org.apache.doris.catalog.HiveExternalDistributionInfo; import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.Table; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Pair; import org.apache.doris.common.UserException; +import org.apache.doris.datasource.hive.HMSExternalTable; +import org.apache.doris.datasource.hive.source.HiveScanNode; import org.apache.doris.qe.ConnectContext; +import org.apache.doris.thrift.THashType; import org.apache.doris.thrift.TPartitionType; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import org.apache.hive.common.util.Ref; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -285,6 +290,10 @@ private PlanFragment createScanFragment(PlanNode node) throws UserException { OlapScanNode olapScanNode = (OlapScanNode) node; return new PlanFragment(ctx.getNextFragmentId(), node, olapScanNode.constructInputPartitionByDistributionInfo(), DataPartition.RANDOM); + } else if (node instanceof HiveScanNode) { + HiveScanNode hiveScanNode = (HiveScanNode) node; + return new PlanFragment(ctx.getNextFragmentId(), node, + hiveScanNode.constructInputPartitionByDistributionInfo(), DataPartition.RANDOM); } else { // other scan nodes are random partitioned: es, broker return new PlanFragment(ctx.getNextFragmentId(), node, DataPartition.RANDOM); @@ -327,10 +336,12 @@ private PlanFragment createHashJoinFragment( // bucket shuffle join is better than broadcast and shuffle join // it can reduce the network cost of join, so doris chose it first List rhsPartitionExprs = Lists.newArrayList(); - if (canBucketShuffleJoin(node, leftChildFragment, rhsPartitionExprs)) { + Ref hashType = Ref.from(THashType.CRC32); + if (canBucketShuffleJoin(node, leftChildFragment, rhsPartitionExprs, hashType)) { node.setDistributionMode(HashJoinNode.DistributionMode.BUCKET_SHUFFLE); DataPartition rhsJoinPartition = - new DataPartition(TPartitionType.BUCKET_SHFFULE_HASH_PARTITIONED, rhsPartitionExprs); + new DataPartition(TPartitionType.BUCKET_SHFFULE_HASH_PARTITIONED, + rhsPartitionExprs, hashType.value); ExchangeNode rhsExchange = new ExchangeNode(ctx.getNextNodeId(), rightChildFragment.getPlanRoot(), false); rhsExchange.setNumInstances(rightChildFragment.getPlanRoot().getNumInstances()); @@ -600,7 +611,7 @@ private boolean dataDistributionMatchEqPredicate(List eqJoinPre } private boolean canBucketShuffleJoin(HashJoinNode node, PlanFragment leftChildFragment, - List rhsHashExprs) { + List rhsHashExprs, Ref hashType) { if (node.getJoinOp() == JoinOperator.NULL_AWARE_LEFT_ANTI_JOIN) { return false; } @@ -616,7 +627,9 @@ private boolean canBucketShuffleJoin(HashJoinNode node, PlanFragment leftChildFr PlanNode leftRoot = leftChildFragment.getPlanRoot(); // 1.leftRoot be OlapScanNode if (leftRoot instanceof OlapScanNode) { - return canBucketShuffleJoin(node, leftRoot, rhsHashExprs); + return canBucketShuffleJoin(node, (OlapScanNode) leftRoot, rhsHashExprs); + } else if (leftRoot instanceof HiveScanNode) { + return canBucketShuffleJoin(node, (HiveScanNode) leftRoot, rhsHashExprs, hashType); } // 2.leftRoot be hashjoin node @@ -625,17 +638,83 @@ private boolean canBucketShuffleJoin(HashJoinNode node, PlanFragment leftChildFr leftRoot = leftRoot.getChild(0); } if (leftRoot instanceof OlapScanNode) { - return canBucketShuffleJoin(node, leftRoot, rhsHashExprs); + return canBucketShuffleJoin(node, (OlapScanNode) leftRoot, rhsHashExprs); + } else if (leftRoot instanceof HiveScanNode) { + return canBucketShuffleJoin(node, (HiveScanNode) leftRoot, rhsHashExprs, hashType); } } return false; } + private boolean canBucketShuffleJoin(HashJoinNode node, HiveScanNode leftScanNode, + List rhsJoinExprs, Ref hashType) { + HMSExternalTable leftTable = leftScanNode.getHiveTable(); + + DistributionInfo leftDistribution = leftTable.getDefaultDistributionInfo(); + if (leftDistribution == null || !(leftDistribution instanceof HiveExternalDistributionInfo)) { + return false; + } + + HiveExternalDistributionInfo hiveDistributionInfo = (HiveExternalDistributionInfo) leftDistribution; + + List leftDistributeColumns = hiveDistributionInfo.getDistributionColumns(); + List leftDistributeColumnNames = leftDistributeColumns.stream() + .map(col -> leftTable.getName() + "." + col.getName().toLowerCase()).collect(Collectors.toList()); + + List leftJoinColumnNames = new ArrayList<>(); + List rightExprs = new ArrayList<>(); + List eqJoinConjuncts = node.getEqJoinConjuncts(); + + for (BinaryPredicate eqJoinPredicate : eqJoinConjuncts) { + Expr lhsJoinExpr = eqJoinPredicate.getChild(0); + Expr rhsJoinExpr = eqJoinPredicate.getChild(1); + if (lhsJoinExpr.unwrapSlotRef() == null || rhsJoinExpr.unwrapSlotRef() == null) { + continue; + } + + SlotRef leftSlot = node.getChild(0).findSrcSlotRef(lhsJoinExpr.unwrapSlotRef()); + if (leftSlot.getTable() instanceof HMSExternalTable + && leftScanNode.desc.getSlots().contains(leftSlot.getDesc())) { + // table name in SlotRef is not the really name. `select * from test as t` + // table name in SlotRef is `t`, but here we need is `test`. + leftJoinColumnNames.add(leftSlot.getTable().getName() + "." + + leftSlot.getColumnName().toLowerCase()); + rightExprs.add(rhsJoinExpr); + } + } + + //2 the join columns should contains all left table distribute columns to enable bucket shuffle join + for (int i = 0; i < leftDistributeColumnNames.size(); i++) { + String distributeColumnName = leftDistributeColumnNames.get(i); + boolean findRhsExprs = false; + // check the join column name is same as distribute column name and + // check the rhs join expr type is same as distribute column + for (int j = 0; j < leftJoinColumnNames.size(); j++) { + if (leftJoinColumnNames.get(j).equals(distributeColumnName)) { + // varchar and string type don't need to check the length property + if ((rightExprs.get(j).getType().isVarcharOrStringType() + && leftDistributeColumns.get(i).getType().isVarcharOrStringType()) + || (rightExprs.get(j).getType().equals(leftDistributeColumns.get(i).getType()))) { + rhsJoinExprs.add(rightExprs.get(j)); + findRhsExprs = true; + break; + } + } + } + + if (!findRhsExprs) { + return false; + } + } + + hashType.value = leftScanNode.getHashType(); + return true; + } + //the join expr must contian left table distribute column - private boolean canBucketShuffleJoin(HashJoinNode node, PlanNode leftRoot, + private boolean canBucketShuffleJoin(HashJoinNode node, OlapScanNode leftScanNode, List rhsJoinExprs) { - OlapScanNode leftScanNode = ((OlapScanNode) leftRoot); OlapTable leftTable = leftScanNode.getOlapTable(); //1 the left table has more than one partition or left table is not a stable colocate table diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java index 274725f3f22671..7561a7cef4dae5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java @@ -74,6 +74,7 @@ import org.apache.doris.system.Backend; import org.apache.doris.thrift.TColumn; import org.apache.doris.thrift.TExplainLevel; +import org.apache.doris.thrift.THashType; import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TOlapScanNode; import org.apache.doris.thrift.TOlapTableIndex; @@ -1683,7 +1684,7 @@ public DataPartition constructInputPartitionByDistributionInfo() throws UserExce SlotRef slotRef = new SlotRef(desc.getRef().getName(), column.getName()); dataDistributeExprs.add(slotRef); } - return DataPartition.hashPartitioned(dataDistributeExprs); + return DataPartition.hashPartitioned(dataDistributeExprs, THashType.CRC32); } else { return DataPartition.RANDOM; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 17e62d40f1f9f6..bfaec73287d167 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -25,6 +25,7 @@ import org.apache.doris.catalog.OlapTable; import org.apache.doris.cloud.catalog.CloudPartition; import org.apache.doris.common.Config; +import org.apache.doris.common.NotImplementedException; import org.apache.doris.common.Pair; import org.apache.doris.common.Reference; import org.apache.doris.common.Status; @@ -36,6 +37,7 @@ import org.apache.doris.common.util.TimeUtils; import org.apache.doris.datasource.ExternalScanNode; import org.apache.doris.datasource.FileQueryScanNode; +import org.apache.doris.datasource.hive.source.HiveScanNode; import org.apache.doris.load.loadv2.LoadJob; import org.apache.doris.metric.MetricRepo; import org.apache.doris.nereids.NereidsPlanner; @@ -2289,8 +2291,13 @@ private void computeScanRangeAssignment() throws Exception { computeScanRangeAssignmentByColocate((OlapScanNode) scanNode, assignedBytesPerHost, replicaNumPerHost); } if (fragmentContainsBucketShuffleJoin) { - bucketShuffleJoinController.computeScanRangeAssignmentByBucket((OlapScanNode) scanNode, - idToBackend, addressToBackendID, replicaNumPerHost); + if (scanNode instanceof OlapScanNode) { + bucketShuffleJoinController.computeScanRangeAssignmentByBucket((OlapScanNode) scanNode, + idToBackend, addressToBackendID, replicaNumPerHost); + } else if (scanNode instanceof HiveScanNode) { + bucketShuffleJoinController.computeScanRangeAssignmentByBucket((HiveScanNode) scanNode, + idToBackend, addressToBackendID, replicaNumPerHost); + } } if (!(fragmentContainsColocateJoin || fragmentContainsBucketShuffleJoin)) { computeScanRangeAssignmentByScheduler(scanNode, locations, assignment, assignedBytesPerHost, @@ -2967,6 +2974,50 @@ private void computeScanRangeAssignmentByBucket( } } + private void computeScanRangeAssignmentByBucket( + final HiveScanNode scanNode, ImmutableMap idToBackend, + Map addressToBackendID, + Map replicaNumPerHost) throws Exception { + if (!fragmentIdToSeqToAddressMap.containsKey(scanNode.getFragmentId())) { + int bucketNum = 0; + if (scanNode.getHiveTable().isBucketedTable()) { + bucketNum = scanNode.getHiveTable().getDefaultDistributionInfo().getBucketNum(); + } else { + throw new NotImplementedException("bucket shuffle for non-bucketed table not supported"); + } + fragmentIdToBucketNumMap.put(scanNode.getFragmentId(), bucketNum); + fragmentIdToSeqToAddressMap.put(scanNode.getFragmentId(), new HashMap<>()); + fragmentIdBucketSeqToScanRangeMap.put(scanNode.getFragmentId(), new BucketSeqToScanRange()); + fragmentIdToBuckendIdBucketCountMap.put(scanNode.getFragmentId(), new HashMap<>()); + } + Map bucketSeqToAddress + = fragmentIdToSeqToAddressMap.get(scanNode.getFragmentId()); + BucketSeqToScanRange bucketSeqToScanRange = fragmentIdBucketSeqToScanRangeMap.get(scanNode.getFragmentId()); + + for (Integer bucketSeq : scanNode.bucketSeq2locations.keySet()) { + //fill scanRangeParamsList + List locations = scanNode.bucketSeq2locations.get(bucketSeq); + if (!bucketSeqToAddress.containsKey(bucketSeq)) { + getExecHostPortForFragmentIDAndBucketSeq(locations.get(0), scanNode.getFragmentId(), + bucketSeq, idToBackend, addressToBackendID, replicaNumPerHost); + } + + for (TScanRangeLocations location : locations) { + Map> scanRanges = + findOrInsert(bucketSeqToScanRange, bucketSeq, new HashMap<>()); + + List scanRangeParamsList = + findOrInsert(scanRanges, scanNode.getId().asInt(), new ArrayList<>()); + + // add scan range + TScanRangeParams scanRangeParams = new TScanRangeParams(); + scanRangeParams.scan_range = location.scan_range; + scanRangeParamsList.add(scanRangeParams); + updateScanRangeNumByScanRange(scanRangeParams); + } + } + } + private void computeInstanceParam(PlanFragmentId fragmentId, int parallelExecInstanceNum, FragmentExecParams params) { assignScanRanges(fragmentId, parallelExecInstanceNum, params, fragmentIdBucketSeqToScanRangeMap, diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java index c39724efb4bdf6..3a5c38bdecd541 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java @@ -1125,7 +1125,7 @@ public void testBucketShuffleJoin() throws Exception { + " and t1.k1 = t2.k2"; String explainString = getSQLPlanOrErrorMsg(queryStr); Assert.assertTrue(explainString.contains("BUCKET_SHFFULE")); - Assert.assertTrue(explainString.contains("BUCKET_SHFFULE_HASH_PARTITIONED: `t1`.`k1`, `t1`.`k1`")); + Assert.assertTrue(explainString.contains("BUCKET_SHFFULE_HASH_PARTITIONED(CRC32): `t1`.`k1`, `t1`.`k1`")); // not bucket shuffle join do not support different type queryStr = "explain select /*+ SET_VAR(enable_nereids_planner=false) */ * from test.jointest t1, test.bucket_shuffle1 t2 where cast (t1.k1 as tinyint)" @@ -1156,24 +1156,24 @@ public void testBucketShuffleJoin() throws Exception { + " on t1.k1 = t2.k1 and t1.k1 = t2.k2 join test.colocate1 t3" + " on t2.k1 = t3.k1 and t2.k2 = t3.k2"; explainString = getSQLPlanOrErrorMsg(queryStr); - // Assert.assertTrue(explainString.contains("BUCKET_SHFFULE_HASH_PARTITIONED: `t1`.`k1`, `t1`.`k1`")); - // Assert.assertTrue(explainString.contains("BUCKET_SHFFULE_HASH_PARTITIONED: `t3`.`k1`, `t3`.`k2`")); + // Assert.assertTrue(explainString.contains("BUCKET_SHFFULE_HASH_PARTITIONED(CRC32): `t1`.`k1`, `t1`.`k1`")); + // Assert.assertTrue(explainString.contains("BUCKET_SHFFULE_HASH_PARTITIONED(CRC32): `t3`.`k1`, `t3`.`k2`")); // support recurse of bucket shuffle because t4 join t2 and join column name is same as t2 distribute column name queryStr = "explain select * from test.jointest t1 join test.bucket_shuffle1 t2 on t1.k1 = t2.k1 and" + " t1.k1 = t2.k2 join test.colocate1 t3 on t2.k1 = t3.k1 join test.jointest t4 on t4.k1 = t2.k1 and" + " t4.k1 = t2.k2"; explainString = getSQLPlanOrErrorMsg(queryStr); - //Assert.assertTrue(explainString.contains("BUCKET_SHFFULE_HASH_PARTITIONED: `t1`.`k1`, `t1`.`k1`")); - //Assert.assertTrue(explainString.contains("BUCKET_SHFFULE_HASH_PARTITIONED: `t4`.`k1`, `t4`.`k1`")); + //Assert.assertTrue(explainString.contains("BUCKET_SHFFULE_HASH_PARTITIONED(CRC32): `t1`.`k1`, `t1`.`k1`")); + //Assert.assertTrue(explainString.contains("BUCKET_SHFFULE_HASH_PARTITIONED(CRC32): `t4`.`k1`, `t4`.`k1`")); // some column name in join expr t3 join t4 and t1 distribute column name, so should not be bucket shuffle join queryStr = "explain select /*+ SET_VAR(enable_nereids_planner=false) */ * from test.jointest t1 join test.bucket_shuffle1 t2 on t1.k1 = t2.k1 and t1.k1 =" + " t2.k2 join test.colocate1 t3 on t2.k1 = t3.k1 join test.jointest t4 on t4.k1 = t3.k1 and" + " t4.k2 = t3.k2"; explainString = getSQLPlanOrErrorMsg(queryStr); - Assert.assertTrue(explainString.contains("BUCKET_SHFFULE_HASH_PARTITIONED: `t1`.`k1`, `t1`.`k1`")); - Assert.assertTrue(!explainString.contains("BUCKET_SHFFULE_HASH_PARTITIONED: `t4`.`k1`, `t4`.`k1`")); + Assert.assertTrue(explainString.contains("BUCKET_SHFFULE_HASH_PARTITIONED(CRC32): `t1`.`k1`, `t1`.`k1`")); + Assert.assertTrue(!explainString.contains("BUCKET_SHFFULE_HASH_PARTITIONED(CRC32): `t4`.`k1`, `t4`.`k1`")); // here only a bucket shuffle + broadcast jost join queryStr = "explain SELECT /*+ SET_VAR(enable_nereids_planner=false) */ * FROM test.bucket_shuffle1 T LEFT JOIN test.bucket_shuffle1 T1 ON T1.k2 = T.k1 and T.k2 = T1.k3 LEFT JOIN" diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java b/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java index b47bdbb78547e6..71ebc730fc995e 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java @@ -41,6 +41,7 @@ import org.apache.doris.planner.ScanNode; import org.apache.doris.service.FrontendOptions; import org.apache.doris.system.Backend; +import org.apache.doris.thrift.THashType; import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TPartitionType; import org.apache.doris.thrift.TScanRangeLocation; @@ -175,7 +176,7 @@ public void testIsBucketShuffleJoin() { new ArrayList<>()); hashJoinNode.setFragment(new PlanFragment(new PlanFragmentId(-1), hashJoinNode, - new DataPartition(TPartitionType.BUCKET_SHFFULE_HASH_PARTITIONED, testJoinexprs))); + new DataPartition(TPartitionType.BUCKET_SHFFULE_HASH_PARTITIONED, testJoinexprs, THashType.CRC32))); // hash join node is not bucket shuffle join Assert.assertEquals(false, @@ -183,13 +184,13 @@ public void testIsBucketShuffleJoin() { // the fragment id is different from hash join node hashJoinNode.setFragment(new PlanFragment(new PlanFragmentId(-2), hashJoinNode, - new DataPartition(TPartitionType.BUCKET_SHFFULE_HASH_PARTITIONED, testJoinexprs))); + new DataPartition(TPartitionType.BUCKET_SHFFULE_HASH_PARTITIONED, testJoinexprs, THashType.CRC32))); hashJoinNode.setDistributionMode(HashJoinNode.DistributionMode.BUCKET_SHUFFLE); Assert.assertEquals(false, Deencapsulation.invoke(bucketShuffleJoinController, "isBucketShuffleJoin", -1, hashJoinNode)); hashJoinNode.setFragment(new PlanFragment(new PlanFragmentId(-1), hashJoinNode, - new DataPartition(TPartitionType.BUCKET_SHFFULE_HASH_PARTITIONED, testJoinexprs))); + new DataPartition(TPartitionType.BUCKET_SHFFULE_HASH_PARTITIONED, testJoinexprs, THashType.CRC32))); Assert.assertEquals(true, Deencapsulation.invoke(bucketShuffleJoinController, "isBucketShuffleJoin", -1, hashJoinNode)); diff --git a/gensrc/thrift/Ddl.thrift b/gensrc/thrift/Ddl.thrift index 9696230af909ed..f733637bc7791a 100644 --- a/gensrc/thrift/Ddl.thrift +++ b/gensrc/thrift/Ddl.thrift @@ -76,10 +76,6 @@ enum TAggType { // 4: optional string default_value //} -enum THashType { - CRC32 -} - // random partition info struct TRandomPartitionDesc { } @@ -93,7 +89,7 @@ struct THashPartitionDesc { 2: required i32 hash_buckets // type to compute hash value. if not set, use CRC32 - 3: optional THashType hash_type + 3: optional Partitions.THashType hash_type } // value used to represents one column value in one range value diff --git a/gensrc/thrift/Partitions.thrift b/gensrc/thrift/Partitions.thrift index 0a7e70c0a4f2a2..bff6f2867e4d34 100644 --- a/gensrc/thrift/Partitions.thrift +++ b/gensrc/thrift/Partitions.thrift @@ -21,6 +21,12 @@ namespace java org.apache.doris.thrift include "Exprs.thrift" include "Types.thrift" +enum THashType { + CRC32, + XXHASH64, + SPARK_MURMUR32 +} + enum TPartitionType { UNPARTITIONED, @@ -90,6 +96,7 @@ struct TDataPartition { 1: required TPartitionType type 2: optional list partition_exprs 3: optional list partition_infos + 4: optional THashType hash_type } diff --git a/regression-test/data/external_table_p0/hive/test_hive_spark_clustered_table.out b/regression-test/data/external_table_p0/hive/test_hive_spark_clustered_table.out new file mode 100644 index 00000000000000..23f03f24aea961 --- /dev/null +++ b/regression-test/data/external_table_p0/hive/test_hive_spark_clustered_table.out @@ -0,0 +1,158 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !q01 -- +1 U1 IN 1 U1 IN +11 U11 IN 11 U11 IN +21 U21 IN 21 U21 IN +31 U31 IN 31 U31 IN + +-- !q02 -- +PLAN FRAGMENT 0 + OUTPUT EXPRS: + user_id[#12] + key[#13] + part[#14] + user_id[#15] + key[#16] + part[#17] + PARTITION: UNPARTITIONED + + HAS_COLO_PLAN_NODE: false + + VRESULT SINK + MYSQL_PROTOCAL + + 4:VEXCHANGE + offset: 0 + distribute expr lists: user_id[#12] + +PLAN FRAGMENT 1 + + PARTITION: HASH_PARTITIONED: user_id[#3] + + HAS_COLO_PLAN_NODE: false + + STREAM DATA SINK + EXCHANGE ID: 04 + UNPARTITIONED + + 3:VHASH JOIN(165) + | join op: INNER JOIN(BUCKET_SHUFFLE)[] + | equal join conjunct: (user_id[#3] = user_id[#0]) + | cardinality=143 + | vec output tuple id: 3 + | vIntermediate tuple ids: 2 + | hash output slot ids: 0 1 2 3 4 5 + | distribute expr lists: user_id[#3] + | distribute expr lists: user_id[#0] + | + |----1:VEXCHANGE + | offset: 0 + | distribute expr lists: user_id[#0] + | + 2:VHIVE_SCAN_NODE(158) + table: parquet_test2 + inputSplitNum=4, totalFileSize=2873, scanRanges=4 + partition=1/1 + cardinality=143, numNodes=1 + pushdown agg=NONE + +PLAN FRAGMENT 2 + + PARTITION: HASH_PARTITIONED: user_id[#0] + + HAS_COLO_PLAN_NODE: false + + STREAM DATA SINK + EXCHANGE ID: 01 + BUCKET_SHFFULE_HASH_PARTITIONED(SPARK_MURMUR32): user_id[#0] + + 0:VHIVE_SCAN_NODE(159) + table: parquet_test2 + inputSplitNum=4, totalFileSize=2873, scanRanges=4 + partition=1/1 + cardinality=143, numNodes=1 + pushdown agg=NONE + +-- !q03 -- +1 U1 IN 1 U1 IN +11 U11 IN 11 U11 IN +21 U21 IN 21 U21 IN +31 U31 IN 31 U31 IN + +-- !q01 -- +1 U1 IN 1 U1 IN +11 U11 IN 11 U11 IN +21 U21 IN 21 U21 IN +31 U31 IN 31 U31 IN + +-- !q02 -- +PLAN FRAGMENT 0 + OUTPUT EXPRS: + + + + + + + PARTITION: UNPARTITIONED + + HAS_COLO_PLAN_NODE: false + + VRESULT SINK + MYSQL_PROTOCAL + + 4:VEXCHANGE + offset: 0 + +PLAN FRAGMENT 1 + + PARTITION: HASH_PARTITIONED: `hive_test_parquet`.`default`.`parquet_test2`.`user_id` + + HAS_COLO_PLAN_NODE: false + + STREAM DATA SINK + EXCHANGE ID: 04 + UNPARTITIONED + + 2:VHASH JOIN + | join op: INNER JOIN(BUCKET_SHUFFLE)[Only olap table support colocate plan] + | equal join conjunct: (`t1`.`user_id` = `t2`.`user_id`) + | cardinality=-1 + | vec output tuple id: 2 + | vIntermediate tuple ids: 3 4 + | output slot ids: 6 7 8 9 10 11 + | hash output slot ids: 0 1 2 3 4 5 + | + |----3:VEXCHANGE + | offset: 0 + | + 0:VHIVE_SCAN_NODE + table: parquet_test2 + inputSplitNum=4, totalFileSize=2873, scanRanges=4 + partition=1/1 + numNodes=1 + pushdown agg=NONE + +PLAN FRAGMENT 2 + + PARTITION: HASH_PARTITIONED: `hive_test_parquet`.`default`.`parquet_test2`.`user_id` + + HAS_COLO_PLAN_NODE: false + + STREAM DATA SINK + EXCHANGE ID: 03 + BUCKET_SHFFULE_HASH_PARTITIONED(SPARK_MURMUR32): `t2`.`user_id` + + 1:VHIVE_SCAN_NODE + table: parquet_test2 + inputSplitNum=4, totalFileSize=2873, scanRanges=4 + partition=1/1 + numNodes=1 + pushdown agg=NONE + +-- !q03 -- +1 U1 IN 1 U1 IN +11 U11 IN 11 U11 IN +21 U21 IN 21 U21 IN +31 U31 IN 31 U31 IN + diff --git a/regression-test/suites/external_table_p0/hive/test_hive_spark_clustered_table.groovy b/regression-test/suites/external_table_p0/hive/test_hive_spark_clustered_table.groovy new file mode 100644 index 00000000000000..bf7f5c1794a96f --- /dev/null +++ b/regression-test/suites/external_table_p0/hive/test_hive_spark_clustered_table.groovy @@ -0,0 +1,67 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_hive_spark_clustered_table", "p0,external,hive,external_docker,external_docker_hive") { + def q01 = { + qt_q01 """ select * from parquet_test2 t1, parquet_test2 t2 WHERE t1.user_id = t2.user_id ORDER BY 1,2 ;""" + + qt_q02 """explain select * from parquet_test2 t1, parquet_test2 t2 WHERE t1.user_id = t2.user_id ;""" + + qt_q03 """select * from parquet_test2 t1, `internal`.`regression_test`.doris_dist_test t2 WHERE t1.user_id = t2.user_id ORDER BY 1,2 ;""" + + explain { + sql("""select * from parquet_test2 t1, `internal`.`regression_test`.doris_dist_test t2 WHERE t1.user_id = t2.user_id;""") + contains "join op: INNER JOIN(BUCKET_SHUFFLE)" + contains "BUCKET_SHFFULE_HASH_PARTITIONED(SPARK_MURMUR32)" + } + } + + String enabled = context.config.otherConfigs.get("enableHiveTest") + if (enabled != null && enabled.equalsIgnoreCase("true")) { + try { + String hms_port = context.config.otherConfigs.get("hms_port") + String catalog_name = "hive_test_parquet" + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + + sql """drop catalog if exists ${catalog_name}""" + sql """create catalog if not exists ${catalog_name} properties ( + "type"="hms", + 'hive.metastore.uris' = 'thrift://${externalEnvIp}:${hms_port}' + );""" + + sql """use `regression_test`""" + sql """drop table if exists doris_dist_test;""" + sql """create table doris_dist_test properties("replication_num"="1") + as select * from `${catalog_name}`.`default`.parquet_test2; """ + + sql """use `${catalog_name}`.`default`""" + + sql """set enable_fallback_to_original_planner=false;""" + + q01() + + sql """set enable_nereids_planner=false;""" + + q01() + + sql """use `internal`.`regression_test`""" + sql """drop table if exists doris_dist_test; """ + sql """drop catalog if exists ${catalog_name}; """ + } finally { + } + } +} diff --git a/regression-test/suites/nereids_p0/join/bucket_shuffle_join.groovy b/regression-test/suites/nereids_p0/join/bucket_shuffle_join.groovy index e5334010d7b0ca..43af8f8d9d024b 100644 --- a/regression-test/suites/nereids_p0/join/bucket_shuffle_join.groovy +++ b/regression-test/suites/nereids_p0/join/bucket_shuffle_join.groovy @@ -75,7 +75,7 @@ suite("bucket-shuffle-join") { explain { sql("select * from shuffle_join_t1 t1 left join shuffle_join_t2 t2 on t1.a = t2.c;") contains "BUCKET_SHUFFLE" - contains "BUCKET_SHFFULE_HASH_PARTITIONED: c" + contains "BUCKET_SHFFULE_HASH_PARTITIONED(CRC32): c" } }