Skip to content

Commit

Permalink
not use unused rowsets
Browse files Browse the repository at this point in the history
  • Loading branch information
eldenmoon committed Jul 12, 2023
1 parent 720d564 commit c937224
Show file tree
Hide file tree
Showing 6 changed files with 41 additions and 23 deletions.
3 changes: 1 addition & 2 deletions be/src/olap/data_dir.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -641,8 +641,7 @@ void DataDir::perform_path_gc_by_rowsetid() {
TabletSharedPtr tablet = _tablet_manager->get_tablet(tablet_id);
if (tablet != nullptr) {
if (!tablet->check_rowset_id(rowset_id) &&
!StorageEngine::instance()->check_rowset_id_in_unused_rowsets(rowset_id,
nullptr)) {
!StorageEngine::instance()->check_rowset_id_in_unused_rowsets(rowset_id)) {
_process_garbage_path(path);
}
}
Expand Down
31 changes: 22 additions & 9 deletions be/src/olap/storage_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -881,6 +881,7 @@ void StorageEngine::start_delete_unused_rowset() {
unused_rowsets_copy[it->first] = it->second;
}
// remote rowset data will be reclaimed by `remove_unused_remote_files`
evict_querying_rowset(it->second->rowset_id());
it = _unused_rowsets.erase(it);
} else {
++it;
Expand Down Expand Up @@ -1025,17 +1026,10 @@ Status StorageEngine::execute_task(EngineTask* task) {
}

// check whether any unused rowsets's id equal to rowset_id
bool StorageEngine::check_rowset_id_in_unused_rowsets(const RowsetId& rowset_id,
RowsetSharedPtr* rs) {
bool StorageEngine::check_rowset_id_in_unused_rowsets(const RowsetId& rowset_id) {
std::lock_guard<std::mutex> lock(_gc_mutex);
auto search = _unused_rowsets.find(rowset_id.to_string());
if (search != _unused_rowsets.end()) {
if (rs) {
*rs = search->second;
}
return true;
}
return false;
return search != _unused_rowsets.end();
}

void StorageEngine::create_cumulative_compaction(
Expand Down Expand Up @@ -1149,4 +1143,23 @@ Status StorageEngine::get_compaction_status_json(std::string* result) {
return Status::OK();
}

void StorageEngine::add_quering_rowset(RowsetSharedPtr rs) {
std::lock_guard<std::mutex> lock(_quering_rowsets_mutex);
_querying_rowsets.emplace(rs->rowset_id(), rs);
}

RowsetSharedPtr StorageEngine::get_quering_rowset(RowsetId rs_id) {
std::lock_guard<std::mutex> lock(_quering_rowsets_mutex);
auto it = _querying_rowsets.find(rs_id);
if (it != _querying_rowsets.end()) {
return it->second;
}
return nullptr;
}

void StorageEngine::evict_querying_rowset(RowsetId rs_id) {
std::lock_guard<std::mutex> lock(_quering_rowsets_mutex);
_querying_rowsets.erase(rs_id);
}

} // namespace doris
12 changes: 11 additions & 1 deletion be/src/olap/storage_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ class StorageEngine {
TxnManager* txn_manager() { return _txn_manager.get(); }
MemTableFlushExecutor* memtable_flush_executor() { return _memtable_flush_executor.get(); }

bool check_rowset_id_in_unused_rowsets(const RowsetId& rowset_id, RowsetSharedPtr* rs);
bool check_rowset_id_in_unused_rowsets(const RowsetId& rowset_id);

RowsetId next_rowset_id() { return _rowset_id_generator->next_id(); }

Expand Down Expand Up @@ -224,6 +224,12 @@ class StorageEngine {
int64_t transaction_id, bool is_recover);
int64_t get_pending_publish_min_version(int64_t tablet_id);

void add_quering_rowset(RowsetSharedPtr rs);

RowsetSharedPtr get_quering_rowset(RowsetId rs_id);

void evict_querying_rowset(RowsetId rs_id);

private:
// Instance should be inited from `static open()`
// MUST NOT be called in other circumstances.
Expand Down Expand Up @@ -370,6 +376,10 @@ class StorageEngine {
// map<rowset_id(str), RowsetSharedPtr>, if we use RowsetId as the key, we need custom hash func
std::unordered_map<std::string, RowsetSharedPtr> _unused_rowsets;

// Hold reference of quering rowsets
std::mutex _quering_rowsets_mutex;
std::unordered_map<RowsetId, RowsetSharedPtr, HashOfRowsetId> _querying_rowsets;

// Count the memory consumption of segment compaction tasks.
std::shared_ptr<MemTracker> _segcompaction_mem_tracker;
// This mem tracker is only for tracking memory use by segment meta data such as footer or index page.
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/task/index_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,7 @@ Status IndexBuilder::do_build_inverted_index() {
Status IndexBuilder::modify_rowsets(const Merger::Statistics* stats) {
for (auto rowset_ptr : _output_rowsets) {
auto rowset_id = rowset_ptr->rowset_id();
if (StorageEngine::instance()->check_rowset_id_in_unused_rowsets(rowset_id, nullptr)) {
if (StorageEngine::instance()->check_rowset_id_in_unused_rowsets(rowset_id)) {
DCHECK(false) << "output rowset: " << rowset_id.to_string() << " in unused rowsets";
}
}
Expand Down
15 changes: 5 additions & 10 deletions be/src/service/internal_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1540,17 +1540,12 @@ Status PInternalServiceImpl::_multi_get(const PMultiGetRequest& request,
if (!tablet) {
continue;
}
// Get Rowset from either tablet or unused rowsets, since this rowset maybe expired and swept.
// But we ensured it's rowset is not released when init Tablet reader param, rowset->update_delayed_expired_timestamp();
BetaRowsetSharedPtr rowset =
std::static_pointer_cast<BetaRowset>(tablet->get_rowset(rowset_id));
// We ensured it's rowset is not released when init Tablet reader param, rowset->update_delayed_expired_timestamp();
BetaRowsetSharedPtr rowset = std::static_pointer_cast<BetaRowset>(
StorageEngine::instance()->get_quering_rowset(rowset_id));
if (!rowset) {
RowsetSharedPtr rs;
if (!StorageEngine::instance()->check_rowset_id_in_unused_rowsets(rowset_id, &rs)) {
LOG(INFO) << "no such rowset " << rowset_id;
continue;
}
rowset = std::static_pointer_cast<BetaRowset>(rs);
LOG(INFO) << "no such rowset " << rowset_id;
continue;
}
size_t row_size = 0;
Defer _defer([&]() {
Expand Down
1 change: 1 addition & 0 deletions be/src/vec/exec/scan/new_olap_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,7 @@ Status NewOlapScanner::_init_tablet_reader_params(
UnixSeconds() + _tablet_reader_params.runtime_state->execution_timeout() +
delayed_s;
rs_reader->rowset()->update_delayed_expired_timestamp(delayed_expired_timestamp);
StorageEngine::instance()->add_quering_rowset(rs_reader->rowset());
}
}

Expand Down

0 comments on commit c937224

Please sign in to comment.