Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
Gabriel39 committed May 22, 2024
1 parent 2064b33 commit 0c243c8
Show file tree
Hide file tree
Showing 8 changed files with 341 additions and 6 deletions.
116 changes: 116 additions & 0 deletions be/src/pipeline/exec/memory_scratch_sink_operator.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "memory_scratch_sink_operator.h"

#include <memory>

#include "common/object_pool.h"
#include "pipeline/exec/operator.h"
#include "runtime/record_batch_queue.h"
#include "util/arrow/block_convertor.h"
#include "util/arrow/row_batch.h"
#include "vec/exprs/vexpr.h"
#include "vec/exprs/vexpr_context.h"

namespace doris::pipeline {

Status MemoryScratchSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) {
RETURN_IF_ERROR(Base::init(state, info));
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_init_timer);
// create queue
state->exec_env()->result_queue_mgr()->create_queue(state->fragment_instance_id(), &_queue);

auto& p = _parent->cast<MemoryScratchSinkOperatorX>();
_output_vexpr_ctxs.resize(p._output_vexpr_ctxs.size());
for (size_t i = 0; i < _output_vexpr_ctxs.size(); i++) {
RETURN_IF_ERROR(p._output_vexpr_ctxs[i]->clone(state, _output_vexpr_ctxs[i]));
}
_queue_dependency =
Dependency::create_shared(p.operator_id(), p.node_id(), "QueueDependency", true);
_queue->set_dep(_queue_dependency);
return Status::OK();
}

Status MemoryScratchSinkLocalState::close(RuntimeState* state, Status exec_status) {
if (_closed) {
return Status::OK();
}
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_close_timer);
if (_queue != nullptr) {
_queue->blocking_put(nullptr);
}
RETURN_IF_ERROR(Base::close(state, exec_status));
return Status::OK();
}

MemoryScratchSinkOperatorX::MemoryScratchSinkOperatorX(const RowDescriptor& row_desc,
int operator_id,
const std::vector<TExpr>& t_output_expr)
: DataSinkOperatorX(operator_id, 0), _row_desc(row_desc), _t_output_expr(t_output_expr) {}

Status MemoryScratchSinkOperatorX::init(const TDataSink& thrift_sink) {
RETURN_IF_ERROR(DataSinkOperatorX<MemoryScratchSinkLocalState>::init(thrift_sink));
// From the thrift expressions create the real exprs.
RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(_t_output_expr, _output_vexpr_ctxs));
return Status::OK();
}

Status MemoryScratchSinkOperatorX::prepare(RuntimeState* state) {
RETURN_IF_ERROR(DataSinkOperatorX<MemoryScratchSinkLocalState>::prepare(state));
// Prepare the exprs to run.
RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_vexpr_ctxs, state, _row_desc));
_timezone_obj = state->timezone_obj();
return Status::OK();
}

Status MemoryScratchSinkOperatorX::open(RuntimeState* state) {
RETURN_IF_ERROR(DataSinkOperatorX<MemoryScratchSinkLocalState>::open(state));
// Prepare the exprs to run.
RETURN_IF_ERROR(vectorized::VExpr::open(_output_vexpr_ctxs, state));
return Status::OK();
}

Status MemoryScratchSinkOperatorX::sink(RuntimeState* state, vectorized::Block* input_block,
bool eos) {
auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.exec_time_counter());
if (nullptr == input_block || 0 == input_block->rows()) {
return Status::OK();
}
COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)input_block->rows());
std::shared_ptr<arrow::RecordBatch> result;
// Exec vectorized expr here to speed up, block.rows() == 0 means expr exec
// failed, just return the error status
vectorized::Block block;
RETURN_IF_ERROR(vectorized::VExprContext::get_output_block_after_execute_exprs(
local_state._output_vexpr_ctxs, *input_block, &block));
std::shared_ptr<arrow::Schema> block_arrow_schema;
// After expr executed, use recaculated schema as final schema
RETURN_IF_ERROR(convert_block_arrow_schema(block, &block_arrow_schema));
RETURN_IF_ERROR(convert_to_arrow_batch(block, block_arrow_schema, arrow::default_memory_pool(),
&result, _timezone_obj));
local_state._queue->blocking_put(result);
if (local_state._queue->size() < 10) {
local_state._queue_dependency->block();
}
return Status::OK();
}

} // namespace doris::pipeline
68 changes: 68 additions & 0 deletions be/src/pipeline/exec/memory_scratch_sink_operator.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <stdint.h>

#include "operator.h"
#include "runtime/result_queue_mgr.h"

namespace doris::pipeline {

class MemoryScratchSinkOperatorX;
class MemoryScratchSinkLocalState final : public PipelineXSinkLocalState<FakeSharedState> {
ENABLE_FACTORY_CREATOR(MemoryScratchSinkLocalState);

public:
using Base = PipelineXSinkLocalState<FakeSharedState>;
MemoryScratchSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state)
: Base(parent, state) {}

Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
Status close(RuntimeState* state, Status exec_status) override;
std::vector<Dependency*> dependencies() const override { return {_queue_dependency.get()}; }

private:
friend class MemoryScratchSinkOperatorX;
BlockQueueSharedPtr _queue;

// Owned by the RuntimeState.
VExprContextSPtrs _output_vexpr_ctxs;

std::shared_ptr<Dependency> _queue_dependency = nullptr;
};

class MemoryScratchSinkOperatorX final : public DataSinkOperatorX<MemoryScratchSinkLocalState> {
public:
MemoryScratchSinkOperatorX(const RowDescriptor& row_desc, int operator_id,
const std::vector<TExpr>& t_output_expr);
Status init(const TDataSink& thrift_sink) override;
Status prepare(RuntimeState* state) override;
Status open(RuntimeState* state) override;

Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override;

private:
friend class MemoryScratchSinkLocalState;
const RowDescriptor& _row_desc;
cctz::time_zone _timezone_obj;
const std::vector<TExpr>& _t_output_expr;
VExprContextSPtrs _output_vexpr_ctxs;
};

} // namespace doris::pipeline
2 changes: 2 additions & 0 deletions be/src/pipeline/exec/operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
#include "pipeline/exec/hive_table_sink_operator.h"
#include "pipeline/exec/jdbc_scan_operator.h"
#include "pipeline/exec/jdbc_table_sink_operator.h"
#include "pipeline/exec/memory_scratch_sink_operator.h"
#include "pipeline/exec/meta_scan_operator.h"
#include "pipeline/exec/multi_cast_data_stream_sink.h"
#include "pipeline/exec/multi_cast_data_stream_source.h"
Expand Down Expand Up @@ -645,6 +646,7 @@ Status AsyncWriterSink<Writer, Parent>::close(RuntimeState* state, Status exec_s
DECLARE_OPERATOR_X(HashJoinBuildSinkLocalState)
DECLARE_OPERATOR_X(ResultSinkLocalState)
DECLARE_OPERATOR_X(JdbcTableSinkLocalState)
DECLARE_OPERATOR_X(MemoryScratchSinkLocalState)
DECLARE_OPERATOR_X(ResultFileSinkLocalState)
DECLARE_OPERATOR_X(OlapTableSinkLocalState)
DECLARE_OPERATOR_X(OlapTableSinkV2LocalState)
Expand Down
10 changes: 10 additions & 0 deletions be/src/pipeline/pipeline_fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
#include "pipeline/exec/hive_table_sink_operator.h"
#include "pipeline/exec/jdbc_scan_operator.h"
#include "pipeline/exec/jdbc_table_sink_operator.h"
#include "pipeline/exec/memory_scratch_sink_operator.h"
#include "pipeline/exec/meta_scan_operator.h"
#include "pipeline/exec/multi_cast_data_stream_sink.h"
#include "pipeline/exec/multi_cast_data_stream_source.h"
Expand Down Expand Up @@ -973,6 +974,15 @@ Status PipelineFragmentContext::_create_data_sink(ObjectPool* pool, const TDataS
}
break;
}
case TDataSinkType::MEMORY_SCRATCH_SINK: {
if (!thrift_sink.__isset.memory_scratch_sink) {
return Status::InternalError("Missing data buffer sink.");
}

_sink.reset(
new MemoryScratchSinkOperatorX(row_desc, next_sink_operator_id(), output_exprs));
break;
}
case TDataSinkType::RESULT_FILE_SINK: {
if (!thrift_sink.__isset.result_file_sink) {
return Status::InternalError("Missing result file sink.");
Expand Down
9 changes: 9 additions & 0 deletions be/src/runtime/record_batch_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,19 @@

#include "runtime/record_batch_queue.h"

#include "pipeline/dependency.h"
#include "util/spinlock.h"

namespace doris {

bool RecordBatchQueue::blocking_get(std::shared_ptr<arrow::RecordBatch>* result) {
auto res = _queue.blocking_get(result);
if (_dep && size() <= 10) {
_dep->set_ready();
}
return res;
}

void RecordBatchQueue::update_status(const Status& status) {
if (status.ok()) {
return;
Expand Down
12 changes: 9 additions & 3 deletions be/src/runtime/record_batch_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ class RecordBatch;

namespace doris {

namespace pipeline {
class Dependency;
}

// The RecordBatchQueue is created and managed by the ResultQueueMgr to
// cache external query results, as well as query status. Where both
// BlockingGet and BlockingPut operations block if the queue is empty or
Expand All @@ -48,9 +52,7 @@ class RecordBatchQueue {

void update_status(const Status& status);

bool blocking_get(std::shared_ptr<arrow::RecordBatch>* result) {
return _queue.blocking_get(result);
}
bool blocking_get(std::shared_ptr<arrow::RecordBatch>* result);

bool blocking_put(const std::shared_ptr<arrow::RecordBatch>& val) {
return _queue.blocking_put(val);
Expand All @@ -60,11 +62,15 @@ class RecordBatchQueue {
void shutdown();

size_t size() { return _queue.get_size(); }
void set_dep(std::shared_ptr<pipeline::Dependency> dep) {
_dep = dep;
}

private:
BlockingQueue<std::shared_ptr<arrow::RecordBatch>> _queue;
SpinLock _status_lock;
Status _status;
std::shared_ptr<pipeline::Dependency> _dep = nullptr;
};

} // namespace doris
Loading

0 comments on commit 0c243c8

Please sign in to comment.