Skip to content

Commit

Permalink
Support high priority column stats auto collection.
Browse files Browse the repository at this point in the history
* fix visible column (#33023)
* Collect high priority columns. (#31235)
* High priority queue and map. (#31509)
* Support column level health value. (#31794)
* Support follower sync query columns to master. (#31859)
* Support show auto analyze pending jobs. (#31926)
* Check column health value earlier, show job priority. (#32064)
* support window (#32094)
* Refactor. (#32273)
* refactor2 (#32278)
* Unit test (#32398)
* Support auto analyze mv (#32433)
* Fix bug (#32454)
* Support identical column name in different index. (#32957)
* Fix visible column
* Use future to block auto analyze before job finish. (#33083)
* Fix ut. (#33147)
* Fix ut (#33161)
* fix p0 (#33210)
* Improve failover logic. (#33382)
* Improve waiting empty table logic. (#33472)
* Fix pipeline (#33671)
  • Loading branch information
Jibing-Li committed Apr 25, 2024
1 parent 79890c1 commit 4c8a145
Show file tree
Hide file tree
Showing 51 changed files with 2,422 additions and 940 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1613,7 +1613,7 @@ public class Config extends ConfigBase {
"This parameter controls the time interval for automatic collection jobs to check the health of table"
+ "statistics and trigger automatic collection"
})
public static int auto_check_statistics_in_minutes = 5;
public static int auto_check_statistics_in_minutes = 1;

/**
* If set to TRUE, the compaction slower replica will be skipped when select get queryable replicas
Expand Down
4 changes: 4 additions & 0 deletions fe/fe-core/src/main/cup/sql_parser.cup
Original file line number Diff line number Diff line change
Expand Up @@ -4568,6 +4568,10 @@ show_param ::=
{:
RESULT = new ShowAnalyzeStmt(tbl, parser.where, true);
:}
| KW_AUTO KW_JOBS opt_table_name:tbl opt_wild_where
{:
RESULT = new ShowAutoAnalyzeJobsStmt(tbl, parser.where);
:}
| KW_ANALYZE KW_TASK KW_STATUS INTEGER_LITERAL:jobId
{:
RESULT = new ShowAnalyzeTaskStatus(jobId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public class AnalyzeProperties {
public static final String PROPERTY_PERIOD_SECONDS = "period.seconds";
public static final String PROPERTY_FORCE_FULL = "force.full";
public static final String PROPERTY_PARTITION_COLUMN_FROM_SQL = "partition.column.from.sql";
public static final String PROPERTY_USE_AUTO_ANALYZER = "use.auto.analyzer";

public static final AnalyzeProperties DEFAULT_PROP = new AnalyzeProperties(new HashMap<String, String>() {
{
Expand Down Expand Up @@ -72,6 +73,7 @@ public class AnalyzeProperties {
.add(PROPERTY_PERIOD_CRON)
.add(PROPERTY_FORCE_FULL)
.add(PROPERTY_PARTITION_COLUMN_FROM_SQL)
.add(PROPERTY_USE_AUTO_ANALYZER)
.build();

public AnalyzeProperties(Map<String, String> properties) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public class ShowAnalyzeStmt extends ShowStmt {
.add("schedule_type")
.add("start_time")
.add("end_time")
.add("priority")
.build();

private long jobId;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.analysis;

import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.UserException;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.ShowResultSetMetaData;
import org.apache.doris.statistics.JobPriority;

import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;

/**
* ShowAutoAnalyzeJobsStmt is used to show pending auto analysis jobs.
* syntax:
* SHOW AUTO ANALYZE JOBS
* [TABLE]
* [
* WHERE
* [PRIORITY = ["HIGH"|"MID"|"LOW"]]
* ]
*/
public class ShowAutoAnalyzeJobsStmt extends ShowStmt {
private static final String PRIORITY = "priority";
private static final ImmutableList<String> TITLE_NAMES = new ImmutableList.Builder<String>()
.add("catalog_name")
.add("db_name")
.add("tbl_name")
.add("col_list")
.add("priority")
.build();

private final TableName tableName;
private final Expr whereClause;

public ShowAutoAnalyzeJobsStmt(TableName tableName, Expr whereClause) {
this.tableName = tableName;
this.whereClause = whereClause;
}

// extract from predicate
private String jobPriority;

public String getPriority() {
Preconditions.checkArgument(isAnalyzed(),
"The stateValue must be obtained after the parsing is complete");
return jobPriority;
}

public Expr getWhereClause() {
Preconditions.checkArgument(isAnalyzed(),
"The whereClause must be obtained after the parsing is complete");
return whereClause;
}

@Override
public void analyze(Analyzer analyzer) throws UserException {
if (!ConnectContext.get().getSessionVariable().enableStats) {
throw new UserException("Analyze function is forbidden, you should add `enable_stats=true`"
+ "in your FE conf file");
}
super.analyze(analyzer);
if (tableName != null) {
tableName.analyze(analyzer);
String catalogName = tableName.getCtl();
String dbName = tableName.getDb();
String tblName = tableName.getTbl();
checkShowAnalyzePriv(catalogName, dbName, tblName);
}

// analyze where clause if not null
if (whereClause != null) {
analyzeSubPredicate(whereClause);
}
}

@Override
public ShowResultSetMetaData getMetaData() {
ShowResultSetMetaData.Builder builder = ShowResultSetMetaData.builder();
for (String title : TITLE_NAMES) {
builder.addColumn(new Column(title, ScalarType.createVarchar(128)));
}
return builder.build();
}

@Override
public RedirectStatus getRedirectStatus() {
return RedirectStatus.FORWARD_NO_SYNC;
}

private void checkShowAnalyzePriv(String catalogName, String dbName, String tblName) throws AnalysisException {
if (!Env.getCurrentEnv().getAccessManager()
.checkTblPriv(ConnectContext.get(), catalogName, dbName, tblName, PrivPredicate.SHOW)) {
ErrorReport.reportAnalysisException(
ErrorCode.ERR_TABLEACCESS_DENIED_ERROR,
"SHOW ANALYZE",
ConnectContext.get().getQualifiedUser(),
ConnectContext.get().getRemoteIP(),
dbName + ": " + tblName);
}
}

private void analyzeSubPredicate(Expr subExpr) throws AnalysisException {
if (subExpr == null) {
return;
}

boolean valid = true;

CHECK: {
if (subExpr instanceof BinaryPredicate) {
BinaryPredicate binaryPredicate = (BinaryPredicate) subExpr;
if (binaryPredicate.getOp() != BinaryPredicate.Operator.EQ) {
valid = false;
break CHECK;
}
} else {
valid = false;
break CHECK;
}

// left child
if (!(subExpr.getChild(0) instanceof SlotRef)) {
valid = false;
break CHECK;
}
String leftKey = ((SlotRef) subExpr.getChild(0)).getColumnName();
if (!PRIORITY.equalsIgnoreCase(leftKey)) {
valid = false;
break CHECK;
}

// right child
if (!(subExpr.getChild(1) instanceof StringLiteral)) {
valid = false;
break CHECK;
}

String value = subExpr.getChild(1).getStringValue();
if (Strings.isNullOrEmpty(value)) {
valid = false;
break CHECK;
}

jobPriority = value.toUpperCase();
try {
JobPriority.valueOf(jobPriority);
} catch (Exception e) {
valid = false;
}
}

if (!valid) {
throw new AnalysisException("Where clause should looks like: "
+ "PRIORITY = \"HIGH|MID|LOW\"");
}
}

@Override
public String toSql() {
StringBuilder sb = new StringBuilder();
sb.append("SHOW AUTO ANALYZE");

if (tableName != null) {
sb.append(" ");
sb.append(tableName.toSql());
}

if (whereClause != null) {
sb.append(" ");
sb.append("WHERE");
sb.append(" ");
sb.append(whereClause.toSql());
}

return sb.toString();
}

@Override
public String toString() {
return toSql();
}

public TableName getTableName() {
return tableName;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ public class ShowColumnStatsStmt extends ShowStmt {
.add("trigger")
.add("query_times")
.add("updated_time")
.add("update_rows")
.add("last_analyze_row_count")
.build();

private final TableName tableName;
Expand Down Expand Up @@ -162,6 +164,8 @@ public ShowResultSet constructResultSet(List<Pair<Pair<String, String>, ColumnSt
row.add(String.valueOf(colStatsMeta == null ? "N/A" : colStatsMeta.jobType));
row.add(String.valueOf(colStatsMeta == null ? "N/A" : colStatsMeta.queriedTimes));
row.add(String.valueOf(p.second.updatedTime));
row.add(String.valueOf(colStatsMeta == null ? "N/A" : colStatsMeta.updatedRows));
row.add(String.valueOf(colStatsMeta == null ? "N/A" : colStatsMeta.rowCount));
result.add(row);
});
return new ShowResultSet(getMetaData(), result);
Expand Down
27 changes: 20 additions & 7 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
Original file line number Diff line number Diff line change
Expand Up @@ -244,9 +244,11 @@
import org.apache.doris.service.ExecuteEnv;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.statistics.AnalysisManager;
import org.apache.doris.statistics.FollowerColumnSender;
import org.apache.doris.statistics.StatisticsAutoCollector;
import org.apache.doris.statistics.StatisticsCache;
import org.apache.doris.statistics.StatisticsCleaner;
import org.apache.doris.statistics.StatisticsJobAppender;
import org.apache.doris.statistics.query.QueryStats;
import org.apache.doris.system.Backend;
import org.apache.doris.system.Frontend;
Expand Down Expand Up @@ -524,6 +526,10 @@ public class Env {

private StatisticsAutoCollector statisticsAutoCollector;

private StatisticsJobAppender statisticsJobAppender;

private FollowerColumnSender followerColumnSender;

private HiveTransactionMgr hiveTransactionMgr;

private TopicPublisherThread topicPublisherThread;
Expand Down Expand Up @@ -756,6 +762,7 @@ public Env(boolean isCheckpointCatalog) {
this.analysisManager = new AnalysisManager();
this.statisticsCleaner = new StatisticsCleaner();
this.statisticsAutoCollector = new StatisticsAutoCollector();
this.statisticsJobAppender = new StatisticsJobAppender();
this.globalFunctionMgr = new GlobalFunctionMgr();
this.workloadGroupMgr = new WorkloadGroupMgr();
this.workloadSchedPolicyMgr = new WorkloadSchedPolicyMgr();
Expand Down Expand Up @@ -1058,13 +1065,6 @@ public void initialize(String[] args) throws Exception {
// If not using bdb, we need to notify the FE type transfer manually.
notifyNewFETypeTransfer(FrontendNodeType.MASTER);
}
if (statisticsCleaner != null) {
statisticsCleaner.start();
}
if (statisticsAutoCollector != null) {
statisticsAutoCollector.start();
}

queryCancelWorker.start();
}

Expand Down Expand Up @@ -1715,6 +1715,10 @@ protected void startMasterOnlyDaemonThreads() {
topicPublisherThread.addToTopicPublisherList(wpPublisher);
topicPublisherThread.start();

// auto analyze related threads.
statisticsCleaner.start();
statisticsAutoCollector.start();
statisticsJobAppender.start();
}

// start threads that should run on all FE
Expand Down Expand Up @@ -1777,6 +1781,11 @@ private void transferToNonMaster(FrontendNodeType newType) {
if (analysisManager != null) {
analysisManager.getStatisticsCache().preHeat();
}

if (followerColumnSender == null) {
followerColumnSender = new FollowerColumnSender();
followerColumnSender.start();
}
}

// Set global variable 'lower_case_table_names' only when the cluster is initialized.
Expand Down Expand Up @@ -6113,6 +6122,10 @@ public NereidsSqlCacheManager getSqlCacheManager() {
return sqlCacheManager;
}

public StatisticsJobAppender getStatisticsJobAppender() {
return statisticsJobAppender;
}

public void alterMTMVRefreshInfo(AlterMTMVRefreshInfo info) {
AlterMTMV alter = new AlterMTMV(info.getMvName(), info.getRefreshInfo(), MTMVAlterOpType.ALTER_REFRESH_INFO);
this.alter.processAlterMTMV(alter, false);
Expand Down
Loading

0 comments on commit 4c8a145

Please sign in to comment.