Skip to content

Commit

Permalink
[fix](join) incorrect result of mark join
Browse files Browse the repository at this point in the history
  • Loading branch information
mrhhsg committed Jan 30, 2024
1 parent 732194d commit c8b0275
Show file tree
Hide file tree
Showing 11 changed files with 265 additions and 90 deletions.
8 changes: 0 additions & 8 deletions be/src/pipeline/exec/hashjoin_build_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,6 @@ Status HashJoinBuildSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo
_shared_hash_table_dependency->block();
p._shared_hashtable_controller->append_dependency(p.node_id(),
_shared_hash_table_dependency);
} else {
if ((p._join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN ||
p._join_op == TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN) &&
p._have_other_join_conjunct) {
_build_indexes_null = std::make_shared<std::vector<uint32_t>>();
}
}

_memory_usage_counter = ADD_LABEL_COUNTER(profile(), "MemoryUsage");
Expand Down Expand Up @@ -492,7 +486,6 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block*
state, local_state._shared_state->build_block.get(), &local_state, use_global_rf));
RETURN_IF_ERROR(
local_state.process_build_block(state, (*local_state._shared_state->build_block)));
local_state._shared_state->build_indexes_null = local_state._build_indexes_null;
if (_shared_hashtable_controller) {
_shared_hash_table_context->status = Status::OK();
// arena will be shared with other instances.
Expand Down Expand Up @@ -538,7 +531,6 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block*
_shared_hash_table_context->hash_table_variants));

local_state._shared_state->build_block = _shared_hash_table_context->block;
local_state._build_indexes_null = _shared_hash_table_context->build_indexes_null;
local_state._shared_state->build_indexes_null =
_shared_hash_table_context->build_indexes_null;
const bool use_global_rf =
Expand Down
7 changes: 0 additions & 7 deletions be/src/pipeline/exec/hashjoin_build_sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,13 +120,6 @@ class HashJoinBuildSinkLocalState final
std::shared_ptr<SharedHashTableDependency> _shared_hash_table_dependency;
std::vector<int> _build_col_ids;

/*
* For null aware anti/semi join with other join conjuncts, we do need to care about the rows in
* build side with null keys,
* because the other join conjuncts' result may be changed from null to false(null & false == false).
*/
std::shared_ptr<std::vector<uint32_t>> _build_indexes_null;

RuntimeProfile::Counter* _build_table_timer = nullptr;
RuntimeProfile::Counter* _build_expr_call_timer = nullptr;
RuntimeProfile::Counter* _build_table_insert_timer = nullptr;
Expand Down
1 change: 0 additions & 1 deletion be/src/pipeline/exec/hashjoin_probe_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,6 @@ Status HashJoinProbeOperatorX::pull(doris::RuntimeState* state, vectorized::Bloc

Status st;
if (local_state._probe_index < local_state._probe_block.rows()) {
local_state._build_indexes_null = local_state._shared_state->build_indexes_null;
DCHECK(local_state._has_set_need_null_map_for_probe);
RETURN_IF_CATCH_EXCEPTION({
std::visit(
Expand Down
7 changes: 0 additions & 7 deletions be/src/pipeline/exec/hashjoin_probe_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,13 +125,6 @@ class HashJoinProbeLocalState final
// For mark join, last probe index of null mark
int _last_probe_null_mark;

/*
* For null aware anti/semi join with other join conjuncts, we do need to care about the rows in
* build side with null keys,
* because the other join conjuncts' result may be changed from null to false(null & false == false).
*/
std::shared_ptr<std::vector<uint32_t>> _build_indexes_null;

vectorized::Block _probe_block;
vectorized::ColumnRawPtrs _probe_columns;
// other expr
Expand Down
85 changes: 50 additions & 35 deletions be/src/vec/common/hash_table/join_hash_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ class JoinHashTable {

std::vector<uint8_t>& get_visited() { return visited; }

template <int JoinOpType, bool with_other_conjuncts>
void build(const Key* __restrict keys, const uint32_t* __restrict bucket_nums,
size_t num_elem) {
build_keys = keys;
Expand All @@ -76,7 +77,12 @@ class JoinHashTable {
next[i] = first[bucket_num];
first[bucket_num] = i;
}
first[bucket_size] = 0; // index = bucket_num means null
if constexpr ((JoinOpType != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN &&
JoinOpType != TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN) ||
!with_other_conjuncts) {
/// Only null aware join with other conjuncts need to access the null value in hash table
first[bucket_size] = 0; // index = bucket_num means null
}
}

template <int JoinOpType, bool with_other_conjuncts, bool is_mark_join, bool need_judge_null>
Expand Down Expand Up @@ -128,51 +134,48 @@ class JoinHashTable {
* select 'a' not in ('b', null) => null => 'a' != 'b' and 'a' != null => true and null => null
* select 'a' not in ('a', 'b', null) => false
*/
auto find_null_aware_with_other_conjuncts(
const Key* __restrict keys, const uint32_t* __restrict build_idx_map, int probe_idx,
uint32_t build_idx, int probe_rows, uint32_t* __restrict probe_idxs,
uint32_t* __restrict build_idxs, std::set<uint32_t>& null_result,
const std::vector<uint32_t>& build_indexes_null, const size_t build_block_count) {
auto find_null_aware_with_other_conjuncts(const Key* __restrict keys,
const uint32_t* __restrict build_idx_map,
int probe_idx, uint32_t build_idx, int probe_rows,
uint32_t* __restrict probe_idxs,
uint32_t* __restrict build_idxs,
uint8_t* __restrict null_flags,
bool picking_null_keys) {
auto matched_cnt = 0;
const auto batch_size = max_batch_size;

bool has_matched = false;
auto do_the_probe = [&]() {
/// If no any rows match the probe key, here start to handle null keys in build side.
/// The result of "Any = null" is null.
if (build_idx == 0 && !picking_null_keys) {
build_idx = first[bucket_size];
picking_null_keys = true; // now pick null from build side
}

while (build_idx && matched_cnt < batch_size) {
if (build_idx == bucket_size) {
/// All rows in build side should be executed with other join conjuncts.
for (size_t i = 1; i != build_block_count; ++i) {
build_idxs[matched_cnt] = i;
probe_idxs[matched_cnt] = probe_idx;
matched_cnt++;
}
null_result.emplace(probe_idx);
build_idx = 0;
has_matched = true;
break;
} else if (keys[probe_idx] == build_keys[build_idx]) {
if (picking_null_keys || keys[probe_idx] == build_keys[build_idx]) {
build_idxs[matched_cnt] = build_idx;
probe_idxs[matched_cnt] = probe_idx;
null_flags[matched_cnt] = picking_null_keys;
matched_cnt++;
has_matched = true;
}

build_idx = next[build_idx];

// If `build_idx` is 0, all matched keys are handled,
// now need to handle null keys in build side.
if (!build_idx && !picking_null_keys) {
build_idx = first[bucket_size];
picking_null_keys = true; // now pick null keys from build side
}
}

// may over batch_size when emplace 0 into build_idxs
if (!build_idx) {
if (!has_matched) { // has no any row matched
for (auto index : build_indexes_null) {
build_idxs[matched_cnt] = index;
probe_idxs[matched_cnt] = probe_idx;
matched_cnt++;
}
}
probe_idxs[matched_cnt] = probe_idx;
build_idxs[matched_cnt] = 0;
picking_null_keys = false;
matched_cnt++;
has_matched = false;
}

probe_idx++;
Expand All @@ -184,11 +187,21 @@ class JoinHashTable {

while (probe_idx < probe_rows && matched_cnt < batch_size) {
build_idx = build_idx_map[probe_idx];
if (build_idx == bucket_size) {
build_idxs[matched_cnt] = build_idx;
probe_idxs[matched_cnt] = probe_idx;
matched_cnt++;
probe_idx++;
break;
}
do_the_probe();
if (picking_null_keys) {
break;
}
}

probe_idx -= (build_idx != 0);
return std::tuple {probe_idx, build_idx, matched_cnt};
return std::tuple {probe_idx, build_idx, matched_cnt, picking_null_keys};
}

template <int JoinOpType>
Expand All @@ -215,13 +228,15 @@ class JoinHashTable {

bool has_null_key() { return _has_null_key; }

void pre_build_idxs(std::vector<uint32>& bucksets, const uint8_t* null_map) {
void pre_build_idxs(std::vector<uint32>& buckets, const uint8_t* null_map) {
if (null_map) {
first[bucket_size] = bucket_size; // distinguish between not matched and null
}

for (uint32_t i = 0; i < bucksets.size(); i++) {
bucksets[i] = first[bucksets[i]];
for (unsigned int& bucket : buckets) {
bucket = bucket == bucket_size ? bucket_size : first[bucket];
}
} else {
for (unsigned int& bucket : buckets) {
bucket = first[bucket];
}
}
}

Expand Down
2 changes: 2 additions & 0 deletions be/src/vec/exec/join/process_hash_table_probe.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,9 @@ struct ProcessHashTableProbe {

std::vector<uint32_t> _probe_indexs;
bool _probe_visited = false;
bool _picking_null_keys = false;
std::vector<uint32_t> _build_indexs;
std::vector<uint8_t> _null_flags;
std::vector<int> _build_blocks_locs;
// only need set the tuple is null in RIGHT_OUTER_JOIN and FULL_OUTER_JOIN
ColumnUInt8::Container* _tuple_is_null_left_flags = nullptr;
Expand Down
51 changes: 41 additions & 10 deletions be/src/vec/exec/join/process_hash_table_probe_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,11 @@ typename HashTableType::State ProcessHashTableProbe<JoinOpType, Parent>::_init_p
// may over batch size 1 for some outer join case
_probe_indexs.resize(_batch_size + 1);
_build_indexs.resize(_batch_size + 1);
if constexpr (JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN ||
JoinOpType == TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN) {
_null_flags.resize(_batch_size + 1);
memset(_null_flags.data(), 0, _batch_size + 1);
}

if (!_parent->_ready_probe) {
_parent->_ready_probe = true;
Expand Down Expand Up @@ -185,14 +190,41 @@ Status ProcessHashTableProbe<JoinOpType, Parent>::do_process(HashTableType& hash
JoinOpType == doris::TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN) &&
with_other_conjuncts) {
SCOPED_TIMER(_search_hashtable_timer);
auto [new_probe_idx, new_build_idx, new_current_offset] =
hash_table_ctx.hash_table->find_null_aware_with_other_conjuncts(
hash_table_ctx.keys, hash_table_ctx.bucket_nums.data(), probe_index,
build_index, probe_rows, _probe_indexs.data(), _build_indexs.data(),
null_result, *(_parent->_build_indexes_null), _build_block->rows());
probe_index = new_probe_idx;
build_index = new_build_idx;
current_offset = new_current_offset;

// If the key of one probe row is null,
// this probe row should match with all rows in build side(match result: null).
if (build_index == hash_table_ctx.hash_table->get_bucket_size() && !_picking_null_keys) {
const auto rows = _build_block->rows();
/// FIXME: Memory allocation issue due to the possibility of rows being a huge value.
if (rows > _batch_size + 1) {
_build_indexs.resize(rows);
_probe_indexs.resize(rows);
_null_flags.resize(rows);
}

for (size_t i = 0; i != rows - 1; ++i) {
_probe_indexs[i] = probe_index;
_build_indexs[i] = i + 1;
_null_flags[i] = 1;
}

_probe_indexs[rows - 1] = probe_index;
_build_indexs[rows - 1] = 0;
_null_flags[rows - 1] = 0;
current_offset = rows;
build_index = 0;
probe_index++;
} else {
auto [new_probe_idx, new_build_idx, new_current_offset, picking_null_keys] =
hash_table_ctx.hash_table->find_null_aware_with_other_conjuncts(
hash_table_ctx.keys, hash_table_ctx.bucket_nums.data(), probe_index,
build_index, probe_rows, _probe_indexs.data(), _build_indexs.data(),
_null_flags.data(), _picking_null_keys);
probe_index = new_probe_idx;
build_index = new_build_idx;
current_offset = new_current_offset;
_picking_null_keys = picking_null_keys;
}
} else {
SCOPED_TIMER(_search_hashtable_timer);
auto [new_probe_idx, new_build_idx,
Expand Down Expand Up @@ -279,8 +311,7 @@ Status ProcessHashTableProbe<JoinOpType, Parent>::do_mark_join_conjuncts(
filter_data[i] = _build_indexs[i] != 0 && _build_indexs[i] != hash_table_bucket_size;
if constexpr (is_null_aware_join) {
if constexpr (with_other_conjuncts) {
mark_null_map[i] =
null_result.contains(_probe_indexs[i]) && _build_indexs[i] != 0;
mark_null_map[i] = _null_flags[i];
} else {
if (filter_data[i]) {
last_probe_matched = _probe_indexs[i];
Expand Down
8 changes: 0 additions & 8 deletions be/src/vec/exec/join/vhash_join_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -180,12 +180,6 @@ Status HashJoinNode::init(const TPlanNode& tnode, RuntimeState* state) {
}
#endif

if ((_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN ||
_join_op == TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN) &&
_have_other_join_conjunct) {
_build_indexes_null = std::make_shared<std::vector<uint32_t>>();
}

_runtime_filters.resize(_runtime_filter_descs.size());
for (size_t i = 0; i < _runtime_filter_descs.size(); i++) {
RETURN_IF_ERROR(state->runtime_filter_mgr()->register_producer_filter(
Expand Down Expand Up @@ -761,7 +755,6 @@ Status HashJoinNode::sink(doris::RuntimeState* state, vectorized::Block* in_bloc
// arena will be shared with other instances.
_shared_hash_table_context->arena = _arena;
_shared_hash_table_context->block = _build_block;
_shared_hash_table_context->build_indexes_null = _build_indexes_null;
_shared_hash_table_context->hash_table_variants = _hash_table_variants;
_shared_hash_table_context->short_circuit_for_null_in_probe_side =
_has_null_in_build_side;
Expand Down Expand Up @@ -794,7 +787,6 @@ Status HashJoinNode::sink(doris::RuntimeState* state, vectorized::Block* in_bloc
*std::static_pointer_cast<HashTableVariants>(
_shared_hash_table_context->hash_table_variants));
_build_block = _shared_hash_table_context->block;
_build_indexes_null = _shared_hash_table_context->build_indexes_null;

if (!_shared_hash_table_context->runtime_filters.empty()) {
auto ret = std::visit(
Expand Down
16 changes: 2 additions & 14 deletions be/src/vec/exec/join/vhash_join_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,11 +117,6 @@ struct ProcessHashTableBuild {
for (uint32_t i = 1; i < _rows; i++) {
if ((*null_map)[i]) {
*has_null_key = true;
if constexpr (with_other_conjuncts &&
(JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN ||
JoinOpType == TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN)) {
_parent->_build_indexes_null->emplace_back(i);
}
}
}
if (short_circuit_for_null && *has_null_key) {
Expand All @@ -136,8 +131,8 @@ struct ProcessHashTableBuild {
hash_table_ctx.init_serialized_keys(_build_raw_ptrs, _rows,
null_map ? null_map->data() : nullptr, true, true,
hash_table_ctx.hash_table->get_bucket_size());
hash_table_ctx.hash_table->build(hash_table_ctx.keys, hash_table_ctx.bucket_nums.data(),
_rows);
hash_table_ctx.hash_table->template build<JoinOpType, with_other_conjuncts>(
hash_table_ctx.keys, hash_table_ctx.bucket_nums.data(), _rows);
hash_table_ctx.bucket_nums.resize(_batch_size);
hash_table_ctx.bucket_nums.shrink_to_fit();

Expand Down Expand Up @@ -301,13 +296,6 @@ class HashJoinNode final : public VJoinNodeBase {
std::vector<uint16_t> _probe_column_disguise_null;
std::vector<uint16_t> _probe_column_convert_to_null;

/*
* For null aware anti/semi join with other join conjuncts, we do need to care about the rows in
* build side with null keys,
* because the other join conjuncts' result maybe change null to false(null & false == false).
*/
std::shared_ptr<std::vector<uint32_t>> _build_indexes_null;

DataTypes _right_table_data_types;
DataTypes _left_table_data_types;
std::vector<std::string> _right_table_column_names;
Expand Down
43 changes: 43 additions & 0 deletions regression-test/data/nereids_p0/join/test_mark_join.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !mark_join1 --
1 1 true
2 2 true
3 \N true
3 \N true
4 \N \N

-- !mark_join2 --
1 1 \N
2 2 \N
3 \N \N
3 \N true
4 \N true

-- !mark_join3 --
1 1 false
2 2 false
3 \N false
3 \N false
4 \N false

-- !mark_join4 --
1 1 false
2 2 false
3 \N \N
3 \N true
4 \N true

-- !mark_join5 --
1 1 false
2 2 false
3 \N true
3 \N true
4 \N \N

-- !mark_join6 --
1 1 true
2 2 true
3 \N false
3 \N true
4 \N false

Loading

0 comments on commit c8b0275

Please sign in to comment.