Skip to content

Commit

Permalink
[pipelineX](fix) Fix nested loop join operator (apache#24885)
Browse files Browse the repository at this point in the history
  • Loading branch information
Gabriel39 authored Sep 26, 2023
1 parent 733b718 commit a3427cb
Show file tree
Hide file tree
Showing 8 changed files with 22 additions and 24 deletions.
5 changes: 4 additions & 1 deletion be/src/pipeline/exec/hashjoin_build_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ Status HashJoinBuildSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo
profile()->add_info_string("ShareHashTableEnabled", "false");
}
}
if (!_should_build_hash_table) {
_shared_hash_table_dependency->block_writing();
p._shared_hashtable_controller->append_dependency(p.id(), _shared_hash_table_dependency);
}

_memory_usage_counter = ADD_LABEL_COUNTER(profile(), "MemoryUsage");

Expand Down Expand Up @@ -445,7 +449,6 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block*
CREATE_SINK_LOCAL_STATE_RETURN_IF_ERROR(local_state);
SCOPED_TIMER(local_state.profile()->total_time_counter());
COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows());
SCOPED_TIMER(local_state._build_timer);

// make one block for each 4 gigabytes
constexpr static auto BUILD_BLOCK_MAX_SIZE = 4 * 1024UL * 1024UL * 1024UL;
Expand Down
5 changes: 1 addition & 4 deletions be/src/pipeline/exec/hashjoin_build_sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class SharedHashTableDependency : public WriteDependency {
public:
ENABLE_FACTORY_CREATOR(SharedHashTableDependency);
SharedHashTableDependency(int id) : WriteDependency(id, "SharedHashTableDependency") {}
~SharedHashTableDependency() = default;
~SharedHashTableDependency() override = default;

void* shared_state() override { return nullptr; }
};
Expand Down Expand Up @@ -135,9 +135,6 @@ class HashJoinBuildSinkOperatorX final

WriteDependency* wait_for_dependency(RuntimeState* state) override {
CREATE_SINK_LOCAL_STATE_RETURN_NULL_IF_ERROR(local_state);
if (local_state._should_build_hash_table) {
return nullptr;
}
return local_state._shared_hash_table_dependency->write_blocked_by();
}

Expand Down
3 changes: 0 additions & 3 deletions be/src/pipeline/exec/join_build_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,6 @@ Status JoinBuildSinkLocalState<DependencyType, Derived>::init(RuntimeState* stat

PipelineXSinkLocalState<DependencyType>::profile()->add_info_string("JoinType",
to_string(p._join_op));
_build_get_next_timer =
ADD_TIMER(PipelineXSinkLocalState<DependencyType>::profile(), "BuildGetNextTime");
_build_timer = ADD_TIMER(PipelineXSinkLocalState<DependencyType>::profile(), "BuildTime");
_build_rows_counter = ADD_COUNTER(PipelineXSinkLocalState<DependencyType>::profile(),
"BuildRows", TUnit::UNIT);

Expand Down
2 changes: 0 additions & 2 deletions be/src/pipeline/exec/join_build_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,6 @@ class JoinBuildSinkLocalState : public PipelineXSinkLocalState<DependencyType> {

bool _short_circuit_for_null_in_probe_side = false;

RuntimeProfile::Counter* _build_timer;
RuntimeProfile::Counter* _build_get_next_timer;
RuntimeProfile::Counter* _build_rows_counter;
RuntimeProfile::Counter* _push_down_timer;
RuntimeProfile::Counter* _push_compute_timer;
Expand Down
8 changes: 5 additions & 3 deletions be/src/pipeline/exec/nested_loop_join_build_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,15 @@ Status NestedLoopJoinBuildSinkLocalState::init(RuntimeState* state, LocalSinkSta
SCOPED_TIMER(profile()->total_time_counter());
SCOPED_TIMER(_open_timer);
auto& p = _parent->cast<NestedLoopJoinBuildSinkOperatorX>();
_shared_state->join_op_variants = p._join_op_variants;
_filter_src_expr_ctxs.resize(p._filter_src_expr_ctxs.size());
for (size_t i = 0; i < _filter_src_expr_ctxs.size(); i++) {
RETURN_IF_ERROR(p._filter_src_expr_ctxs[i]->clone(state, _filter_src_expr_ctxs[i]));
}
for (size_t i = 0; i < p._runtime_filter_descs.size(); i++) {
RETURN_IF_ERROR(state->runtime_filter_mgr()->register_producer_filter(
p._runtime_filter_descs[i], state->query_options()));
}
return Status::OK();
}

Expand All @@ -61,8 +66,6 @@ Status NestedLoopJoinBuildSinkOperatorX::init(const TPlanNode& tnode, RuntimeSta
std::vector<TExpr> filter_src_exprs;
for (size_t i = 0; i < _runtime_filter_descs.size(); i++) {
filter_src_exprs.push_back(_runtime_filter_descs[i].src_expr);
RETURN_IF_ERROR(state->runtime_filter_mgr()->register_producer_filter(
_runtime_filter_descs[i], state->query_options()));
}
RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(filter_src_exprs, _filter_src_expr_ctxs));
return Status::OK();
Expand Down Expand Up @@ -90,7 +93,6 @@ Status NestedLoopJoinBuildSinkOperatorX::sink(doris::RuntimeState* state, vector
CREATE_SINK_LOCAL_STATE_RETURN_IF_ERROR(local_state);
SCOPED_TIMER(local_state.profile()->total_time_counter());
COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)block->rows());
SCOPED_TIMER(local_state._build_timer);
auto rows = block->rows();
auto mem_usage = block->allocated_bytes();

Expand Down
8 changes: 4 additions & 4 deletions be/src/pipeline/pipeline_x/pipeline_x_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ Status PipelineXTask::prepare(RuntimeState* state, const TPipelineInstanceParams
auto& deps = get_downstream_dependency();
std::vector<LocalSinkStateInfo> infos;
for (auto& dep : deps) {
infos.push_back(LocalSinkStateInfo {_pipeline->pipeline_profile(),
local_params.sender_id, dep.get(), tsink});
infos.emplace_back(
LocalSinkStateInfo {_parent_profile, local_params.sender_id, dep.get(), tsink});
}
RETURN_IF_ERROR(_sink->setup_local_states(state, infos));
}
Expand All @@ -84,10 +84,10 @@ Status PipelineXTask::prepare(RuntimeState* state, const TPipelineInstanceParams
for (auto& dep : deps) {
LocalStateInfo info {
op_idx == _operators.size() - 1
? _pipeline->pipeline_profile()
? _parent_profile
: state->get_local_state(_operators[op_idx + 1]->id())->profile(),
scan_ranges, dep.get()};
infos.push_back(info);
infos.emplace_back(info);
}
RETURN_IF_ERROR(_operators[op_idx]->setup_local_states(state, infos));
}
Expand Down
5 changes: 3 additions & 2 deletions be/src/vec/runtime/shared_hash_table_controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ void SharedHashTableController::set_builder_and_consumers(TUniqueId builder, int
std::lock_guard<std::mutex> lock(_mutex);
DCHECK(_builder_fragment_ids.find(node_id) == _builder_fragment_ids.cend());
_builder_fragment_ids.insert({node_id, builder});
_dependencies.insert({node_id, {}});
}

bool SharedHashTableController::should_build_hash_table(const TUniqueId& fragment_instance_id,
Expand Down Expand Up @@ -70,7 +71,7 @@ void SharedHashTableController::signal(int my_node_id, Status status) {
it->second->status = status;
_shared_contexts.erase(it);
}
for (auto& dep : _dependencies) {
for (auto& dep : _dependencies[my_node_id]) {
dep->set_ready_for_write();
}
_cv.notify_all();
Expand All @@ -83,7 +84,7 @@ void SharedHashTableController::signal(int my_node_id) {
it->second->signaled = true;
_shared_contexts.erase(it);
}
for (auto& dep : _dependencies) {
for (auto& dep : _dependencies[my_node_id]) {
dep->set_ready_for_write();
}
_cv.notify_all();
Expand Down
10 changes: 5 additions & 5 deletions be/src/vec/runtime/shared_hash_table_controller.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,20 +79,20 @@ class SharedHashTableController {
Status wait_for_signal(RuntimeState* state, const SharedHashTableContextPtr& context);
bool should_build_hash_table(const TUniqueId& fragment_instance_id, int my_node_id);
void set_pipeline_engine_enabled(bool enabled) { _pipeline_engine_enabled = enabled; }
void append_dependency(std::shared_ptr<pipeline::SharedHashTableDependency> dep) {
void append_dependency(int node_id, std::shared_ptr<pipeline::SharedHashTableDependency> dep) {
std::lock_guard<std::mutex> lock(_mutex);
_dependencies.push_back(dep);
_dependencies[node_id].push_back(dep);
}

private:
bool _pipeline_engine_enabled = false;
std::mutex _mutex;
// For pipelineX, we update all dependencies once hash table is built;
std::map<int /*node id*/, std::vector<std::shared_ptr<pipeline::SharedHashTableDependency>>>
_dependencies;
std::condition_variable _cv;
std::map<int /*node id*/, TUniqueId /*fragment instance id*/> _builder_fragment_ids;
std::map<int /*node id*/, SharedHashTableContextPtr> _shared_contexts;

// For pipelineX, we update all dependencies once hash table is built;
std::vector<std::shared_ptr<pipeline::SharedHashTableDependency>> _dependencies;
};

} // namespace vectorized
Expand Down

0 comments on commit a3427cb

Please sign in to comment.