Skip to content

Commit

Permalink
save
Browse files Browse the repository at this point in the history
  • Loading branch information
xy720 committed Apr 22, 2024
1 parent 0af0a47 commit b846dbf
Show file tree
Hide file tree
Showing 4 changed files with 4,196 additions and 0 deletions.
27 changes: 27 additions & 0 deletions be/src/vec/sink/writer/vtablet_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,31 @@ Status VNodeChannel::add_block(vectorized::Block* block, const Payload* payload,

SCOPED_RAW_TIMER(&_stat.append_node_channel_ns);
if (is_append) {
if (_cur_mutable_block && !_cur_mutable_block->empty()) {
// When is-append is true, the previous block may not have been sent out yet.
// (e.x. The previous block is not load to single tablet, and its row num was
// 4064, which is smaller than the send batch size 8192).
// If we clear the previous block directly here, it will cause data loss.
{
SCOPED_ATOMIC_TIMER(&_queue_push_lock_ns);
std::lock_guard<std::mutex> l(_pending_batches_lock);
// To simplify the add_row logic, postpone adding block into req until the time of sending req
_pending_batches_bytes += _cur_mutable_block->allocated_bytes();
_cur_add_block_request->set_eos(
false); // for multi-add, only when marking close we set it eos.
// Copy the request to tmp request to add to pend block queue
auto tmp_add_block_request = std::make_shared<PTabletWriterAddBlockRequest>();
*tmp_add_block_request = *_cur_add_block_request;
_pending_blocks.emplace(std::move(_cur_mutable_block), tmp_add_block_request);
_pending_batches_num++;
VLOG_DEBUG << "VTabletWriter:" << _parent << " VNodeChannel:" << this
<< " pending_batches_bytes:" << _pending_batches_bytes
<< " jobid:" << std::to_string(_state->load_job_id())
<< " loadinfo:" << _load_info;
}
_cur_mutable_block = vectorized::MutableBlock::create_unique(block->clone_empty());
_cur_add_block_request->clear_tablet_ids();
}
// Do not split the data of the block by tablets but append it to a single delta writer.
// This is a faster way to send block than append_to_block_by_selector
// TODO: we could write to local delta writer if single_replica_load is true
Expand All @@ -542,6 +567,8 @@ Status VNodeChannel::add_block(vectorized::Block* block, const Payload* payload,
for (auto tablet_id : payload->second) {
_cur_add_block_request->add_tablet_ids(tablet_id);
}
// need to reset to false avoid load data to incorrect tablet.
_cur_add_block_request->set_is_single_tablet_block(false);
}

if (is_append || _cur_mutable_block->rows() >= _batch_size ||
Expand Down
Loading

0 comments on commit b846dbf

Please sign in to comment.