Skip to content

Commit

Permalink
[Enhancement](auto-partition) Re-add deduplication to auto partition …
Browse files Browse the repository at this point in the history
…rpc (#40580)

Issue Number: close #xxx

removed in #27817. we need it so re
add it.
  • Loading branch information
zclllyybb committed Sep 11, 2024
1 parent b0ab02b commit 561ca47
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 7 deletions.
6 changes: 4 additions & 2 deletions be/src/vec/sink/vrow_distribution.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,10 @@ Status VRowDistribution::_save_missing_values(
}
cur_row_values.push_back(node);
}
//For duplicate cur_values, they will be filtered in FE
_partitions_need_create.emplace_back(cur_row_values);
if (!_deduper.contains(cur_row_values)) {
_deduper.insert(cur_row_values);
_partitions_need_create.emplace_back(cur_row_values);
}
}

// to avoid too large mem use
Expand Down
28 changes: 23 additions & 5 deletions be/src/vec/sink/vrow_distribution.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
#include <gen_cpp/PaloInternalService_types.h>

#include <cstdint>
#include <functional>
#include <string>
#include <unordered_set>
#include <vector>

#include "common/status.h"
Expand Down Expand Up @@ -133,6 +135,10 @@ class VRowDistribution {
Status automatic_create_partition();
void clear_batching_stats();

// for auto partition
std::unique_ptr<MutableBlock> _batching_block;
bool _deal_batched = false; // If true, send batched block before any block's append.

private:
std::pair<vectorized::VExprContextSPtrs, vectorized::VExprSPtrs> _get_partition_function();

Expand Down Expand Up @@ -170,17 +176,29 @@ class VRowDistribution {
int64_t rows);
void _reset_find_tablets(int64_t rows);

struct NullableStringListHash {
std::size_t _hash(const TNullableStringLiteral& arg) const {
if (arg.is_null) {
return 0;
}
return std::hash<std::string>()(arg.value);
}
std::size_t operator()(const std::vector<TNullableStringLiteral>& arg) const {
std::size_t result = 0;
for (const auto& v : arg) {
result = (result << 1) ^ _hash(v);
}
return result;
}
};

RuntimeState* _state = nullptr;
int _batch_size = 0;

// for auto partitions
std::vector<std::vector<TNullableStringLiteral>> _partitions_need_create;

public:
std::unique_ptr<MutableBlock> _batching_block;
bool _deal_batched = false; // If true, send batched block before any block's append.
private:
size_t _batching_rows = 0, _batching_bytes = 0;
std::unordered_set<std::vector<TNullableStringLiteral>, NullableStringListHash> _deduper;

OlapTableBlockConvertor* _block_convertor = nullptr;
OlapTabletFinder* _tablet_finder = nullptr;
Expand Down

0 comments on commit 561ca47

Please sign in to comment.