diff --git a/be/src/pipeline/exec/memory_scratch_sink_operator.cpp b/be/src/pipeline/exec/memory_scratch_sink_operator.cpp new file mode 100644 index 000000000000000..e59ccf0333eed87 --- /dev/null +++ b/be/src/pipeline/exec/memory_scratch_sink_operator.cpp @@ -0,0 +1,116 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "memory_scratch_sink_operator.h" + +#include + +#include "common/object_pool.h" +#include "pipeline/exec/operator.h" +#include "runtime/record_batch_queue.h" +#include "util/arrow/block_convertor.h" +#include "util/arrow/row_batch.h" +#include "vec/exprs/vexpr.h" +#include "vec/exprs/vexpr_context.h" + +namespace doris::pipeline { + +Status MemoryScratchSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) { + RETURN_IF_ERROR(Base::init(state, info)); + SCOPED_TIMER(exec_time_counter()); + SCOPED_TIMER(_init_timer); + // create queue + state->exec_env()->result_queue_mgr()->create_queue(state->fragment_instance_id(), &_queue); + + auto& p = _parent->cast(); + _output_vexpr_ctxs.resize(p._output_vexpr_ctxs.size()); + for (size_t i = 0; i < _output_vexpr_ctxs.size(); i++) { + RETURN_IF_ERROR(p._output_vexpr_ctxs[i]->clone(state, _output_vexpr_ctxs[i])); + } + _queue_dependency = + Dependency::create_shared(p.operator_id(), p.node_id(), "QueueDependency", true); + _queue->set_dep(_queue_dependency); + return Status::OK(); +} + +Status MemoryScratchSinkLocalState::close(RuntimeState* state, Status exec_status) { + if (_closed) { + return Status::OK(); + } + SCOPED_TIMER(exec_time_counter()); + SCOPED_TIMER(_close_timer); + if (_queue != nullptr) { + _queue->blocking_put(nullptr); + } + RETURN_IF_ERROR(Base::close(state, exec_status)); + return Status::OK(); +} + +MemoryScratchSinkOperatorX::MemoryScratchSinkOperatorX(const RowDescriptor& row_desc, + int operator_id, + const std::vector& t_output_expr) + : DataSinkOperatorX(operator_id, 0), _row_desc(row_desc), _t_output_expr(t_output_expr) {} + +Status MemoryScratchSinkOperatorX::init(const TDataSink& thrift_sink) { + RETURN_IF_ERROR(DataSinkOperatorX::init(thrift_sink)); + // From the thrift expressions create the real exprs. + RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(_t_output_expr, _output_vexpr_ctxs)); + return Status::OK(); +} + +Status MemoryScratchSinkOperatorX::prepare(RuntimeState* state) { + RETURN_IF_ERROR(DataSinkOperatorX::prepare(state)); + // Prepare the exprs to run. + RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_vexpr_ctxs, state, _row_desc)); + _timezone_obj = state->timezone_obj(); + return Status::OK(); +} + +Status MemoryScratchSinkOperatorX::open(RuntimeState* state) { + RETURN_IF_ERROR(DataSinkOperatorX::open(state)); + // Prepare the exprs to run. + RETURN_IF_ERROR(vectorized::VExpr::open(_output_vexpr_ctxs, state)); + return Status::OK(); +} + +Status MemoryScratchSinkOperatorX::sink(RuntimeState* state, vectorized::Block* input_block, + bool eos) { + auto& local_state = get_local_state(state); + SCOPED_TIMER(local_state.exec_time_counter()); + if (nullptr == input_block || 0 == input_block->rows()) { + return Status::OK(); + } + COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)input_block->rows()); + std::shared_ptr result; + // Exec vectorized expr here to speed up, block.rows() == 0 means expr exec + // failed, just return the error status + vectorized::Block block; + RETURN_IF_ERROR(vectorized::VExprContext::get_output_block_after_execute_exprs( + local_state._output_vexpr_ctxs, *input_block, &block)); + std::shared_ptr block_arrow_schema; + // After expr executed, use recaculated schema as final schema + RETURN_IF_ERROR(convert_block_arrow_schema(block, &block_arrow_schema)); + RETURN_IF_ERROR(convert_to_arrow_batch(block, block_arrow_schema, arrow::default_memory_pool(), + &result, _timezone_obj)); + local_state._queue->blocking_put(result); + if (local_state._queue->size() < 10) { + local_state._queue_dependency->block(); + } + return Status::OK(); +} + +} // namespace doris::pipeline diff --git a/be/src/pipeline/exec/memory_scratch_sink_operator.h b/be/src/pipeline/exec/memory_scratch_sink_operator.h new file mode 100644 index 000000000000000..114e1e40fb17d65 --- /dev/null +++ b/be/src/pipeline/exec/memory_scratch_sink_operator.h @@ -0,0 +1,68 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include + +#include "operator.h" +#include "runtime/result_queue_mgr.h" + +namespace doris::pipeline { + +class MemoryScratchSinkOperatorX; +class MemoryScratchSinkLocalState final : public PipelineXSinkLocalState { + ENABLE_FACTORY_CREATOR(MemoryScratchSinkLocalState); + +public: + using Base = PipelineXSinkLocalState; + MemoryScratchSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state) + : Base(parent, state) {} + + Status init(RuntimeState* state, LocalSinkStateInfo& info) override; + Status close(RuntimeState* state, Status exec_status) override; + std::vector dependencies() const override { return {_queue_dependency.get()}; } + +private: + friend class MemoryScratchSinkOperatorX; + BlockQueueSharedPtr _queue; + + // Owned by the RuntimeState. + VExprContextSPtrs _output_vexpr_ctxs; + + std::shared_ptr _queue_dependency = nullptr; +}; + +class MemoryScratchSinkOperatorX final : public DataSinkOperatorX { +public: + MemoryScratchSinkOperatorX(const RowDescriptor& row_desc, int operator_id, + const std::vector& t_output_expr); + Status init(const TDataSink& thrift_sink) override; + Status prepare(RuntimeState* state) override; + Status open(RuntimeState* state) override; + + Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override; + +private: + friend class MemoryScratchSinkLocalState; + const RowDescriptor& _row_desc; + cctz::time_zone _timezone_obj; + const std::vector& _t_output_expr; + VExprContextSPtrs _output_vexpr_ctxs; +}; + +} // namespace doris::pipeline diff --git a/be/src/pipeline/exec/operator.cpp b/be/src/pipeline/exec/operator.cpp index 22e25518daa8ff8..fdbe1e9f86f8ec8 100644 --- a/be/src/pipeline/exec/operator.cpp +++ b/be/src/pipeline/exec/operator.cpp @@ -40,6 +40,7 @@ #include "pipeline/exec/hive_table_sink_operator.h" #include "pipeline/exec/jdbc_scan_operator.h" #include "pipeline/exec/jdbc_table_sink_operator.h" +#include "pipeline/exec/memory_scratch_sink_operator.h" #include "pipeline/exec/meta_scan_operator.h" #include "pipeline/exec/multi_cast_data_stream_sink.h" #include "pipeline/exec/multi_cast_data_stream_source.h" @@ -645,6 +646,7 @@ Status AsyncWriterSink::close(RuntimeState* state, Status exec_s DECLARE_OPERATOR_X(HashJoinBuildSinkLocalState) DECLARE_OPERATOR_X(ResultSinkLocalState) DECLARE_OPERATOR_X(JdbcTableSinkLocalState) +DECLARE_OPERATOR_X(MemoryScratchSinkLocalState) DECLARE_OPERATOR_X(ResultFileSinkLocalState) DECLARE_OPERATOR_X(OlapTableSinkLocalState) DECLARE_OPERATOR_X(OlapTableSinkV2LocalState) diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 07acf651e623b76..42e5b769fcc3eaf 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -57,6 +57,7 @@ #include "pipeline/exec/hive_table_sink_operator.h" #include "pipeline/exec/jdbc_scan_operator.h" #include "pipeline/exec/jdbc_table_sink_operator.h" +#include "pipeline/exec/memory_scratch_sink_operator.h" #include "pipeline/exec/meta_scan_operator.h" #include "pipeline/exec/multi_cast_data_stream_sink.h" #include "pipeline/exec/multi_cast_data_stream_source.h" @@ -973,6 +974,15 @@ Status PipelineFragmentContext::_create_data_sink(ObjectPool* pool, const TDataS } break; } + case TDataSinkType::MEMORY_SCRATCH_SINK: { + if (!thrift_sink.__isset.memory_scratch_sink) { + return Status::InternalError("Missing data buffer sink."); + } + + _sink.reset( + new MemoryScratchSinkOperatorX(row_desc, next_sink_operator_id(), output_exprs)); + break; + } case TDataSinkType::RESULT_FILE_SINK: { if (!thrift_sink.__isset.result_file_sink) { return Status::InternalError("Missing result file sink."); diff --git a/be/src/runtime/record_batch_queue.cpp b/be/src/runtime/record_batch_queue.cpp index 1f76102bb94a4b3..839826888809489 100644 --- a/be/src/runtime/record_batch_queue.cpp +++ b/be/src/runtime/record_batch_queue.cpp @@ -17,10 +17,19 @@ #include "runtime/record_batch_queue.h" +#include "pipeline/dependency.h" #include "util/spinlock.h" namespace doris { +bool RecordBatchQueue::blocking_get(std::shared_ptr* result) { + auto res = _queue.blocking_get(result); + if (_dep && size() <= 10) { + _dep->set_ready(); + } + return res; +} + void RecordBatchQueue::update_status(const Status& status) { if (status.ok()) { return; diff --git a/be/src/runtime/record_batch_queue.h b/be/src/runtime/record_batch_queue.h index 7528b85f09d69be..e2fbcb01ca08f2b 100644 --- a/be/src/runtime/record_batch_queue.h +++ b/be/src/runtime/record_batch_queue.h @@ -33,6 +33,10 @@ class RecordBatch; namespace doris { +namespace pipeline { +class Dependency; +} + // The RecordBatchQueue is created and managed by the ResultQueueMgr to // cache external query results, as well as query status. Where both // BlockingGet and BlockingPut operations block if the queue is empty or @@ -48,9 +52,7 @@ class RecordBatchQueue { void update_status(const Status& status); - bool blocking_get(std::shared_ptr* result) { - return _queue.blocking_get(result); - } + bool blocking_get(std::shared_ptr* result); bool blocking_put(const std::shared_ptr& val) { return _queue.blocking_put(val); @@ -60,11 +62,15 @@ class RecordBatchQueue { void shutdown(); size_t size() { return _queue.get_size(); } + void set_dep(std::shared_ptr dep) { + _dep = dep; + } private: BlockingQueue> _queue; SpinLock _status_lock; Status _status; + std::shared_ptr _dep = nullptr; }; } // namespace doris diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 39a59dae3e06632..b8b066f92484996 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -768,7 +768,130 @@ private void execInternal() throws Exception { * @throws UserException */ private void sendFragment() throws TException, RpcException, UserException { - throw new UserException("Pipeline load should be enabled!"); + lock(); + try { + Multiset hostCounter = HashMultiset.create(); + for (FragmentExecParams params : fragmentExecParamsMap.values()) { + for (FInstanceExecParam fi : params.instanceExecParams) { + hostCounter.add(fi.host); + } + } + + int backendIdx = 0; + int profileFragmentId = 0; + long memoryLimit = queryOptions.getMemLimit(); + Map numSinkOnBackend = Maps.newHashMap(); + beToExecStates.clear(); + // If #fragments >=2, use twoPhaseExecution with exec_plan_fragments_prepare and exec_plan_fragments_start, + // else use exec_plan_fragments directly. + // we choose #fragments >=2 because in some cases + // we need ensure that A fragment is already prepared to receive data before B fragment sends data. + // For example: select * from numbers("number"="10") will generate ExchangeNode and + // TableValuedFunctionScanNode, we should ensure TableValuedFunctionScanNode does + // not send data until ExchangeNode is ready to receive. + boolean twoPhaseExecution = fragments.size() >= 2; + for (PlanFragment fragment : fragments) { + FragmentExecParams params = fragmentExecParamsMap.get(fragment.getFragmentId()); + + // 1. set up exec states + int instanceNum = params.instanceExecParams.size(); + Preconditions.checkState(instanceNum > 0); + List tParams = params.toThrift(backendIdx); + + // 2. update memory limit for colocate join + if (colocateFragmentIds.contains(fragment.getFragmentId().asInt())) { + int rate = Math.min(Config.query_colocate_join_memory_limit_penalty_factor, instanceNum); + long newMemory = memoryLimit / rate; + // TODO(zxy): The meaning of mem limit in query_options has become the real once query mem limit. + // The logic to modify mem_limit here needs to be modified or deleted. + for (TExecPlanFragmentParams tParam : tParams) { + tParam.query_options.setMemLimit(newMemory); + } + } + + boolean needCheckBackendState = false; + if (queryOptions.getQueryType() == TQueryType.LOAD && profileFragmentId == 0) { + // this is a load process, and it is the first fragment. + // we should add all BackendExecState of this fragment to needCheckBackendExecStates, + // so that we can check these backends' state when joining this Coordinator + needCheckBackendState = true; + } + + // 3. group BackendExecState by BE. So that we can use one RPC to send all fragment instances of a BE. + int instanceId = 0; + for (TExecPlanFragmentParams tParam : tParams) { + BackendExecState execState = + new BackendExecState(fragment.getFragmentId(), instanceId++, + tParam, this.addressToBackendID, executionProfile); + // Each tParam will set the total number of Fragments that need to be executed on the same BE, + // and the BE will determine whether all Fragments have been executed based on this information. + // Notice. load fragment has a small probability that FragmentNumOnHost is 0, for unknown reasons. + tParam.setFragmentNumOnHost(hostCounter.count(execState.address)); + tParam.setBackendId(execState.backend.getId()); + tParam.setNeedWaitExecutionTrigger(twoPhaseExecution); + + backendExecStates.add(execState); + if (needCheckBackendState) { + needCheckBackendExecStates.add(execState); + if (LOG.isDebugEnabled()) { + LOG.debug("add need check backend {} for fragment, {} job: {}", + execState.backend.getId(), fragment.getFragmentId().asInt(), jobId); + } + } + + BackendExecStates states = beToExecStates.get(execState.backend.getId()); + if (states == null) { + states = new BackendExecStates(execState.backend.getId(), execState.brpcAddress, + twoPhaseExecution, execState.backend.getProcessEpoch()); + beToExecStates.putIfAbsent(execState.backend.getId(), states); + } + states.addState(execState); + if (tParam.getFragment().getOutputSink() != null + && tParam.getFragment().getOutputSink().getType() == TDataSinkType.OLAP_TABLE_SINK) { + numSinkOnBackend.merge(execState.backend.getId(), 1, Integer::sum); + } + ++backendIdx; + } + int loadStreamPerNode = 1; + if (ConnectContext.get() != null && ConnectContext.get().getSessionVariable() != null) { + loadStreamPerNode = ConnectContext.get().getSessionVariable().getLoadStreamPerNode(); + } + for (TExecPlanFragmentParams tParam : tParams) { + if (tParam.getFragment().getOutputSink() != null + && tParam.getFragment().getOutputSink().getType() == TDataSinkType.OLAP_TABLE_SINK) { + tParam.setLoadStreamPerNode(loadStreamPerNode); + tParam.setTotalLoadStreams(numSinkOnBackend.size() * loadStreamPerNode); + tParam.setNumLocalSink(numSinkOnBackend.get(tParam.getBackendId())); + LOG.info("num local sink for backend {} is {}", tParam.getBackendId(), + numSinkOnBackend.get(tParam.getBackendId())); + } + } + profileFragmentId += 1; + } // end for fragments + + // 4. send and wait fragments rpc + List>> + futures = Lists.newArrayList(); + + for (BackendExecStates states : beToExecStates.values()) { + states.unsetFields(); + BackendServiceProxy proxy = BackendServiceProxy.getInstance(); + futures.add(ImmutableTriple.of(states, proxy, states.execRemoteFragmentsAsync(proxy))); + } + waitRpc(futures, this.timeoutDeadline - System.currentTimeMillis(), "send fragments"); + + if (twoPhaseExecution) { + // 5. send and wait execution start rpc + futures.clear(); + for (BackendExecStates states : beToExecStates.values()) { + BackendServiceProxy proxy = BackendServiceProxy.getInstance(); + futures.add(ImmutableTriple.of(states, proxy, states.execPlanFragmentStartAsync(proxy))); + } + waitRpc(futures, this.timeoutDeadline - System.currentTimeMillis(), "send execution start"); + } + } finally { + unlock(); + } } private void sendPipelineCtx() throws TException, RpcException, UserException { diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index cb000a034a2a28f..ca3c43470bed118 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -138,6 +138,7 @@ import org.apache.doris.thrift.TDescribeTablesResult; import org.apache.doris.thrift.TDropPlsqlPackageRequest; import org.apache.doris.thrift.TDropPlsqlStoredProcedureRequest; +import org.apache.doris.thrift.TExecPlanFragmentParams; import org.apache.doris.thrift.TFeResult; import org.apache.doris.thrift.TFetchResourceResult; import org.apache.doris.thrift.TFetchSchemaTableDataRequest; @@ -1956,14 +1957,14 @@ public TStreamLoadPutResult streamLoadPut(TStreamLoadPutRequest request) { if (Config.enable_pipeline_load) { result.setPipelineParams((TPipelineFragmentParams) streamLoadHandler.getFragmentParams().get(0)); } else { - throw new UserException("Pipeline load should be enabled!"); + result.setParams((TExecPlanFragmentParams) streamLoadHandler.getFragmentParams().get(0)); } } if (tWorkloadGroupList != null && tWorkloadGroupList.size() > 0) { if (Config.enable_pipeline_load) { result.pipeline_params.setWorkloadGroups(tWorkloadGroupList); } else { - throw new UserException("Pipeline load should be enabled!"); + result.params.setWorkloadGroups(tWorkloadGroupList); } } } catch (MetaNotFoundException e) {