Skip to content

Commit

Permalink
[enhancement](statistics) collect table level loaded rows on BE to ma…
Browse files Browse the repository at this point in the history
…ke RPC light weight (#24609)
  • Loading branch information
TangSiyang2001 authored Sep 28, 2023
1 parent 42207df commit 188d9ab
Show file tree
Hide file tree
Showing 9 changed files with 60 additions and 44 deletions.
7 changes: 4 additions & 3 deletions be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1529,16 +1529,17 @@ void PublishVersionTaskPool::_publish_version_worker_thread_callback() {
std::map<TTabletId, TVersion> succ_tablets;
// partition_id, tablet_id, publish_version
std::vector<std::tuple<int64_t, int64_t, int64_t>> discontinuous_version_tablets;
std::map<TTabletId, int64_t> tablet_id_to_num_delta_rows;
std::map<TTableId, int64_t> table_id_to_num_delta_rows;
uint32_t retry_time = 0;
Status status;
bool is_task_timeout = false;
while (retry_time < PUBLISH_VERSION_MAX_RETRY) {
succ_tablets.clear();
error_tablet_ids.clear();
table_id_to_num_delta_rows.clear();
EnginePublishVersionTask engine_task(publish_version_req, &error_tablet_ids,
&succ_tablets, &discontinuous_version_tablets,
&tablet_id_to_num_delta_rows);
&table_id_to_num_delta_rows);
status = StorageEngine::instance()->execute_task(&engine_task);
if (status.ok()) {
break;
Expand Down Expand Up @@ -1623,7 +1624,7 @@ void PublishVersionTaskPool::_publish_version_worker_thread_callback() {
finish_task_request.__set_succ_tablets(succ_tablets);
finish_task_request.__set_error_tablet_ids(
std::vector<TTabletId>(error_tablet_ids.begin(), error_tablet_ids.end()));
finish_task_request.__set_tablet_id_to_delta_num_rows(tablet_id_to_num_delta_rows);
finish_task_request.__set_table_id_to_delta_num_rows(table_id_to_num_delta_rows);
_finish_task(finish_task_request);
_remove_task_info(agent_task_req.task_type, agent_task_req.signature);
}
Expand Down
22 changes: 17 additions & 5 deletions be/src/olap/task/engine_publish_version_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include <set>
#include <shared_mutex>
#include <string>
#include <unordered_map>
#include <utility>

#include "common/logging.h"
Expand Down Expand Up @@ -73,12 +74,12 @@ EnginePublishVersionTask::EnginePublishVersionTask(
const TPublishVersionRequest& publish_version_req, std::set<TTabletId>* error_tablet_ids,
std::map<TTabletId, TVersion>* succ_tablets,
std::vector<std::tuple<int64_t, int64_t, int64_t>>* discontinuous_version_tablets,
std::map<TTabletId, int64_t>* tablet_id_to_num_delta_rows)
std::map<TTableId, int64_t>* table_id_to_num_delta_rows)
: _publish_version_req(publish_version_req),
_error_tablet_ids(error_tablet_ids),
_succ_tablets(succ_tablets),
_discontinuous_version_tablets(discontinuous_version_tablets),
_tablet_id_to_num_delta_rows(tablet_id_to_num_delta_rows) {}
_table_id_to_num_delta_rows(table_id_to_num_delta_rows) {}

void EnginePublishVersionTask::add_error_tablet_id(int64_t tablet_id) {
std::lock_guard<std::mutex> lck(_tablet_ids_mutex);
Expand All @@ -93,7 +94,7 @@ Status EnginePublishVersionTask::finish() {
std::unique_ptr<ThreadPoolToken> token =
StorageEngine::instance()->tablet_publish_txn_thread_pool()->new_token(
ThreadPool::ExecutionMode::CONCURRENT);

std::unordered_map<int64_t, int64_t> tablet_id_to_num_delta_rows;
// each partition
for (auto& par_ver_info : _publish_version_req.partition_version_infos) {
int64_t partition_id = par_ver_info.partition_id;
Expand Down Expand Up @@ -189,9 +190,11 @@ Status EnginePublishVersionTask::finish() {
continue;
}
}

auto rowset_meta_ptr = rowset->rowset_meta();
_tablet_id_to_num_delta_rows->insert(
tablet_id_to_num_delta_rows.insert(
{rowset_meta_ptr->tablet_id(), rowset_meta_ptr->num_rows()});

auto tablet_publish_txn_ptr = std::make_shared<TabletPublishTxnTask>(
this, tablet, rowset, partition_id, transaction_id, version, tablet_info);
auto submit_st = token->submit_func([=]() { tablet_publish_txn_ptr->handle(); });
Expand All @@ -208,7 +211,6 @@ Status EnginePublishVersionTask::finish() {
std::set<TabletInfo> partition_related_tablet_infos;
StorageEngine::instance()->tablet_manager()->get_partition_related_tablets(
partition_id, &partition_related_tablet_infos);

Version version(par_ver_info.version, par_ver_info.version);
for (auto& tablet_info : partition_related_tablet_infos) {
TabletSharedPtr tablet =
Expand Down Expand Up @@ -245,6 +247,7 @@ Status EnginePublishVersionTask::finish() {
}
}
}
_calculate_tbl_num_delta_rows(tablet_id_to_num_delta_rows);

if (!res.is<PUBLISH_VERSION_NOT_CONTINUOUS>()) {
LOG(INFO) << "finish to publish version on transaction."
Expand All @@ -256,6 +259,15 @@ Status EnginePublishVersionTask::finish() {
return res;
}

void EnginePublishVersionTask::_calculate_tbl_num_delta_rows(
const std::unordered_map<int64_t, int64_t>& tablet_id_to_num_delta_rows) {
for (const auto& kv : tablet_id_to_num_delta_rows) {
auto table_id =
StorageEngine::instance()->tablet_manager()->get_tablet(kv.first)->get_table_id();
(*_table_id_to_num_delta_rows)[table_id] += kv.second;
}
}

TabletPublishTxnTask::TabletPublishTxnTask(EnginePublishVersionTask* engine_task,
TabletSharedPtr tablet, RowsetSharedPtr rowset,
int64_t partition_id, int64_t transaction_id,
Expand Down
7 changes: 5 additions & 2 deletions be/src/olap/task/engine_publish_version_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ class EnginePublishVersionTask : public EngineTask {
const TPublishVersionRequest& publish_version_req,
std::set<TTabletId>* error_tablet_ids, std::map<TTabletId, TVersion>* succ_tablets,
std::vector<std::tuple<int64_t, int64_t, int64_t>>* discontinous_version_tablets,
std::map<TTabletId, int64_t>* tablet_id_to_num_delta_rows);
std::map<TTableId, int64_t>* table_id_to_num_delta_rows);
~EnginePublishVersionTask() override = default;

Status finish() override;
Expand All @@ -98,12 +98,15 @@ class EnginePublishVersionTask : public EngineTask {
int64_t finish_task();

private:
void _calculate_tbl_num_delta_rows(
const std::unordered_map<int64_t, int64_t>& tablet_id_to_num_delta_rows);

const TPublishVersionRequest& _publish_version_req;
std::mutex _tablet_ids_mutex;
std::set<TTabletId>* _error_tablet_ids;
std::map<TTabletId, TVersion>* _succ_tablets;
std::vector<std::tuple<int64_t, int64_t, int64_t>>* _discontinuous_version_tablets;
std::map<TTabletId, int64_t>* _tablet_id_to_num_delta_rows;
std::map<TTableId, int64_t>* _table_id_to_num_delta_rows;
};

class AsyncTabletPublishTask {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -491,8 +491,8 @@ private void finishPublishVersion(AgentTask task, TFinishTaskRequest request) {
// not remove the task from queue and be will retry
return;
}
if (request.isSetTabletIdToDeltaNumRows()) {
publishVersionTask.setTabletIdToDeltaNumRows(request.getTabletIdToDeltaNumRows());
if (request.isSetTableIdToDeltaNumRows()) {
publishVersionTask.setTableIdToDeltaNumRows(request.getTableIdToDeltaNumRows());
}
AgentTaskQueue.removeTask(publishVersionTask.getBackendId(),
publishVersionTask.getTaskType(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ public class PublishVersionTask extends AgentTask {
private Map<Long, Long> succTablets;

/**
* To collect loaded rows for each tablet from each BE
* To collect loaded rows for each table from each BE
*/
private final Map<Long, Long> tabletIdToDeltaNumRows = Maps.newHashMap();
private final Map<Long, Long> tableIdToDeltaNumRows = Maps.newHashMap();

public PublishVersionTask(long backendId, long transactionId, long dbId,
List<TPartitionVersionInfo> partitionVersionInfos, long createTime) {
Expand Down Expand Up @@ -88,11 +88,11 @@ public synchronized void addErrorTablets(List<Long> errorTablets) {
this.errorTablets.addAll(errorTablets);
}

public void setTabletIdToDeltaNumRows(Map<Long, Long> tabletIdToDeltaNumRows) {
this.tabletIdToDeltaNumRows.putAll(tabletIdToDeltaNumRows);
public void setTableIdToDeltaNumRows(Map<Long, Long> tabletIdToDeltaNumRows) {
this.tableIdToDeltaNumRows.putAll(tabletIdToDeltaNumRows);
}

public Map<Long, Long> getTabletIdToDeltaNumRows() {
return tabletIdToDeltaNumRows;
public Map<Long, Long> getTableIdToDeltaNumRows() {
return tableIdToDeltaNumRows;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1799,8 +1799,17 @@ private boolean updateCatalogAfterVisible(TransactionState transactionState, Dat
}
}
AnalysisManager analysisManager = Env.getCurrentEnv().getAnalysisManager();
LOG.debug("table id to loaded rows:{}", transactionState.getTableIdToNumDeltaRows());
transactionState.getTableIdToNumDeltaRows().forEach(analysisManager::updateUpdatedRows);
Map<Long, Long> tableIdToTotalNumDeltaRows = transactionState.getTableIdToTotalNumDeltaRows();
LOG.debug("table id to loaded rows:{}", tableIdToTotalNumDeltaRows);
Map<Long, Long> tableIdToNumDeltaRows = Maps.newHashMap();
tableIdToTotalNumDeltaRows
.forEach((tableId, numRows) -> {
OlapTable table = (OlapTable) db.getTableNullable(tableId);
if (table != null) {
tableIdToNumDeltaRows.put(tableId, numRows / table.getReplicaCount());
}
});
tableIdToNumDeltaRows.forEach(analysisManager::updateUpdatedRows);
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
package org.apache.doris.transaction;

import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.TabletInvertedIndex;
import org.apache.doris.catalog.TabletMeta;
import org.apache.doris.common.Config;
import org.apache.doris.common.util.MasterDaemon;
import org.apache.doris.metric.MetricRepo;
Expand Down Expand Up @@ -127,8 +125,6 @@ private void publishVersion() {
AgentTaskExecutor.submit(batchTask);
}

TabletInvertedIndex tabletInvertedIndex = Env.getCurrentEnv().getTabletInvertedIndex();
Set<Long> tabletIdFilter = Sets.newHashSet();
Map<Long, Long> tableIdToNumDeltaRows = Maps.newHashMap();
// try to finish the transaction, if failed just retry in next loop
for (TransactionState transactionState : readyTransactionStates) {
Expand All @@ -138,26 +134,18 @@ private void publishVersion() {
.stream()
.peek(task -> {
if (task.isFinished() && CollectionUtils.isEmpty(task.getErrorTablets())) {
Map<Long, Long> tabletIdToDeltaNumRows =
task.getTabletIdToDeltaNumRows();
tabletIdToDeltaNumRows.forEach((tabletId, numRows) -> {
if (!tabletIdFilter.add(tabletId)) {
// means the delta num rows for this tablet id has been collected
return;
}
TabletMeta tabletMeta = tabletInvertedIndex.getTabletMeta(tabletId);
if (tabletMeta == null) {
// for delete, drop, schema change etc. here may be a null value
return;
}
long tableId = tabletMeta.getTableId();
tableIdToNumDeltaRows.computeIfPresent(tableId, (tblId, orgNum) -> orgNum + numRows);
Map<Long, Long> tableIdToDeltaNumRows =
task.getTableIdToDeltaNumRows();
tableIdToDeltaNumRows.forEach((tableId, numRows) -> {
tableIdToDeltaNumRows
.computeIfPresent(tableId, (id, orgNumRows) -> orgNumRows + numRows);
tableIdToNumDeltaRows.putIfAbsent(tableId, numRows);
});
}
});
boolean hasBackendAliveAndUnfinishedTask = publishVersionTaskStream
.anyMatch(task -> !task.isFinished() && infoService.checkBackendAlive(task.getBackendId()));
transactionState.setTableIdToTotalNumDeltaRows(tableIdToNumDeltaRows);

boolean shouldFinishTxn = !hasBackendAliveAndUnfinishedTask || transactionState.isPublishTimeout();
if (shouldFinishTxn) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,10 @@ public String toString() {
// tbl id -> (index ids)
private Map<Long, Set<Long>> loadedTblIndexes = Maps.newHashMap();

private Map<Long, Long> tableIdToNumDeltaRows = Maps.newHashMap();
/**
* the value is the num delta rows of all replicas in each table
*/
private final Map<Long, Long> tableIdToTotalNumDeltaRows = Maps.newHashMap();

private String errorLogUrl = null;

Expand Down Expand Up @@ -703,12 +706,12 @@ public void readFields(DataInput in) throws IOException {
}
}

public Map<Long, Long> getTableIdToNumDeltaRows() {
return tableIdToNumDeltaRows;
public Map<Long, Long> getTableIdToTotalNumDeltaRows() {
return tableIdToTotalNumDeltaRows;
}

public void setTableIdToNumDeltaRows(Map<Long, Long> tableIdToNumDeltaRows) {
this.tableIdToNumDeltaRows.putAll(tableIdToNumDeltaRows);
public void setTableIdToTotalNumDeltaRows(Map<Long, Long> tableIdToTotalNumDeltaRows) {
this.tableIdToTotalNumDeltaRows.putAll(tableIdToTotalNumDeltaRows);
}

public void setErrorMsg(String errMsg) {
Expand Down
2 changes: 1 addition & 1 deletion gensrc/thrift/MasterService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ struct TFinishTaskRequest {
15: optional i64 copy_size
16: optional i64 copy_time_ms
17: optional map<Types.TTabletId, Types.TVersion> succ_tablets
18: optional map<i64, i64> tablet_id_to_delta_num_rows
18: optional map<i64, i64> table_id_to_delta_num_rows
}

struct TTablet {
Expand Down

0 comments on commit 188d9ab

Please sign in to comment.