diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 69d71dc814f850..95793335f1a1b5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -78,6 +78,8 @@ import org.apache.doris.qe.QueryState; import org.apache.doris.qe.VariableMgr; import org.apache.doris.statistics.AnalysisManager; +import org.apache.doris.statistics.ColStatsData; +import org.apache.doris.statistics.ColumnStatistic; import org.apache.doris.statistics.InvalidateStatsTarget; import org.apache.doris.statistics.StatisticsCacheKey; import org.apache.doris.statistics.TableStatsMeta; @@ -3078,14 +3080,15 @@ private TGetBinlogLagResult getBinlogLagImpl(TGetBinlogRequest request, String c @Override public TStatus updateStatsCache(TUpdateFollowerStatsCacheRequest request) throws TException { - StatisticsCacheKey key = GsonUtils.GSON.fromJson(request.key, StatisticsCacheKey.class); - /* - TODO: Need to handle minExpr and maxExpr, so that we can generate the columnStatistic - here and use putCache to update cached directly. - ColumnStatistic columnStatistic = GsonUtils.GSON.fromJson(request.colStats, ColumnStatistic.class); - Env.getCurrentEnv().getStatisticsCache().putCache(key, columnStatistic); - */ - Env.getCurrentEnv().getStatisticsCache().refreshColStatsSync(key.tableId, key.idxId, key.colName); + StatisticsCacheKey k = GsonUtils.GSON.fromJson(request.key, StatisticsCacheKey.class); + ColStatsData data = GsonUtils.GSON.fromJson(request.colStatsData, ColStatsData.class); + ColumnStatistic c = data.toColumnStatistic(); + if (c == ColumnStatistic.UNKNOWN) { + Env.getCurrentEnv().getStatisticsCache().invalidate(k.tableId, k.idxId, k.colName); + } else { + Env.getCurrentEnv().getStatisticsCache().updateColStatsCache(k.tableId, k.idxId, k.colName, c); + } + // Return Ok anyway return new TStatus(TStatusCode.OK); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJob.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJob.java index f8f6cda259122c..33db1e63aa0a43 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJob.java @@ -136,7 +136,6 @@ protected void writeBuf() { } } updateTaskState(AnalysisState.FINISHED, ""); - syncLoadStats(); queryFinished.clear(); buf.clear(); } @@ -193,17 +192,4 @@ public void deregisterJob() { } } - protected void syncLoadStats() { - long tblId = jobInfo.tblId; - for (BaseAnalysisTask task : queryFinished) { - if (task.info.externalTableLevelTask) { - continue; - } - String colName = task.col.getName(); - if (!Env.getCurrentEnv().getStatisticsCache().syncLoadColStats(tblId, -1, colName)) { - analysisManager.removeColStatsStatus(tblId, colName); - } - } - } - } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java index 1bc6da5a6c2804..febba90cb522aa 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java @@ -709,7 +709,7 @@ public void invalidateRemoteStats(long catalogId, long dbId, long tableId, boolean success = true; for (Frontend frontend : Env.getCurrentEnv().getFrontends(null)) { // Skip master - if (selfNode.equals(frontend.getHost())) { + if (selfNode.getHost().equals(frontend.getHost())) { continue; } success = success && statisticsCache.invalidateStats(frontend, request); diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java index 27101a1d66fa1f..05e3ed27668196 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java @@ -204,16 +204,7 @@ protected void executeWithRetry() { public abstract void doExecute() throws Exception; - protected void afterExecution() { - if (killed) { - return; - } - long tblId = tbl.getId(); - String colName = col.getName(); - if (!Env.getCurrentEnv().getStatisticsCache().syncLoadColStats(tblId, -1, colName)) { - Env.getCurrentEnv().getAnalysisManager().removeColStatsStatus(tblId, colName); - } - } + protected void afterExecution() {} protected void setTaskStateToRunning() { Env.getCurrentEnv().getAnalysisManager() @@ -316,6 +307,7 @@ protected void runQuery(String sql) { try (AutoCloseConnectContext a = StatisticsUtil.buildConnectContext()) { stmtExecutor = new StmtExecutor(a.connectContext, sql); ColStatsData colStatsData = new ColStatsData(stmtExecutor.executeInternalQuery().get(0)); + Env.getCurrentEnv().getStatisticsCache().syncColStats(colStatsData); job.appendBuf(this, Collections.singletonList(colStatsData)); } finally { LOG.debug("End cost time in secs: " + (System.currentTimeMillis() - startTime) / 1000); diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/ColStatsData.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/ColStatsData.java index ab551e2d4cfd5c..bdc600987f4d6d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/ColStatsData.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/ColStatsData.java @@ -17,9 +17,14 @@ package org.apache.doris.statistics; +import org.apache.doris.catalog.Column; +import org.apache.doris.common.AnalysisException; import org.apache.doris.statistics.util.StatisticsUtil; import com.google.common.annotations.VisibleForTesting; +import com.google.gson.annotations.SerializedName; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; @@ -43,17 +48,23 @@ * 13: update_time */ public class ColStatsData { + private static final Logger LOG = LogManager.getLogger(ColStatsData.class); + + @SerializedName("statsId") public final StatsId statsId; + @SerializedName("count") public final long count; + @SerializedName("ndv") public final long ndv; - + @SerializedName("nullCount") public final long nullCount; - + @SerializedName("minLit") public final String minLit; + @SerializedName("maxLit") public final String maxLit; - + @SerializedName("dataSizeInBytes") public final long dataSizeInBytes; - + @SerializedName("updateTime") public final String updateTime; @VisibleForTesting @@ -106,4 +117,56 @@ public String toSQL(boolean roundByParentheses) { sj.add(StatisticsUtil.quote(updateTime)); return sj.toString(); } + + public ColumnStatistic toColumnStatistic() { + try { + ColumnStatisticBuilder columnStatisticBuilder = new ColumnStatisticBuilder(); + columnStatisticBuilder.setCount(count); + columnStatisticBuilder.setNdv(ndv); + columnStatisticBuilder.setNumNulls(nullCount); + columnStatisticBuilder.setDataSize(dataSizeInBytes); + columnStatisticBuilder.setAvgSizeByte(count == 0 ? 0 : dataSizeInBytes / count); + if (statsId == null) { + return ColumnStatistic.UNKNOWN; + } + long catalogId = statsId.catalogId; + long idxId = statsId.idxId; + long dbID = statsId.dbId; + long tblId = statsId.tblId; + String colName = statsId.colId; + Column col = StatisticsUtil.findColumn(catalogId, dbID, tblId, idxId, colName); + if (col == null) { + return ColumnStatistic.UNKNOWN; + } + String min = minLit; + String max = maxLit; + if (min != null && !min.equalsIgnoreCase("NULL")) { + try { + columnStatisticBuilder.setMinValue(StatisticsUtil.convertToDouble(col.getType(), min)); + columnStatisticBuilder.setMinExpr(StatisticsUtil.readableValue(col.getType(), min)); + } catch (AnalysisException e) { + LOG.warn("Failed to process column {} min value {}.", col, min, e); + columnStatisticBuilder.setMinValue(Double.NEGATIVE_INFINITY); + } + } else { + columnStatisticBuilder.setMinValue(Double.NEGATIVE_INFINITY); + } + if (max != null && !max.equalsIgnoreCase("NULL")) { + try { + columnStatisticBuilder.setMaxValue(StatisticsUtil.convertToDouble(col.getType(), max)); + columnStatisticBuilder.setMaxExpr(StatisticsUtil.readableValue(col.getType(), max)); + } catch (AnalysisException e) { + LOG.warn("Failed to process column {} max value {}.", col, max, e); + columnStatisticBuilder.setMaxValue(Double.POSITIVE_INFINITY); + } + } else { + columnStatisticBuilder.setMaxValue(Double.POSITIVE_INFINITY); + } + columnStatisticBuilder.setUpdatedTime(updateTime); + return columnStatisticBuilder.build(); + } catch (Exception e) { + LOG.warn("Failed to convert column statistics.", e); + return ColumnStatistic.UNKNOWN; + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStatistic.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStatistic.java index 5637455b1720c1..dcfefaedd35b1b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStatistic.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStatistic.java @@ -88,6 +88,7 @@ public class ColumnStatistic { public final LiteralExpr minExpr; public final LiteralExpr maxExpr; + @SerializedName("updatedTime") public final String updatedTime; public ColumnStatistic(double count, double ndv, ColumnStatistic original, double avgSizeByte, diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/HMSAnalysisTask.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/HMSAnalysisTask.java index efd99d1eca953d..c6bd61cc5a8e94 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/HMSAnalysisTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/HMSAnalysisTask.java @@ -337,16 +337,6 @@ protected Pair getSampleInfo() { return Pair.of(Math.max(((double) total) / cumulate, 1), cumulate); } - @Override - protected void afterExecution() { - // Table level task doesn't need to sync any value to sync stats, it stores the value in metadata. - // Partition only task doesn't need to refresh cached. - if (isTableLevelTask || isPartitionOnly) { - return; - } - Env.getCurrentEnv().getStatisticsCache().syncLoadColStats(tbl.getId(), -1, col.getName()); - } - /** * If the size to sample is larger than LIMIT_SIZE (1GB) * and is much larger (1.2*) than the size user want to sample, diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/JdbcAnalysisTask.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/JdbcAnalysisTask.java index 2bf72843a71f5e..38ee648cad4be7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/JdbcAnalysisTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/JdbcAnalysisTask.java @@ -137,13 +137,4 @@ private Map buildTableStatsParams(String partId) { commonParams.put("lastAnalyzeTimeInMs", String.valueOf(System.currentTimeMillis())); return commonParams; } - - @Override - protected void afterExecution() { - // Table level task doesn't need to sync any value to sync stats, it stores the value in metadata. - if (isTableLevelTask) { - return; - } - Env.getCurrentEnv().getStatisticsCache().syncLoadColStats(tbl.getId(), -1, col.getName()); - } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCache.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCache.java index 0cf2808222e2b8..73eaaaff1c8584 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsCache.java @@ -21,11 +21,11 @@ import org.apache.doris.common.ClientPool; import org.apache.doris.common.Config; import org.apache.doris.common.ThreadPoolManager; -import org.apache.doris.ha.FrontendNodeType; import org.apache.doris.persist.gson.GsonUtils; import org.apache.doris.qe.ConnectContext; import org.apache.doris.statistics.util.StatisticsUtil; import org.apache.doris.system.Frontend; +import org.apache.doris.system.SystemInfoService; import org.apache.doris.thrift.FrontendService; import org.apache.doris.thrift.TInvalidateFollowerStatsCacheRequest; import org.apache.doris.thrift.TNetworkAddress; @@ -39,12 +39,12 @@ import org.apache.logging.log4j.Logger; import java.time.Duration; +import java.util.ArrayList; import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; public class StatisticsCache { @@ -203,37 +203,34 @@ private void doPreHeat() { } /** - * Return false if the log of corresponding stats load is failed. + * Refresh stats cache, invalidate cache if the new data is unknown. */ - public boolean syncLoadColStats(long tableId, long idxId, String colName) { - List columnResults = StatisticsRepository.loadColStats(tableId, idxId, colName); - final StatisticsCacheKey k = - new StatisticsCacheKey(tableId, idxId, colName); - final ColumnStatistic c = ColumnStatistic.fromResultRow(columnResults); - if (c == ColumnStatistic.UNKNOWN) { - return false; - } - putCache(k, c); - if (ColumnStatistic.UNKNOWN == c) { - return false; + public void syncColStats(ColStatsData data) { + StatsId statsId = data.statsId; + final StatisticsCacheKey k = new StatisticsCacheKey(statsId.tblId, statsId.idxId, statsId.colId); + ColumnStatistic columnStatistic = data.toColumnStatistic(); + if (columnStatistic == ColumnStatistic.UNKNOWN) { + invalidate(k.tableId, k.idxId, k.colName); + } else { + putCache(k, columnStatistic); } TUpdateFollowerStatsCacheRequest updateFollowerStatsCacheRequest = new TUpdateFollowerStatsCacheRequest(); updateFollowerStatsCacheRequest.key = GsonUtils.GSON.toJson(k); - updateFollowerStatsCacheRequest.statsRows = columnResults.stream().map(GsonUtils.GSON::toJson).collect( - Collectors.toList()); - for (Frontend frontend : Env.getCurrentEnv().getFrontends(FrontendNodeType.FOLLOWER)) { - if (StatisticsUtil.isMaster(frontend)) { + updateFollowerStatsCacheRequest.colStatsData = GsonUtils.GSON.toJson(data); + // For compatible only, to be deprecated. + updateFollowerStatsCacheRequest.statsRows = new ArrayList<>(); + SystemInfoService.HostInfo selfNode = Env.getCurrentEnv().getSelfNode(); + for (Frontend frontend : Env.getCurrentEnv().getFrontends(null)) { + if (selfNode.getHost().equals(frontend.getHost())) { continue; } sendStats(frontend, updateFollowerStatsCacheRequest); } - return true; } @VisibleForTesting public void sendStats(Frontend frontend, TUpdateFollowerStatsCacheRequest updateFollowerStatsCacheRequest) { - TNetworkAddress address = new TNetworkAddress(frontend.getHost(), - frontend.getRpcPort()); + TNetworkAddress address = new TNetworkAddress(frontend.getHost(), frontend.getRpcPort()); FrontendService.Client client = null; try { client = ClientPool.frontendPool.borrowObject(address); diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatsId.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatsId.java index 21395638cd6ddf..22f2f73ac27b60 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatsId.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatsId.java @@ -20,20 +20,25 @@ import org.apache.doris.statistics.util.StatisticsUtil; import com.google.common.annotations.VisibleForTesting; +import com.google.gson.annotations.SerializedName; import java.util.StringJoiner; public class StatsId { + @SerializedName("id") public final String id; + @SerializedName("catalogId") public final long catalogId; + @SerializedName("dbId") public final long dbId; + @SerializedName("tblId") public final long tblId; + @SerializedName("idxId") public final long idxId; - + @SerializedName("colId") public final String colId; - - // nullable + @SerializedName("partId") public final String partId; @VisibleForTesting diff --git a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobTest.java index 255ab7106aa2de..1bf2041bb4f12c 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobTest.java @@ -21,7 +21,6 @@ import org.apache.doris.qe.StmtExecutor; import org.apache.doris.statistics.util.StatisticsUtil; -import mockit.Expectations; import mockit.Mock; import mockit.MockUp; import mockit.Mocked; @@ -185,12 +184,6 @@ protected void executeWithExceptionOnFail(StmtExecutor stmtExecutor) throws Exce protected void syncLoadStats() { } }; - new Expectations() { - { - job.syncLoadStats(); - times = 1; - } - }; job.writeBuf(); Assertions.assertEquals(0, job.queryFinished.size()); diff --git a/fe/fe-core/src/test/java/org/apache/doris/statistics/CacheTest.java b/fe/fe-core/src/test/java/org/apache/doris/statistics/CacheTest.java index b8e8e8df4331bb..6eca38be951436 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/statistics/CacheTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/statistics/CacheTest.java @@ -324,7 +324,6 @@ private void sendStats(Frontend frontend, } }; StatisticsCache statisticsCache = new StatisticsCache(); - statisticsCache.syncLoadColStats(1L, 1L, "any"); new Expectations() { { statisticsCache.sendStats((Frontend) any, (TUpdateFollowerStatsCacheRequest) any); @@ -369,7 +368,6 @@ private void sendStats(Frontend frontend, } }; StatisticsCache statisticsCache = new StatisticsCache(); - statisticsCache.syncLoadColStats(1L, 1L, "any"); new Expectations() { { statisticsCache.sendStats((Frontend) any, (TUpdateFollowerStatsCacheRequest) any); diff --git a/fe/fe-core/src/test/java/org/apache/doris/statistics/ColStatsDataTest.java b/fe/fe-core/src/test/java/org/apache/doris/statistics/ColStatsDataTest.java index dcbbe6e2f35a0c..8743105a6444f7 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/statistics/ColStatsDataTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/statistics/ColStatsDataTest.java @@ -17,7 +17,13 @@ package org.apache.doris.statistics; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.PrimitiveType; +import org.apache.doris.statistics.util.StatisticsUtil; + import com.google.common.collect.Lists; +import mockit.Expectations; +import mockit.Mocked; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -93,4 +99,69 @@ public void testConstructNull() { Assertions.assertEquals(0, data.dataSizeInBytes); Assertions.assertEquals(null, data.updateTime); } + + @Test + public void testToColumnStatisticUnknown(@Mocked StatisticsUtil mockedClass) { + // Test column is null + new Expectations() { + { + mockedClass.findColumn(anyLong, anyLong, anyLong, anyLong, anyString); + result = null; + } + }; + List values = Lists.newArrayList(); + values.add("id"); + values.add("10000"); + values.add("20000"); + values.add("30000"); + values.add("0"); + values.add("col"); + values.add(null); + values.add("100"); + values.add("200"); + values.add("300"); + values.add("min"); + values.add("max"); + values.add("400"); + values.add("500"); + ResultRow row = new ResultRow(values); + ColStatsData data = new ColStatsData(row); + ColumnStatistic columnStatistic = data.toColumnStatistic(); + Assertions.assertEquals(ColumnStatistic.UNKNOWN, columnStatistic); + } + + @Test + public void testToColumnStatisticNormal(@Mocked StatisticsUtil mockedClass) { + new Expectations() { + { + mockedClass.findColumn(anyLong, anyLong, anyLong, anyLong, anyString); + result = new Column("colName", PrimitiveType.STRING); + } + }; + List values = Lists.newArrayList(); + values.add("id"); + values.add("10000"); + values.add("20000"); + values.add("30000"); + values.add("0"); + values.add("col"); + values.add(null); + values.add("100"); + values.add("200"); + values.add("300"); + values.add("null"); + values.add("null"); + values.add("400"); + values.add("500"); + ResultRow row = new ResultRow(values); + ColStatsData data = new ColStatsData(row); + ColumnStatistic columnStatistic = data.toColumnStatistic(); + Assertions.assertEquals(100, columnStatistic.count); + Assertions.assertEquals(200, columnStatistic.ndv); + Assertions.assertEquals(300, columnStatistic.numNulls); + Assertions.assertEquals(Double.NEGATIVE_INFINITY, columnStatistic.minValue); + Assertions.assertEquals(Double.POSITIVE_INFINITY, columnStatistic.maxValue); + Assertions.assertEquals(400, columnStatistic.dataSize); + Assertions.assertEquals("500", columnStatistic.updatedTime); + } } diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index 3976fe20897de6..bdaa4ea28e981d 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -1121,7 +1121,8 @@ struct TGetBinlogLagResult { struct TUpdateFollowerStatsCacheRequest { 1: optional string key; - 2: list statsRows; + 2: optional list statsRows; + 3: optional string colStatsData; } struct TInvalidateFollowerStatsCacheRequest {