From 62c075bf7e82b97c8a103718c901f138c5845a1a Mon Sep 17 00:00:00 2001 From: TengJianPing <18241664+jacktengg@users.noreply.github.com> Date: Thu, 31 Aug 2023 14:44:17 +0800 Subject: [PATCH] [improvement](Block) Replace Block(const PBlock&) with deserialize because it has heavy operations in ctor (#23672) --- be/src/exec/rowid_fetcher.cpp | 3 ++- be/src/runtime/tablets_channel.cpp | 5 ++-- .../aggregate_function_sort.h | 3 ++- be/src/vec/core/block.cpp | 10 +++++--- be/src/vec/core/block.h | 3 ++- be/src/vec/core/block_spill_reader.cpp | 3 ++- be/src/vec/runtime/vdata_stream_mgr.cpp | 5 ++-- be/src/vec/runtime/vdata_stream_recvr.cpp | 24 ++++++++++--------- be/src/vec/runtime/vdata_stream_recvr.h | 8 +++---- be/test/vec/core/block_test.cpp | 24 ++++++++++++------- be/test/vec/exec/vtablet_sink_test.cpp | 3 ++- 11 files changed, 55 insertions(+), 36 deletions(-) diff --git a/be/src/exec/rowid_fetcher.cpp b/be/src/exec/rowid_fetcher.cpp index af8e71eed00626..302fc34fcc583f 100644 --- a/be/src/exec/rowid_fetcher.cpp +++ b/be/src/exec/rowid_fetcher.cpp @@ -159,7 +159,8 @@ Status RowIDFetcher::_merge_rpc_results(const PMultiGetRequest& request, return Status::OK(); } // Merge partial blocks - vectorized::Block partial_block(resp.block()); + vectorized::Block partial_block; + RETURN_IF_ERROR(partial_block.deserialize(resp.block())); if (partial_block.is_empty_column()) { return Status::OK(); } diff --git a/be/src/runtime/tablets_channel.cpp b/be/src/runtime/tablets_channel.cpp index 66adca79487ae1..2e21ec92c349c3 100644 --- a/be/src/runtime/tablets_channel.cpp +++ b/be/src/runtime/tablets_channel.cpp @@ -436,9 +436,8 @@ Status TabletsChannel::add_batch(const PTabletWriterAddBlockRequest& request, } } - auto get_send_data = [&]() { return vectorized::Block(request.block()); }; - - auto send_data = get_send_data(); + vectorized::Block send_data; + RETURN_IF_ERROR(send_data.deserialize(request.block())); CHECK(send_data.rows() == request.tablet_ids_size()) << "block rows: " << send_data.rows() << ", tablet_ids_size: " << request.tablet_ids_size(); diff --git a/be/src/vec/aggregate_functions/aggregate_function_sort.h b/be/src/vec/aggregate_functions/aggregate_function_sort.h index f82061e071a8f4..39d7fd184f5688 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_sort.h +++ b/be/src/vec/aggregate_functions/aggregate_function_sort.h @@ -87,7 +87,8 @@ struct AggregateFunctionSortData { PBlock pblock; pblock.ParseFromString(data); - block = Block(pblock); + auto st = block.deserialize(pblock); + CHECK(st.ok()); } void add(const IColumn** columns, size_t columns_num, size_t row_num) { diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp index da63244f6ef920..0fea95a90ec28d 100644 --- a/be/src/vec/core/block.cpp +++ b/be/src/vec/core/block.cpp @@ -84,7 +84,8 @@ Block::Block(const std::vector& slots, size_t block_size, } } -Block::Block(const PBlock& pblock) { +Status Block::deserialize(const PBlock& pblock) { + swap(Block()); int be_exec_version = pblock.has_be_exec_version() ? pblock.be_exec_version() : 0; CHECK(BeExecVersionManager::check_be_exec_version(be_exec_version)); @@ -98,11 +99,12 @@ Block::Block(const PBlock& pblock) { size_t uncompressed_size = 0; if (pblock.has_compression_type() && pblock.has_uncompressed_size()) { BlockCompressionCodec* codec; - get_block_compression_codec(pblock.compression_type(), &codec); + RETURN_IF_ERROR(get_block_compression_codec(pblock.compression_type(), &codec)); uncompressed_size = pblock.uncompressed_size(); compression_scratch.resize(uncompressed_size); Slice decompressed_slice(compression_scratch); - codec->decompress(Slice(compressed_data, compressed_size), &decompressed_slice); + RETURN_IF_ERROR(codec->decompress(Slice(compressed_data, compressed_size), + &decompressed_slice)); DCHECK(uncompressed_size == decompressed_slice.size); } else { bool success = snappy::GetUncompressedLength(compressed_data, compressed_size, @@ -126,6 +128,8 @@ Block::Block(const PBlock& pblock) { data.emplace_back(data_column->get_ptr(), type, pcol_meta.name()); } initialize_index_by_name(); + + return Status::OK(); } void Block::reserve(size_t count) { diff --git a/be/src/vec/core/block.h b/be/src/vec/core/block.h index 900969bda351e6..cad45ac2379192 100644 --- a/be/src/vec/core/block.h +++ b/be/src/vec/core/block.h @@ -86,7 +86,6 @@ class Block { Block() = default; Block(std::initializer_list il); Block(const ColumnsWithTypeAndName& data_); - Block(const PBlock& pblock); Block(const std::vector& slots, size_t block_size, bool ignore_trivial_slot = false); @@ -308,6 +307,8 @@ class Block { size_t* compressed_bytes, segment_v2::CompressionTypePB compression_type, bool allow_transfer_large_data = false) const; + Status deserialize(const PBlock& pblock); + std::unique_ptr create_same_struct_block(size_t size) const; /** Compares (*this) n-th row and rhs m-th row. diff --git a/be/src/vec/core/block_spill_reader.cpp b/be/src/vec/core/block_spill_reader.cpp index 8d2d4812296405..d0cebd3043bf68 100644 --- a/be/src/vec/core/block_spill_reader.cpp +++ b/be/src/vec/core/block_spill_reader.cpp @@ -134,7 +134,8 @@ Status BlockSpillReader::read(Block* block, bool* eos) { if (!pb_block.ParseFromArray(result.data, result.size)) { return Status::InternalError("Failed to read spilled block"); } - new_block = Block::create_unique(pb_block); + new_block = Block::create_unique(); + RETURN_IF_ERROR(new_block->deserialize(pb_block)); } block->swap(*new_block); } else { diff --git a/be/src/vec/runtime/vdata_stream_mgr.cpp b/be/src/vec/runtime/vdata_stream_mgr.cpp index eeee0723717627..ad161828f905a6 100644 --- a/be/src/vec/runtime/vdata_stream_mgr.cpp +++ b/be/src/vec/runtime/vdata_stream_mgr.cpp @@ -120,8 +120,9 @@ Status VDataStreamMgr::transmit_block(const PTransmitDataParams* request, bool eos = request->eos(); if (request->has_block()) { - recvr->add_block(request->block(), request->sender_id(), request->be_number(), - request->packet_seq(), eos ? nullptr : done); + RETURN_IF_ERROR(recvr->add_block(request->block(), request->sender_id(), + request->be_number(), request->packet_seq(), + eos ? nullptr : done)); } if (eos) { diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp b/be/src/vec/runtime/vdata_stream_recvr.cpp index 0c76df5b9ab148..cc908d47e08e92 100644 --- a/be/src/vec/runtime/vdata_stream_recvr.cpp +++ b/be/src/vec/runtime/vdata_stream_recvr.cpp @@ -108,13 +108,13 @@ Status VDataStreamRecvr::SenderQueue::_inner_get_batch_without_lock(Block* block return Status::OK(); } -void VDataStreamRecvr::SenderQueue::add_block(const PBlock& pblock, int be_number, - int64_t packet_seq, - ::google::protobuf::Closure** done) { +Status VDataStreamRecvr::SenderQueue::add_block(const PBlock& pblock, int be_number, + int64_t packet_seq, + ::google::protobuf::Closure** done) { { std::lock_guard l(_lock); if (_is_cancelled) { - return; + return Status::OK(); } auto iter = _packet_seq_map.find(be_number); if (iter != _packet_seq_map.end()) { @@ -122,7 +122,7 @@ void VDataStreamRecvr::SenderQueue::add_block(const PBlock& pblock, int be_numbe LOG(WARNING) << fmt::format( "packet already exist [cur_packet_id= {} receive_packet_id={}]", iter->second, packet_seq); - return; + return Status::OK(); } iter->second = packet_seq; } else { @@ -134,7 +134,7 @@ void VDataStreamRecvr::SenderQueue::add_block(const PBlock& pblock, int be_numbe DCHECK(_num_remaining_senders >= 0); if (_num_remaining_senders == 0) { DCHECK(_sender_eos_set.end() != _sender_eos_set.find(be_number)); - return; + return Status::OK(); } } @@ -142,7 +142,8 @@ void VDataStreamRecvr::SenderQueue::add_block(const PBlock& pblock, int be_numbe int64_t deserialize_time = 0; { SCOPED_RAW_TIMER(&deserialize_time); - block = Block::create_unique(pblock); + block = Block::create_unique(); + RETURN_IF_ERROR(block->deserialize(pblock)); } auto block_byte_size = block->allocated_bytes(); @@ -150,7 +151,7 @@ void VDataStreamRecvr::SenderQueue::add_block(const PBlock& pblock, int be_numbe std::lock_guard l(_lock); if (_is_cancelled) { - return; + return Status::OK(); } COUNTER_UPDATE(_recvr->_deserialize_row_batch_timer, deserialize_time); @@ -176,6 +177,7 @@ void VDataStreamRecvr::SenderQueue::add_block(const PBlock& pblock, int be_numbe if (!empty) { _data_arrival_cv.notify_one(); } + return Status::OK(); } void VDataStreamRecvr::SenderQueue::add_block(Block* block, bool use_move) { @@ -376,11 +378,11 @@ Status VDataStreamRecvr::create_merger(const VExprContextSPtrs& ordering_expr, return Status::OK(); } -void VDataStreamRecvr::add_block(const PBlock& pblock, int sender_id, int be_number, - int64_t packet_seq, ::google::protobuf::Closure** done) { +Status VDataStreamRecvr::add_block(const PBlock& pblock, int sender_id, int be_number, + int64_t packet_seq, ::google::protobuf::Closure** done) { SCOPED_ATTACH_TASK_WITH_ID(_query_mem_tracker, _query_id, _fragment_instance_id); int use_sender_id = _is_merging ? sender_id : 0; - _sender_queues[use_sender_id]->add_block(pblock, be_number, packet_seq, done); + return _sender_queues[use_sender_id]->add_block(pblock, be_number, packet_seq, done); } void VDataStreamRecvr::add_block(Block* block, int sender_id, bool use_move) { diff --git a/be/src/vec/runtime/vdata_stream_recvr.h b/be/src/vec/runtime/vdata_stream_recvr.h index eb57c57b0d26d7..fe9910492bf5a9 100644 --- a/be/src/vec/runtime/vdata_stream_recvr.h +++ b/be/src/vec/runtime/vdata_stream_recvr.h @@ -75,8 +75,8 @@ class VDataStreamRecvr { const std::vector& nulls_first, size_t batch_size, int64_t limit, size_t offset); - void add_block(const PBlock& pblock, int sender_id, int be_number, int64_t packet_seq, - ::google::protobuf::Closure** done); + Status add_block(const PBlock& pblock, int sender_id, int be_number, int64_t packet_seq, + ::google::protobuf::Closure** done); void add_block(Block* block, int sender_id, bool use_move); @@ -185,8 +185,8 @@ class VDataStreamRecvr::SenderQueue { virtual Status get_batch(Block* next_block, bool* eos); - void add_block(const PBlock& pblock, int be_number, int64_t packet_seq, - ::google::protobuf::Closure** done); + Status add_block(const PBlock& pblock, int be_number, int64_t packet_seq, + ::google::protobuf::Closure** done); virtual void add_block(Block* block, bool use_move); diff --git a/be/test/vec/core/block_test.cpp b/be/test/vec/core/block_test.cpp index 456d4fc4807c8e..61903e588f2fba 100644 --- a/be/test/vec/core/block_test.cpp +++ b/be/test/vec/core/block_test.cpp @@ -129,7 +129,8 @@ void serialize_and_deserialize_test(segment_v2::CompressionTypePB compression_ty block_to_pb(block, &pblock, compression_type); std::string s1 = pblock.DebugString(); - vectorized::Block block2(pblock); + vectorized::Block block2; + block2.deserialize(pblock); PBlock pblock2; block_to_pb(block2, &pblock2, compression_type); std::string s2 = pblock2.DebugString(); @@ -150,7 +151,8 @@ void serialize_and_deserialize_test(segment_v2::CompressionTypePB compression_ty block_to_pb(block, &pblock, compression_type); std::string s1 = pblock.DebugString(); - vectorized::Block block2(pblock); + vectorized::Block block2; + block2.deserialize(pblock); PBlock pblock2; block_to_pb(block2, &pblock2, compression_type); std::string s2 = pblock2.DebugString(); @@ -174,7 +176,8 @@ void serialize_and_deserialize_test(segment_v2::CompressionTypePB compression_ty block_to_pb(block, &pblock, compression_type); std::string s1 = pblock.DebugString(); - vectorized::Block block2(pblock); + vectorized::Block block2; + block2.deserialize(pblock); PBlock pblock2; block_to_pb(block2, &pblock2, compression_type); std::string s2 = pblock2.DebugString(); @@ -200,7 +203,8 @@ void serialize_and_deserialize_test(segment_v2::CompressionTypePB compression_ty block_to_pb(block, &pblock, compression_type); std::string s1 = pblock.DebugString(); - vectorized::Block block2(pblock); + vectorized::Block block2; + block2.deserialize(pblock); PBlock pblock2; block_to_pb(block2, &pblock2, compression_type); std::string s2 = pblock2.DebugString(); @@ -220,7 +224,8 @@ void serialize_and_deserialize_test(segment_v2::CompressionTypePB compression_ty block_to_pb(block, &pblock, compression_type); std::string s1 = pblock.DebugString(); - vectorized::Block block2(pblock); + vectorized::Block block2; + block2.deserialize(pblock); PBlock pblock2; block_to_pb(block2, &pblock2, compression_type); std::string s2 = pblock2.DebugString(); @@ -242,7 +247,8 @@ void serialize_and_deserialize_test(segment_v2::CompressionTypePB compression_ty EXPECT_TRUE(pblock.column_metas()[0].has_decimal_param()); std::string s1 = pblock.DebugString(); - vectorized::Block block2(pblock); + vectorized::Block block2; + block2.deserialize(pblock); PBlock pblock2; block_to_pb(block2, &pblock2, compression_type); std::string s2 = pblock2.DebugString(); @@ -264,7 +270,8 @@ void serialize_and_deserialize_test(segment_v2::CompressionTypePB compression_ty block_to_pb(block, &pblock, compression_type); std::string s1 = pblock.DebugString(); - vectorized::Block block2(pblock); + vectorized::Block block2; + block2.deserialize(pblock); PBlock pblock2; block_to_pb(block2, &pblock2, compression_type); std::string s2 = pblock2.DebugString(); @@ -279,7 +286,8 @@ void serialize_and_deserialize_test(segment_v2::CompressionTypePB compression_ty block_to_pb(block, &pblock, compression_type); std::string s1 = pblock.DebugString(); - vectorized::Block block2(pblock); + vectorized::Block block2; + block2.deserialize(pblock); PBlock pblock2; block_to_pb(block2, &pblock2, compression_type); std::string s2 = pblock2.DebugString(); diff --git a/be/test/vec/exec/vtablet_sink_test.cpp b/be/test/vec/exec/vtablet_sink_test.cpp index 5e60181c8fb80e..75ed927fcb2acc 100644 --- a/be/test/vec/exec/vtablet_sink_test.cpp +++ b/be/test/vec/exec/vtablet_sink_test.cpp @@ -316,7 +316,8 @@ class VTestInternalService : public PBackendService { k_add_batch_status.to_protobuf(response->mutable_status()); if (request->has_block() && _row_desc != nullptr) { - vectorized::Block block(request->block()); + vectorized::Block block; + block.deserialize(request->block()); for (size_t row_num = 0; row_num < block.rows(); ++row_num) { std::stringstream out;