Skip to content

Commit

Permalink
[improvement](Block) Replace Block(const PBlock&) with deserialize be…
Browse files Browse the repository at this point in the history
…cause it has heavy operations in ctor (#23672)
  • Loading branch information
jacktengg authored Aug 31, 2023
1 parent 409640a commit 62c075b
Show file tree
Hide file tree
Showing 11 changed files with 55 additions and 36 deletions.
3 changes: 2 additions & 1 deletion be/src/exec/rowid_fetcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,8 @@ Status RowIDFetcher::_merge_rpc_results(const PMultiGetRequest& request,
return Status::OK();
}
// Merge partial blocks
vectorized::Block partial_block(resp.block());
vectorized::Block partial_block;
RETURN_IF_ERROR(partial_block.deserialize(resp.block()));
if (partial_block.is_empty_column()) {
return Status::OK();
}
Expand Down
5 changes: 2 additions & 3 deletions be/src/runtime/tablets_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -436,9 +436,8 @@ Status TabletsChannel::add_batch(const PTabletWriterAddBlockRequest& request,
}
}

auto get_send_data = [&]() { return vectorized::Block(request.block()); };

auto send_data = get_send_data();
vectorized::Block send_data;
RETURN_IF_ERROR(send_data.deserialize(request.block()));
CHECK(send_data.rows() == request.tablet_ids_size())
<< "block rows: " << send_data.rows()
<< ", tablet_ids_size: " << request.tablet_ids_size();
Expand Down
3 changes: 2 additions & 1 deletion be/src/vec/aggregate_functions/aggregate_function_sort.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ struct AggregateFunctionSortData {

PBlock pblock;
pblock.ParseFromString(data);
block = Block(pblock);
auto st = block.deserialize(pblock);
CHECK(st.ok());
}

void add(const IColumn** columns, size_t columns_num, size_t row_num) {
Expand Down
10 changes: 7 additions & 3 deletions be/src/vec/core/block.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ Block::Block(const std::vector<SlotDescriptor*>& slots, size_t block_size,
}
}

Block::Block(const PBlock& pblock) {
Status Block::deserialize(const PBlock& pblock) {
swap(Block());
int be_exec_version = pblock.has_be_exec_version() ? pblock.be_exec_version() : 0;
CHECK(BeExecVersionManager::check_be_exec_version(be_exec_version));

Expand All @@ -98,11 +99,12 @@ Block::Block(const PBlock& pblock) {
size_t uncompressed_size = 0;
if (pblock.has_compression_type() && pblock.has_uncompressed_size()) {
BlockCompressionCodec* codec;
get_block_compression_codec(pblock.compression_type(), &codec);
RETURN_IF_ERROR(get_block_compression_codec(pblock.compression_type(), &codec));
uncompressed_size = pblock.uncompressed_size();
compression_scratch.resize(uncompressed_size);
Slice decompressed_slice(compression_scratch);
codec->decompress(Slice(compressed_data, compressed_size), &decompressed_slice);
RETURN_IF_ERROR(codec->decompress(Slice(compressed_data, compressed_size),
&decompressed_slice));
DCHECK(uncompressed_size == decompressed_slice.size);
} else {
bool success = snappy::GetUncompressedLength(compressed_data, compressed_size,
Expand All @@ -126,6 +128,8 @@ Block::Block(const PBlock& pblock) {
data.emplace_back(data_column->get_ptr(), type, pcol_meta.name());
}
initialize_index_by_name();

return Status::OK();
}

void Block::reserve(size_t count) {
Expand Down
3 changes: 2 additions & 1 deletion be/src/vec/core/block.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ class Block {
Block() = default;
Block(std::initializer_list<ColumnWithTypeAndName> il);
Block(const ColumnsWithTypeAndName& data_);
Block(const PBlock& pblock);
Block(const std::vector<SlotDescriptor*>& slots, size_t block_size,
bool ignore_trivial_slot = false);

Expand Down Expand Up @@ -308,6 +307,8 @@ class Block {
size_t* compressed_bytes, segment_v2::CompressionTypePB compression_type,
bool allow_transfer_large_data = false) const;

Status deserialize(const PBlock& pblock);

std::unique_ptr<Block> create_same_struct_block(size_t size) const;

/** Compares (*this) n-th row and rhs m-th row.
Expand Down
3 changes: 2 additions & 1 deletion be/src/vec/core/block_spill_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,8 @@ Status BlockSpillReader::read(Block* block, bool* eos) {
if (!pb_block.ParseFromArray(result.data, result.size)) {
return Status::InternalError("Failed to read spilled block");
}
new_block = Block::create_unique(pb_block);
new_block = Block::create_unique();
RETURN_IF_ERROR(new_block->deserialize(pb_block));
}
block->swap(*new_block);
} else {
Expand Down
5 changes: 3 additions & 2 deletions be/src/vec/runtime/vdata_stream_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,9 @@ Status VDataStreamMgr::transmit_block(const PTransmitDataParams* request,

bool eos = request->eos();
if (request->has_block()) {
recvr->add_block(request->block(), request->sender_id(), request->be_number(),
request->packet_seq(), eos ? nullptr : done);
RETURN_IF_ERROR(recvr->add_block(request->block(), request->sender_id(),
request->be_number(), request->packet_seq(),
eos ? nullptr : done));
}

if (eos) {
Expand Down
24 changes: 13 additions & 11 deletions be/src/vec/runtime/vdata_stream_recvr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,21 +108,21 @@ Status VDataStreamRecvr::SenderQueue::_inner_get_batch_without_lock(Block* block
return Status::OK();
}

void VDataStreamRecvr::SenderQueue::add_block(const PBlock& pblock, int be_number,
int64_t packet_seq,
::google::protobuf::Closure** done) {
Status VDataStreamRecvr::SenderQueue::add_block(const PBlock& pblock, int be_number,
int64_t packet_seq,
::google::protobuf::Closure** done) {
{
std::lock_guard<std::mutex> l(_lock);
if (_is_cancelled) {
return;
return Status::OK();
}
auto iter = _packet_seq_map.find(be_number);
if (iter != _packet_seq_map.end()) {
if (iter->second >= packet_seq) {
LOG(WARNING) << fmt::format(
"packet already exist [cur_packet_id= {} receive_packet_id={}]",
iter->second, packet_seq);
return;
return Status::OK();
}
iter->second = packet_seq;
} else {
Expand All @@ -134,23 +134,24 @@ void VDataStreamRecvr::SenderQueue::add_block(const PBlock& pblock, int be_numbe
DCHECK(_num_remaining_senders >= 0);
if (_num_remaining_senders == 0) {
DCHECK(_sender_eos_set.end() != _sender_eos_set.find(be_number));
return;
return Status::OK();
}
}

BlockUPtr block = nullptr;
int64_t deserialize_time = 0;
{
SCOPED_RAW_TIMER(&deserialize_time);
block = Block::create_unique(pblock);
block = Block::create_unique();
RETURN_IF_ERROR(block->deserialize(pblock));
}

auto block_byte_size = block->allocated_bytes();
VLOG_ROW << "added #rows=" << block->rows() << " batch_size=" << block_byte_size << "\n";

std::lock_guard<std::mutex> l(_lock);
if (_is_cancelled) {
return;
return Status::OK();
}

COUNTER_UPDATE(_recvr->_deserialize_row_batch_timer, deserialize_time);
Expand All @@ -176,6 +177,7 @@ void VDataStreamRecvr::SenderQueue::add_block(const PBlock& pblock, int be_numbe
if (!empty) {
_data_arrival_cv.notify_one();
}
return Status::OK();
}

void VDataStreamRecvr::SenderQueue::add_block(Block* block, bool use_move) {
Expand Down Expand Up @@ -376,11 +378,11 @@ Status VDataStreamRecvr::create_merger(const VExprContextSPtrs& ordering_expr,
return Status::OK();
}

void VDataStreamRecvr::add_block(const PBlock& pblock, int sender_id, int be_number,
int64_t packet_seq, ::google::protobuf::Closure** done) {
Status VDataStreamRecvr::add_block(const PBlock& pblock, int sender_id, int be_number,
int64_t packet_seq, ::google::protobuf::Closure** done) {
SCOPED_ATTACH_TASK_WITH_ID(_query_mem_tracker, _query_id, _fragment_instance_id);
int use_sender_id = _is_merging ? sender_id : 0;
_sender_queues[use_sender_id]->add_block(pblock, be_number, packet_seq, done);
return _sender_queues[use_sender_id]->add_block(pblock, be_number, packet_seq, done);
}

void VDataStreamRecvr::add_block(Block* block, int sender_id, bool use_move) {
Expand Down
8 changes: 4 additions & 4 deletions be/src/vec/runtime/vdata_stream_recvr.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ class VDataStreamRecvr {
const std::vector<bool>& nulls_first, size_t batch_size, int64_t limit,
size_t offset);

void add_block(const PBlock& pblock, int sender_id, int be_number, int64_t packet_seq,
::google::protobuf::Closure** done);
Status add_block(const PBlock& pblock, int sender_id, int be_number, int64_t packet_seq,
::google::protobuf::Closure** done);

void add_block(Block* block, int sender_id, bool use_move);

Expand Down Expand Up @@ -185,8 +185,8 @@ class VDataStreamRecvr::SenderQueue {

virtual Status get_batch(Block* next_block, bool* eos);

void add_block(const PBlock& pblock, int be_number, int64_t packet_seq,
::google::protobuf::Closure** done);
Status add_block(const PBlock& pblock, int be_number, int64_t packet_seq,
::google::protobuf::Closure** done);

virtual void add_block(Block* block, bool use_move);

Expand Down
24 changes: 16 additions & 8 deletions be/test/vec/core/block_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,8 @@ void serialize_and_deserialize_test(segment_v2::CompressionTypePB compression_ty
block_to_pb(block, &pblock, compression_type);
std::string s1 = pblock.DebugString();

vectorized::Block block2(pblock);
vectorized::Block block2;
block2.deserialize(pblock);
PBlock pblock2;
block_to_pb(block2, &pblock2, compression_type);
std::string s2 = pblock2.DebugString();
Expand All @@ -150,7 +151,8 @@ void serialize_and_deserialize_test(segment_v2::CompressionTypePB compression_ty
block_to_pb(block, &pblock, compression_type);
std::string s1 = pblock.DebugString();

vectorized::Block block2(pblock);
vectorized::Block block2;
block2.deserialize(pblock);
PBlock pblock2;
block_to_pb(block2, &pblock2, compression_type);
std::string s2 = pblock2.DebugString();
Expand All @@ -174,7 +176,8 @@ void serialize_and_deserialize_test(segment_v2::CompressionTypePB compression_ty
block_to_pb(block, &pblock, compression_type);
std::string s1 = pblock.DebugString();

vectorized::Block block2(pblock);
vectorized::Block block2;
block2.deserialize(pblock);
PBlock pblock2;
block_to_pb(block2, &pblock2, compression_type);
std::string s2 = pblock2.DebugString();
Expand All @@ -200,7 +203,8 @@ void serialize_and_deserialize_test(segment_v2::CompressionTypePB compression_ty
block_to_pb(block, &pblock, compression_type);
std::string s1 = pblock.DebugString();

vectorized::Block block2(pblock);
vectorized::Block block2;
block2.deserialize(pblock);
PBlock pblock2;
block_to_pb(block2, &pblock2, compression_type);
std::string s2 = pblock2.DebugString();
Expand All @@ -220,7 +224,8 @@ void serialize_and_deserialize_test(segment_v2::CompressionTypePB compression_ty
block_to_pb(block, &pblock, compression_type);
std::string s1 = pblock.DebugString();

vectorized::Block block2(pblock);
vectorized::Block block2;
block2.deserialize(pblock);
PBlock pblock2;
block_to_pb(block2, &pblock2, compression_type);
std::string s2 = pblock2.DebugString();
Expand All @@ -242,7 +247,8 @@ void serialize_and_deserialize_test(segment_v2::CompressionTypePB compression_ty
EXPECT_TRUE(pblock.column_metas()[0].has_decimal_param());
std::string s1 = pblock.DebugString();

vectorized::Block block2(pblock);
vectorized::Block block2;
block2.deserialize(pblock);
PBlock pblock2;
block_to_pb(block2, &pblock2, compression_type);
std::string s2 = pblock2.DebugString();
Expand All @@ -264,7 +270,8 @@ void serialize_and_deserialize_test(segment_v2::CompressionTypePB compression_ty
block_to_pb(block, &pblock, compression_type);
std::string s1 = pblock.DebugString();

vectorized::Block block2(pblock);
vectorized::Block block2;
block2.deserialize(pblock);
PBlock pblock2;
block_to_pb(block2, &pblock2, compression_type);
std::string s2 = pblock2.DebugString();
Expand All @@ -279,7 +286,8 @@ void serialize_and_deserialize_test(segment_v2::CompressionTypePB compression_ty
block_to_pb(block, &pblock, compression_type);
std::string s1 = pblock.DebugString();

vectorized::Block block2(pblock);
vectorized::Block block2;
block2.deserialize(pblock);
PBlock pblock2;
block_to_pb(block2, &pblock2, compression_type);
std::string s2 = pblock2.DebugString();
Expand Down
3 changes: 2 additions & 1 deletion be/test/vec/exec/vtablet_sink_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,8 @@ class VTestInternalService : public PBackendService {
k_add_batch_status.to_protobuf(response->mutable_status());

if (request->has_block() && _row_desc != nullptr) {
vectorized::Block block(request->block());
vectorized::Block block;
block.deserialize(request->block());

for (size_t row_num = 0; row_num < block.rows(); ++row_num) {
std::stringstream out;
Expand Down

0 comments on commit 62c075b

Please sign in to comment.