diff --git a/be/src/util/hash_util.hpp b/be/src/util/hash_util.hpp index 402797a8e35ce3..afae30fec99cf4 100644 --- a/be/src/util/hash_util.hpp +++ b/be/src/util/hash_util.hpp @@ -133,6 +133,9 @@ class HashUtil { // refer to https://github.com/apache/commons-codec/blob/master/src/main/java/org/apache/commons/codec/digest/MurmurHash3.java static const uint32_t MURMUR3_32_SEED = 104729; + // refer https://github.com/apache/spark/blob/v3.5.0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala#L615 + static const uint32_t SPARK_MURMUR_32_SEED = 42; + // modify from https://github.com/aappleby/smhasher/blob/master/src/MurmurHash3.cpp static uint32_t murmur_hash3_32(const void* key, int32_t len, uint32_t seed) { uint32_t out = 0; @@ -140,6 +143,11 @@ class HashUtil { return out; } + static uint32_t murmur_hash3_32_null(uint32_t seed) { + static const int INT_VALUE = 0; + return murmur_hash3_32((const unsigned char*)(&INT_VALUE), 4, seed); + } + static const int MURMUR_R = 47; // Murmur2 hash implementation returning 64-bit hashes. diff --git a/be/src/vec/columns/column.h b/be/src/vec/columns/column.h index bf869961544817..45dbfeac64205e 100644 --- a/be/src/vec/columns/column.h +++ b/be/src/vec/columns/column.h @@ -58,6 +58,18 @@ class SipHash; } \ } +#define DO_MURMUR_HASHES_FUNCTION_COLUMN_IMPL(SEED) \ + if (null_data == nullptr) { \ + for (size_t i = 0; i < s; i++) { \ + hashes[i] = HashUtil::murmur_hash3_32(&data[i], sizeof(T), SEED); \ + } \ + } else { \ + for (size_t i = 0; i < s; i++) { \ + if (null_data[i] == 0) \ + hashes[i] = HashUtil::murmur_hash3_32(&data[i], sizeof(T), SEED); \ + } \ + } + namespace doris::vectorized { class Arena; @@ -379,6 +391,21 @@ class IColumn : public COW { LOG(FATAL) << get_name() << " update_crc_with_value not supported"; } + /// Update state of murmur3 hash function (spark files) with value of n elements to avoid the virtual + /// function call null_data to mark whether need to do hash compute, null_data == nullptr + /// means all element need to do hash function, else only *null_data != 0 need to do hash func + virtual void update_murmurs_with_value(int32_t* __restrict hash, PrimitiveType type, + int32_t rows, uint32_t offset = 0, + const uint8_t* __restrict null_data = nullptr) const { + LOG(FATAL) << get_name() << "update_murmurs_with_value not supported"; + } + + // use range for one hash value to avoid virtual function call in loop + virtual void update_murmur_with_value(size_t start, size_t end, int32_t& hash, + const uint8_t* __restrict null_data) const { + LOG(FATAL) << get_name() << " update_murmur_with_value not supported"; + } + /** Removes elements that don't match the filter. * Is used in WHERE and HAVING operations. * If result_size_hint > 0, then makes advance reserve(result_size_hint) for the result column; diff --git a/be/src/vec/columns/column_array.cpp b/be/src/vec/columns/column_array.cpp index 86d31c9223baf0..84dd75f7e1f9c1 100644 --- a/be/src/vec/columns/column_array.cpp +++ b/be/src/vec/columns/column_array.cpp @@ -389,6 +389,60 @@ void ColumnArray::update_crcs_with_value(uint32_t* __restrict hash, PrimitiveTyp } } +// for every array row calculate murmurHash +void ColumnArray::update_murmur_with_value(size_t start, size_t end, int32_t& hash, + const uint8_t* __restrict null_data) const { + auto& offsets_column = get_offsets(); + if (hash == 0) { + hash = HashUtil::SPARK_MURMUR_32_SEED; + } + if (null_data) { + for (size_t i = start; i < end; ++i) { + if (null_data[i] == 0) { + size_t elem_size = offsets_column[i] - offsets_column[i - 1]; + if (elem_size == 0) { + hash = HashUtil::murmur_hash3_32(reinterpret_cast(&elem_size), + sizeof(elem_size), hash); + } else { + get_data().update_murmur_with_value(offsets_column[i - 1], offsets_column[i], + hash, nullptr); + } + } + } + } else { + for (size_t i = start; i < end; ++i) { + size_t elem_size = offsets_column[i] - offsets_column[i - 1]; + if (elem_size == 0) { + hash = HashUtil::murmur_hash3_32(reinterpret_cast(&elem_size), + sizeof(elem_size), hash); + } else { + get_data().update_murmur_with_value(offsets_column[i - 1], offsets_column[i], hash, + nullptr); + } + } + } +} + +void ColumnArray::update_murmurs_with_value(int32_t* __restrict hash, PrimitiveType type, + int32_t rows, uint32_t offset, + const uint8_t* __restrict null_data) const { + auto s = rows; + DCHECK(s == size()); + + if (null_data) { + for (size_t i = 0; i < s; ++i) { + // every row + if (null_data[i] == 0) { + update_murmur_with_value(i, i + 1, hash[i], nullptr); + } + } + } else { + for (size_t i = 0; i < s; ++i) { + update_murmur_with_value(i, i + 1, hash[i], nullptr); + } + } +} + void ColumnArray::insert(const Field& x) { if (x.is_null()) { get_data().insert(Null()); diff --git a/be/src/vec/columns/column_array.h b/be/src/vec/columns/column_array.h index 17408bfa63311e..50aa4322ec2f1e 100644 --- a/be/src/vec/columns/column_array.h +++ b/be/src/vec/columns/column_array.h @@ -141,6 +141,8 @@ class ColumnArray final : public COWHelper { const uint8_t* __restrict null_data) const override; void update_crc_with_value(size_t start, size_t end, uint32_t& hash, const uint8_t* __restrict null_data) const override; + void update_murmur_with_value(size_t start, size_t end, int32_t& hash, + const uint8_t* __restrict null_data) const override; void update_hashes_with_value(uint64_t* __restrict hashes, const uint8_t* __restrict null_data = nullptr) const override; @@ -149,6 +151,10 @@ class ColumnArray final : public COWHelper { uint32_t offset = 0, const uint8_t* __restrict null_data = nullptr) const override; + void update_murmurs_with_value(int32_t* __restrict hash, PrimitiveType type, int32_t rows, + uint32_t offset = 0, + const uint8_t* __restrict null_data = nullptr) const override; + void insert_range_from(const IColumn& src, size_t start, size_t length) override; void insert(const Field& x) override; void insert_from(const IColumn& src_, size_t n) override; diff --git a/be/src/vec/columns/column_const.cpp b/be/src/vec/columns/column_const.cpp index e06e53b42897b9..95bffcb277e2ec 100644 --- a/be/src/vec/columns/column_const.cpp +++ b/be/src/vec/columns/column_const.cpp @@ -126,6 +126,24 @@ void ColumnConst::update_hashes_with_value(uint64_t* __restrict hashes, } } +void ColumnConst::update_murmurs_with_value(int32_t* __restrict hashes, doris::PrimitiveType type, + int32_t rows, uint32_t offset, + const uint8_t* __restrict null_data) const { + DCHECK(null_data == nullptr); + DCHECK(rows == size()); + auto real_data = data->get_data_at(0); + if (real_data.data == nullptr) { + for (int i = 0; i < rows; ++i) { + hashes[i] = HashUtil::murmur_hash3_32_null(HashUtil::SPARK_MURMUR_32_SEED); + } + } else { + for (int i = 0; i < rows; ++i) { + hashes[i] = HashUtil::murmur_hash3_32(real_data.data, real_data.size, + HashUtil::SPARK_MURMUR_32_SEED); + } + } +} + MutableColumns ColumnConst::scatter(ColumnIndex num_columns, const Selector& selector) const { if (s != selector.size()) { LOG(FATAL) << fmt::format("Size of selector ({}) doesn't match size of column ({})", diff --git a/be/src/vec/columns/column_const.h b/be/src/vec/columns/column_const.h index 746cb00fd5dd38..b5b1993c8ca07b 100644 --- a/be/src/vec/columns/column_const.h +++ b/be/src/vec/columns/column_const.h @@ -168,6 +168,11 @@ class ColumnConst final : public COWHelper { get_data_column_ptr()->update_crc_with_value(start, end, hash, nullptr); } + void update_murmur_with_value(size_t start, size_t end, int32_t& hash, + const uint8_t* __restrict null_data) const override { + get_data_column_ptr()->update_murmur_with_value(start, end, hash, nullptr); + } + void serialize_vec_with_null_map(std::vector& keys, size_t num_rows, const uint8_t* null_map) const override { data->serialize_vec_with_null_map(keys, num_rows, null_map); @@ -185,6 +190,10 @@ class ColumnConst final : public COWHelper { void update_hashes_with_value(uint64_t* __restrict hashes, const uint8_t* __restrict null_data) const override; + void update_murmurs_with_value(int32_t* __restrict hashes, PrimitiveType type, int32_t rows, + uint32_t offset = 0, + const uint8_t* __restrict null_data = nullptr) const override; + ColumnPtr filter(const Filter& filt, ssize_t result_size_hint) const override; size_t filter(const Filter& filter) override; diff --git a/be/src/vec/columns/column_decimal.cpp b/be/src/vec/columns/column_decimal.cpp index 9a05a8d68dfd3c..77796c6ab5bcf5 100644 --- a/be/src/vec/columns/column_decimal.cpp +++ b/be/src/vec/columns/column_decimal.cpp @@ -177,6 +177,56 @@ void ColumnDecimal::update_crcs_with_value(uint32_t* __restrict hashes, Primi } } +template +void ColumnDecimal::update_murmur_with_value(size_t start, size_t end, int32_t& hash, + const uint8_t* __restrict null_data) const { + if (null_data == nullptr) { + for (size_t i = start; i < end; i++) { + if constexpr (!IsDecimalV2) { + hash = HashUtil::murmur_hash3_32(&data[i], sizeof(T), + HashUtil::SPARK_MURMUR_32_SEED); + } else { + decimalv2_do_murmur(i, hash); + } + } + } else { + for (size_t i = start; i < end; i++) { + if (null_data[i] == 0) { + if constexpr (!IsDecimalV2) { + hash = HashUtil::murmur_hash3_32(&data[i], sizeof(T), + HashUtil::SPARK_MURMUR_32_SEED); + } else { + decimalv2_do_murmur(i, hash); + } + } + } + } +} + +template +void ColumnDecimal::update_murmurs_with_value(int32_t* __restrict hashes, PrimitiveType type, + int32_t rows, uint32_t offset, + const uint8_t* __restrict null_data) const { + auto s = rows; + DCHECK(s == size()); + + if constexpr (!IsDecimalV2) { + DO_MURMUR_HASHES_FUNCTION_COLUMN_IMPL(HashUtil::SPARK_MURMUR_32_SEED) + } else { + if (null_data == nullptr) { + for (size_t i = 0; i < s; i++) { + decimalv2_do_murmur(i, hashes[i]); + } + } else { + for (size_t i = 0; i < s; i++) { + if (null_data[i] == 0) { + decimalv2_do_murmur(i, hashes[i]); + } + } + } + } +} + template void ColumnDecimal::update_xxHash_with_value(size_t start, size_t end, uint64_t& hash, const uint8_t* __restrict null_data) const { diff --git a/be/src/vec/columns/column_decimal.h b/be/src/vec/columns/column_decimal.h index 920f6ad8438fa5..eca1bd8708ef09 100644 --- a/be/src/vec/columns/column_decimal.h +++ b/be/src/vec/columns/column_decimal.h @@ -181,12 +181,16 @@ class ColumnDecimal final : public COWHelper diff --git a/be/src/vec/columns/column_map.cpp b/be/src/vec/columns/column_map.cpp index c1c668ef07cd8b..fdd89d466c1b97 100644 --- a/be/src/vec/columns/column_map.cpp +++ b/be/src/vec/columns/column_map.cpp @@ -332,6 +332,40 @@ void ColumnMap::update_crc_with_value(size_t start, size_t end, uint32_t& hash, } } +void ColumnMap::update_murmur_with_value(size_t start, size_t end, int32_t& hash, + const uint8_t* __restrict null_data) const { + auto& offsets = get_offsets(); + if (hash == 0) { + hash = HashUtil::SPARK_MURMUR_32_SEED; + } + if (null_data) { + for (size_t i = start; i < end; ++i) { + if (null_data[i] == 0) { + size_t kv_size = offsets[i] - offsets[i - 1]; + if (kv_size == 0) { + hash = HashUtil::murmur_hash3_32(reinterpret_cast(&kv_size), + sizeof(kv_size), hash); + } else { + get_keys().update_murmur_with_value(offsets[i - 1], offsets[i], hash, nullptr); + get_values().update_murmur_with_value(offsets[i - 1], offsets[i], hash, + nullptr); + } + } + } + } else { + for (size_t i = start; i < end; ++i) { + size_t kv_size = offsets[i] - offsets[i - 1]; + if (kv_size == 0) { + hash = HashUtil::murmur_hash3_32(reinterpret_cast(&kv_size), + sizeof(kv_size), hash); + } else { + get_keys().update_murmur_with_value(offsets[i - 1], offsets[i], hash, nullptr); + get_values().update_murmur_with_value(offsets[i - 1], offsets[i], hash, nullptr); + } + } + } +} + void ColumnMap::update_hashes_with_value(uint64_t* hashes, const uint8_t* null_data) const { size_t s = size(); if (null_data) { @@ -367,6 +401,26 @@ void ColumnMap::update_crcs_with_value(uint32_t* __restrict hash, PrimitiveType } } +void ColumnMap::update_murmurs_with_value(int32_t* __restrict hash, PrimitiveType type, + int32_t rows, uint32_t offset, + const uint8_t* __restrict null_data) const { + auto s = rows; + DCHECK(s == size()); + + if (null_data) { + for (size_t i = 0; i < s; ++i) { + // every row + if (null_data[i] == 0) { + update_murmur_with_value(i, i + 1, hash[i], nullptr); + } + } + } else { + for (size_t i = 0; i < s; ++i) { + update_murmur_with_value(i, i + 1, hash[i], nullptr); + } + } +} + void ColumnMap::insert_range_from(const IColumn& src, size_t start, size_t length) { if (length == 0) { return; diff --git a/be/src/vec/columns/column_map.h b/be/src/vec/columns/column_map.h index 0e6ad3c3d91cf2..b03249038071cc 100644 --- a/be/src/vec/columns/column_map.h +++ b/be/src/vec/columns/column_map.h @@ -181,6 +181,8 @@ class ColumnMap final : public COWHelper { const uint8_t* __restrict null_data) const override; void update_crc_with_value(size_t start, size_t end, uint32_t& hash, const uint8_t* __restrict null_data) const override; + void update_murmur_with_value(size_t start, size_t end, int32_t& hash, + const uint8_t* __restrict null_data) const override; void update_hashes_with_value(uint64_t* __restrict hashes, const uint8_t* __restrict null_data = nullptr) const override; @@ -189,6 +191,10 @@ class ColumnMap final : public COWHelper { uint32_t offset = 0, const uint8_t* __restrict null_data = nullptr) const override; + void update_murmurs_with_value(int32_t* __restrict hash, PrimitiveType type, int32_t rows, + uint32_t offset = 0, + const uint8_t* __restrict null_data = nullptr) const override; + /******************** keys and values ***************/ const ColumnPtr& get_keys_ptr() const { return keys_column; } ColumnPtr& get_keys_ptr() { return keys_column; } diff --git a/be/src/vec/columns/column_nullable.cpp b/be/src/vec/columns/column_nullable.cpp index e20b87af826aba..0cdf3a6536eb76 100644 --- a/be/src/vec/columns/column_nullable.cpp +++ b/be/src/vec/columns/column_nullable.cpp @@ -85,6 +85,23 @@ void ColumnNullable::update_crc_with_value(size_t start, size_t end, uint32_t& h } } +void ColumnNullable::update_murmur_with_value(size_t start, size_t end, int32_t& hash, + const uint8_t* __restrict null_data) const { + if (!has_null()) { + nested_column->update_murmur_with_value(start, end, hash, nullptr); + } else { + const auto* __restrict real_null_data = + assert_cast(*null_map).get_data().data(); + hash = HashUtil::SPARK_MURMUR_32_SEED; + for (int i = start; i < end; ++i) { + if (real_null_data[i] != 0) { + hash = HashUtil::murmur_hash3_32_null(hash); + } + } + nested_column->update_murmur_with_value(start, end, hash, real_null_data); + } +} + void ColumnNullable::update_hash_with_value(size_t n, SipHash& hash) const { if (is_null_at(n)) { hash.update(0); @@ -113,6 +130,27 @@ void ColumnNullable::update_crcs_with_value(uint32_t* __restrict hashes, doris:: } } +void ColumnNullable::update_murmurs_with_value(int32_t* __restrict hashes, + doris::PrimitiveType type, int32_t rows, + uint32_t offset, + const uint8_t* __restrict null_data) const { + DCHECK(null_data == nullptr); + auto s = rows; + DCHECK(s == size()); + const auto* __restrict real_null_data = + assert_cast(*null_map).get_data().data(); + if (!has_null()) { + nested_column->update_murmurs_with_value(hashes, type, rows, offset, nullptr); + } else { + for (int i = 0; i < s; ++i) { + if (real_null_data[i] != 0) { + hashes[i] = HashUtil::murmur_hash3_32_null(HashUtil::SPARK_MURMUR_32_SEED); + } + } + nested_column->update_murmurs_with_value(hashes, type, rows, offset, real_null_data); + } +} + void ColumnNullable::update_hashes_with_value(uint64_t* __restrict hashes, const uint8_t* __restrict null_data) const { DCHECK(null_data == nullptr); diff --git a/be/src/vec/columns/column_nullable.h b/be/src/vec/columns/column_nullable.h index de01907650e691..77ced88c012d61 100644 --- a/be/src/vec/columns/column_nullable.h +++ b/be/src/vec/columns/column_nullable.h @@ -217,6 +217,8 @@ class ColumnNullable final : public COWHelper { const uint8_t* __restrict null_data) const override; void update_crc_with_value(size_t start, size_t end, uint32_t& hash, const uint8_t* __restrict null_data) const override; + void update_murmur_with_value(size_t start, size_t end, int32_t& hash, + const uint8_t* __restrict null_data) const override; void update_hash_with_value(size_t n, SipHash& hash) const override; void update_crcs_with_value(uint32_t* __restrict hash, PrimitiveType type, uint32_t rows, @@ -224,6 +226,9 @@ class ColumnNullable final : public COWHelper { const uint8_t* __restrict null_data) const override; void update_hashes_with_value(uint64_t* __restrict hashes, const uint8_t* __restrict null_data) const override; + void update_murmurs_with_value(int32_t* __restrict hash, PrimitiveType type, int32_t rows, + uint32_t offset, + const uint8_t* __restrict null_data) const override; MutableColumns scatter(ColumnIndex num_columns, const Selector& selector) const override { return scatter_impl(num_columns, selector); diff --git a/be/src/vec/columns/column_string.cpp b/be/src/vec/columns/column_string.cpp index 76db0e58c44580..d921b9015ec8f0 100644 --- a/be/src/vec/columns/column_string.cpp +++ b/be/src/vec/columns/column_string.cpp @@ -180,6 +180,29 @@ void ColumnString::update_crcs_with_value(uint32_t* __restrict hashes, doris::Pr } } +void ColumnString::update_murmurs_with_value(int32_t* __restrict hashes, doris::PrimitiveType type, + int32_t rows, uint32_t offset, + const uint8_t* __restrict null_data) const { + auto s = rows; + DCHECK(s == size()); + + if (null_data == nullptr) { + for (size_t i = 0; i < s; i++) { + auto data_ref = get_data_at(i); + hashes[i] = HashUtil::murmur_hash3_32(data_ref.data, data_ref.size, + HashUtil::SPARK_MURMUR_32_SEED); + } + } else { + for (size_t i = 0; i < s; i++) { + if (null_data[i] == 0) { + auto data_ref = get_data_at(i); + hashes[i] = HashUtil::murmur_hash3_32(data_ref.data, data_ref.size, + HashUtil::SPARK_MURMUR_32_SEED); + } + } + } +} + ColumnPtr ColumnString::filter(const Filter& filt, ssize_t result_size_hint) const { if (offsets.size() == 0) { return ColumnString::create(); diff --git a/be/src/vec/columns/column_string.h b/be/src/vec/columns/column_string.h index 9bf43c9c627815..d7b7461560e9c0 100644 --- a/be/src/vec/columns/column_string.h +++ b/be/src/vec/columns/column_string.h @@ -442,6 +442,25 @@ class ColumnString final : public COWHelper { } } + void update_murmur_with_value(size_t start, size_t end, int32_t& hash, + const uint8_t* __restrict null_data) const override { + if (null_data) { + for (size_t i = start; i < end; ++i) { + if (null_data[i] == 0) { + auto data_ref = get_data_at(i); + hash = HashUtil::murmur_hash3_32(data_ref.data, data_ref.size, + HashUtil::SPARK_MURMUR_32_SEED); + } + } + } else { + for (size_t i = start; i < end; ++i) { + auto data_ref = get_data_at(i); + hash = HashUtil::murmur_hash3_32(data_ref.data, data_ref.size, + HashUtil::SPARK_MURMUR_32_SEED); + } + } + } + void update_hash_with_value(size_t n, SipHash& hash) const override { size_t string_size = size_at(n); size_t offset = offset_at(n); @@ -455,6 +474,10 @@ class ColumnString final : public COWHelper { uint32_t offset, const uint8_t* __restrict null_data) const override; + void update_murmurs_with_value(int32_t* __restrict hashes, PrimitiveType type, int32_t rows, + uint32_t offset, + const uint8_t* __restrict null_data) const override; + void update_hashes_with_value(uint64_t* __restrict hashes, const uint8_t* __restrict null_data) const override { auto s = size(); diff --git a/be/src/vec/columns/column_struct.cpp b/be/src/vec/columns/column_struct.cpp index 290452a1c8b663..6b456290c06854 100644 --- a/be/src/vec/columns/column_struct.cpp +++ b/be/src/vec/columns/column_struct.cpp @@ -220,6 +220,13 @@ void ColumnStruct::update_crc_with_value(size_t start, size_t end, uint32_t& has } } +void ColumnStruct::update_murmur_with_value(size_t start, size_t end, int32_t& hash, + const uint8_t* __restrict null_data) const { + for (const auto& column : columns) { + column->update_murmur_with_value(start, end, hash, nullptr); + } +} + void ColumnStruct::update_hashes_with_value(uint64_t* __restrict hashes, const uint8_t* __restrict null_data) const { for (const auto& column : columns) { @@ -235,6 +242,14 @@ void ColumnStruct::update_crcs_with_value(uint32_t* __restrict hash, PrimitiveTy } } +void ColumnStruct::update_murmurs_with_value(int32_t* __restrict hash, PrimitiveType type, + int32_t rows, uint32_t offset, + const uint8_t* __restrict null_data) const { + for (const auto& column : columns) { + column->update_murmurs_with_value(hash, type, rows, offset, null_data); + } +} + void ColumnStruct::insert_indices_from(const IColumn& src, const uint32_t* indices_begin, const uint32_t* indices_end) { const auto& src_concrete = assert_cast(src); diff --git a/be/src/vec/columns/column_struct.h b/be/src/vec/columns/column_struct.h index d91d38006485db..a81eac001abaa9 100644 --- a/be/src/vec/columns/column_struct.h +++ b/be/src/vec/columns/column_struct.h @@ -112,6 +112,8 @@ class ColumnStruct final : public COWHelper { const uint8_t* __restrict null_data) const override; void update_crc_with_value(size_t start, size_t end, uint32_t& hash, const uint8_t* __restrict null_data) const override; + void update_murmur_with_value(size_t start, size_t end, int32_t& hash, + const uint8_t* __restrict null_data) const override; void update_hashes_with_value(uint64_t* __restrict hashes, const uint8_t* __restrict null_data = nullptr) const override; @@ -120,6 +122,10 @@ class ColumnStruct final : public COWHelper { uint32_t offset = 0, const uint8_t* __restrict null_data = nullptr) const override; + void update_murmurs_with_value(int32_t* __restrict hash, PrimitiveType type, int32_t rows, + uint32_t offset = 0, + const uint8_t* __restrict null_data = nullptr) const override; + void insert_indices_from(const IColumn& src, const uint32_t* indices_begin, const uint32_t* indices_end) override; diff --git a/be/src/vec/columns/column_vector.cpp b/be/src/vec/columns/column_vector.cpp index 97fadcb4071052..e968ee6a9c4acb 100644 --- a/be/src/vec/columns/column_vector.cpp +++ b/be/src/vec/columns/column_vector.cpp @@ -196,6 +196,41 @@ void ColumnVector::update_crcs_with_value(uint32_t* __restrict hashes, Primit } } +template +void ColumnVector::update_murmurs_with_value(int32_t* __restrict hashes, PrimitiveType type, + int32_t rows, uint32_t offset, + const uint8_t* __restrict null_data) const { + auto s = rows; + DCHECK(s == size()); + + if constexpr (!std::is_same_v) { + DO_MURMUR_HASHES_FUNCTION_COLUMN_IMPL(HashUtil::SPARK_MURMUR_32_SEED) + } else { + if (type == TYPE_DATE || type == TYPE_DATETIME) { + char buf[64]; + auto date_convert_do_crc = [&](size_t i) { + const VecDateTimeValue& date_val = (const VecDateTimeValue&)data[i]; + auto len = date_val.to_buffer(buf); + hashes[i] = HashUtil::murmur_hash3_32(buf, len, HashUtil::SPARK_MURMUR_32_SEED); + }; + + if (null_data == nullptr) { + for (size_t i = 0; i < s; i++) { + date_convert_do_crc(i); + } + } else { + for (size_t i = 0; i < s; i++) { + if (null_data[i] == 0) { + date_convert_do_crc(i); + } + } + } + } else { + DO_MURMUR_HASHES_FUNCTION_COLUMN_IMPL(HashUtil::SPARK_MURMUR_32_SEED) + } + } +} + template struct ColumnVector::less { const Self& parent; diff --git a/be/src/vec/columns/column_vector.h b/be/src/vec/columns/column_vector.h index ff9197df357a3c..3465530b87fe65 100644 --- a/be/src/vec/columns/column_vector.h +++ b/be/src/vec/columns/column_vector.h @@ -319,12 +319,35 @@ class ColumnVector final : public COWHelper> } } } + + void update_murmur_with_value(size_t start, size_t end, int32_t& hash, + const uint8_t* __restrict null_data) const override { + if (hash == 0) { + hash = HashUtil::SPARK_MURMUR_32_SEED; + } + if (null_data) { + for (size_t i = start; i < end; i++) { + if (null_data[i] == 0) { + hash = HashUtil::murmur_hash3_32(&data[i], sizeof(T), hash); + } + } + } else { + for (size_t i = start; i < end; i++) { + hash = HashUtil::murmur_hash3_32(&data[i], sizeof(T), hash); + } + } + } + void update_hash_with_value(size_t n, SipHash& hash) const override; void update_crcs_with_value(uint32_t* __restrict hashes, PrimitiveType type, uint32_t rows, uint32_t offset, const uint8_t* __restrict null_data) const override; + void update_murmurs_with_value(int32_t* __restrict hashes, PrimitiveType type, int32_t rows, + uint32_t offset, + const uint8_t* __restrict null_data) const override; + void update_hashes_with_value(uint64_t* __restrict hashes, const uint8_t* __restrict null_data) const override; diff --git a/be/src/vec/runtime/partitioner.cpp b/be/src/vec/runtime/partitioner.cpp index db40610723cdb6..81085eda85f446 100644 --- a/be/src/vec/runtime/partitioner.cpp +++ b/be/src/vec/runtime/partitioner.cpp @@ -71,6 +71,13 @@ void XXHashPartitioner::_do_hash(const ColumnPtr& column, uint64_t* column->update_hashes_with_value(result); } +template +void Murmur32HashPartitioner::_do_hash(const ColumnPtr& column, + int32_t* __restrict result, int idx) const { + column->update_murmurs_with_value(result, Base::_partition_expr_ctxs[idx]->root()->type().type, + column->size()); +} + template Status XXHashPartitioner::clone(RuntimeState* state, std::unique_ptr& partitioner) { @@ -97,6 +104,19 @@ Status Crc32HashPartitioner::clone(RuntimeState* state, return Status::OK(); } +template +Status Murmur32HashPartitioner::clone(RuntimeState* state, + std::unique_ptr& partitioner) { + auto* new_partitioner = new Murmur32HashPartitioner(Base::_partition_count); + partitioner.reset(new_partitioner); + new_partitioner->_partition_expr_ctxs.resize(Base::_partition_expr_ctxs.size()); + for (size_t i = 0; i < Base::_partition_expr_ctxs.size(); i++) { + RETURN_IF_ERROR(Base::_partition_expr_ctxs[i]->clone( + state, new_partitioner->_partition_expr_ctxs[i])); + } + return Status::OK(); +} + template class Partitioner; template class XXHashPartitioner; template class Partitioner; @@ -104,5 +124,7 @@ template class XXHashPartitioner; template class Partitioner; template class Crc32HashPartitioner; template class Crc32HashPartitioner; +template class Murmur32HashPartitioner; +template class Murmur32HashPartitioner; } // namespace doris::vectorized diff --git a/be/src/vec/runtime/partitioner.h b/be/src/vec/runtime/partitioner.h index 66ed8809d7ce7c..92d2698c1f4c80 100644 --- a/be/src/vec/runtime/partitioner.h +++ b/be/src/vec/runtime/partitioner.h @@ -112,5 +112,19 @@ class Crc32HashPartitioner final : public Partitioner { void _do_hash(const ColumnPtr& column, uint32_t* __restrict result, int idx) const override; }; +template +class Murmur32HashPartitioner final : public Partitioner { +public: + using Base = Partitioner; + Murmur32HashPartitioner(int partition_count) + : Partitioner(partition_count) {} + ~Murmur32HashPartitioner() override = default; + + Status clone(RuntimeState* state, std::unique_ptr& partitioner) override; + +private: + void _do_hash(const ColumnPtr& column, int32_t* __restrict result, int idx) const override; +}; + } // namespace vectorized } // namespace doris diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp index b6e4d0b962b843..1036097689ca07 100644 --- a/be/src/vec/sink/vdata_stream_sender.cpp +++ b/be/src/vec/sink/vdata_stream_sender.cpp @@ -328,6 +328,7 @@ VDataStreamSender::VDataStreamSender(RuntimeState* state, ObjectPool* pool, int _pool(pool), _current_channel_idx(0), _part_type(sink.output_partition.type), + _hash_type(sink.output_partition.hash_type), _dest_node_id(sink.dest_node_id), _transfer_large_data_by_brpc(config::transfer_large_data_by_brpc), _serializer(this) { @@ -338,6 +339,9 @@ VDataStreamSender::VDataStreamSender(RuntimeState* state, ObjectPool* pool, int sink.output_partition.type == TPartitionType::RANGE_PARTITIONED || sink.output_partition.type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED || sink.output_partition.type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED); + DCHECK(sink.output_partition.hash_type == THashType::CRC32 || + sink.output_partition.hash_type == THashType::XXHASH64 || + sink.output_partition.hash_type == THashType::SPARK_MURMUR32); std::map fragment_id_to_channel_index; _enable_pipeline_exec = state->enable_pipeline_exec(); @@ -383,6 +387,7 @@ VDataStreamSender::VDataStreamSender(RuntimeState* state, ObjectPool* pool, int _pool(pool), _current_channel_idx(0), _part_type(TPartitionType::UNPARTITIONED), + _hash_type(THashType::XXHASH64), _dest_node_id(dest_node_id), _serializer(this) { _cur_pb_block = &_pb_block1; @@ -415,8 +420,13 @@ Status VDataStreamSender::init(const TDataSink& tsink) { RETURN_IF_ERROR(_partitioner->init(t_stream_sink.output_partition.partition_exprs)); } else if (_part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED) { _partition_count = _channel_shared_ptrs.size(); - _partitioner.reset( - new Crc32HashPartitioner(_channel_shared_ptrs.size())); + if (_hash_type == THashType::CRC32) { + _partitioner.reset( + new Crc32HashPartitioner(_channel_shared_ptrs.size())); + } else { + _partitioner.reset(new Murmur32HashPartitioner( + _channel_shared_ptrs.size())); + } RETURN_IF_ERROR(_partitioner->init(t_stream_sink.output_partition.partition_exprs)); } else if (_part_type == TPartitionType::RANGE_PARTITIONED) { return Status::InternalError("TPartitionType::RANGE_PARTITIONED should not be used"); diff --git a/be/src/vec/sink/vdata_stream_sender.h b/be/src/vec/sink/vdata_stream_sender.h index 88a948ed05cc0e..8222eb49dcce02 100644 --- a/be/src/vec/sink/vdata_stream_sender.h +++ b/be/src/vec/sink/vdata_stream_sender.h @@ -107,6 +107,13 @@ struct ShuffleChannelIds { } }; +struct ShufflePModChannelIds { + template + HashValueType operator()(HashValueType l, int32_t r) { + return (l % r + r) % r; + } +}; + class VDataStreamSender : public DataSink { public: friend class pipeline::ExchangeSinkOperator; @@ -185,6 +192,7 @@ class VDataStreamSender : public DataSink { int _current_channel_idx; // index of current channel to send to if _random == true TPartitionType::type _part_type; + THashType::type _hash_type; // serialized batches for broadcasting; we need two so we can write // one while the other one is still being sent diff --git a/docker/thirdparties/docker-compose/hive/scripts/create_preinstalled_table.hql b/docker/thirdparties/docker-compose/hive/scripts/create_preinstalled_table.hql index 00d2a699410b92..349efc069bf583 100644 --- a/docker/thirdparties/docker-compose/hive/scripts/create_preinstalled_table.hql +++ b/docker/thirdparties/docker-compose/hive/scripts/create_preinstalled_table.hql @@ -1897,7 +1897,51 @@ OUTPUTFORMAT LOCATION '/user/doris/preinstalled_data/parquet_table/parquet_decimal90_table'; -msck repair table parquet_decimal90_table; +CREATE TABLE `parquet_test2`( + `user_id` int, + `key` varchar(20)) +PARTITIONED BY ( + `part` varchar(10)) +ROW FORMAT SERDE + 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' +STORED AS INPUTFORMAT + 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' + OUTPUTFORMAT + 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' +LOCATION + '/user/doris/preinstalled_data/parquet_table/parquet_test2' +TBLPROPERTIES ( + 'spark.sql.statistics.colStats.user_id.nullCount'='0', + 'spark.sql.statistics.colStats.key.distinctCount'='4', + 'spark.sql.statistics.colStats.user_id.distinctCount'='4', + 'spark.sql.statistics.colStats.part.version'='2', + 'transient_lastDdlTime'='1697557072', + 'spark.sql.sources.schema.partCol.0'='part', + 'spark.sql.statistics.colStats.user_id.maxLen'='4', + 'spark.sql.create.version'='3.3.2', + 'spark.sql.statistics.colStats.part.avgLen'='2', + 'spark.sql.sources.schema.numBuckets'='3', + 'spark.sql.statistics.colStats.part.maxLen'='2', + 'spark.sql.sources.schema'='{"type":"struct","fields":[{"name":"user_id","type":"integer","nullable":true,"metadata":{}},{"name":"key","type":"string","nullable":true,"metadata":{"__CHAR_VARCHAR_TYPE_STRING":"varchar(20)"}},{"name":"part","type":"string","nullable":true,"metadata":{"__CHAR_VARCHAR_TYPE_STRING":"varchar(10)"}}]}', + 'spark.sql.partitionProvider'='catalog', + 'spark.sql.sources.schema.numBucketCols'='1', + 'spark.sql.statistics.colStats.user_id.max'='31', + 'spark.sql.sources.schema.bucketCol.0'='user_id', + 'spark.sql.statistics.colStats.user_id.version'='2', + 'spark.sql.statistics.numRows'='4', + 'spark.sql.statistics.colStats.part.nullCount'='0', + 'spark.sql.statistics.colStats.part.distinctCount'='1', + 'spark.sql.statistics.colStats.user_id.min'='1', + 'spark.sql.statistics.colStats.key.version'='2', + 'spark.sql.statistics.colStats.key.avgLen'='3', + 'spark.sql.sources.schema.numPartCols'='1', + 'spark.sql.statistics.colStats.user_id.avgLen'='4', + 'spark.sql.statistics.colStats.key.maxLen'='3', + 'spark.sql.statistics.colStats.key.nullCount'='0', + 'spark.sql.sources.provider'='PARQUET', + 'spark.sql.statistics.totalSize'='2873'); + +msck repair table parquet_test2; CREATE TABLE `fixed_length_byte_array_decimal_table`( `decimal_col1` decimal(7,2), @@ -1920,7 +1964,6 @@ msck repair table fixed_length_byte_array_decimal_table; show tables; - create database stats_test; use stats_test; create table stats_test1 (id INT, value STRING) STORED AS ORC; diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveExternalDistributionInfo.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveExternalDistributionInfo.java new file mode 100644 index 00000000000000..5b15874401908a --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveExternalDistributionInfo.java @@ -0,0 +1,95 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.catalog; + +import com.google.gson.annotations.SerializedName; + +import java.util.List; +import java.util.Objects; + +/* + * Hive Hash Distribution Info + */ +public class HiveExternalDistributionInfo extends HashDistributionInfo { + @SerializedName(value = "bucketingVersion") + private final int bucketingVersion; + + public HiveExternalDistributionInfo() { + bucketingVersion = 2; + } + + public HiveExternalDistributionInfo(int bucketNum, List distributionColumns, int bucketingVersion) { + super(bucketNum, distributionColumns); + this.bucketingVersion = bucketingVersion; + } + + public HiveExternalDistributionInfo(int bucketNum, boolean autoBucket, + List distributionColumns, int bucketingVersion) { + super(bucketNum, autoBucket, distributionColumns); + this.bucketingVersion = bucketingVersion; + } + + public int getBucketingVersion() { + return bucketingVersion; + } + + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } + HiveExternalDistributionInfo that = (HiveExternalDistributionInfo) o; + return bucketNum == that.bucketNum + && sameDistributionColumns(that) + && bucketingVersion == that.bucketingVersion; + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), bucketingVersion); + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append("type: ").append(type).append("; "); + + builder.append("distribution columns: ["); + for (Column column : getDistributionColumns()) { + builder.append(column.getName()).append(","); + } + builder.append("]; "); + + if (autoBucket) { + builder.append("bucket num: auto;"); + } else { + builder.append("bucket num: ").append(bucketNum).append(";"); + } + + builder.append("bucketingVersion: ").append(bucketingVersion).append(";"); + + return builder.toString(); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/FederationBackendPolicy.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/FederationBackendPolicy.java index 73a49bb24a8c54..73b1ce9c8371f5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/FederationBackendPolicy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FederationBackendPolicy.java @@ -198,6 +198,8 @@ public void init(BeSelectionPolicy policy) throws UserException { } catch (ExecutionException e) { throw new UserException("failed to get consistent hash", e); } + /*consistentBucket = new ConsistentHash<>(Hashing.murmur3_128(), new BucketHash(), + new BackendHash(), backends, Config.virtual_node_number);*/ } public Backend getNextBe() { @@ -250,6 +252,7 @@ public Multimap computeScanRangeAssignment(List splits) t Optional chosenNode = candidateNodes.stream() .min(Comparator.comparingLong(ownerNode -> assignedWeightPerBackend.get(ownerNode))); + //ToDo(Nitin): group assignment based on the bucketId if (chosenNode.isPresent()) { Backend selectedBackend = chosenNode.get(); assignment.put(selectedBackend, split); @@ -507,4 +510,11 @@ public void funnel(Split split, PrimitiveSink primitiveSink) { primitiveSink.putLong(split.getLength()); } } + + private static class BucketHash implements Funnel { + @Override + public void funnel(Integer bucketId, PrimitiveSink primitiveSink) { + primitiveSink.putLong(bucketId); + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java index 7afb04831ce029..a9d8797612256d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java @@ -36,10 +36,13 @@ import org.apache.doris.datasource.hive.AcidInfo; import org.apache.doris.datasource.hive.AcidInfo.DeleteDeltaInfo; import org.apache.doris.datasource.hive.HMSExternalCatalog; +import org.apache.doris.datasource.hive.HMSExternalTable; +import org.apache.doris.datasource.hive.HiveBucketUtil; import org.apache.doris.datasource.hive.source.HiveScanNode; import org.apache.doris.datasource.hive.source.HiveSplit; import org.apache.doris.datasource.iceberg.source.IcebergSplit; import org.apache.doris.nereids.glue.translator.PlanTranslatorContext; +import org.apache.doris.planner.DataPartition; import org.apache.doris.planner.PlanNodeId; import org.apache.doris.qe.ConnectContext; import org.apache.doris.spi.Split; @@ -55,6 +58,7 @@ import org.apache.doris.thrift.TFileScanRangeParams; import org.apache.doris.thrift.TFileScanSlotInfo; import org.apache.doris.thrift.TFileType; +import org.apache.doris.thrift.THashType; import org.apache.doris.thrift.THdfsParams; import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TScanRange; @@ -67,6 +71,7 @@ import com.google.common.base.Joiner; import com.google.common.base.Preconditions; +import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Multimap; @@ -91,6 +96,8 @@ public abstract class FileQueryScanNode extends FileScanNode { protected Map destSlotDescByName; protected TFileScanRangeParams params; + public ArrayListMultimap bucketSeq2locations = ArrayListMultimap.create(); + @Getter protected TableSample tableSample; @@ -343,6 +350,15 @@ public void createScanRangeLocations() throws UserException { ? BrokerUtil.parseColumnsFromPath(fileSplit.getPath().toString(), pathPartitionKeys, false, isACID) : fileSplit.getPartitionValues(); + boolean isBucketedHiveTable = false; + int bucketNum = 0; + TableIf targetTable = getTargetTable(); + if (targetTable instanceof HMSExternalTable) { + isBucketedHiveTable = ((HMSExternalTable) targetTable).isBucketedTable(); + if (isBucketedHiveTable) { + bucketNum = HiveBucketUtil.getBucketNumberFromPath(fileSplit.getPath().getName()).getAsInt(); + } + } TFileRangeDesc rangeDesc = createFileRangeDesc(fileSplit, partitionValuesFromPath, pathPartitionKeys, locationType); TFileCompressType fileCompressType = getFileCompressType(fileSplit); @@ -381,6 +397,9 @@ public void createScanRangeLocations() throws UserException { fileSplit.getStart(), fileSplit.getLength(), Joiner.on("|").join(fileSplit.getHosts())); } + if (isBucketedHiveTable) { + bucketSeq2locations.put(bucketNum, curLocations); + } scanRangeLocations.add(curLocations); this.totalFileSize += fileSplit.getLength(); } @@ -532,4 +551,12 @@ protected TFileAttributes getFileAttributes() throws UserException { protected abstract TableIf getTargetTable() throws UserException; protected abstract Map getLocationProperties() throws UserException; + + public DataPartition constructInputPartitionByDistributionInfo() { + return DataPartition.RANDOM; + } + + public THashType getHashType() { + return THashType.CRC32; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java index 47b684c264b83d..a770f688aa435f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java @@ -18,11 +18,15 @@ package org.apache.doris.datasource.hive; import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.DistributionInfo; import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.HashDistributionInfo; +import org.apache.doris.catalog.HiveExternalDistributionInfo; import org.apache.doris.catalog.ListPartitionItem; import org.apache.doris.catalog.PartitionItem; import org.apache.doris.catalog.PartitionType; import org.apache.doris.catalog.PrimitiveType; +import org.apache.doris.catalog.RandomDistributionInfo; import org.apache.doris.catalog.ScalarType; import org.apache.doris.catalog.Type; import org.apache.doris.common.AnalysisException; @@ -45,6 +49,7 @@ import org.apache.doris.thrift.TTableDescriptor; import org.apache.doris.thrift.TTableType; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; @@ -59,6 +64,7 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.LongColumnStatsData; import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.StringColumnStatsData; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.logging.log4j.LogManager; @@ -68,6 +74,8 @@ import java.math.BigDecimal; import java.math.BigInteger; import java.time.LocalDate; +import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -93,6 +101,22 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI private static final String TBL_PROP_TRANSIENT_LAST_DDL_TIME = "transient_lastDdlTime"; private static final String NUM_ROWS = "numRows"; + private static final String SPARK_BUCKET = "spark.sql.sources.schema.bucketCol."; + private static final String SPARK_NUM_BUCKET = "spark.sql.sources.schema.numBuckets"; + private static final String BUCKETING_VERSION = "bucketing_version"; + + private static final Set SUPPORTED_BUCKET_PROPERTIES; + + static { + SUPPORTED_BUCKET_PROPERTIES = Sets.newHashSet(); + SUPPORTED_BUCKET_PROPERTIES.add(SPARK_BUCKET + "0"); + SUPPORTED_BUCKET_PROPERTIES.add(SPARK_BUCKET + "1"); + SUPPORTED_BUCKET_PROPERTIES.add(SPARK_BUCKET + "2"); + SUPPORTED_BUCKET_PROPERTIES.add(SPARK_BUCKET + "3"); + SUPPORTED_BUCKET_PROPERTIES.add(SPARK_BUCKET + "4"); + SUPPORTED_BUCKET_PROPERTIES.add(SPARK_NUM_BUCKET); + SUPPORTED_BUCKET_PROPERTIES.add(BUCKETING_VERSION); + } private static final String SPARK_COL_STATS = "spark.sql.statistics.colStats."; private static final String SPARK_STATS_MAX = ".max"; @@ -140,8 +164,10 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI MAP_SPARK_STATS_TO_DORIS.put(StatsType.HISTOGRAM, SPARK_STATS_HISTOGRAM); } - private volatile org.apache.hadoop.hive.metastore.api.Table remoteTable = null; - private List partitionColumns; + protected volatile org.apache.hadoop.hive.metastore.api.Table remoteTable = null; + protected List partitionColumns; + private List bucketColumns; + private boolean isSparkTable; private DLAType dlaType = DLAType.UNKNOWN; @@ -152,6 +178,8 @@ public enum DLAType { UNKNOWN, HIVE, HUDI, ICEBERG } + private DistributionInfo distributionInfo; + /** * Create hive metastore external table. * @@ -228,6 +256,14 @@ public boolean isHoodieCowTable() { return "org.apache.hudi.hadoop.HoodieParquetInputFormat".equals(inputFormatName); } + public boolean isSparkTable() { + return isSparkTable; + } + + public boolean isBucketedTable() { + return bucketColumns != null && !bucketColumns.isEmpty() && isSparkTable; + } + /** * Some data lakes (such as Hudi) will synchronize their partition information to HMS, * then we can quickly obtain the partition information of the table from HMS. @@ -464,9 +500,71 @@ public List initSchema() { columns = getHiveSchema(); } initPartitionColumns(columns); + initBucketingColumns(columns); return columns; } + private void initBucketingColumns(List columns) { + List bucketCols = new ArrayList<>(5); + int numBuckets = getBucketColumns(bucketCols); + if (bucketCols.isEmpty() || !isSparkTable) { + bucketColumns = ImmutableList.of(); + distributionInfo = new RandomDistributionInfo(1, true); + return; + } + + int bucketingVersion = Integer.valueOf(remoteTable.getParameters().getOrDefault(BUCKETING_VERSION, "2")); + ImmutableList.Builder bucketColBuilder = ImmutableList.builder(); + for (String colName : bucketCols) { + // do not use "getColumn()", which will cause dead loop + for (Column column : columns) { + if (colName.equalsIgnoreCase(column.getName())) { + // For partition/bucket column, if it is string type, change it to varchar(65535) + // to be same as doris managed table. + // This is to avoid some unexpected behavior such as different partition pruning result + // between doris managed table and external table. + if (column.getType().getPrimitiveType() == PrimitiveType.STRING) { + column.setType(ScalarType.createVarcharType(ScalarType.MAX_VARCHAR_LENGTH)); + } + bucketColBuilder.add(column); + break; + } + } + } + + bucketColumns = bucketColBuilder.build(); + distributionInfo = new HiveExternalDistributionInfo(numBuckets, bucketColumns, bucketingVersion); + LOG.debug("get {} bucket columns for table: {}", bucketColumns.size(), name); + } + + private int getBucketColumns(List bucketCols) { + StorageDescriptor descriptor = remoteTable.getSd(); + int numBuckets = -1; + if (descriptor.isSetBucketCols() && !descriptor.getBucketCols().isEmpty()) { + /* Hive Bucketed Table */ + bucketCols.addAll(descriptor.getBucketCols()); + numBuckets = descriptor.getNumBuckets(); + } else if (remoteTable.isSetParameters() + && !Collections.disjoint(SUPPORTED_BUCKET_PROPERTIES, remoteTable.getParameters().keySet())) { + Map parameters = remoteTable.getParameters(); + for (Map.Entry param : parameters.entrySet()) { + if (param.getKey().startsWith(SPARK_BUCKET)) { + int index = Integer.valueOf(param.getKey() + .substring(param.getKey().lastIndexOf(".") + 1)); + bucketCols.add(index, param.getValue()); + } else if (param.getKey().equals(SPARK_NUM_BUCKET)) { + numBuckets = Integer.valueOf(param.getValue()); + } + } + + if (numBuckets > 0) { + isSparkTable = true; + } + } + + return numBuckets; + } + private List getIcebergSchema() { return IcebergUtils.getSchema(catalog, dbName, name); } @@ -575,6 +673,19 @@ public Optional getColumnStatistic(String colName) { return Optional.empty(); } + public DistributionInfo getDefaultDistributionInfo() { + makeSureInitialized(); + if (distributionInfo != null) { + return distributionInfo; + } + + return new RandomDistributionInfo(1, true); + } + + public Map getTableParameters() { + return remoteTable.getParameters(); + } + private Optional getHiveColumnStats(String colName) { List tableStats = getHiveTableColumnStats(Lists.newArrayList(colName)); if (tableStats == null || tableStats.isEmpty()) { @@ -751,14 +862,23 @@ public long getDataSize(boolean singleReplica) { @Override public boolean isDistributionColumn(String columnName) { - return getRemoteTable().getSd().getBucketCols().stream().map(String::toLowerCase) - .collect(Collectors.toSet()).contains(columnName.toLowerCase()); + Set distributeColumns = getDistributionColumnNames() + .stream().map(String::toLowerCase).collect(Collectors.toSet()); + return distributeColumns.contains(columnName.toLowerCase()); } @Override public Set getDistributionColumnNames() { - return getRemoteTable().getSd().getBucketCols().stream().map(String::toLowerCase) - .collect(Collectors.toSet()); + Set distributionColumnNames = Sets.newHashSet(); + if (distributionInfo instanceof RandomDistributionInfo) { + return distributionColumnNames; + } + HashDistributionInfo hashDistributionInfo = (HashDistributionInfo) distributionInfo; + List distColumn = hashDistributionInfo.getDistributionColumns(); + for (Column column : distColumn) { + distributionColumnNames.add(column.getName().toLowerCase()); + } + return distributionColumnNames; } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveBucketUtil.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveBucketUtil.java index 7435a3d58dc911..ce0d9cfba98bf7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveBucketUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveBucketUtil.java @@ -96,6 +96,9 @@ private static PrimitiveTypeInfo convertToHiveColType(PrimitiveType dorisType) t Pattern.compile("bucket_(\\d+)(_\\d+)?$"); private static final Iterable BUCKET_PATTERNS = ImmutableList.of( + // spark/parquet pattern + // format: f"part-[paritionId]-[tid]-[txnId]-[jobId]-[taskAttemptId]-[fileCount].c000.snappy.parquet" + Pattern.compile("part-\\d{5}-\\w{8}-\\w{4}-\\w{4}-\\w{4}-\\w{12}_(\\d{5})(?:[-_.].*)?"), // legacy Presto naming pattern (current version matches Hive) Pattern.compile("\\d{8}_\\d{6}_\\d{5}_[a-z0-9]{5}_bucket-(\\d+)(?:[-_.].*)?"), // Hive naming pattern per `org.apache.hadoop.hive.ql.exec.Utilities#getBucketIdFromFile()` @@ -398,7 +401,7 @@ private static int hashCodeV2(Object o, ObjectInspector objIns, ByteBuffer byteB throw new DdlException("Unknown type: " + objIns.getTypeName()); } - private static OptionalInt getBucketNumberFromPath(String name) { + public static OptionalInt getBucketNumberFromPath(String name) { for (Pattern pattern : BUCKET_PATTERNS) { Matcher matcher = pattern.matcher(name); if (matcher.matches()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java index c901ee654ae82d..4264b8c50f4e85 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java @@ -17,10 +17,15 @@ package org.apache.doris.datasource.hive.source; +import org.apache.doris.analysis.Expr; import org.apache.doris.analysis.FunctionCallExpr; +import org.apache.doris.analysis.SlotRef; import org.apache.doris.analysis.TupleDescriptor; import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.DistributionInfo; import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.HashDistributionInfo; +import org.apache.doris.catalog.HiveExternalDistributionInfo; import org.apache.doris.catalog.ListPartitionItem; import org.apache.doris.catalog.PartitionItem; import org.apache.doris.catalog.TableIf; @@ -42,6 +47,7 @@ import org.apache.doris.datasource.hive.HiveTransaction; import org.apache.doris.datasource.hive.source.HiveSplit.HiveSplitCreator; import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan.SelectedPartitions; +import org.apache.doris.planner.DataPartition; import org.apache.doris.planner.ListPartitionPrunerV2; import org.apache.doris.planner.PlanNodeId; import org.apache.doris.qe.ConnectContext; @@ -52,6 +58,7 @@ import org.apache.doris.thrift.TFileFormatType; import org.apache.doris.thrift.TFileTextScanRangeParams; import org.apache.doris.thrift.TFileType; +import org.apache.doris.thrift.THashType; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; @@ -415,4 +422,37 @@ protected TFileCompressType getFileCompressType(FileSplit fileSplit) throws User } return compressType; } + + @Override + public DataPartition constructInputPartitionByDistributionInfo() { + if (hmsTable.isBucketedTable()) { + DistributionInfo distributionInfo = hmsTable.getDefaultDistributionInfo(); + if (!(distributionInfo instanceof HashDistributionInfo)) { + return DataPartition.RANDOM; + } + List distributeColumns = ((HiveExternalDistributionInfo) distributionInfo).getDistributionColumns(); + List dataDistributeExprs = Lists.newArrayList(); + for (Column column : distributeColumns) { + SlotRef slotRef = new SlotRef(desc.getRef().getName(), column.getName()); + dataDistributeExprs.add(slotRef); + } + return DataPartition.hashPartitioned(dataDistributeExprs, THashType.SPARK_MURMUR32); + } + + return DataPartition.RANDOM; + } + + public HMSExternalTable getHiveTable() { + return hmsTable; + } + + @Override + public THashType getHashType() { + if (hmsTable.isBucketedTable() + && hmsTable.getDefaultDistributionInfo() instanceof HashDistributionInfo) { + return THashType.SPARK_MURMUR32; + } + + return THashType.CRC32; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index c64965080fbe3c..a43903f9481721 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -48,6 +48,7 @@ import org.apache.doris.common.UserException; import org.apache.doris.common.util.Util; import org.apache.doris.datasource.ExternalTable; +import org.apache.doris.datasource.FileQueryScanNode; import org.apache.doris.datasource.es.source.EsScanNode; import org.apache.doris.datasource.hive.HMSExternalTable; import org.apache.doris.datasource.hive.source.HiveScanNode; @@ -184,6 +185,7 @@ import org.apache.doris.statistics.StatisticConstants; import org.apache.doris.tablefunction.TableValuedFunctionIf; import org.apache.doris.thrift.TFetchOption; +import org.apache.doris.thrift.THashType; import org.apache.doris.thrift.TPartitionType; import org.apache.doris.thrift.TPushAggOp; import org.apache.doris.thrift.TResultSinkType; @@ -465,7 +467,8 @@ public PlanFragment visitPhysicalFileScan(PhysicalFileScan fileScan, PlanTransla TupleDescriptor tupleDescriptor = generateTupleDesc(slots, table, context); // TODO(cmy): determine the needCheckColumnPriv param - ScanNode scanNode; + FileQueryScanNode scanNode; + DataPartition dataPartition = DataPartition.RANDOM; if (table instanceof HMSExternalTable) { switch (((HMSExternalTable) table).getDlaType()) { case HUDI: @@ -515,8 +518,14 @@ public PlanFragment visitPhysicalFileScan(PhysicalFileScan fileScan, PlanTransla ) ); Utils.execWithUncheckedException(scanNode::finalizeForNereids); + if (fileScan.getDistributionSpec() instanceof DistributionSpecHash) { + DistributionSpecHash distributionSpecHash = (DistributionSpecHash) fileScan.getDistributionSpec(); + List partitionExprs = distributionSpecHash.getOrderedShuffledColumns().stream() + .map(context::findSlotRef).collect(Collectors.toList()); + dataPartition = new DataPartition(TPartitionType.HASH_PARTITIONED, + partitionExprs, scanNode.getHashType()); + } // Create PlanFragment - DataPartition dataPartition = DataPartition.RANDOM; PlanFragment planFragment = createPlanFragment(scanNode, dataPartition, fileScan); context.addPlanFragment(planFragment); updateLegacyPlanIdToPhysicalPlan(planFragment.getPlanRoot(), fileScan); @@ -2419,7 +2428,7 @@ private void addPlanRoot(PlanFragment fragment, PlanNode planNode, AbstractPlan } private DataPartition toDataPartition(DistributionSpec distributionSpec, - List childOutputIds, PlanTranslatorContext context) { + List childOutputIds, PlanTranslatorContext context) { if (distributionSpec instanceof DistributionSpecAny || distributionSpec instanceof DistributionSpecStorageAny || distributionSpec instanceof DistributionSpecExecutionAny) { @@ -2446,8 +2455,20 @@ private DataPartition toDataPartition(DistributionSpec distributionSpec, } } TPartitionType partitionType; + THashType hashType = THashType.XXHASH64; switch (distributionSpecHash.getShuffleType()) { case STORAGE_BUCKETED: + switch (distributionSpecHash.getShuffleFunction()) { + case STORAGE_BUCKET_SPARK_MURMUR32: + hashType = THashType.SPARK_MURMUR32; + break; + case STORAGE_BUCKET_CRC32: + hashType = THashType.CRC32; + break; + case STORAGE_BUCKET_XXHASH64: + default: + break; + } partitionType = TPartitionType.BUCKET_SHFFULE_HASH_PARTITIONED; break; case EXECUTION_BUCKETED: @@ -2458,7 +2479,7 @@ private DataPartition toDataPartition(DistributionSpec distributionSpec, throw new RuntimeException("Do not support shuffle type: " + distributionSpecHash.getShuffleType()); } - return new DataPartition(partitionType, partitionExprs); + return new DataPartition(partitionType, partitionExprs, hashType); } else if (distributionSpec instanceof DistributionSpecTabletIdShuffle) { return DataPartition.TABLET_ID; } else { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java index 3756c1bcfe3518..88866cbda323cf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java @@ -141,7 +141,7 @@ public PhysicalProperties visitPhysicalEsScan(PhysicalEsScan esScan, PlanContext @Override public PhysicalProperties visitPhysicalFileScan(PhysicalFileScan fileScan, PlanContext context) { - return PhysicalProperties.STORAGE_ANY; + return new PhysicalProperties(fileScan.getDistributionSpec()); } /** @@ -285,7 +285,7 @@ public PhysicalProperties visitPhysicalHashJoin( // retain left shuffle type, since coordinator use left most node to schedule fragment // forbid colocate join, since right table already shuffle return new PhysicalProperties(rightHashSpec.withShuffleTypeAndForbidColocateJoin( - leftHashSpec.getShuffleType())); + leftHashSpec.getShuffleType(), leftHashSpec.getShuffleFunction())); } case FULL_OUTER_JOIN: return PhysicalProperties.createAnyFromHash(leftHashSpec); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java index a70d72c565c707..a4c86552fcc6da 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java @@ -551,7 +551,7 @@ private PhysicalProperties calAnotherSideRequired(ShuffleType shuffleType, notShuffleSideRequired, shuffleSideRequired); return new PhysicalProperties(new DistributionSpecHash(shuffleSideIds, shuffleType, shuffleSideOutput.getTableId(), shuffleSideOutput.getSelectedIndexId(), - shuffleSideOutput.getPartitionIds())); + shuffleSideOutput.getPartitionIds(), notShuffleSideOutput.getShuffleFunction())); } private void updateChildEnforceAndCost(int index, PhysicalProperties targetProperties) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpecHash.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpecHash.java index 6ab8e054f8aaef..5bf1a7f52472bc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpecHash.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpecHash.java @@ -54,6 +54,8 @@ public class DistributionSpecHash extends DistributionSpec { private final Set partitionIds; private final long selectedIndexId; + private final StorageBucketHashType storageBucketHashType; + /** * Use for no need set table related attributes. */ @@ -70,10 +72,19 @@ public DistributionSpecHash(List orderedShuffledColumns, ShuffleType shu } /** - * Normal constructor. + * Use when no need set shuffle hash function */ public DistributionSpecHash(List orderedShuffledColumns, ShuffleType shuffleType, long tableId, long selectedIndexId, Set partitionIds) { + this(orderedShuffledColumns, shuffleType, tableId, selectedIndexId, partitionIds, + StorageBucketHashType.STORAGE_BUCKET_CRC32); + } + + /** + * Normal constructor. + */ + public DistributionSpecHash(List orderedShuffledColumns, ShuffleType shuffleType, + long tableId, long selectedIndexId, Set partitionIds, StorageBucketHashType storageBucketHashType) { this.orderedShuffledColumns = ImmutableList.copyOf( Objects.requireNonNull(orderedShuffledColumns, "orderedShuffledColumns should not null")); this.shuffleType = Objects.requireNonNull(shuffleType, "shuffleType should not null"); @@ -92,6 +103,7 @@ public DistributionSpecHash(List orderedShuffledColumns, ShuffleType shu } this.equivalenceExprIds = equivalenceExprIdsBuilder.build(); this.exprIdToEquivalenceSet = exprIdToEquivalenceSetBuilder.buildKeepingLast(); + this.storageBucketHashType = storageBucketHashType; } /** @@ -101,7 +113,7 @@ public DistributionSpecHash(List orderedShuffledColumns, ShuffleType shu long tableId, Set partitionIds, List> equivalenceExprIds, Map exprIdToEquivalenceSet) { this(orderedShuffledColumns, shuffleType, tableId, -1L, partitionIds, - equivalenceExprIds, exprIdToEquivalenceSet); + equivalenceExprIds, exprIdToEquivalenceSet, StorageBucketHashType.STORAGE_BUCKET_XXHASH64); } /** @@ -109,7 +121,7 @@ public DistributionSpecHash(List orderedShuffledColumns, ShuffleType shu */ public DistributionSpecHash(List orderedShuffledColumns, ShuffleType shuffleType, long tableId, long selectedIndexId, Set partitionIds, List> equivalenceExprIds, - Map exprIdToEquivalenceSet) { + Map exprIdToEquivalenceSet, StorageBucketHashType storageBucketHashType) { this.orderedShuffledColumns = ImmutableList.copyOf(Objects.requireNonNull(orderedShuffledColumns, "orderedShuffledColumns should not null")); this.shuffleType = Objects.requireNonNull(shuffleType, "shuffleType should not null"); @@ -121,6 +133,7 @@ public DistributionSpecHash(List orderedShuffledColumns, ShuffleType shu Objects.requireNonNull(equivalenceExprIds, "equivalenceExprIds should not null")); this.exprIdToEquivalenceSet = ImmutableMap.copyOf( Objects.requireNonNull(exprIdToEquivalenceSet, "exprIdToEquivalenceSet should not null")); + this.storageBucketHashType = storageBucketHashType; } static DistributionSpecHash merge(DistributionSpecHash left, DistributionSpecHash right, ShuffleType shuffleType) { @@ -140,7 +153,7 @@ static DistributionSpecHash merge(DistributionSpecHash left, DistributionSpecHas exprIdToEquivalenceSet.putAll(right.getExprIdToEquivalenceSet()); return new DistributionSpecHash(orderedShuffledColumns, shuffleType, left.getTableId(), left.getSelectedIndexId(), left.getPartitionIds(), equivalenceExprIds.build(), - exprIdToEquivalenceSet.buildKeepingLast()); + exprIdToEquivalenceSet.buildKeepingLast(), left.getShuffleFunction()); } static DistributionSpecHash merge(DistributionSpecHash left, DistributionSpecHash right) { @@ -175,6 +188,10 @@ public Map getExprIdToEquivalenceSet() { return exprIdToEquivalenceSet; } + public StorageBucketHashType getShuffleFunction() { + return storageBucketHashType; + } + public Set getEquivalenceExprIdsOf(ExprId exprId) { if (exprIdToEquivalenceSet.containsKey(exprId)) { return equivalenceExprIds.get(exprIdToEquivalenceSet.get(exprId)); @@ -227,14 +244,15 @@ private boolean equalsSatisfy(List required) { return true; } - public DistributionSpecHash withShuffleType(ShuffleType shuffleType) { + public DistributionSpecHash withShuffleType(ShuffleType shuffleType, StorageBucketHashType storageBucketHashType) { return new DistributionSpecHash(orderedShuffledColumns, shuffleType, tableId, selectedIndexId, partitionIds, - equivalenceExprIds, exprIdToEquivalenceSet); + equivalenceExprIds, exprIdToEquivalenceSet, storageBucketHashType); } - public DistributionSpecHash withShuffleTypeAndForbidColocateJoin(ShuffleType shuffleType) { + public DistributionSpecHash withShuffleTypeAndForbidColocateJoin(ShuffleType shuffleType, + StorageBucketHashType storageBucketHashType) { return new DistributionSpecHash(orderedShuffledColumns, shuffleType, -1, -1, partitionIds, - equivalenceExprIds, exprIdToEquivalenceSet); + equivalenceExprIds, exprIdToEquivalenceSet, storageBucketHashType); } /** @@ -272,7 +290,7 @@ public DistributionSpec project(Map projections, } } return new DistributionSpecHash(orderedShuffledColumns, shuffleType, tableId, selectedIndexId, partitionIds, - equivalenceExprIds, exprIdToEquivalenceSet); + equivalenceExprIds, exprIdToEquivalenceSet, storageBucketHashType); } @Override @@ -281,12 +299,13 @@ public boolean equals(Object o) { return false; } DistributionSpecHash that = (DistributionSpecHash) o; - return shuffleType == that.shuffleType && orderedShuffledColumns.equals(that.orderedShuffledColumns); + return shuffleType == that.shuffleType && storageBucketHashType == that.storageBucketHashType + && orderedShuffledColumns.equals(that.orderedShuffledColumns); } @Override public int hashCode() { - return Objects.hash(shuffleType, orderedShuffledColumns); + return Objects.hash(shuffleType, storageBucketHashType, orderedShuffledColumns); } @Override @@ -315,4 +334,13 @@ public enum ShuffleType { STORAGE_BUCKETED, } + /** + * Enums for concrete shuffle functions. + */ + public enum StorageBucketHashType { + STORAGE_BUCKET_CRC32, + STORAGE_BUCKET_XXHASH64, + STORAGE_BUCKET_SPARK_MURMUR32 + } + } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/EnforceMissingPropertiesHelper.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/EnforceMissingPropertiesHelper.java index b08db2aeba2b15..c7c40fe1e98189 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/EnforceMissingPropertiesHelper.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/EnforceMissingPropertiesHelper.java @@ -26,6 +26,7 @@ import org.apache.doris.nereids.metrics.event.EnforcerEvent; import org.apache.doris.nereids.minidump.NereidsTracer; import org.apache.doris.nereids.properties.DistributionSpecHash.ShuffleType; +import org.apache.doris.nereids.properties.DistributionSpecHash.StorageBucketHashType; import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan; import org.apache.doris.qe.ConnectContext; @@ -117,7 +118,8 @@ private PhysicalProperties enforceDistribution(PhysicalProperties oldOutputPrope DistributionSpec requiredDistributionSpec = required.getDistributionSpec(); if (requiredDistributionSpec instanceof DistributionSpecHash) { DistributionSpecHash requiredDistributionSpecHash = (DistributionSpecHash) requiredDistributionSpec; - outputDistributionSpec = requiredDistributionSpecHash.withShuffleType(ShuffleType.EXECUTION_BUCKETED); + outputDistributionSpec = requiredDistributionSpecHash.withShuffleType(ShuffleType.EXECUTION_BUCKETED, + StorageBucketHashType.STORAGE_BUCKET_XXHASH64); } else { outputDistributionSpec = requiredDistributionSpec; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalFileScanToPhysicalFileScan.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalFileScanToPhysicalFileScan.java index d86e1d1667e18a..682b5eb2659b19 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalFileScanToPhysicalFileScan.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalFileScanToPhysicalFileScan.java @@ -17,11 +17,27 @@ package org.apache.doris.nereids.rules.implementation; -import org.apache.doris.nereids.properties.DistributionSpecAny; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.DistributionInfo; +import org.apache.doris.catalog.HashDistributionInfo; +import org.apache.doris.catalog.TableIf; +import org.apache.doris.datasource.hive.HMSExternalTable; +import org.apache.doris.nereids.properties.DistributionSpec; +import org.apache.doris.nereids.properties.DistributionSpecHash; +import org.apache.doris.nereids.properties.DistributionSpecHash.StorageBucketHashType; +import org.apache.doris.nereids.properties.DistributionSpecStorageAny; import org.apache.doris.nereids.rules.Rule; import org.apache.doris.nereids.rules.RuleType; +import org.apache.doris.nereids.trees.expressions.ExprId; +import org.apache.doris.nereids.trees.expressions.Slot; +import org.apache.doris.nereids.trees.expressions.SlotReference; +import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan; import org.apache.doris.nereids.trees.plans.physical.PhysicalFileScan; +import com.google.common.collect.Lists; + +import java.util.Collections; +import java.util.List; import java.util.Optional; /** @@ -35,7 +51,7 @@ public Rule build() { fileScan.getRelationId(), fileScan.getTable(), fileScan.getQualifier(), - DistributionSpecAny.INSTANCE, + convertDistribution(fileScan), Optional.empty(), fileScan.getLogicalProperties(), fileScan.getConjuncts(), @@ -43,4 +59,34 @@ public Rule build() { fileScan.getTableSample()) ).toRule(RuleType.LOGICAL_FILE_SCAN_TO_PHYSICAL_FILE_SCAN_RULE); } + + private DistributionSpec convertDistribution(LogicalFileScan fileScan) { + TableIf table = fileScan.getTable(); + if (!(table instanceof HMSExternalTable)) { + return DistributionSpecStorageAny.INSTANCE; + } + + HMSExternalTable hmsExternalTable = (HMSExternalTable) table; + DistributionInfo distributionInfo = hmsExternalTable.getDefaultDistributionInfo(); + if (distributionInfo instanceof HashDistributionInfo) { + HashDistributionInfo hashDistributionInfo = (HashDistributionInfo) distributionInfo; + List output = fileScan.getOutput(); + List hashColumns = Lists.newArrayList(); + for (Slot slot : output) { + for (Column column : hashDistributionInfo.getDistributionColumns()) { + if (((SlotReference) slot).getColumn().get().equals(column)) { + hashColumns.add(slot.getExprId()); + } + } + } + StorageBucketHashType function = StorageBucketHashType.STORAGE_BUCKET_CRC32; + if (hmsExternalTable.isBucketedTable()) { + function = StorageBucketHashType.STORAGE_BUCKET_SPARK_MURMUR32; + } + return new DistributionSpecHash(hashColumns, DistributionSpecHash.ShuffleType.NATURAL, + fileScan.getTable().getId(), -1, Collections.emptySet(), function); + } + + return DistributionSpecStorageAny.INSTANCE; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/DataPartition.java b/fe/fe-core/src/main/java/org/apache/doris/planner/DataPartition.java index 9c5c375a35c06d..339de61cf9c9fb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/DataPartition.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DataPartition.java @@ -26,6 +26,7 @@ import org.apache.doris.common.AnalysisException; import org.apache.doris.thrift.TDataPartition; import org.apache.doris.thrift.TExplainLevel; +import org.apache.doris.thrift.THashType; import org.apache.doris.thrift.TPartitionType; import com.google.common.base.Joiner; @@ -50,10 +51,16 @@ public class DataPartition { public static final DataPartition TABLET_ID = new DataPartition(TPartitionType.TABLET_SINK_SHUFFLE_PARTITIONED); private final TPartitionType type; + private final THashType hashType; + // for hash partition: exprs used to compute hash value private ImmutableList partitionExprs; public DataPartition(TPartitionType type, List exprs) { + this(type, exprs, THashType.CRC32); + } + + public DataPartition(TPartitionType type, List exprs, THashType hashType) { Preconditions.checkNotNull(exprs); Preconditions.checkState(!exprs.isEmpty()); Preconditions.checkState(type == TPartitionType.HASH_PARTITIONED @@ -61,6 +68,7 @@ public DataPartition(TPartitionType type, List exprs) { || type == TPartitionType.BUCKET_SHFFULE_HASH_PARTITIONED); this.type = type; this.partitionExprs = ImmutableList.copyOf(exprs); + this.hashType = hashType; } public DataPartition(TPartitionType type) { @@ -69,10 +77,15 @@ public DataPartition(TPartitionType type) { || type == TPartitionType.TABLET_SINK_SHUFFLE_PARTITIONED); this.type = type; this.partitionExprs = ImmutableList.of(); + this.hashType = THashType.CRC32; + } + + public static DataPartition hashPartitioned(List exprs, THashType hashType) { + return new DataPartition(TPartitionType.HASH_PARTITIONED, exprs, hashType); } public static DataPartition hashPartitioned(List exprs) { - return new DataPartition(TPartitionType.HASH_PARTITIONED, exprs); + return new DataPartition(TPartitionType.HASH_PARTITIONED, exprs, THashType.CRC32); } public void substitute(ExprSubstitutionMap smap, Analyzer analyzer) throws AnalysisException { @@ -96,17 +109,25 @@ public List getPartitionExprs() { return partitionExprs; } + public THashType getHashType() { + return hashType; + } + public TDataPartition toThrift() { TDataPartition result = new TDataPartition(type); if (partitionExprs != null) { result.setPartitionExprs(Expr.treesToThrift(partitionExprs)); } + result.setHashType(hashType); return result; } public String getExplainString(TExplainLevel explainLevel) { StringBuilder str = new StringBuilder(); str.append(type.toString()); + if (type == TPartitionType.BUCKET_SHFFULE_HASH_PARTITIONED) { + str.append("(").append(hashType.toString()).append(")"); + } if (explainLevel == TExplainLevel.BRIEF) { return str.toString(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java index e1a8d36424eebe..4cc9608088cb81 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java @@ -33,17 +33,22 @@ import org.apache.doris.catalog.DistributionInfo; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.HashDistributionInfo; +import org.apache.doris.catalog.HiveExternalDistributionInfo; import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.Table; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Pair; import org.apache.doris.common.UserException; +import org.apache.doris.datasource.hive.HMSExternalTable; +import org.apache.doris.datasource.hive.source.HiveScanNode; import org.apache.doris.qe.ConnectContext; +import org.apache.doris.thrift.THashType; import org.apache.doris.thrift.TPartitionType; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import org.apache.hive.common.util.Ref; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -285,6 +290,10 @@ private PlanFragment createScanFragment(PlanNode node) throws UserException { OlapScanNode olapScanNode = (OlapScanNode) node; return new PlanFragment(ctx.getNextFragmentId(), node, olapScanNode.constructInputPartitionByDistributionInfo(), DataPartition.RANDOM); + } else if (node instanceof HiveScanNode) { + HiveScanNode hiveScanNode = (HiveScanNode) node; + return new PlanFragment(ctx.getNextFragmentId(), node, + hiveScanNode.constructInputPartitionByDistributionInfo(), DataPartition.RANDOM); } else { // other scan nodes are random partitioned: es, broker return new PlanFragment(ctx.getNextFragmentId(), node, DataPartition.RANDOM); @@ -327,10 +336,12 @@ private PlanFragment createHashJoinFragment( // bucket shuffle join is better than broadcast and shuffle join // it can reduce the network cost of join, so doris chose it first List rhsPartitionExprs = Lists.newArrayList(); - if (canBucketShuffleJoin(node, leftChildFragment, rhsPartitionExprs)) { + Ref hashType = Ref.from(THashType.CRC32); + if (canBucketShuffleJoin(node, leftChildFragment, rhsPartitionExprs, hashType)) { node.setDistributionMode(HashJoinNode.DistributionMode.BUCKET_SHUFFLE); DataPartition rhsJoinPartition = - new DataPartition(TPartitionType.BUCKET_SHFFULE_HASH_PARTITIONED, rhsPartitionExprs); + new DataPartition(TPartitionType.BUCKET_SHFFULE_HASH_PARTITIONED, + rhsPartitionExprs, hashType.value); ExchangeNode rhsExchange = new ExchangeNode(ctx.getNextNodeId(), rightChildFragment.getPlanRoot(), false); rhsExchange.setNumInstances(rightChildFragment.getPlanRoot().getNumInstances()); @@ -600,7 +611,7 @@ private boolean dataDistributionMatchEqPredicate(List eqJoinPre } private boolean canBucketShuffleJoin(HashJoinNode node, PlanFragment leftChildFragment, - List rhsHashExprs) { + List rhsHashExprs, Ref hashType) { if (node.getJoinOp() == JoinOperator.NULL_AWARE_LEFT_ANTI_JOIN) { return false; } @@ -616,7 +627,9 @@ private boolean canBucketShuffleJoin(HashJoinNode node, PlanFragment leftChildFr PlanNode leftRoot = leftChildFragment.getPlanRoot(); // 1.leftRoot be OlapScanNode if (leftRoot instanceof OlapScanNode) { - return canBucketShuffleJoin(node, leftRoot, rhsHashExprs); + return canBucketShuffleJoin(node, (OlapScanNode) leftRoot, rhsHashExprs); + } else if (leftRoot instanceof HiveScanNode) { + return canBucketShuffleJoin(node, (HiveScanNode) leftRoot, rhsHashExprs, hashType); } // 2.leftRoot be hashjoin node @@ -625,17 +638,83 @@ private boolean canBucketShuffleJoin(HashJoinNode node, PlanFragment leftChildFr leftRoot = leftRoot.getChild(0); } if (leftRoot instanceof OlapScanNode) { - return canBucketShuffleJoin(node, leftRoot, rhsHashExprs); + return canBucketShuffleJoin(node, (OlapScanNode) leftRoot, rhsHashExprs); + } else if (leftRoot instanceof HiveScanNode) { + return canBucketShuffleJoin(node, (HiveScanNode) leftRoot, rhsHashExprs, hashType); } } return false; } + private boolean canBucketShuffleJoin(HashJoinNode node, HiveScanNode leftScanNode, + List rhsJoinExprs, Ref hashType) { + HMSExternalTable leftTable = leftScanNode.getHiveTable(); + + DistributionInfo leftDistribution = leftTable.getDefaultDistributionInfo(); + if (leftDistribution == null || !(leftDistribution instanceof HiveExternalDistributionInfo)) { + return false; + } + + HiveExternalDistributionInfo hiveDistributionInfo = (HiveExternalDistributionInfo) leftDistribution; + + List leftDistributeColumns = hiveDistributionInfo.getDistributionColumns(); + List leftDistributeColumnNames = leftDistributeColumns.stream() + .map(col -> leftTable.getName() + "." + col.getName().toLowerCase()).collect(Collectors.toList()); + + List leftJoinColumnNames = new ArrayList<>(); + List rightExprs = new ArrayList<>(); + List eqJoinConjuncts = node.getEqJoinConjuncts(); + + for (BinaryPredicate eqJoinPredicate : eqJoinConjuncts) { + Expr lhsJoinExpr = eqJoinPredicate.getChild(0); + Expr rhsJoinExpr = eqJoinPredicate.getChild(1); + if (lhsJoinExpr.unwrapSlotRef() == null || rhsJoinExpr.unwrapSlotRef() == null) { + continue; + } + + SlotRef leftSlot = node.getChild(0).findSrcSlotRef(lhsJoinExpr.unwrapSlotRef()); + if (leftSlot.getTable() instanceof HMSExternalTable + && leftScanNode.desc.getSlots().contains(leftSlot.getDesc())) { + // table name in SlotRef is not the really name. `select * from test as t` + // table name in SlotRef is `t`, but here we need is `test`. + leftJoinColumnNames.add(leftSlot.getTable().getName() + "." + + leftSlot.getColumnName().toLowerCase()); + rightExprs.add(rhsJoinExpr); + } + } + + //2 the join columns should contains all left table distribute columns to enable bucket shuffle join + for (int i = 0; i < leftDistributeColumnNames.size(); i++) { + String distributeColumnName = leftDistributeColumnNames.get(i); + boolean findRhsExprs = false; + // check the join column name is same as distribute column name and + // check the rhs join expr type is same as distribute column + for (int j = 0; j < leftJoinColumnNames.size(); j++) { + if (leftJoinColumnNames.get(j).equals(distributeColumnName)) { + // varchar and string type don't need to check the length property + if ((rightExprs.get(j).getType().isVarcharOrStringType() + && leftDistributeColumns.get(i).getType().isVarcharOrStringType()) + || (rightExprs.get(j).getType().equals(leftDistributeColumns.get(i).getType()))) { + rhsJoinExprs.add(rightExprs.get(j)); + findRhsExprs = true; + break; + } + } + } + + if (!findRhsExprs) { + return false; + } + } + + hashType.value = leftScanNode.getHashType(); + return true; + } + //the join expr must contian left table distribute column - private boolean canBucketShuffleJoin(HashJoinNode node, PlanNode leftRoot, + private boolean canBucketShuffleJoin(HashJoinNode node, OlapScanNode leftScanNode, List rhsJoinExprs) { - OlapScanNode leftScanNode = ((OlapScanNode) leftRoot); OlapTable leftTable = leftScanNode.getOlapTable(); //1 the left table has more than one partition or left table is not a stable colocate table diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java index 274725f3f22671..7561a7cef4dae5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java @@ -74,6 +74,7 @@ import org.apache.doris.system.Backend; import org.apache.doris.thrift.TColumn; import org.apache.doris.thrift.TExplainLevel; +import org.apache.doris.thrift.THashType; import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TOlapScanNode; import org.apache.doris.thrift.TOlapTableIndex; @@ -1683,7 +1684,7 @@ public DataPartition constructInputPartitionByDistributionInfo() throws UserExce SlotRef slotRef = new SlotRef(desc.getRef().getName(), column.getName()); dataDistributeExprs.add(slotRef); } - return DataPartition.hashPartitioned(dataDistributeExprs); + return DataPartition.hashPartitioned(dataDistributeExprs, THashType.CRC32); } else { return DataPartition.RANDOM; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 17e62d40f1f9f6..bfaec73287d167 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -25,6 +25,7 @@ import org.apache.doris.catalog.OlapTable; import org.apache.doris.cloud.catalog.CloudPartition; import org.apache.doris.common.Config; +import org.apache.doris.common.NotImplementedException; import org.apache.doris.common.Pair; import org.apache.doris.common.Reference; import org.apache.doris.common.Status; @@ -36,6 +37,7 @@ import org.apache.doris.common.util.TimeUtils; import org.apache.doris.datasource.ExternalScanNode; import org.apache.doris.datasource.FileQueryScanNode; +import org.apache.doris.datasource.hive.source.HiveScanNode; import org.apache.doris.load.loadv2.LoadJob; import org.apache.doris.metric.MetricRepo; import org.apache.doris.nereids.NereidsPlanner; @@ -2289,8 +2291,13 @@ private void computeScanRangeAssignment() throws Exception { computeScanRangeAssignmentByColocate((OlapScanNode) scanNode, assignedBytesPerHost, replicaNumPerHost); } if (fragmentContainsBucketShuffleJoin) { - bucketShuffleJoinController.computeScanRangeAssignmentByBucket((OlapScanNode) scanNode, - idToBackend, addressToBackendID, replicaNumPerHost); + if (scanNode instanceof OlapScanNode) { + bucketShuffleJoinController.computeScanRangeAssignmentByBucket((OlapScanNode) scanNode, + idToBackend, addressToBackendID, replicaNumPerHost); + } else if (scanNode instanceof HiveScanNode) { + bucketShuffleJoinController.computeScanRangeAssignmentByBucket((HiveScanNode) scanNode, + idToBackend, addressToBackendID, replicaNumPerHost); + } } if (!(fragmentContainsColocateJoin || fragmentContainsBucketShuffleJoin)) { computeScanRangeAssignmentByScheduler(scanNode, locations, assignment, assignedBytesPerHost, @@ -2967,6 +2974,50 @@ private void computeScanRangeAssignmentByBucket( } } + private void computeScanRangeAssignmentByBucket( + final HiveScanNode scanNode, ImmutableMap idToBackend, + Map addressToBackendID, + Map replicaNumPerHost) throws Exception { + if (!fragmentIdToSeqToAddressMap.containsKey(scanNode.getFragmentId())) { + int bucketNum = 0; + if (scanNode.getHiveTable().isBucketedTable()) { + bucketNum = scanNode.getHiveTable().getDefaultDistributionInfo().getBucketNum(); + } else { + throw new NotImplementedException("bucket shuffle for non-bucketed table not supported"); + } + fragmentIdToBucketNumMap.put(scanNode.getFragmentId(), bucketNum); + fragmentIdToSeqToAddressMap.put(scanNode.getFragmentId(), new HashMap<>()); + fragmentIdBucketSeqToScanRangeMap.put(scanNode.getFragmentId(), new BucketSeqToScanRange()); + fragmentIdToBuckendIdBucketCountMap.put(scanNode.getFragmentId(), new HashMap<>()); + } + Map bucketSeqToAddress + = fragmentIdToSeqToAddressMap.get(scanNode.getFragmentId()); + BucketSeqToScanRange bucketSeqToScanRange = fragmentIdBucketSeqToScanRangeMap.get(scanNode.getFragmentId()); + + for (Integer bucketSeq : scanNode.bucketSeq2locations.keySet()) { + //fill scanRangeParamsList + List locations = scanNode.bucketSeq2locations.get(bucketSeq); + if (!bucketSeqToAddress.containsKey(bucketSeq)) { + getExecHostPortForFragmentIDAndBucketSeq(locations.get(0), scanNode.getFragmentId(), + bucketSeq, idToBackend, addressToBackendID, replicaNumPerHost); + } + + for (TScanRangeLocations location : locations) { + Map> scanRanges = + findOrInsert(bucketSeqToScanRange, bucketSeq, new HashMap<>()); + + List scanRangeParamsList = + findOrInsert(scanRanges, scanNode.getId().asInt(), new ArrayList<>()); + + // add scan range + TScanRangeParams scanRangeParams = new TScanRangeParams(); + scanRangeParams.scan_range = location.scan_range; + scanRangeParamsList.add(scanRangeParams); + updateScanRangeNumByScanRange(scanRangeParams); + } + } + } + private void computeInstanceParam(PlanFragmentId fragmentId, int parallelExecInstanceNum, FragmentExecParams params) { assignScanRanges(fragmentId, parallelExecInstanceNum, params, fragmentIdBucketSeqToScanRangeMap, diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java index c39724efb4bdf6..3a5c38bdecd541 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java @@ -1125,7 +1125,7 @@ public void testBucketShuffleJoin() throws Exception { + " and t1.k1 = t2.k2"; String explainString = getSQLPlanOrErrorMsg(queryStr); Assert.assertTrue(explainString.contains("BUCKET_SHFFULE")); - Assert.assertTrue(explainString.contains("BUCKET_SHFFULE_HASH_PARTITIONED: `t1`.`k1`, `t1`.`k1`")); + Assert.assertTrue(explainString.contains("BUCKET_SHFFULE_HASH_PARTITIONED(CRC32): `t1`.`k1`, `t1`.`k1`")); // not bucket shuffle join do not support different type queryStr = "explain select /*+ SET_VAR(enable_nereids_planner=false) */ * from test.jointest t1, test.bucket_shuffle1 t2 where cast (t1.k1 as tinyint)" @@ -1156,24 +1156,24 @@ public void testBucketShuffleJoin() throws Exception { + " on t1.k1 = t2.k1 and t1.k1 = t2.k2 join test.colocate1 t3" + " on t2.k1 = t3.k1 and t2.k2 = t3.k2"; explainString = getSQLPlanOrErrorMsg(queryStr); - // Assert.assertTrue(explainString.contains("BUCKET_SHFFULE_HASH_PARTITIONED: `t1`.`k1`, `t1`.`k1`")); - // Assert.assertTrue(explainString.contains("BUCKET_SHFFULE_HASH_PARTITIONED: `t3`.`k1`, `t3`.`k2`")); + // Assert.assertTrue(explainString.contains("BUCKET_SHFFULE_HASH_PARTITIONED(CRC32): `t1`.`k1`, `t1`.`k1`")); + // Assert.assertTrue(explainString.contains("BUCKET_SHFFULE_HASH_PARTITIONED(CRC32): `t3`.`k1`, `t3`.`k2`")); // support recurse of bucket shuffle because t4 join t2 and join column name is same as t2 distribute column name queryStr = "explain select * from test.jointest t1 join test.bucket_shuffle1 t2 on t1.k1 = t2.k1 and" + " t1.k1 = t2.k2 join test.colocate1 t3 on t2.k1 = t3.k1 join test.jointest t4 on t4.k1 = t2.k1 and" + " t4.k1 = t2.k2"; explainString = getSQLPlanOrErrorMsg(queryStr); - //Assert.assertTrue(explainString.contains("BUCKET_SHFFULE_HASH_PARTITIONED: `t1`.`k1`, `t1`.`k1`")); - //Assert.assertTrue(explainString.contains("BUCKET_SHFFULE_HASH_PARTITIONED: `t4`.`k1`, `t4`.`k1`")); + //Assert.assertTrue(explainString.contains("BUCKET_SHFFULE_HASH_PARTITIONED(CRC32): `t1`.`k1`, `t1`.`k1`")); + //Assert.assertTrue(explainString.contains("BUCKET_SHFFULE_HASH_PARTITIONED(CRC32): `t4`.`k1`, `t4`.`k1`")); // some column name in join expr t3 join t4 and t1 distribute column name, so should not be bucket shuffle join queryStr = "explain select /*+ SET_VAR(enable_nereids_planner=false) */ * from test.jointest t1 join test.bucket_shuffle1 t2 on t1.k1 = t2.k1 and t1.k1 =" + " t2.k2 join test.colocate1 t3 on t2.k1 = t3.k1 join test.jointest t4 on t4.k1 = t3.k1 and" + " t4.k2 = t3.k2"; explainString = getSQLPlanOrErrorMsg(queryStr); - Assert.assertTrue(explainString.contains("BUCKET_SHFFULE_HASH_PARTITIONED: `t1`.`k1`, `t1`.`k1`")); - Assert.assertTrue(!explainString.contains("BUCKET_SHFFULE_HASH_PARTITIONED: `t4`.`k1`, `t4`.`k1`")); + Assert.assertTrue(explainString.contains("BUCKET_SHFFULE_HASH_PARTITIONED(CRC32): `t1`.`k1`, `t1`.`k1`")); + Assert.assertTrue(!explainString.contains("BUCKET_SHFFULE_HASH_PARTITIONED(CRC32): `t4`.`k1`, `t4`.`k1`")); // here only a bucket shuffle + broadcast jost join queryStr = "explain SELECT /*+ SET_VAR(enable_nereids_planner=false) */ * FROM test.bucket_shuffle1 T LEFT JOIN test.bucket_shuffle1 T1 ON T1.k2 = T.k1 and T.k2 = T1.k3 LEFT JOIN" diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java b/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java index b47bdbb78547e6..71ebc730fc995e 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java @@ -41,6 +41,7 @@ import org.apache.doris.planner.ScanNode; import org.apache.doris.service.FrontendOptions; import org.apache.doris.system.Backend; +import org.apache.doris.thrift.THashType; import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TPartitionType; import org.apache.doris.thrift.TScanRangeLocation; @@ -175,7 +176,7 @@ public void testIsBucketShuffleJoin() { new ArrayList<>()); hashJoinNode.setFragment(new PlanFragment(new PlanFragmentId(-1), hashJoinNode, - new DataPartition(TPartitionType.BUCKET_SHFFULE_HASH_PARTITIONED, testJoinexprs))); + new DataPartition(TPartitionType.BUCKET_SHFFULE_HASH_PARTITIONED, testJoinexprs, THashType.CRC32))); // hash join node is not bucket shuffle join Assert.assertEquals(false, @@ -183,13 +184,13 @@ public void testIsBucketShuffleJoin() { // the fragment id is different from hash join node hashJoinNode.setFragment(new PlanFragment(new PlanFragmentId(-2), hashJoinNode, - new DataPartition(TPartitionType.BUCKET_SHFFULE_HASH_PARTITIONED, testJoinexprs))); + new DataPartition(TPartitionType.BUCKET_SHFFULE_HASH_PARTITIONED, testJoinexprs, THashType.CRC32))); hashJoinNode.setDistributionMode(HashJoinNode.DistributionMode.BUCKET_SHUFFLE); Assert.assertEquals(false, Deencapsulation.invoke(bucketShuffleJoinController, "isBucketShuffleJoin", -1, hashJoinNode)); hashJoinNode.setFragment(new PlanFragment(new PlanFragmentId(-1), hashJoinNode, - new DataPartition(TPartitionType.BUCKET_SHFFULE_HASH_PARTITIONED, testJoinexprs))); + new DataPartition(TPartitionType.BUCKET_SHFFULE_HASH_PARTITIONED, testJoinexprs, THashType.CRC32))); Assert.assertEquals(true, Deencapsulation.invoke(bucketShuffleJoinController, "isBucketShuffleJoin", -1, hashJoinNode)); diff --git a/gensrc/thrift/Ddl.thrift b/gensrc/thrift/Ddl.thrift index 9696230af909ed..f733637bc7791a 100644 --- a/gensrc/thrift/Ddl.thrift +++ b/gensrc/thrift/Ddl.thrift @@ -76,10 +76,6 @@ enum TAggType { // 4: optional string default_value //} -enum THashType { - CRC32 -} - // random partition info struct TRandomPartitionDesc { } @@ -93,7 +89,7 @@ struct THashPartitionDesc { 2: required i32 hash_buckets // type to compute hash value. if not set, use CRC32 - 3: optional THashType hash_type + 3: optional Partitions.THashType hash_type } // value used to represents one column value in one range value diff --git a/gensrc/thrift/Partitions.thrift b/gensrc/thrift/Partitions.thrift index 0a7e70c0a4f2a2..bff6f2867e4d34 100644 --- a/gensrc/thrift/Partitions.thrift +++ b/gensrc/thrift/Partitions.thrift @@ -21,6 +21,12 @@ namespace java org.apache.doris.thrift include "Exprs.thrift" include "Types.thrift" +enum THashType { + CRC32, + XXHASH64, + SPARK_MURMUR32 +} + enum TPartitionType { UNPARTITIONED, @@ -90,6 +96,7 @@ struct TDataPartition { 1: required TPartitionType type 2: optional list partition_exprs 3: optional list partition_infos + 4: optional THashType hash_type } diff --git a/regression-test/data/external_table_p0/hive/test_hive_spark_clustered_table.out b/regression-test/data/external_table_p0/hive/test_hive_spark_clustered_table.out new file mode 100644 index 00000000000000..23f03f24aea961 --- /dev/null +++ b/regression-test/data/external_table_p0/hive/test_hive_spark_clustered_table.out @@ -0,0 +1,158 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !q01 -- +1 U1 IN 1 U1 IN +11 U11 IN 11 U11 IN +21 U21 IN 21 U21 IN +31 U31 IN 31 U31 IN + +-- !q02 -- +PLAN FRAGMENT 0 + OUTPUT EXPRS: + user_id[#12] + key[#13] + part[#14] + user_id[#15] + key[#16] + part[#17] + PARTITION: UNPARTITIONED + + HAS_COLO_PLAN_NODE: false + + VRESULT SINK + MYSQL_PROTOCAL + + 4:VEXCHANGE + offset: 0 + distribute expr lists: user_id[#12] + +PLAN FRAGMENT 1 + + PARTITION: HASH_PARTITIONED: user_id[#3] + + HAS_COLO_PLAN_NODE: false + + STREAM DATA SINK + EXCHANGE ID: 04 + UNPARTITIONED + + 3:VHASH JOIN(165) + | join op: INNER JOIN(BUCKET_SHUFFLE)[] + | equal join conjunct: (user_id[#3] = user_id[#0]) + | cardinality=143 + | vec output tuple id: 3 + | vIntermediate tuple ids: 2 + | hash output slot ids: 0 1 2 3 4 5 + | distribute expr lists: user_id[#3] + | distribute expr lists: user_id[#0] + | + |----1:VEXCHANGE + | offset: 0 + | distribute expr lists: user_id[#0] + | + 2:VHIVE_SCAN_NODE(158) + table: parquet_test2 + inputSplitNum=4, totalFileSize=2873, scanRanges=4 + partition=1/1 + cardinality=143, numNodes=1 + pushdown agg=NONE + +PLAN FRAGMENT 2 + + PARTITION: HASH_PARTITIONED: user_id[#0] + + HAS_COLO_PLAN_NODE: false + + STREAM DATA SINK + EXCHANGE ID: 01 + BUCKET_SHFFULE_HASH_PARTITIONED(SPARK_MURMUR32): user_id[#0] + + 0:VHIVE_SCAN_NODE(159) + table: parquet_test2 + inputSplitNum=4, totalFileSize=2873, scanRanges=4 + partition=1/1 + cardinality=143, numNodes=1 + pushdown agg=NONE + +-- !q03 -- +1 U1 IN 1 U1 IN +11 U11 IN 11 U11 IN +21 U21 IN 21 U21 IN +31 U31 IN 31 U31 IN + +-- !q01 -- +1 U1 IN 1 U1 IN +11 U11 IN 11 U11 IN +21 U21 IN 21 U21 IN +31 U31 IN 31 U31 IN + +-- !q02 -- +PLAN FRAGMENT 0 + OUTPUT EXPRS: + + + + + + + PARTITION: UNPARTITIONED + + HAS_COLO_PLAN_NODE: false + + VRESULT SINK + MYSQL_PROTOCAL + + 4:VEXCHANGE + offset: 0 + +PLAN FRAGMENT 1 + + PARTITION: HASH_PARTITIONED: `hive_test_parquet`.`default`.`parquet_test2`.`user_id` + + HAS_COLO_PLAN_NODE: false + + STREAM DATA SINK + EXCHANGE ID: 04 + UNPARTITIONED + + 2:VHASH JOIN + | join op: INNER JOIN(BUCKET_SHUFFLE)[Only olap table support colocate plan] + | equal join conjunct: (`t1`.`user_id` = `t2`.`user_id`) + | cardinality=-1 + | vec output tuple id: 2 + | vIntermediate tuple ids: 3 4 + | output slot ids: 6 7 8 9 10 11 + | hash output slot ids: 0 1 2 3 4 5 + | + |----3:VEXCHANGE + | offset: 0 + | + 0:VHIVE_SCAN_NODE + table: parquet_test2 + inputSplitNum=4, totalFileSize=2873, scanRanges=4 + partition=1/1 + numNodes=1 + pushdown agg=NONE + +PLAN FRAGMENT 2 + + PARTITION: HASH_PARTITIONED: `hive_test_parquet`.`default`.`parquet_test2`.`user_id` + + HAS_COLO_PLAN_NODE: false + + STREAM DATA SINK + EXCHANGE ID: 03 + BUCKET_SHFFULE_HASH_PARTITIONED(SPARK_MURMUR32): `t2`.`user_id` + + 1:VHIVE_SCAN_NODE + table: parquet_test2 + inputSplitNum=4, totalFileSize=2873, scanRanges=4 + partition=1/1 + numNodes=1 + pushdown agg=NONE + +-- !q03 -- +1 U1 IN 1 U1 IN +11 U11 IN 11 U11 IN +21 U21 IN 21 U21 IN +31 U31 IN 31 U31 IN + diff --git a/regression-test/suites/external_table_p0/hive/test_hive_spark_clustered_table.groovy b/regression-test/suites/external_table_p0/hive/test_hive_spark_clustered_table.groovy new file mode 100644 index 00000000000000..bf7f5c1794a96f --- /dev/null +++ b/regression-test/suites/external_table_p0/hive/test_hive_spark_clustered_table.groovy @@ -0,0 +1,67 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_hive_spark_clustered_table", "p0,external,hive,external_docker,external_docker_hive") { + def q01 = { + qt_q01 """ select * from parquet_test2 t1, parquet_test2 t2 WHERE t1.user_id = t2.user_id ORDER BY 1,2 ;""" + + qt_q02 """explain select * from parquet_test2 t1, parquet_test2 t2 WHERE t1.user_id = t2.user_id ;""" + + qt_q03 """select * from parquet_test2 t1, `internal`.`regression_test`.doris_dist_test t2 WHERE t1.user_id = t2.user_id ORDER BY 1,2 ;""" + + explain { + sql("""select * from parquet_test2 t1, `internal`.`regression_test`.doris_dist_test t2 WHERE t1.user_id = t2.user_id;""") + contains "join op: INNER JOIN(BUCKET_SHUFFLE)" + contains "BUCKET_SHFFULE_HASH_PARTITIONED(SPARK_MURMUR32)" + } + } + + String enabled = context.config.otherConfigs.get("enableHiveTest") + if (enabled != null && enabled.equalsIgnoreCase("true")) { + try { + String hms_port = context.config.otherConfigs.get("hms_port") + String catalog_name = "hive_test_parquet" + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + + sql """drop catalog if exists ${catalog_name}""" + sql """create catalog if not exists ${catalog_name} properties ( + "type"="hms", + 'hive.metastore.uris' = 'thrift://${externalEnvIp}:${hms_port}' + );""" + + sql """use `regression_test`""" + sql """drop table if exists doris_dist_test;""" + sql """create table doris_dist_test properties("replication_num"="1") + as select * from `${catalog_name}`.`default`.parquet_test2; """ + + sql """use `${catalog_name}`.`default`""" + + sql """set enable_fallback_to_original_planner=false;""" + + q01() + + sql """set enable_nereids_planner=false;""" + + q01() + + sql """use `internal`.`regression_test`""" + sql """drop table if exists doris_dist_test; """ + sql """drop catalog if exists ${catalog_name}; """ + } finally { + } + } +} diff --git a/regression-test/suites/nereids_p0/join/bucket_shuffle_join.groovy b/regression-test/suites/nereids_p0/join/bucket_shuffle_join.groovy index e5334010d7b0ca..43af8f8d9d024b 100644 --- a/regression-test/suites/nereids_p0/join/bucket_shuffle_join.groovy +++ b/regression-test/suites/nereids_p0/join/bucket_shuffle_join.groovy @@ -75,7 +75,7 @@ suite("bucket-shuffle-join") { explain { sql("select * from shuffle_join_t1 t1 left join shuffle_join_t2 t2 on t1.a = t2.c;") contains "BUCKET_SHUFFLE" - contains "BUCKET_SHFFULE_HASH_PARTITIONED: c" + contains "BUCKET_SHFFULE_HASH_PARTITIONED(CRC32): c" } }