Skip to content

Commit

Permalink
[feature](pipelineX) add runtimefliter in pipelineX multicast sink (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
Mryange authored Oct 10, 2023
1 parent 59dee6b commit 39669c6
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 4 deletions.
10 changes: 10 additions & 0 deletions be/src/pipeline/exec/multi_cast_data_stream_source.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,15 +125,25 @@ RuntimeProfile* MultiCastDataStreamerSourceOperator::get_runtime_profile() const
return _multi_cast_data_streamer->profile();
}

MultiCastDataStreamSourceLocalState::MultiCastDataStreamSourceLocalState(RuntimeState* state,
OperatorXBase* parent)
: Base(state, parent),
vectorized::RuntimeFilterConsumer(
static_cast<Parent*>(parent)->dest_id_from_sink(), parent->runtime_filter_descs(),
static_cast<Parent*>(parent)->_row_desc(), _conjuncts) {};

Status MultiCastDataStreamSourceLocalState::init(RuntimeState* state, LocalStateInfo& info) {
RETURN_IF_ERROR(Base::init(state, info));
RETURN_IF_ERROR(RuntimeFilterConsumer::init(state));
SCOPED_TIMER(profile()->total_time_counter());
SCOPED_TIMER(_open_timer);
auto& p = _parent->cast<Parent>();
_output_expr_contexts.resize(p._output_expr_contexts.size());
for (size_t i = 0; i < p._output_expr_contexts.size(); i++) {
RETURN_IF_ERROR(p._output_expr_contexts[i]->clone(state, _output_expr_contexts[i]));
}
// init profile for runtime filter
RuntimeFilterConsumer::_init_profile(profile());
return Status::OK();
}

Expand Down
26 changes: 22 additions & 4 deletions be/src/pipeline/exec/multi_cast_data_stream_source.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,15 +94,21 @@ class MultiCastDataStreamerSourceOperator final : public OperatorBase,

class MultiCastDataStreamerSourceOperatorX;

class MultiCastDataStreamSourceLocalState final : public PipelineXLocalState<MultiCastDependency> {
class MultiCastDataStreamSourceLocalState final : public PipelineXLocalState<MultiCastDependency>,
public vectorized::RuntimeFilterConsumer {
public:
ENABLE_FACTORY_CREATOR(MultiCastDataStreamSourceLocalState);
using Base = PipelineXLocalState<MultiCastDependency>;
using Parent = MultiCastDataStreamerSourceOperatorX;
MultiCastDataStreamSourceLocalState(RuntimeState* state, OperatorXBase* parent)
: Base(state, parent) {};

MultiCastDataStreamSourceLocalState(RuntimeState* state, OperatorXBase* parent);
Status init(RuntimeState* state, LocalStateInfo& info) override;

Status open(RuntimeState* state) override {
RETURN_IF_ERROR(Base::open(state));
RETURN_IF_ERROR(_acquire_runtime_filter());
return Status::OK();
}

friend class MultiCastDataStreamerSourceOperatorX;

private:
Expand Down Expand Up @@ -163,6 +169,18 @@ class MultiCastDataStreamerSourceOperatorX final

bool is_source() const override { return true; }

const std::vector<TRuntimeFilterDesc>& runtime_filter_descs() override {
return _t_data_stream_sink.runtime_filters;
}

int dest_id_from_sink() const { return _t_data_stream_sink.dest_node_id; }

bool runtime_filters_are_ready_or_timeout(RuntimeState* state) const override {
return state->get_local_state(id())
->template cast<MultiCastDataStreamSourceLocalState>()
.runtime_filters_are_ready_or_timeout();
}

private:
friend class MultiCastDataStreamSourceLocalState;
const int _consumer_id;
Expand Down

0 comments on commit 39669c6

Please sign in to comment.