Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[opt](spill) handle oom exception in spill tasks (#35025) #35171

Merged
merged 1 commit into from
Jul 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 40 additions & 33 deletions be/src/pipeline/exec/partition_sort_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -180,42 +180,49 @@ Status PartitionSortSinkOperatorX::_emplace_into_hash_table(
const vectorized::ColumnRawPtrs& key_columns, const vectorized::Block* input_block,
PartitionSortSinkLocalState& local_state, bool eos) {
return std::visit(
[&](auto&& agg_method) -> Status {
SCOPED_TIMER(local_state._build_timer);
using HashMethodType = std::decay_t<decltype(agg_method)>;
using AggState = typename HashMethodType::State;
vectorized::Overload {
[&](std::monostate& arg) -> Status {
return Status::InternalError("Unit hash table");
},
[&](auto& agg_method) -> Status {
SCOPED_TIMER(local_state._build_timer);
using HashMethodType = std::decay_t<decltype(agg_method)>;
using AggState = typename HashMethodType::State;

AggState state(key_columns);
size_t num_rows = input_block->rows();
agg_method.init_serialized_keys(key_columns, num_rows);
AggState state(key_columns);
size_t num_rows = input_block->rows();
agg_method.init_serialized_keys(key_columns, num_rows);

auto creator = [&](const auto& ctor, auto& key, auto& origin) {
HashMethodType::try_presis_key(key, origin, *local_state._agg_arena_pool);
auto* aggregate_data = _pool->add(new vectorized::PartitionBlocks(
local_state._partition_sort_info, local_state._value_places.empty()));
local_state._value_places.push_back(aggregate_data);
ctor(key, aggregate_data);
local_state._num_partition++;
};
auto creator_for_null_key = [&](auto& mapped) {
mapped = _pool->add(new vectorized::PartitionBlocks(
local_state._partition_sort_info, local_state._value_places.empty()));
local_state._value_places.push_back(mapped);
local_state._num_partition++;
};
auto creator = [&](const auto& ctor, auto& key, auto& origin) {
HashMethodType::try_presis_key(key, origin,
*local_state._agg_arena_pool);
auto* aggregate_data = _pool->add(new vectorized::PartitionBlocks(
local_state._partition_sort_info,
local_state._value_places.empty()));
local_state._value_places.push_back(aggregate_data);
ctor(key, aggregate_data);
local_state._num_partition++;
};
auto creator_for_null_key = [&](auto& mapped) {
mapped = _pool->add(new vectorized::PartitionBlocks(
local_state._partition_sort_info,
local_state._value_places.empty()));
local_state._value_places.push_back(mapped);
local_state._num_partition++;
};

SCOPED_TIMER(local_state._emplace_key_timer);
for (size_t row = 0; row < num_rows; ++row) {
auto& mapped =
agg_method.lazy_emplace(state, row, creator, creator_for_null_key);
mapped->add_row_idx(row);
}
for (auto* place : local_state._value_places) {
SCOPED_TIMER(local_state._selector_block_timer);
RETURN_IF_ERROR(place->append_block_by_selector(input_block, eos));
}
return Status::OK();
},
SCOPED_TIMER(local_state._emplace_key_timer);
for (size_t row = 0; row < num_rows; ++row) {
auto& mapped = agg_method.lazy_emplace(state, row, creator,
creator_for_null_key);
mapped->add_row_idx(row);
}
for (auto* place : local_state._value_places) {
SCOPED_TIMER(local_state._selector_block_timer);
RETURN_IF_ERROR(place->append_block_by_selector(input_block, eos));
}
return Status::OK();
}},
local_state._partitioned_data->method_variant);
}

Expand Down
17 changes: 11 additions & 6 deletions be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -298,12 +298,17 @@ Status PartitionedAggSinkLocalState::revoke_memory(RuntimeState* state) {
}};
auto* runtime_state = _runtime_state.get();
auto* agg_data = parent._agg_sink_operator->get_agg_data(runtime_state);
Base::_shared_state->sink_status = std::visit(
[&](auto&& agg_method) -> Status {
auto& hash_table = *agg_method.hash_table;
return _spill_hash_table(state, agg_method, hash_table, _eos);
},
agg_data->method_variant);
Base::_shared_state->sink_status =
std::visit(vectorized::Overload {
[&](std::monostate& arg) -> Status {
return Status::InternalError("Unit hash table");
},
[&](auto& agg_method) -> Status {
auto& hash_table = *agg_method.hash_table;
RETURN_IF_CATCH_EXCEPTION(return _spill_hash_table(
state, agg_method, hash_table, _eos));
}},
agg_data->method_variant);
RETURN_IF_ERROR(Base::_shared_state->sink_status);
Base::_shared_state->sink_status =
parent._agg_sink_operator->reset_hash_table(runtime_state);
Expand Down
145 changes: 74 additions & 71 deletions be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -214,80 +214,83 @@ Status PartitionedAggLocalState::initiate_merge_spill_partition_agg_data(Runtime

MonotonicStopWatch submit_timer;
submit_timer.start();
auto spill_func = [this, state, query_id, execution_context, submit_timer] {
_spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
Defer defer {[&]() {
if (!_status.ok() || state->is_cancelled()) {
if (!_status.ok()) {
LOG(WARNING) << "query " << print_id(query_id) << " agg node "
<< _parent->node_id()
<< " merge spilled agg data error: " << _status;
}
_shared_state->close();
} else if (_shared_state->spill_partitions.empty()) {
VLOG_DEBUG << "query " << print_id(query_id) << " agg node " << _parent->node_id()
<< " merge spilled agg data finish";
}
Base::_shared_state->in_mem_shared_state->aggregate_data_container->init_once();
_is_merging = false;
_dependency->Dependency::set_ready();
}};
bool has_agg_data = false;
auto& parent = Base::_parent->template cast<Parent>();
while (!state->is_cancelled() && !has_agg_data &&
!_shared_state->spill_partitions.empty()) {
for (auto& stream : _shared_state->spill_partitions[0]->spill_streams_) {
stream->set_read_counters(Base::_spill_read_data_time,
Base::_spill_deserialize_time, Base::_spill_read_bytes,
Base::_spill_read_wait_io_timer);
vectorized::Block block;
bool eos = false;
while (!eos && !state->is_cancelled()) {
{
SCOPED_TIMER(Base::_spill_recover_time);
_status = stream->read_next_block_sync(&block, &eos);
}
RETURN_IF_ERROR(_status);

if (!block.empty()) {
has_agg_data = true;
_status = parent._agg_source_operator
->merge_with_serialized_key_helper<false>(
_runtime_state.get(), &block);
RETURN_IF_ERROR(_status);
}
}
(void)ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(stream);
}
_shared_state->spill_partitions.pop_front();
}
if (_shared_state->spill_partitions.empty()) {
_shared_state->close();
}
return _status;
};

auto exception_catch_func = [spill_func, query_id, mem_tracker, shared_state_holder,
execution_context, this]() {
SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id);
std::shared_ptr<TaskExecutionContext> execution_context_lock;
auto shared_state_sptr = shared_state_holder.lock();
if (shared_state_sptr) {
execution_context_lock = execution_context.lock();
}
if (!shared_state_sptr || !execution_context_lock) {
LOG(INFO) << "query " << print_id(query_id)
<< " execution_context released, maybe query was cancelled.";
return;
}

auto status = [&]() { RETURN_IF_CATCH_EXCEPTION({ return spill_func(); }); }();

if (!status.ok()) {
_status = status;
}
};

RETURN_IF_ERROR(
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool()->submit_func(
[this, state, query_id, mem_tracker, shared_state_holder, execution_context,
submit_timer] {
SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id);
std::shared_ptr<TaskExecutionContext> execution_context_lock;
auto shared_state_sptr = shared_state_holder.lock();
if (shared_state_sptr) {
execution_context_lock = execution_context.lock();
}
if (!shared_state_sptr || !execution_context_lock) {
LOG(INFO) << "query " << print_id(query_id)
<< " execution_context released, maybe query was cancelled.";
// FIXME: return status is meaningless?
return Status::Cancelled("Cancelled");
}

_spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
Defer defer {[&]() {
if (!_status.ok() || state->is_cancelled()) {
if (!_status.ok()) {
LOG(WARNING) << "query " << print_id(query_id) << " agg node "
<< _parent->node_id()
<< " merge spilled agg data error: " << _status;
}
_shared_state->close();
} else if (_shared_state->spill_partitions.empty()) {
VLOG_DEBUG << "query " << print_id(query_id) << " agg node "
<< _parent->node_id()
<< " merge spilled agg data finish";
}
Base::_shared_state->in_mem_shared_state->aggregate_data_container
->init_once();
_is_merging = false;
_dependency->Dependency::set_ready();
}};
bool has_agg_data = false;
auto& parent = Base::_parent->template cast<Parent>();
while (!state->is_cancelled() && !has_agg_data &&
!_shared_state->spill_partitions.empty()) {
for (auto& stream :
_shared_state->spill_partitions[0]->spill_streams_) {
stream->set_read_counters(
Base::_spill_read_data_time, Base::_spill_deserialize_time,
Base::_spill_read_bytes, Base::_spill_read_wait_io_timer);
vectorized::Block block;
bool eos = false;
while (!eos && !state->is_cancelled()) {
{
SCOPED_TIMER(Base::_spill_recover_time);
_status = stream->read_next_block_sync(&block, &eos);
}
RETURN_IF_ERROR(_status);

if (!block.empty()) {
has_agg_data = true;
_status = parent._agg_source_operator
->merge_with_serialized_key_helper<false>(
_runtime_state.get(), &block);
RETURN_IF_ERROR(_status);
}
}
(void)ExecEnv::GetInstance()
->spill_stream_mgr()
->delete_spill_stream(stream);
}
_shared_state->spill_partitions.pop_front();
}
if (_shared_state->spill_partitions.empty()) {
_shared_state->close();
}
return _status;
}));
exception_catch_func));
return Status::OK();
}
} // namespace doris::pipeline
Loading
Loading