Skip to content

Commit

Permalink
[opt](spill) handle oom exception in spill tasks (#35025)
Browse files Browse the repository at this point in the history
  • Loading branch information
mrhhsg committed Jun 6, 2024
1 parent b6ab0c4 commit 15e9c3b
Show file tree
Hide file tree
Showing 11 changed files with 458 additions and 503 deletions.
73 changes: 40 additions & 33 deletions be/src/pipeline/exec/partition_sort_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -180,42 +180,49 @@ Status PartitionSortSinkOperatorX::_emplace_into_hash_table(
const vectorized::ColumnRawPtrs& key_columns, const vectorized::Block* input_block,
PartitionSortSinkLocalState& local_state, bool eos) {
return std::visit(
[&](auto&& agg_method) -> Status {
SCOPED_TIMER(local_state._build_timer);
using HashMethodType = std::decay_t<decltype(agg_method)>;
using AggState = typename HashMethodType::State;
vectorized::Overload {
[&](std::monostate& arg) -> Status {
return Status::InternalError("Unit hash table");
},
[&](auto& agg_method) -> Status {
SCOPED_TIMER(local_state._build_timer);
using HashMethodType = std::decay_t<decltype(agg_method)>;
using AggState = typename HashMethodType::State;

AggState state(key_columns);
size_t num_rows = input_block->rows();
agg_method.init_serialized_keys(key_columns, num_rows);
AggState state(key_columns);
size_t num_rows = input_block->rows();
agg_method.init_serialized_keys(key_columns, num_rows);

auto creator = [&](const auto& ctor, auto& key, auto& origin) {
HashMethodType::try_presis_key(key, origin, *local_state._agg_arena_pool);
auto* aggregate_data = _pool->add(new vectorized::PartitionBlocks(
local_state._partition_sort_info, local_state._value_places.empty()));
local_state._value_places.push_back(aggregate_data);
ctor(key, aggregate_data);
local_state._num_partition++;
};
auto creator_for_null_key = [&](auto& mapped) {
mapped = _pool->add(new vectorized::PartitionBlocks(
local_state._partition_sort_info, local_state._value_places.empty()));
local_state._value_places.push_back(mapped);
local_state._num_partition++;
};
auto creator = [&](const auto& ctor, auto& key, auto& origin) {
HashMethodType::try_presis_key(key, origin,
*local_state._agg_arena_pool);
auto* aggregate_data = _pool->add(new vectorized::PartitionBlocks(
local_state._partition_sort_info,
local_state._value_places.empty()));
local_state._value_places.push_back(aggregate_data);
ctor(key, aggregate_data);
local_state._num_partition++;
};
auto creator_for_null_key = [&](auto& mapped) {
mapped = _pool->add(new vectorized::PartitionBlocks(
local_state._partition_sort_info,
local_state._value_places.empty()));
local_state._value_places.push_back(mapped);
local_state._num_partition++;
};

SCOPED_TIMER(local_state._emplace_key_timer);
for (size_t row = 0; row < num_rows; ++row) {
auto& mapped =
agg_method.lazy_emplace(state, row, creator, creator_for_null_key);
mapped->add_row_idx(row);
}
for (auto* place : local_state._value_places) {
SCOPED_TIMER(local_state._selector_block_timer);
RETURN_IF_ERROR(place->append_block_by_selector(input_block, eos));
}
return Status::OK();
},
SCOPED_TIMER(local_state._emplace_key_timer);
for (size_t row = 0; row < num_rows; ++row) {
auto& mapped = agg_method.lazy_emplace(state, row, creator,
creator_for_null_key);
mapped->add_row_idx(row);
}
for (auto* place : local_state._value_places) {
SCOPED_TIMER(local_state._selector_block_timer);
RETURN_IF_ERROR(place->append_block_by_selector(input_block, eos));
}
return Status::OK();
}},
local_state._partitioned_data->method_variant);
}

Expand Down
17 changes: 11 additions & 6 deletions be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -298,12 +298,17 @@ Status PartitionedAggSinkLocalState::revoke_memory(RuntimeState* state) {
}};
auto* runtime_state = _runtime_state.get();
auto* agg_data = parent._agg_sink_operator->get_agg_data(runtime_state);
Base::_shared_state->sink_status = std::visit(
[&](auto&& agg_method) -> Status {
auto& hash_table = *agg_method.hash_table;
return _spill_hash_table(state, agg_method, hash_table, _eos);
},
agg_data->method_variant);
Base::_shared_state->sink_status =
std::visit(vectorized::Overload {
[&](std::monostate& arg) -> Status {
return Status::InternalError("Unit hash table");
},
[&](auto& agg_method) -> Status {
auto& hash_table = *agg_method.hash_table;
RETURN_IF_CATCH_EXCEPTION(return _spill_hash_table(
state, agg_method, hash_table, _eos));
}},
agg_data->method_variant);
RETURN_IF_ERROR(Base::_shared_state->sink_status);
Base::_shared_state->sink_status =
parent._agg_sink_operator->reset_hash_table(runtime_state);
Expand Down
145 changes: 74 additions & 71 deletions be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -214,80 +214,83 @@ Status PartitionedAggLocalState::initiate_merge_spill_partition_agg_data(Runtime

MonotonicStopWatch submit_timer;
submit_timer.start();
auto spill_func = [this, state, query_id, execution_context, submit_timer] {
_spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
Defer defer {[&]() {
if (!_status.ok() || state->is_cancelled()) {
if (!_status.ok()) {
LOG(WARNING) << "query " << print_id(query_id) << " agg node "
<< _parent->node_id()
<< " merge spilled agg data error: " << _status;
}
_shared_state->close();
} else if (_shared_state->spill_partitions.empty()) {
VLOG_DEBUG << "query " << print_id(query_id) << " agg node " << _parent->node_id()
<< " merge spilled agg data finish";
}
Base::_shared_state->in_mem_shared_state->aggregate_data_container->init_once();
_is_merging = false;
_dependency->Dependency::set_ready();
}};
bool has_agg_data = false;
auto& parent = Base::_parent->template cast<Parent>();
while (!state->is_cancelled() && !has_agg_data &&
!_shared_state->spill_partitions.empty()) {
for (auto& stream : _shared_state->spill_partitions[0]->spill_streams_) {
stream->set_read_counters(Base::_spill_read_data_time,
Base::_spill_deserialize_time, Base::_spill_read_bytes,
Base::_spill_read_wait_io_timer);
vectorized::Block block;
bool eos = false;
while (!eos && !state->is_cancelled()) {
{
SCOPED_TIMER(Base::_spill_recover_time);
_status = stream->read_next_block_sync(&block, &eos);
}
RETURN_IF_ERROR(_status);

if (!block.empty()) {
has_agg_data = true;
_status = parent._agg_source_operator
->merge_with_serialized_key_helper<false>(
_runtime_state.get(), &block);
RETURN_IF_ERROR(_status);
}
}
(void)ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(stream);
}
_shared_state->spill_partitions.pop_front();
}
if (_shared_state->spill_partitions.empty()) {
_shared_state->close();
}
return _status;
};

auto exception_catch_func = [spill_func, query_id, mem_tracker, shared_state_holder,
execution_context, this]() {
SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id);
std::shared_ptr<TaskExecutionContext> execution_context_lock;
auto shared_state_sptr = shared_state_holder.lock();
if (shared_state_sptr) {
execution_context_lock = execution_context.lock();
}
if (!shared_state_sptr || !execution_context_lock) {
LOG(INFO) << "query " << print_id(query_id)
<< " execution_context released, maybe query was cancelled.";
return;
}

auto status = [&]() { RETURN_IF_CATCH_EXCEPTION({ return spill_func(); }); }();

if (!status.ok()) {
_status = status;
}
};

RETURN_IF_ERROR(
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool()->submit_func(
[this, state, query_id, mem_tracker, shared_state_holder, execution_context,
submit_timer] {
SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id);
std::shared_ptr<TaskExecutionContext> execution_context_lock;
auto shared_state_sptr = shared_state_holder.lock();
if (shared_state_sptr) {
execution_context_lock = execution_context.lock();
}
if (!shared_state_sptr || !execution_context_lock) {
LOG(INFO) << "query " << print_id(query_id)
<< " execution_context released, maybe query was cancelled.";
// FIXME: return status is meaningless?
return Status::Cancelled("Cancelled");
}

_spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
Defer defer {[&]() {
if (!_status.ok() || state->is_cancelled()) {
if (!_status.ok()) {
LOG(WARNING) << "query " << print_id(query_id) << " agg node "
<< _parent->node_id()
<< " merge spilled agg data error: " << _status;
}
_shared_state->close();
} else if (_shared_state->spill_partitions.empty()) {
VLOG_DEBUG << "query " << print_id(query_id) << " agg node "
<< _parent->node_id()
<< " merge spilled agg data finish";
}
Base::_shared_state->in_mem_shared_state->aggregate_data_container
->init_once();
_is_merging = false;
_dependency->Dependency::set_ready();
}};
bool has_agg_data = false;
auto& parent = Base::_parent->template cast<Parent>();
while (!state->is_cancelled() && !has_agg_data &&
!_shared_state->spill_partitions.empty()) {
for (auto& stream :
_shared_state->spill_partitions[0]->spill_streams_) {
stream->set_read_counters(
Base::_spill_read_data_time, Base::_spill_deserialize_time,
Base::_spill_read_bytes, Base::_spill_read_wait_io_timer);
vectorized::Block block;
bool eos = false;
while (!eos && !state->is_cancelled()) {
{
SCOPED_TIMER(Base::_spill_recover_time);
_status = stream->read_next_block_sync(&block, &eos);
}
RETURN_IF_ERROR(_status);

if (!block.empty()) {
has_agg_data = true;
_status = parent._agg_source_operator
->merge_with_serialized_key_helper<false>(
_runtime_state.get(), &block);
RETURN_IF_ERROR(_status);
}
}
(void)ExecEnv::GetInstance()
->spill_stream_mgr()
->delete_spill_stream(stream);
}
_shared_state->spill_partitions.pop_front();
}
if (_shared_state->spill_partitions.empty()) {
_shared_state->close();
}
return _status;
}));
exception_catch_func));
return Status::OK();
}
} // namespace doris::pipeline
Loading

0 comments on commit 15e9c3b

Please sign in to comment.