Skip to content

Commit

Permalink
Sync stats cache while task finished, doesn't need to query column_st…
Browse files Browse the repository at this point in the history
…atistics table. (#30609)
  • Loading branch information
Jibing-Li authored Jan 31, 2024
1 parent 3c7aa0b commit a9b93ed
Show file tree
Hide file tree
Showing 14 changed files with 176 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,9 @@
import org.apache.doris.qe.VariableMgr;
import org.apache.doris.service.arrowflight.FlightSqlConnectProcessor;
import org.apache.doris.statistics.AnalysisManager;
import org.apache.doris.statistics.ColStatsData;
import org.apache.doris.statistics.ColumnStatistic;
import org.apache.doris.statistics.InvalidateStatsTarget;
import org.apache.doris.statistics.ResultRow;
import org.apache.doris.statistics.StatisticsCacheKey;
import org.apache.doris.statistics.TableStatsMeta;
import org.apache.doris.statistics.query.QueryStats;
Expand Down Expand Up @@ -3053,11 +3053,11 @@ private TGetBinlogLagResult getBinlogLagImpl(TGetBinlogRequest request, String c
@Override
public TStatus updateStatsCache(TUpdateFollowerStatsCacheRequest request) throws TException {
StatisticsCacheKey k = GsonUtils.GSON.fromJson(request.key, StatisticsCacheKey.class);
List<ResultRow> rows = request.statsRows.stream()
.map(s -> GsonUtils.GSON.fromJson(s, ResultRow.class))
.collect(Collectors.toList());
ColumnStatistic c = ColumnStatistic.fromResultRow(rows);
if (c != ColumnStatistic.UNKNOWN) {
ColStatsData data = GsonUtils.GSON.fromJson(request.colStatsData, ColStatsData.class);
ColumnStatistic c = data.toColumnStatistic();
if (c == ColumnStatistic.UNKNOWN) {
Env.getCurrentEnv().getStatisticsCache().invalidate(k.tableId, k.idxId, k.colName);
} else {
Env.getCurrentEnv().getStatisticsCache().updateColStatsCache(k.tableId, k.idxId, k.colName, c);
}
// Return Ok anyway
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,6 @@ protected void writeBuf() {
}
}
updateTaskState(AnalysisState.FINISHED, "");
syncLoadStats();
queryFinished.clear();
buf.clear();
}
Expand Down Expand Up @@ -192,17 +191,4 @@ public void deregisterJob() {
}
}

protected void syncLoadStats() {
long tblId = jobInfo.tblId;
for (BaseAnalysisTask task : queryFinished) {
if (task.info.externalTableLevelTask) {
continue;
}
String colName = task.col.getName();
if (!Env.getCurrentEnv().getStatisticsCache().syncLoadColStats(tblId, -1, colName)) {
analysisManager.removeColStatsStatus(tblId, colName);
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -705,7 +705,7 @@ public void invalidateRemoteStats(long catalogId, long dbId, long tableId,
boolean success = true;
for (Frontend frontend : Env.getCurrentEnv().getFrontends(null)) {
// Skip master
if (selfNode.equals(frontend.getHost())) {
if (selfNode.getHost().equals(frontend.getHost())) {
continue;
}
success = success && statisticsCache.invalidateStats(frontend, request);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,16 +205,7 @@ protected void executeWithRetry() {

public abstract void doExecute() throws Exception;

protected void afterExecution() {
if (killed) {
return;
}
long tblId = tbl.getId();
String colName = col.getName();
if (!Env.getCurrentEnv().getStatisticsCache().syncLoadColStats(tblId, -1, colName)) {
Env.getCurrentEnv().getAnalysisManager().removeColStatsStatus(tblId, colName);
}
}
protected void afterExecution() {}

protected void setTaskStateToRunning() {
Env.getCurrentEnv().getAnalysisManager()
Expand Down Expand Up @@ -318,6 +309,7 @@ protected void runQuery(String sql) {
try (AutoCloseConnectContext a = StatisticsUtil.buildConnectContext()) {
stmtExecutor = new StmtExecutor(a.connectContext, sql);
ColStatsData colStatsData = new ColStatsData(stmtExecutor.executeInternalQuery().get(0));
Env.getCurrentEnv().getStatisticsCache().syncColStats(colStatsData);
queryId = DebugUtil.printId(stmtExecutor.getContext().queryId());
job.appendBuf(this, Collections.singletonList(colStatsData));
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,14 @@

package org.apache.doris.statistics;

import org.apache.doris.catalog.Column;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.statistics.util.StatisticsUtil;

import com.google.common.annotations.VisibleForTesting;
import com.google.gson.annotations.SerializedName;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
Expand All @@ -43,17 +48,23 @@
* 13: update_time
*/
public class ColStatsData {
private static final Logger LOG = LogManager.getLogger(ColStatsData.class);

@SerializedName("statsId")
public final StatsId statsId;
@SerializedName("count")
public final long count;
@SerializedName("ndv")
public final long ndv;

@SerializedName("nullCount")
public final long nullCount;

@SerializedName("minLit")
public final String minLit;
@SerializedName("maxLit")
public final String maxLit;

@SerializedName("dataSizeInBytes")
public final long dataSizeInBytes;

@SerializedName("updateTime")
public final String updateTime;

@VisibleForTesting
Expand Down Expand Up @@ -106,4 +117,56 @@ public String toSQL(boolean roundByParentheses) {
sj.add(StatisticsUtil.quote(updateTime));
return sj.toString();
}

public ColumnStatistic toColumnStatistic() {
try {
ColumnStatisticBuilder columnStatisticBuilder = new ColumnStatisticBuilder();
columnStatisticBuilder.setCount(count);
columnStatisticBuilder.setNdv(ndv);
columnStatisticBuilder.setNumNulls(nullCount);
columnStatisticBuilder.setDataSize(dataSizeInBytes);
columnStatisticBuilder.setAvgSizeByte(count == 0 ? 0 : dataSizeInBytes / count);
if (statsId == null) {
return ColumnStatistic.UNKNOWN;
}
long catalogId = statsId.catalogId;
long idxId = statsId.idxId;
long dbID = statsId.dbId;
long tblId = statsId.tblId;
String colName = statsId.colId;
Column col = StatisticsUtil.findColumn(catalogId, dbID, tblId, idxId, colName);
if (col == null) {
return ColumnStatistic.UNKNOWN;
}
String min = minLit;
String max = maxLit;
if (min != null && !min.equalsIgnoreCase("NULL")) {
try {
columnStatisticBuilder.setMinValue(StatisticsUtil.convertToDouble(col.getType(), min));
columnStatisticBuilder.setMinExpr(StatisticsUtil.readableValue(col.getType(), min));
} catch (AnalysisException e) {
LOG.warn("Failed to process column {} min value {}.", col, min, e);
columnStatisticBuilder.setMinValue(Double.NEGATIVE_INFINITY);
}
} else {
columnStatisticBuilder.setMinValue(Double.NEGATIVE_INFINITY);
}
if (max != null && !max.equalsIgnoreCase("NULL")) {
try {
columnStatisticBuilder.setMaxValue(StatisticsUtil.convertToDouble(col.getType(), max));
columnStatisticBuilder.setMaxExpr(StatisticsUtil.readableValue(col.getType(), max));
} catch (AnalysisException e) {
LOG.warn("Failed to process column {} max value {}.", col, max, e);
columnStatisticBuilder.setMaxValue(Double.POSITIVE_INFINITY);
}
} else {
columnStatisticBuilder.setMaxValue(Double.POSITIVE_INFINITY);
}
columnStatisticBuilder.setUpdatedTime(updateTime);
return columnStatisticBuilder.build();
} catch (Exception e) {
LOG.warn("Failed to convert column statistics.", e);
return ColumnStatistic.UNKNOWN;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ public class ColumnStatistic {
public final LiteralExpr minExpr;
public final LiteralExpr maxExpr;

@SerializedName("updatedTime")
public final String updatedTime;

public ColumnStatistic(double count, double ndv, ColumnStatistic original, double avgSizeByte,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,16 +225,6 @@ protected Pair<Double, Long> getSampleInfo() {
return Pair.of(Math.max(((double) total) / cumulate, 1), cumulate);
}

@Override
protected void afterExecution() {
// Table level task doesn't need to sync any value to sync stats, it stores the value in metadata.
// Partition only task doesn't need to refresh cached.
if (isTableLevelTask || isPartitionOnly) {
return;
}
Env.getCurrentEnv().getStatisticsCache().syncLoadColStats(tbl.getId(), -1, col.getName());
}

/**
* If the size to sample is larger than LIMIT_SIZE (1GB)
* and is much larger (1.2*) than the size user want to sample,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,13 +137,4 @@ private Map<String, String> buildTableStatsParams(String partId) {
commonParams.put("lastAnalyzeTimeInMs", String.valueOf(System.currentTimeMillis()));
return commonParams;
}

@Override
protected void afterExecution() {
// Table level task doesn't need to sync any value to sync stats, it stores the value in metadata.
if (isTableLevelTask) {
return;
}
Env.getCurrentEnv().getStatisticsCache().syncLoadColStats(tbl.getId(), -1, col.getName());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@
import org.apache.doris.common.ClientPool;
import org.apache.doris.common.Config;
import org.apache.doris.common.ThreadPoolManager;
import org.apache.doris.ha.FrontendNodeType;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.statistics.util.StatisticsUtil;
import org.apache.doris.system.Frontend;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.thrift.FrontendService;
import org.apache.doris.thrift.TInvalidateFollowerStatsCacheRequest;
import org.apache.doris.thrift.TNetworkAddress;
Expand All @@ -39,12 +39,12 @@
import org.apache.logging.log4j.Logger;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

public class StatisticsCache {

Expand Down Expand Up @@ -203,37 +203,34 @@ private void doPreHeat() {
}

/**
* Return false if the log of corresponding stats load is failed.
* Refresh stats cache, invalidate cache if the new data is unknown.
*/
public boolean syncLoadColStats(long tableId, long idxId, String colName) {
List<ResultRow> columnResults = StatisticsRepository.loadColStats(tableId, idxId, colName);
final StatisticsCacheKey k =
new StatisticsCacheKey(tableId, idxId, colName);
final ColumnStatistic c = ColumnStatistic.fromResultRow(columnResults);
if (c == ColumnStatistic.UNKNOWN) {
return false;
}
putCache(k, c);
if (ColumnStatistic.UNKNOWN == c) {
return false;
public void syncColStats(ColStatsData data) {
StatsId statsId = data.statsId;
final StatisticsCacheKey k = new StatisticsCacheKey(statsId.tblId, statsId.idxId, statsId.colId);
ColumnStatistic columnStatistic = data.toColumnStatistic();
if (columnStatistic == ColumnStatistic.UNKNOWN) {
invalidate(k.tableId, k.idxId, k.colName);
} else {
putCache(k, columnStatistic);
}
TUpdateFollowerStatsCacheRequest updateFollowerStatsCacheRequest = new TUpdateFollowerStatsCacheRequest();
updateFollowerStatsCacheRequest.key = GsonUtils.GSON.toJson(k);
updateFollowerStatsCacheRequest.statsRows = columnResults.stream().map(GsonUtils.GSON::toJson).collect(
Collectors.toList());
for (Frontend frontend : Env.getCurrentEnv().getFrontends(FrontendNodeType.FOLLOWER)) {
if (StatisticsUtil.isMaster(frontend)) {
updateFollowerStatsCacheRequest.colStatsData = GsonUtils.GSON.toJson(data);
// For compatible only, to be deprecated.
updateFollowerStatsCacheRequest.statsRows = new ArrayList<>();
SystemInfoService.HostInfo selfNode = Env.getCurrentEnv().getSelfNode();
for (Frontend frontend : Env.getCurrentEnv().getFrontends(null)) {
if (selfNode.getHost().equals(frontend.getHost())) {
continue;
}
sendStats(frontend, updateFollowerStatsCacheRequest);
}
return true;
}

@VisibleForTesting
public void sendStats(Frontend frontend, TUpdateFollowerStatsCacheRequest updateFollowerStatsCacheRequest) {
TNetworkAddress address = new TNetworkAddress(frontend.getHost(),
frontend.getRpcPort());
TNetworkAddress address = new TNetworkAddress(frontend.getHost(), frontend.getRpcPort());
FrontendService.Client client = null;
try {
client = ClientPool.frontendPool.borrowObject(address);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,25 @@
import org.apache.doris.statistics.util.StatisticsUtil;

import com.google.common.annotations.VisibleForTesting;
import com.google.gson.annotations.SerializedName;

import java.util.StringJoiner;

public class StatsId {

@SerializedName("id")
public final String id;
@SerializedName("catalogId")
public final long catalogId;
@SerializedName("dbId")
public final long dbId;
@SerializedName("tblId")
public final long tblId;
@SerializedName("idxId")
public final long idxId;

@SerializedName("colId")
public final String colId;

// nullable
@SerializedName("partId")
public final String partId;

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.statistics.util.StatisticsUtil;

import mockit.Expectations;
import mockit.Mock;
import mockit.MockUp;
import mockit.Mocked;
Expand Down Expand Up @@ -185,12 +184,6 @@ protected void executeWithExceptionOnFail(StmtExecutor stmtExecutor) throws Exce
protected void syncLoadStats() {
}
};
new Expectations() {
{
job.syncLoadStats();
times = 1;
}
};
job.writeBuf();

Assertions.assertEquals(0, job.queryFinished.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,6 @@ private void sendStats(Frontend frontend,
}
};
StatisticsCache statisticsCache = new StatisticsCache();
statisticsCache.syncLoadColStats(1L, 1L, "any");
new Expectations() {
{
statisticsCache.sendStats((Frontend) any, (TUpdateFollowerStatsCacheRequest) any);
Expand Down Expand Up @@ -346,7 +345,6 @@ private void sendStats(Frontend frontend,
}
};
StatisticsCache statisticsCache = new StatisticsCache();
statisticsCache.syncLoadColStats(1L, 1L, "any");
new Expectations() {
{
statisticsCache.sendStats((Frontend) any, (TUpdateFollowerStatsCacheRequest) any);
Expand Down
Loading

0 comments on commit a9b93ed

Please sign in to comment.