Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix](join) incorrect result of mark join #30543

Merged
merged 1 commit into from
Jan 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 0 additions & 8 deletions be/src/pipeline/exec/hashjoin_build_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,6 @@ Status HashJoinBuildSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo
_shared_hash_table_dependency->block();
p._shared_hashtable_controller->append_dependency(p.node_id(),
_shared_hash_table_dependency);
} else {
if ((p._join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN ||
p._join_op == TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN) &&
p._have_other_join_conjunct) {
_build_indexes_null = std::make_shared<std::vector<uint32_t>>();
}
}

_build_blocks_memory_usage =
Expand Down Expand Up @@ -496,7 +490,6 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block*
state, local_state._shared_state->build_block.get(), &local_state, use_global_rf));
RETURN_IF_ERROR(
local_state.process_build_block(state, (*local_state._shared_state->build_block)));
local_state._shared_state->build_indexes_null = local_state._build_indexes_null;
if (_shared_hashtable_controller) {
_shared_hash_table_context->status = Status::OK();
// arena will be shared with other instances.
Expand Down Expand Up @@ -542,7 +535,6 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block*
_shared_hash_table_context->hash_table_variants));

local_state._shared_state->build_block = _shared_hash_table_context->block;
local_state._build_indexes_null = _shared_hash_table_context->build_indexes_null;
local_state._shared_state->build_indexes_null =
_shared_hash_table_context->build_indexes_null;
const bool use_global_rf =
Expand Down
7 changes: 0 additions & 7 deletions be/src/pipeline/exec/hashjoin_build_sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,13 +120,6 @@ class HashJoinBuildSinkLocalState final
std::shared_ptr<SharedHashTableDependency> _shared_hash_table_dependency;
std::vector<int> _build_col_ids;

/*
* For null aware anti/semi join with other join conjuncts, we do need to care about the rows in
* build side with null keys,
* because the other join conjuncts' result may be changed from null to false(null & false == false).
*/
std::shared_ptr<std::vector<uint32_t>> _build_indexes_null;

RuntimeProfile::Counter* _build_table_timer = nullptr;
RuntimeProfile::Counter* _build_expr_call_timer = nullptr;
RuntimeProfile::Counter* _build_table_insert_timer = nullptr;
Expand Down
1 change: 0 additions & 1 deletion be/src/pipeline/exec/hashjoin_probe_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,6 @@ Status HashJoinProbeOperatorX::pull(doris::RuntimeState* state, vectorized::Bloc

Status st;
if (local_state._probe_index < local_state._probe_block.rows()) {
local_state._build_indexes_null = local_state._shared_state->build_indexes_null;
DCHECK(local_state._has_set_need_null_map_for_probe);
RETURN_IF_CATCH_EXCEPTION({
std::visit(
Expand Down
7 changes: 0 additions & 7 deletions be/src/pipeline/exec/hashjoin_probe_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,13 +125,6 @@ class HashJoinProbeLocalState final
// For mark join, last probe index of null mark
int _last_probe_null_mark;

/*
* For null aware anti/semi join with other join conjuncts, we do need to care about the rows in
* build side with null keys,
* because the other join conjuncts' result may be changed from null to false(null & false == false).
*/
std::shared_ptr<std::vector<uint32_t>> _build_indexes_null;

vectorized::Block _probe_block;
vectorized::ColumnRawPtrs _probe_columns;
// other expr
Expand Down
101 changes: 55 additions & 46 deletions be/src/vec/common/hash_table/join_hash_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ class JoinHashTable {

std::vector<uint8_t>& get_visited() { return visited; }

template <int JoinOpType, bool with_other_conjuncts>
void build(const Key* __restrict keys, const uint32_t* __restrict bucket_nums,
size_t num_elem) {
build_keys = keys;
Expand All @@ -76,19 +77,24 @@ class JoinHashTable {
next[i] = first[bucket_num];
first[bucket_num] = i;
}
first[bucket_size] = 0; // index = bucket_num means null
if constexpr ((JoinOpType != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN &&
JoinOpType != TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN) ||
!with_other_conjuncts) {
/// Only null aware join with other conjuncts need to access the null value in hash table
first[bucket_size] = 0; // index = bucket_num means null
}
}

template <int JoinOpType, bool with_other_conjuncts, bool is_mark_join, bool need_judge_null>
auto find_batch(const Key* __restrict keys, const uint32_t* __restrict build_idx_map,
int probe_idx, uint32_t build_idx, int probe_rows,
uint32_t* __restrict probe_idxs, bool& probe_visited,
uint32_t* __restrict build_idxs, vectorized::ColumnFilterHelper* mark_column) {
uint32_t* __restrict build_idxs) {
if constexpr (JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
if (_empty_build_side) {
return _process_null_aware_left_anti_join_for_empty_build_side<
JoinOpType, with_other_conjuncts, is_mark_join>(
probe_idx, probe_rows, probe_idxs, build_idxs, mark_column);
JoinOpType, with_other_conjuncts, is_mark_join>(probe_idx, probe_rows,
probe_idxs, build_idxs);
}
}

Expand Down Expand Up @@ -128,51 +134,48 @@ class JoinHashTable {
* select 'a' not in ('b', null) => null => 'a' != 'b' and 'a' != null => true and null => null
* select 'a' not in ('a', 'b', null) => false
*/
auto find_null_aware_with_other_conjuncts(
const Key* __restrict keys, const uint32_t* __restrict build_idx_map, int probe_idx,
uint32_t build_idx, int probe_rows, uint32_t* __restrict probe_idxs,
uint32_t* __restrict build_idxs, std::set<uint32_t>& null_result,
const std::vector<uint32_t>& build_indexes_null, const size_t build_block_count) {
auto find_null_aware_with_other_conjuncts(const Key* __restrict keys,
const uint32_t* __restrict build_idx_map,
int probe_idx, uint32_t build_idx, int probe_rows,
uint32_t* __restrict probe_idxs,
uint32_t* __restrict build_idxs,
uint8_t* __restrict null_flags,
bool picking_null_keys) {
auto matched_cnt = 0;
const auto batch_size = max_batch_size;

bool has_matched = false;
auto do_the_probe = [&]() {
/// If no any rows match the probe key, here start to handle null keys in build side.
/// The result of "Any = null" is null.
if (build_idx == 0 && !picking_null_keys) {
build_idx = first[bucket_size];
picking_null_keys = true; // now pick null from build side
}

while (build_idx && matched_cnt < batch_size) {
if (build_idx == bucket_size) {
/// All rows in build side should be executed with other join conjuncts.
for (size_t i = 1; i != build_block_count; ++i) {
build_idxs[matched_cnt] = i;
probe_idxs[matched_cnt] = probe_idx;
matched_cnt++;
}
null_result.emplace(probe_idx);
build_idx = 0;
has_matched = true;
break;
} else if (keys[probe_idx] == build_keys[build_idx]) {
if (picking_null_keys || keys[probe_idx] == build_keys[build_idx]) {
build_idxs[matched_cnt] = build_idx;
probe_idxs[matched_cnt] = probe_idx;
null_flags[matched_cnt] = picking_null_keys;
matched_cnt++;
has_matched = true;
}

build_idx = next[build_idx];

// If `build_idx` is 0, all matched keys are handled,
// now need to handle null keys in build side.
if (!build_idx && !picking_null_keys) {
build_idx = first[bucket_size];
picking_null_keys = true; // now pick null keys from build side
}
}

// may over batch_size when emplace 0 into build_idxs
if (!build_idx) {
if (!has_matched) { // has no any row matched
for (auto index : build_indexes_null) {
build_idxs[matched_cnt] = index;
probe_idxs[matched_cnt] = probe_idx;
matched_cnt++;
}
}
probe_idxs[matched_cnt] = probe_idx;
build_idxs[matched_cnt] = 0;
picking_null_keys = false;
matched_cnt++;
has_matched = false;
}

probe_idx++;
Expand All @@ -184,11 +187,20 @@ class JoinHashTable {

while (probe_idx < probe_rows && matched_cnt < batch_size) {
build_idx = build_idx_map[probe_idx];

/// If the probe key is null
if (build_idx == bucket_size) {
probe_idx++;
break;
}
do_the_probe();
if (picking_null_keys) {
break;
}
}

probe_idx -= (build_idx != 0);
return std::tuple {probe_idx, build_idx, matched_cnt};
return std::tuple {probe_idx, build_idx, matched_cnt, picking_null_keys};
}

template <int JoinOpType>
Expand All @@ -215,21 +227,23 @@ class JoinHashTable {

bool has_null_key() { return _has_null_key; }

void pre_build_idxs(std::vector<uint32>& bucksets, const uint8_t* null_map) {
void pre_build_idxs(std::vector<uint32>& buckets, const uint8_t* null_map) {
if (null_map) {
first[bucket_size] = bucket_size; // distinguish between not matched and null
}

for (uint32_t i = 0; i < bucksets.size(); i++) {
bucksets[i] = first[bucksets[i]];
for (unsigned int& bucket : buckets) {
bucket = bucket == bucket_size ? bucket_size : first[bucket];
}
} else {
for (unsigned int& bucket : buckets) {
bucket = first[bucket];
}
}
}

private:
template <int JoinOpType, bool with_other_conjuncts, bool is_mark_join>
auto _process_null_aware_left_anti_join_for_empty_build_side(
int probe_idx, int probe_rows, uint32_t* __restrict probe_idxs,
uint32_t* __restrict build_idxs, vectorized::ColumnFilterHelper* mark_column) {
auto _process_null_aware_left_anti_join_for_empty_build_side(int probe_idx, int probe_rows,
uint32_t* __restrict probe_idxs,
uint32_t* __restrict build_idxs) {
static_assert(JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN);
auto matched_cnt = 0;
const auto batch_size = max_batch_size;
Expand All @@ -240,11 +254,6 @@ class JoinHashTable {
++matched_cnt;
}

if constexpr (is_mark_join && !with_other_conjuncts) {
// we will flip the mark column later for anti join, so here set 0 into mark column.
mark_column->resize_fill(matched_cnt, 0);
}

return std::tuple {probe_idx, 0U, matched_cnt};
}

Expand Down
19 changes: 15 additions & 4 deletions be/src/vec/exec/join/process_hash_table_probe.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,11 @@ struct ProcessHashTableProbe {
// each matching join column need to be processed by other join conjunct. so the struct of mutable block
// and output block may be different
// The output result is determined by the other join conjunct result and same_to_prev struct
Status do_other_join_conjuncts(Block* output_block, bool is_mark_join,
std::vector<uint8_t>& visited, bool has_null_in_build_side);
Status do_other_join_conjuncts(Block* output_block, std::vector<uint8_t>& visited,
bool has_null_in_build_side);

template <bool with_other_conjuncts>
Status do_mark_join_conjuncts(Block* output_block, size_t hash_table_bucket_size,
const std::set<uint32_t>& null_result);
Status do_mark_join_conjuncts(Block* output_block, size_t hash_table_bucket_size);

template <typename HashTableType>
typename HashTableType::State _init_probe_side(HashTableType& hash_table_ctx, size_t probe_rows,
Expand All @@ -85,6 +84,10 @@ struct ProcessHashTableProbe {
Status process_data_in_hashtable(HashTableType& hash_table_ctx, MutableBlock& mutable_block,
Block* output_block, bool* eos);

/// For null aware join with other conjuncts, if the probe key of one row on left side is null,
/// we should make this row match with all rows in build side.
size_t _process_probe_null_key(uint32_t probe_idx);

Parent* _parent = nullptr;
const int _batch_size;
const std::shared_ptr<Block>& _build_block;
Expand All @@ -93,7 +96,15 @@ struct ProcessHashTableProbe {

std::vector<uint32_t> _probe_indexs;
bool _probe_visited = false;
bool _picking_null_keys = false;
std::vector<uint32_t> _build_indexs;
std::vector<uint8_t> _null_flags;

/// If the probe key of one row on left side is null,
/// we will make all rows in build side match with this row,
/// `_build_index_for_null_probe_key` is used to record the progress if the build block is too big.
uint32_t _build_index_for_null_probe_key {0};

std::vector<int> _build_blocks_locs;
// only need set the tuple is null in RIGHT_OUTER_JOIN and FULL_OUTER_JOIN
ColumnUInt8::Container* _tuple_is_null_left_flags = nullptr;
Expand Down
Loading
Loading