Skip to content

Commit

Permalink
Revert "[fix](partial update) Fix missing rowsets during doing alignm…
Browse files Browse the repository at this point in the history
…ent when flushing memtable due to compaction #28062 (#28468)"

This reverts commit 8c8f322.
  • Loading branch information
bobhan1 committed Dec 19, 2023
1 parent 1142942 commit d97b337
Show file tree
Hide file tree
Showing 6 changed files with 4 additions and 155 deletions.
26 changes: 0 additions & 26 deletions be/src/olap/memtable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
#include "runtime/load_channel_mgr.h"
#include "runtime/thread_context.h"
#include "tablet_meta.h"
#include "util/debug_points.h"
#include "util/doris_metrics.h"
#include "util/runtime_profile.h"
#include "util/stopwatch.hpp"
Expand Down Expand Up @@ -475,31 +474,6 @@ Status MemTable::_generate_delete_bitmap(int32_t segment_id) {
{
std::shared_lock meta_rlock(_tablet->get_header_lock());
specified_rowsets = _tablet->get_rowset_by_ids(&_mow_context->rowset_ids);
DBUG_EXECUTE_IF("_append_block_with_partial_content.clear_specified_rowsets",
{ specified_rowsets.clear(); });
if (specified_rowsets.size() != _mow_context->rowset_ids.size()) {
// `get_rowset_by_ids` may fail to find some of the rowsets we request if cumulative compaction delete
// rowsets from `_rs_version_map`(see `Tablet::modify_rowsets` for detials) before we get here.
// Becasue we havn't begun calculation for merge-on-write table, we can safely reset the `_mow_context->rowset_ids`
// to the latest value and re-request the correspoding rowsets.
LOG(INFO) << fmt::format(
"[Memtable Flush] some rowsets have been deleted due to "
"compaction(specified_rowsets.size()={}, but rowset_ids.size()={}), reset "
"rowset_ids to the latest value. tablet_id: {}, cur max_version: {}, "
"transaction_id: {}",
specified_rowsets.size(), _mow_context->rowset_ids.size(), _tablet->tablet_id(),
_mow_context->max_version, _mow_context->txn_id);
Status st {Status::OK()};
_mow_context->update_rowset_ids_with_lock([&]() {
_mow_context->rowset_ids.clear();
st = _tablet->all_rs_id(_mow_context->max_version, &_mow_context->rowset_ids);
});
if (!st.ok()) {
return st;
}
specified_rowsets = _tablet->get_rowset_by_ids(&_mow_context->rowset_ids);
DCHECK(specified_rowsets.size() == _mow_context->rowset_ids.size());
}
}
OlapStopWatch watch;
RETURN_IF_ERROR(_tablet->calc_delete_bitmap(rowset, segments, specified_rowsets,
Expand Down
9 changes: 2 additions & 7 deletions be/src/olap/olap_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -468,18 +468,13 @@ using RowsetIdUnorderedSet = std::unordered_set<RowsetId, HashOfRowsetId>;
class DeleteBitmap;
// merge on write context
struct MowContext {
MowContext(int64_t version, int64_t txnid, RowsetIdUnorderedSet& ids,
MowContext(int64_t version, int64_t txnid, const RowsetIdUnorderedSet& ids,
std::shared_ptr<DeleteBitmap> db)
: max_version(version), txn_id(txnid), rowset_ids(ids), delete_bitmap(db) {}
void update_rowset_ids_with_lock(std::function<void()> callback) {
std::lock_guard<std::mutex> lock(m);
callback();
}
int64_t max_version;
int64_t txn_id;
RowsetIdUnorderedSet& rowset_ids;
const RowsetIdUnorderedSet& rowset_ids;
std::shared_ptr<DeleteBitmap> delete_bitmap;
std::mutex m; // protection for updating rowset_ids only
};

// used in mow partial update
Expand Down
26 changes: 0 additions & 26 deletions be/src/olap/rowset/segment_v2/segment_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
#include "service/point_query_executor.h"
#include "util/coding.h"
#include "util/crc32c.h"
#include "util/debug_points.h"
#include "util/faststring.h"
#include "util/key_util.h"
#include "vec/columns/column_nullable.h"
Expand Down Expand Up @@ -402,31 +401,6 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block*
{
std::shared_lock rlock(_tablet->get_header_lock());
specified_rowsets = _tablet->get_rowset_by_ids(&_mow_context->rowset_ids);
DBUG_EXECUTE_IF("_append_block_with_partial_content.clear_specified_rowsets",
{ specified_rowsets.clear(); });
if (specified_rowsets.size() != _mow_context->rowset_ids.size()) {
// `get_rowset_by_ids` may fail to find some of the rowsets we request if cumulative compaction delete
// rowsets from `_rs_version_map`(see `Tablet::modify_rowsets` for detials) before we get here.
// Becasue we havn't begun calculation for merge-on-write table, we can safely reset the `_mow_context->rowset_ids`
// to the latest value and re-request the correspoding rowsets.
LOG(INFO) << fmt::format(
"[Memtable Flush] some rowsets have been deleted due to "
"compaction(specified_rowsets.size()={}, but rowset_ids.size()={}), reset "
"rowset_ids to the latest value. tablet_id: {}, cur max_version: {}, "
"transaction_id: {}",
specified_rowsets.size(), _mow_context->rowset_ids.size(), _tablet->tablet_id(),
_mow_context->max_version, _mow_context->txn_id);
Status st {Status::OK()};
_mow_context->update_rowset_ids_with_lock([&]() {
_mow_context->rowset_ids.clear();
st = _tablet->all_rs_id(_mow_context->max_version, &_mow_context->rowset_ids);
});
if (!st.ok()) {
return st;
}
specified_rowsets = _tablet->get_rowset_by_ids(&_mow_context->rowset_ids);
DCHECK(specified_rowsets.size() == _mow_context->rowset_ids.size());
}
}
std::vector<std::unique_ptr<SegmentCacheHandle>> segment_caches(specified_rowsets.size());
// locate rows in base data
Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ suite("test_primary_key_partial_update_publish", "p0") {

file '10000.csv'
time 10000 // limit inflight 10s
}
}
streamLoad {
table "${tableName}"

Expand Down Expand Up @@ -68,5 +68,5 @@ suite("test_primary_key_partial_update_publish", "p0") {
"""

// drop drop
sql """ DROP TABLE IF EXISTS ${tableName} """
// sql """ DROP TABLE IF EXISTS ${tableName} """
}

0 comments on commit d97b337

Please sign in to comment.