Skip to content

Commit

Permalink
[bug](branch-2.0) fix data loss and incorrect in random distribution …
Browse files Browse the repository at this point in the history
…table in 2.0 (#33920)
  • Loading branch information
xy720 authored Apr 23, 2024
1 parent 54f5d13 commit 475b065
Show file tree
Hide file tree
Showing 4 changed files with 4,191 additions and 0 deletions.
22 changes: 22 additions & 0 deletions be/src/vec/sink/vtablet_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -605,6 +605,25 @@ Status VNodeChannel::add_block(vectorized::Block* block, const Payload* payload,

SCOPED_RAW_TIMER(&_stat.append_node_channel_ns);
if (is_append) {
if (_cur_mutable_block && !_cur_mutable_block->empty()) {
// When is-append is true, the previous block may not have been sent out yet.
// (e.x. The previous block is not load to single tablet, and its row num was
// 4064, which is smaller than the send batch size 8192).
// If we clear the previous block directly here, it will cause data loss.
{
SCOPED_ATOMIC_TIMER(&_queue_push_lock_ns);
std::lock_guard<std::mutex> l(_pending_batches_lock);
_pending_batches_bytes += _cur_mutable_block->allocated_bytes();
_pending_blocks.emplace(std::move(_cur_mutable_block), _cur_add_block_request);
_pending_batches_num++;
VLOG_DEBUG << "VOlapTableSink:" << _parent << " VNodeChannel:" << this
<< " pending_batches_bytes:" << _pending_batches_bytes
<< " jobid:" << std::to_string(_state->load_job_id())
<< " loadinfo:" << _load_info;
}
_cur_mutable_block = vectorized::MutableBlock::create_unique(block->clone_empty());
_cur_add_block_request.clear_tablet_ids();
}
// Do not split the data of the block by tablets but append it to a single delta writer.
// This is a faster way to send block than append_block_by_selector
// TODO: we could write to local delta writer if single_replica_load is true
Expand All @@ -624,6 +643,8 @@ Status VNodeChannel::add_block(vectorized::Block* block, const Payload* payload,
for (auto tablet_id : payload->second) {
_cur_add_block_request.add_tablet_ids(tablet_id);
}
// need to reset to false avoid load data to incorrect tablet.
_cur_add_block_request.set_is_single_tablet_block(false);
}

if (is_append || _cur_mutable_block->rows() >= _batch_size ||
Expand Down Expand Up @@ -1397,6 +1418,7 @@ Status VOlapTableSink::send(RuntimeState* state, vectorized::Block* input_block,
uint32_t tablet_index = 0;
RETURN_IF_ERROR(find_tablet(state, &block, i, &partition, tablet_index, stop_processing,
is_continue));

if (is_continue) {
continue;
}
Expand Down
Loading

0 comments on commit 475b065

Please sign in to comment.