Skip to content

Commit

Permalink
[Feature](executor)Insert select limited by WorkloadGroup #30610
Browse files Browse the repository at this point in the history
  • Loading branch information
wangbo authored and Doris-Extras committed Jan 31, 2024
1 parent 8154b8a commit eacef8b
Show file tree
Hide file tree
Showing 9 changed files with 71 additions and 31 deletions.
2 changes: 1 addition & 1 deletion be/src/olap/delta_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ Status BaseDeltaWriter::init() {
RETURN_IF_ERROR(_rowset_builder->init());
RETURN_IF_ERROR(_memtable_writer->init(
_rowset_builder->rowset_writer(), _rowset_builder->tablet_schema(),
_rowset_builder->get_partial_update_info(),
_rowset_builder->get_partial_update_info(), nullptr,
_rowset_builder->tablet()->enable_unique_key_merge_on_write()));
ExecEnv::GetInstance()->memtable_memory_limiter()->register_writer(_memtable_writer);
_is_init = true;
Expand Down
6 changes: 6 additions & 0 deletions be/src/olap/delta_writer_v2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
#include "olap/tablet_manager.h"
#include "olap/tablet_schema.h"
#include "runtime/exec_env.h"
#include "runtime/query_context.h"
#include "service/backend_options.h"
#include "util/brpc_client_cache.h"
#include "util/debug_points.h"
Expand Down Expand Up @@ -134,7 +135,12 @@ Status DeltaWriterV2::init() {

_rowset_writer = std::make_shared<BetaRowsetWriterV2>(_streams);
RETURN_IF_ERROR(_rowset_writer->init(context));
ThreadPool* wg_thread_pool_ptr = nullptr;
if (_state->get_query_ctx()) {
wg_thread_pool_ptr = _state->get_query_ctx()->get_non_pipe_exec_thread_pool();
}
RETURN_IF_ERROR(_memtable_writer->init(_rowset_writer, _tablet_schema, _partial_update_info,
wg_thread_pool_ptr,
_streams[0]->enable_unique_mow(_req.index_id)));
ExecEnv::GetInstance()->memtable_memory_limiter()->register_writer(_memtable_writer);
_is_init = true;
Expand Down
12 changes: 12 additions & 0 deletions be/src/olap/memtable_flush_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,18 @@ Status MemTableFlushExecutor::create_flush_token(std::unique_ptr<FlushToken>& fl
}
}

Status MemTableFlushExecutor::create_flush_token(std::unique_ptr<FlushToken>& flush_token,
RowsetWriter* rowset_writer,
ThreadPool* wg_flush_pool_ptr) {
if (rowset_writer->type() == BETA_ROWSET) {
flush_token = std::make_unique<FlushToken>(wg_flush_pool_ptr);
} else {
return Status::InternalError<false>("not support alpha rowset load now.");
}
flush_token->set_rowset_writer(rowset_writer);
return Status::OK();
}

void MemTableFlushExecutor::_register_metrics() {
REGISTER_HOOK_METRIC(flush_thread_pool_queue_size,
[this]() { return _flush_pool->get_queue_size(); });
Expand Down
3 changes: 3 additions & 0 deletions be/src/olap/memtable_flush_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,9 @@ class MemTableFlushExecutor {
Status create_flush_token(std::unique_ptr<FlushToken>& flush_token, RowsetWriter* rowset_writer,
bool is_high_priority);

Status create_flush_token(std::unique_ptr<FlushToken>& flush_token, RowsetWriter* rowset_writer,
ThreadPool* wg_flush_pool_ptr);

private:
void _register_metrics();
static void _deregister_metrics();
Expand Down
11 changes: 8 additions & 3 deletions be/src/olap/memtable_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ MemTableWriter::~MemTableWriter() {
Status MemTableWriter::init(std::shared_ptr<RowsetWriter> rowset_writer,
TabletSchemaSPtr tablet_schema,
std::shared_ptr<PartialUpdateInfo> partial_update_info,
bool unique_key_mow) {
ThreadPool* wg_flush_pool_ptr, bool unique_key_mow) {
_rowset_writer = rowset_writer;
_tablet_schema = tablet_schema;
_unique_key_mow = unique_key_mow;
Expand All @@ -76,8 +76,13 @@ Status MemTableWriter::init(std::shared_ptr<RowsetWriter> rowset_writer,
// create flush handler
// by assigning segment_id to memtable before submiting to flush executor,
// we can make sure same keys sort in the same order in all replicas.
RETURN_IF_ERROR(StorageEngine::instance()->memtable_flush_executor()->create_flush_token(
_flush_token, _rowset_writer.get(), _req.is_high_priority));
if (wg_flush_pool_ptr) {
RETURN_IF_ERROR(StorageEngine::instance()->memtable_flush_executor()->create_flush_token(
_flush_token, _rowset_writer.get(), wg_flush_pool_ptr));
} else {
RETURN_IF_ERROR(StorageEngine::instance()->memtable_flush_executor()->create_flush_token(
_flush_token, _rowset_writer.get(), _req.is_high_priority));
}

_is_init = true;
return Status::OK();
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/memtable_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ class MemTableWriter {

Status init(std::shared_ptr<RowsetWriter> rowset_writer, TabletSchemaSPtr tablet_schema,
std::shared_ptr<PartialUpdateInfo> partial_update_info,
bool unique_key_mow = false);
ThreadPool* wg_flush_pool_ptr, bool unique_key_mow = false);

Status write(const vectorized::Block* block, const std::vector<uint32_t>& row_idxs,
bool is_append = false);
Expand Down
30 changes: 21 additions & 9 deletions be/src/vec/sink/writer/async_result_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,15 +92,27 @@ void AsyncResultWriter::start_writer(RuntimeState* state, RuntimeProfile* profil
// This is a async thread, should lock the task ctx, to make sure runtimestate and profile
// not deconstructed before the thread exit.
auto task_ctx = state->get_task_execution_context();
static_cast<void>(ExecEnv::GetInstance()->fragment_mgr()->get_thread_pool()->submit_func(
[this, state, profile, task_ctx]() {
auto task_lock = task_ctx.lock();
if (task_lock == nullptr) {
_writer_thread_closed = true;
return;
}
this->process_block(state, profile);
}));
if (state->get_query_ctx() && state->get_query_ctx()->get_non_pipe_exec_thread_pool()) {
ThreadPool* pool_ptr = state->get_query_ctx()->get_non_pipe_exec_thread_pool();
static_cast<void>(pool_ptr->submit_func([this, state, profile, task_ctx]() {
auto task_lock = task_ctx.lock();
if (task_lock == nullptr) {
_writer_thread_closed = true;
return;
}
this->process_block(state, profile);
}));
} else {
static_cast<void>(ExecEnv::GetInstance()->fragment_mgr()->get_thread_pool()->submit_func(
[this, state, profile, task_ctx]() {
auto task_lock = task_ctx.lock();
if (task_lock == nullptr) {
_writer_thread_closed = true;
return;
}
this->process_block(state, profile);
}));
}
}

void AsyncResultWriter::process_block(RuntimeState* state, RuntimeProfile* profile) {
Expand Down
31 changes: 19 additions & 12 deletions fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
Original file line number Diff line number Diff line change
Expand Up @@ -599,18 +599,25 @@ public TExecPlanFragmentParams getStreamLoadPlan() throws Exception {
@Override
public void exec() throws Exception {
// LoadTask does not have context, not controlled by queue now
if (Config.enable_workload_group && Config.enable_query_queue && context != null) {
queryQueue = context.getEnv().getWorkloadGroupMgr().getWorkloadGroupQueryQueue(context);
if (queryQueue == null) {
// This logic is actually useless, because when could not find query queue, it will
// throw exception during workload group manager.
throw new UserException("could not find query queue");
}
queueToken = queryQueue.getToken();
if (!queueToken.waitSignal(this.queryOptions.getExecutionTimeout() * 1000)) {
LOG.error("query (id=" + DebugUtil.printId(queryId) + ") " + queueToken.getOfferResultDetail());
queryQueue.returnToken(queueToken);
throw new UserException(queueToken.getOfferResultDetail());
if (context != null) {
if (Config.enable_workload_group) {
this.setTWorkloadGroups(context.getEnv().getWorkloadGroupMgr().getWorkloadGroup(context));
if (Config.enable_query_queue) {
queryQueue = context.getEnv().getWorkloadGroupMgr().getWorkloadGroupQueryQueue(context);
if (queryQueue == null) {
// This logic is actually useless, because when could not find query queue, it will
// throw exception during workload group manager.
throw new UserException("could not find query queue");
}
queueToken = queryQueue.getToken();
if (!queueToken.waitSignal(this.queryOptions.getExecutionTimeout() * 1000)) {
LOG.error("query (id=" + DebugUtil.printId(queryId) + ") " + queueToken.getOfferResultDetail());
queryQueue.returnToken(queueToken);
throw new UserException(queueToken.getOfferResultDetail());
}
}
} else {
context.setWorkloadGroupName("");
}
}
execInternal();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1525,11 +1525,6 @@ private void sendResult(boolean isOutfileQuery, boolean isSendFields, Queriable
coordBase = new PointQueryExec(planner, analyzer);
} else {
coord = new Coordinator(context, analyzer, planner, context.getStatsErrorEstimator());
if (Config.enable_workload_group) {
coord.setTWorkloadGroups(context.getEnv().getWorkloadGroupMgr().getWorkloadGroup(context));
} else {
context.setWorkloadGroupName("");
}
QeProcessorImpl.INSTANCE.registerQuery(context.queryId(),
new QeProcessorImpl.QueryInfo(context, originStmt.originStmt, coord));
profile.setExecutionProfile(coord.getExecutionProfile());
Expand Down

0 comments on commit eacef8b

Please sign in to comment.