Skip to content

Commit

Permalink
[fix](load) disable num segments check in compatibility mode (#41053)
Browse files Browse the repository at this point in the history
## Proposed changes

When using mixed version of BEs, sink v2 on old BE won't report num
segments to load streams on the new BE.
This will cause false positive segment num mismatch.

This PR addressed this issue by disabling num segments check when any
tablets_to_commit proto has not set num_segments field.
  • Loading branch information
kaijchen authored Sep 23, 2024
1 parent 8c34bfe commit eb7fd6e
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 2 deletions.
9 changes: 7 additions & 2 deletions be/src/runtime/load_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ Status TabletStream::close() {
}

DBUG_EXECUTE_IF("TabletStream.close.segment_num_mismatch", { _num_segments++; });
if (_next_segid.load() != _num_segments) {
if (_check_num_segments && (_next_segid.load() != _num_segments)) {
_status = Status::Corruption(
"segment num mismatch in tablet {}, expected: {}, actual: {}, load_id: {}", _id,
_num_segments, _next_segid.load(), print_id(_load_id));
Expand Down Expand Up @@ -380,9 +380,14 @@ void IndexStream::close(const std::vector<PTabletID>& tablets_to_commit,
auto it = _tablet_streams_map.find(tablet.tablet_id());
if (it == _tablet_streams_map.end()) {
_init_tablet_stream(tablet_stream, tablet.tablet_id(), tablet.partition_id());
} else {
tablet_stream = it->second;
}
if (tablet.has_num_segments()) {
tablet_stream->add_num_segments(tablet.num_segments());
} else {
it->second->add_num_segments(tablet.num_segments());
// for compatibility reasons (sink from old version BE)
tablet_stream->disable_num_segments_check();
}
}

Expand Down
2 changes: 2 additions & 0 deletions be/src/runtime/load_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ class TabletStream {
Status append_data(const PStreamHeader& header, butil::IOBuf* data);
Status add_segment(const PStreamHeader& header, butil::IOBuf* data);
void add_num_segments(int64_t num_segments) { _num_segments += num_segments; }
void disable_num_segments_check() { _check_num_segments = false; }
Status close();
int64_t id() const { return _id; }

Expand All @@ -65,6 +66,7 @@ class TabletStream {
std::unordered_map<int64_t, std::unique_ptr<SegIdMapping>> _segids_mapping;
std::atomic<uint32_t> _next_segid;
int64_t _num_segments = 0;
bool _check_num_segments = true;
bthread::Mutex _lock;
Status _status;
PUniqueId _load_id;
Expand Down

0 comments on commit eb7fd6e

Please sign in to comment.