Skip to content

Commit

Permalink
Merge branch 'load-stream-no-heavy' of github.com:kaijchen/doris into…
Browse files Browse the repository at this point in the history
… load-stream-no-heavy
  • Loading branch information
kaijchen committed Dec 21, 2023
2 parents 062930b + b6daf41 commit 234b5b0
Show file tree
Hide file tree
Showing 180 changed files with 4,269 additions and 3,238 deletions.
2 changes: 2 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -776,6 +776,8 @@ DEFINE_Int64(load_stream_max_buf_size, "20971520"); // 20MB
DEFINE_Int32(load_stream_messages_in_batch, "128");
// brpc streaming StreamWait seconds on EAGAIN
DEFINE_Int32(load_stream_eagain_wait_seconds, "60");
// max tasks per flush token in load stream
DEFINE_Int32(load_stream_flush_token_max_tasks, "2");

// max send batch parallelism for OlapTableSink
// The value set by the user for send_batch_parallelism is not allowed to exceed max_send_batch_parallelism_per_job,
Expand Down
2 changes: 2 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -837,6 +837,8 @@ DECLARE_Int64(load_stream_max_buf_size);
DECLARE_Int32(load_stream_messages_in_batch);
// brpc streaming StreamWait seconds on EAGAIN
DECLARE_Int32(load_stream_eagain_wait_seconds);
// max tasks per flush token in load stream
DECLARE_Int32(load_stream_flush_token_max_tasks);

// max send batch parallelism for OlapTableSink
// The value set by the user for send_batch_parallelism is not allowed to exceed max_send_batch_parallelism_per_job,
Expand Down
38 changes: 34 additions & 4 deletions be/src/exprs/runtime_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -946,10 +946,11 @@ class RuntimePredicateWrapper {
Status IRuntimeFilter::create(RuntimeFilterParamsContext* state, ObjectPool* pool,
const TRuntimeFilterDesc* desc, const TQueryOptions* query_options,
const RuntimeFilterRole role, int node_id, IRuntimeFilter** res,
bool build_bf_exactly) {
*res = pool->add(new IRuntimeFilter(state, pool, desc));
bool build_bf_exactly, bool is_global, int parallel_tasks) {
*res = pool->add(new IRuntimeFilter(state, pool, desc, is_global, parallel_tasks));
(*res)->set_role(role);
return (*res)->init_with_desc(desc, query_options, node_id, build_bf_exactly);
return (*res)->init_with_desc(desc, query_options, node_id,
is_global ? false : build_bf_exactly);
}

void IRuntimeFilter::copy_to_shared_context(vectorized::SharedRuntimeFilterContext& context) {
Expand All @@ -972,9 +973,35 @@ void IRuntimeFilter::insert_batch(const vectorized::ColumnPtr column, size_t sta
_wrapper->insert_batch(column, start);
}

Status IRuntimeFilter::merge_local_filter(RuntimePredicateWrapper* wrapper, int* merged_num) {
SCOPED_TIMER(_merge_local_rf_timer);
std::unique_lock lock(_local_merge_mutex);
if (_merged_rf_num == 0) {
_wrapper = wrapper;
} else {
RETURN_IF_ERROR(merge_from(wrapper));
}
*merged_num = ++_merged_rf_num;
return Status::OK();
}

Status IRuntimeFilter::publish() {
DCHECK(is_producer());
if (_has_local_target) {
if (_is_global) {
std::vector<IRuntimeFilter*> filters;
RETURN_IF_ERROR(_state->get_query_ctx()->runtime_filter_mgr()->get_consume_filters(
_filter_id, filters));
// push down
for (auto filter : filters) {
int merged_num = 0;
RETURN_IF_ERROR(filter->merge_local_filter(_wrapper, &merged_num));
if (merged_num == _parallel_build_tasks) {
filter->update_runtime_filter_type_to_profile();
filter->signal();
}
}
return Status::OK();
} else if (_has_local_target) {
std::vector<IRuntimeFilter*> filters;
RETURN_IF_ERROR(_state->runtime_filter_mgr->get_consume_filters(_filter_id, filters));
// push down
Expand Down Expand Up @@ -1297,6 +1324,9 @@ void IRuntimeFilter::init_profile(RuntimeProfile* parent_profile) {
_profile_init = true;
parent_profile->add_child(_profile.get(), true, nullptr);
_profile->add_info_string("Info", _format_status());
if (_is_global) {
_merge_local_rf_timer = ADD_TIMER(_profile.get(), "MergeLocalRuntimeFilterTime");
}
if (_runtime_filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER) {
update_runtime_filter_type_to_profile();
}
Expand Down
22 changes: 19 additions & 3 deletions be/src/exprs/runtime_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ enum RuntimeFilterState {
class IRuntimeFilter {
public:
IRuntimeFilter(RuntimeFilterParamsContext* state, ObjectPool* pool,
const TRuntimeFilterDesc* desc)
const TRuntimeFilterDesc* desc, bool is_global = false, int parallel_tasks = -1)
: _state(state),
_pool(pool),
_filter_id(desc->filter_id),
Expand All @@ -206,14 +206,17 @@ class IRuntimeFilter {
_runtime_filter_type(get_runtime_filter_type(desc)),
_name(fmt::format("RuntimeFilter: (id = {}, type = {})", _filter_id,
to_string(_runtime_filter_type))),
_profile(new RuntimeProfile(_name)) {}
_profile(new RuntimeProfile(_name)),
_is_global(is_global),
_parallel_build_tasks(parallel_tasks) {}

~IRuntimeFilter() = default;

static Status create(RuntimeFilterParamsContext* state, ObjectPool* pool,
const TRuntimeFilterDesc* desc, const TQueryOptions* query_options,
const RuntimeFilterRole role, int node_id, IRuntimeFilter** res,
bool build_bf_exactly = false);
bool build_bf_exactly = false, bool is_global = false,
int parallel_tasks = 0);

void copy_to_shared_context(vectorized::SharedRuntimeFilterContext& context);
Status copy_from_shared_context(vectorized::SharedRuntimeFilterContext& context);
Expand Down Expand Up @@ -359,6 +362,8 @@ class IRuntimeFilter {

void set_filter_timer(std::shared_ptr<pipeline::RuntimeFilterTimer>);

Status merge_local_filter(RuntimePredicateWrapper* wrapper, int* merged_num);

protected:
// serialize _wrapper to protobuf
void to_protobuf(PInFilter* filter);
Expand Down Expand Up @@ -452,7 +457,18 @@ class IRuntimeFilter {
// parent profile
// only effect on consumer
std::unique_ptr<RuntimeProfile> _profile;
RuntimeProfile::Counter* _merge_local_rf_timer = nullptr;
bool _opt_remote_rf;
// `_is_global` indicates whether this runtime filter is global on this BE.
// All runtime filters should be merged on each BE if it is global.
// This is improvement for pipelineX.
const bool _is_global = false;
std::mutex _local_merge_mutex;
// There are `_parallel_build_tasks` pipeline tasks to build runtime filter.
// We should call `signal` once all runtime filters are done and merged to one
// (e.g. `_merged_rf_num` is equal to `_parallel_build_tasks`).
int _merged_rf_num = 0;
const int _parallel_build_tasks = -1;

std::vector<std::shared_ptr<pipeline::RuntimeFilterTimer>> _filter_timer;
};
Expand Down
12 changes: 9 additions & 3 deletions be/src/exprs/runtime_filter_slots.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,10 @@ class VRuntimeFilterSlots {
public:
VRuntimeFilterSlots(
const std::vector<std::shared_ptr<vectorized::VExprContext>>& build_expr_ctxs,
const std::vector<TRuntimeFilterDesc>& runtime_filter_descs)
: _build_expr_context(build_expr_ctxs), _runtime_filter_descs(runtime_filter_descs) {}
const std::vector<TRuntimeFilterDesc>& runtime_filter_descs, bool is_global = false)
: _build_expr_context(build_expr_ctxs),
_runtime_filter_descs(runtime_filter_descs),
_is_global(is_global) {}

Status init(RuntimeState* state, int64_t hash_table_size) {
// runtime filter effect strategy
Expand All @@ -45,7 +47,10 @@ class VRuntimeFilterSlots {

std::map<int, bool> has_in_filter;

auto ignore_local_filter = [state](int filter_id) {
auto ignore_local_filter = [&](int filter_id) {
if (_is_global) {
return Status::OK();
}
std::vector<IRuntimeFilter*> filters;
RETURN_IF_ERROR(state->runtime_filter_mgr()->get_consume_filters(filter_id, filters));
if (filters.empty()) {
Expand Down Expand Up @@ -236,6 +241,7 @@ class VRuntimeFilterSlots {
private:
const std::vector<std::shared_ptr<vectorized::VExprContext>>& _build_expr_context;
const std::vector<TRuntimeFilterDesc>& _runtime_filter_descs;
const bool _is_global = false;
// prob_contition index -> [IRuntimeFilter]
std::map<int, std::list<IRuntimeFilter*>> _runtime_filters;
};
Expand Down
5 changes: 3 additions & 2 deletions be/src/olap/storage_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1230,14 +1230,15 @@ void StorageEngine::notify_listeners() {
}

bool StorageEngine::notify_listener(std::string_view name) {
bool found = false;
std::lock_guard<std::mutex> l(_report_mtx);
for (auto& listener : _report_listeners) {
if (listener->name() == name) {
listener->notify();
return true;
found = true;
}
}
return false;
return found;
}

// check whether any unused rowsets's id equal to rowset_id
Expand Down
27 changes: 20 additions & 7 deletions be/src/olap/task/engine_publish_version_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ Status EnginePublishVersionTask::execute() {
}
#endif

std::vector<std::shared_ptr<TabletPublishTxnTask>> tablet_tasks;
// each partition
for (auto& par_ver_info : _publish_version_req.partition_version_infos) {
int64_t partition_id = par_ver_info.partition_id;
Expand Down Expand Up @@ -242,6 +243,7 @@ Status EnginePublishVersionTask::execute() {

auto tablet_publish_txn_ptr = std::make_shared<TabletPublishTxnTask>(
this, tablet, rowset, partition_id, transaction_id, version, tablet_info);
tablet_tasks.push_back(tablet_publish_txn_ptr);
auto submit_st = token->submit_func([=]() { tablet_publish_txn_ptr->handle(); });
#ifndef NDEBUG
LOG(INFO) << "transaction_id: " << transaction_id << ", partition id: " << partition_id
Expand All @@ -254,6 +256,15 @@ Status EnginePublishVersionTask::execute() {
}
token->wait();

if (res.ok()) {
for (const auto& tablet_task : tablet_tasks) {
res = tablet_task->result();
if (!res.ok()) {
break;
}
}
}

_succ_tablets->clear();
// check if the related tablet remained all have the version
for (auto& par_ver_info : _publish_version_req.partition_version_infos) {
Expand Down Expand Up @@ -343,24 +354,24 @@ void TabletPublishTxnTask::handle() {
rowset_update_lock.lock();
}
_stats.schedule_time_us = MonotonicMicros() - _stats.submit_time_us;
auto publish_status = StorageEngine::instance()->txn_manager()->publish_txn(
_result = StorageEngine::instance()->txn_manager()->publish_txn(
_partition_id, _tablet, _transaction_id, _version, &_stats);
if (!publish_status.ok()) {
if (!_result.ok()) {
LOG(WARNING) << "failed to publish version. rowset_id=" << _rowset->rowset_id()
<< ", tablet_id=" << _tablet_info.tablet_id << ", txn_id=" << _transaction_id
<< ", res=" << publish_status;
<< ", res=" << _result;
_engine_publish_version_task->add_error_tablet_id(_tablet_info.tablet_id);
return;
}

// add visible rowset to tablet
int64_t t1 = MonotonicMicros();
publish_status = _tablet->add_inc_rowset(_rowset);
_result = _tablet->add_inc_rowset(_rowset);
_stats.add_inc_rowset_us = MonotonicMicros() - t1;
if (!publish_status.ok() && !publish_status.is<PUSH_VERSION_ALREADY_EXIST>()) {
if (!_result.ok() && !_result.is<PUSH_VERSION_ALREADY_EXIST>()) {
LOG(WARNING) << "fail to add visible rowset to tablet. rowset_id=" << _rowset->rowset_id()
<< ", tablet_id=" << _tablet_info.tablet_id << ", txn_id=" << _transaction_id
<< ", res=" << publish_status;
<< ", res=" << _result;
_engine_publish_version_task->add_error_tablet_id(_tablet_info.tablet_id);
return;
}
Expand All @@ -370,9 +381,11 @@ void TabletPublishTxnTask::handle() {
LOG(INFO) << "publish version successfully on tablet"
<< ", table_id=" << _tablet->table_id() << ", tablet=" << _tablet->tablet_id()
<< ", transaction_id=" << _transaction_id << ", version=" << _version.first
<< ", num_rows=" << _rowset->num_rows() << ", res=" << publish_status
<< ", num_rows=" << _rowset->num_rows() << ", res=" << _result
<< ", cost: " << cost_us << "(us) "
<< (cost_us > 500 * 1000 ? _stats.to_string() : "");

_result = Status::OK();
}

void AsyncTabletPublishTask::handle() {
Expand Down
2 changes: 2 additions & 0 deletions be/src/olap/task/engine_publish_version_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ class TabletPublishTxnTask {
~TabletPublishTxnTask() = default;

void handle();
Status result() { return _result; }

private:
EnginePublishVersionTask* _engine_publish_version_task = nullptr;
Expand All @@ -80,6 +81,7 @@ class TabletPublishTxnTask {
Version _version;
TabletInfo _tablet_info;
TabletPublishStatistics _stats;
Status _result;
};

class EnginePublishVersionTask : public EngineTask {
Expand Down
10 changes: 5 additions & 5 deletions be/src/pipeline/exec/aggregation_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -366,15 +366,15 @@ class AggSinkOperatorX : public DataSinkOperatorX<LocalStateType> {
Status sink(RuntimeState* state, vectorized::Block* in_block,
SourceState source_state) override;

std::vector<TExpr> get_local_shuffle_exprs() const override { return _partition_exprs; }
ExchangeType get_local_exchange_type() const override {
DataDistribution get_local_exchange_type() const override {
if (_probe_expr_ctxs.empty()) {
return _needs_finalize || DataSinkOperatorX<LocalStateType>::_child_x
->ignore_data_distribution()
? ExchangeType::PASSTHROUGH
: ExchangeType::NOOP;
? DataDistribution(ExchangeType::PASSTHROUGH)
: DataSinkOperatorX<LocalStateType>::get_local_exchange_type();
}
return _is_colocate ? ExchangeType::BUCKET_HASH_SHUFFLE : ExchangeType::HASH_SHUFFLE;
return _is_colocate ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
: DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs);
}

using DataSinkOperatorX<LocalStateType>::id;
Expand Down
5 changes: 3 additions & 2 deletions be/src/pipeline/exec/aggregation_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -506,8 +506,9 @@ Status AggLocalState::_get_without_key_result(RuntimeState* state, vectorized::B
if (!column_type->is_nullable() || data_types[i]->is_nullable() ||
!remove_nullable(column_type)->equals(*data_types[i])) {
return Status::InternalError(
"column_type not match data_types, column_type={}, data_types={}",
column_type->get_name(), data_types[i]->get_name());
"node id = {}, column_type not match data_types, column_type={}, "
"data_types={}",
_parent->node_id(), column_type->get_name(), data_types[i]->get_name());
}
}

Expand Down
11 changes: 6 additions & 5 deletions be/src/pipeline/exec/analytic_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,14 +107,15 @@ class AnalyticSinkOperatorX final : public DataSinkOperatorX<AnalyticSinkLocalSt

Status sink(RuntimeState* state, vectorized::Block* in_block,
SourceState source_state) override;
std::vector<TExpr> get_local_shuffle_exprs() const override { return _partition_exprs; }
ExchangeType get_local_exchange_type() const override {
DataDistribution get_local_exchange_type() const override {
if (_partition_by_eq_expr_ctxs.empty()) {
return ExchangeType::PASSTHROUGH;
return {ExchangeType::PASSTHROUGH};
} else if (_order_by_eq_expr_ctxs.empty()) {
return _is_colocate ? ExchangeType::BUCKET_HASH_SHUFFLE : ExchangeType::HASH_SHUFFLE;
return _is_colocate
? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
: DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs);
}
return ExchangeType::NOOP;
return DataSinkOperatorX<AnalyticSinkLocalState>::get_local_exchange_type();
}

private:
Expand Down
4 changes: 3 additions & 1 deletion be/src/pipeline/exec/assert_num_rows_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@ class AssertNumRowsOperatorX final : public StreamingOperatorX<AssertNumRowsLoca

[[nodiscard]] bool is_source() const override { return false; }

ExchangeType get_local_exchange_type() const override { return ExchangeType::PASSTHROUGH; }
DataDistribution get_local_exchange_type() const override {
return {ExchangeType::PASSTHROUGH};
}

private:
friend class AssertNumRowsLocalState;
Expand Down
Loading

0 comments on commit 234b5b0

Please sign in to comment.