Skip to content

Commit

Permalink
Refactor of statistics buildConnectContext.
Browse files Browse the repository at this point in the history
  • Loading branch information
Jibing-Li committed Oct 8, 2024
1 parent 0e5fd2f commit a1d8610
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ public long fetchRowCount() {
params.put("tblName", name);
switch (((JdbcExternalCatalog) catalog).getDatabaseTypeName()) {
case JdbcResource.MYSQL:
try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext(false, false)) {
try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext(false)) {
StringSubstitutor stringSubstitutor = new StringSubstitutor(params);
String sql = stringSubstitutor.replace(MYSQL_ROW_COUNT_SQL);
StmtExecutor stmtExecutor = new StmtExecutor(r.connectContext, sql);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ protected void flushBuffer() {
values.add(data.toSQL(true));
}
insertStmt += values.toString();
try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext()) {
try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext(false)) {
stmtExecutor = new StmtExecutor(r.connectContext, insertStmt);
executeWithExceptionOnFail(stmtExecutor);
} catch (Exception t) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,7 @@ protected String castToNumeric(String colName) {
protected void runQuery(String sql) {
long startTime = System.currentTimeMillis();
String queryId = "";
try (AutoCloseConnectContext a = StatisticsUtil.buildConnectContext()) {
try (AutoCloseConnectContext a = StatisticsUtil.buildConnectContext(false)) {
stmtExecutor = new StmtExecutor(a.connectContext, sql);
ColStatsData colStatsData = new ColStatsData(stmtExecutor.executeInternalQuery().get(0));
// Update index row count after analyze.
Expand Down Expand Up @@ -509,7 +509,7 @@ protected void runQuery(String sql) {
}

protected void runInsert(String sql) throws Exception {
try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext()) {
try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext(false)) {
stmtExecutor = new StmtExecutor(r.connectContext, sql);
try {
stmtExecutor.execute();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.qe.AutoCloseConnectContext;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.statistics.AnalysisInfo.JobType;
import org.apache.doris.statistics.util.StatisticsUtil;

import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -113,63 +112,60 @@ protected void doSample() {
String tabletStr = tabletIds.stream()
.map(Object::toString)
.collect(Collectors.joining(", "));
try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext(
info.jobType.equals(JobType.SYSTEM), false)) {
// Get basic stats, including min and max.
ResultRow basicStats = collectBasicStat(r);
String min = StatisticsUtil.escapeSQL(basicStats != null && basicStats.getValues().size() > 0
? basicStats.get(0) : null);
String max = StatisticsUtil.escapeSQL(basicStats != null && basicStats.getValues().size() > 1
? basicStats.get(1) : null);
// Get basic stats, including min and max.
ResultRow basicStats = collectBasicStat();
String min = StatisticsUtil.escapeSQL(basicStats != null && basicStats.getValues().size() > 0
? basicStats.get(0) : null);
String max = StatisticsUtil.escapeSQL(basicStats != null && basicStats.getValues().size() > 1
? basicStats.get(1) : null);

boolean limitFlag = false;
long rowsToSample = pair.second;
Map<String, String> params = buildSqlParams();
params.put("scaleFactor", String.valueOf(scaleFactor));
params.put("sampleHints", tabletStr.isEmpty() ? "" : String.format("TABLET(%s)", tabletStr));
params.put("ndvFunction", getNdvFunction(String.valueOf(totalRowCount)));
params.put("min", StatisticsUtil.quote(min));
params.put("max", StatisticsUtil.quote(max));
params.put("rowCount", String.valueOf(totalRowCount));
params.put("type", col.getType().toString());
params.put("limit", "");
if (needLimit()) {
// If the tablets to be sampled are too large, use limit to control the rows to read, and re-calculate
// the scaleFactor.
rowsToSample = Math.min(getSampleRows(), pair.second);
// Empty table doesn't need to limit.
if (rowsToSample > 0) {
limitFlag = true;
params.put("limit", "limit " + rowsToSample);
params.put("scaleFactor", String.valueOf(scaleFactor * (double) pair.second / rowsToSample));
}
boolean limitFlag = false;
long rowsToSample = pair.second;
Map<String, String> params = buildSqlParams();
params.put("scaleFactor", String.valueOf(scaleFactor));
params.put("sampleHints", tabletStr.isEmpty() ? "" : String.format("TABLET(%s)", tabletStr));
params.put("ndvFunction", getNdvFunction(String.valueOf(totalRowCount)));
params.put("min", StatisticsUtil.quote(min));
params.put("max", StatisticsUtil.quote(max));
params.put("rowCount", String.valueOf(totalRowCount));
params.put("type", col.getType().toString());
params.put("limit", "");
if (needLimit()) {
// If the tablets to be sampled are too large, use limit to control the rows to read, and re-calculate
// the scaleFactor.
rowsToSample = Math.min(getSampleRows(), pair.second);
// Empty table doesn't need to limit.
if (rowsToSample > 0) {
limitFlag = true;
params.put("limit", "limit " + rowsToSample);
params.put("scaleFactor", String.valueOf(scaleFactor * (double) pair.second / rowsToSample));
}
StringSubstitutor stringSubstitutor = new StringSubstitutor(params);
String sql;
if (useLinearAnalyzeTemplate()) {
// For single unique key, use count as ndv.
if (isSingleUniqueKey()) {
params.put("ndvFunction", String.valueOf(totalRowCount));
} else {
params.put("ndvFunction", "ROUND(NDV(`${colName}`) * ${scaleFactor})");
}
sql = stringSubstitutor.replace(LINEAR_ANALYZE_TEMPLATE);
}
StringSubstitutor stringSubstitutor = new StringSubstitutor(params);
String sql;
if (useLinearAnalyzeTemplate()) {
// For single unique key, use count as ndv.
if (isSingleUniqueKey()) {
params.put("ndvFunction", String.valueOf(totalRowCount));
} else {
params.put("dataSizeFunction", getDataSizeFunction(col, true));
params.put("subStringColName", getStringTypeColName(col));
sql = stringSubstitutor.replace(DUJ1_ANALYZE_TEMPLATE);
params.put("ndvFunction", "ROUND(NDV(`${colName}`) * ${scaleFactor})");
}
LOG.info("Sample for column [{}]. Total rows [{}], rows to sample [{}], scale factor [{}], "
+ "limited [{}], distribute column [{}], partition column [{}], key column [{}], "
+ "is single unique key [{}]",
col.getName(), params.get("rowCount"), rowsToSample, params.get("scaleFactor"),
limitFlag, tbl.isDistributionColumn(col.getName()),
tbl.isPartitionColumn(col.getName()), col.isKey(), isSingleUniqueKey());
runQuery(sql);
sql = stringSubstitutor.replace(LINEAR_ANALYZE_TEMPLATE);
} else {
params.put("dataSizeFunction", getDataSizeFunction(col, true));
params.put("subStringColName", getStringTypeColName(col));
sql = stringSubstitutor.replace(DUJ1_ANALYZE_TEMPLATE);
}
LOG.info("Sample for column [{}]. Total rows [{}], rows to sample [{}], scale factor [{}], "
+ "limited [{}], distribute column [{}], partition column [{}], key column [{}], "
+ "is single unique key [{}]",
col.getName(), params.get("rowCount"), rowsToSample, params.get("scaleFactor"),
limitFlag, tbl.isDistributionColumn(col.getName()),
tbl.isPartitionColumn(col.getName()), col.isKey(), isSingleUniqueKey());
runQuery(sql);
}

protected ResultRow collectBasicStat(AutoCloseConnectContext context) {
protected ResultRow collectBasicStat() {
// Agg table value columns has no zone map.
// For these columns, skip collecting min and max value to avoid scan whole table.
if (((OlapTable) tbl).getKeysType().equals(KeysType.AGG_KEYS) && !col.isKey()) {
Expand All @@ -181,14 +177,17 @@ protected ResultRow collectBasicStat(AutoCloseConnectContext context) {
Map<String, String> params = buildSqlParams();
StringSubstitutor stringSubstitutor = new StringSubstitutor(params);
String sql = stringSubstitutor.replace(BASIC_STATS_TEMPLATE);
stmtExecutor = new StmtExecutor(context.connectContext, sql);
ResultRow resultRow = stmtExecutor.executeInternalQuery().get(0);
if (LOG.isDebugEnabled()) {
LOG.debug("Cost time in millisec: " + (System.currentTimeMillis() - startTime)
+ " Min max SQL: " + sql + " QueryId: " + DebugUtil.printId(stmtExecutor.getContext().queryId()));
ResultRow resultRow = null;
try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext(false)) {
stmtExecutor = new StmtExecutor(r.connectContext, sql);
resultRow = stmtExecutor.executeInternalQuery().get(0);
if (LOG.isDebugEnabled()) {
LOG.debug("Cost time in millisec: " + (System.currentTimeMillis() - startTime) + " Min max SQL: "
+ sql + " QueryId: " + DebugUtil.printId(stmtExecutor.getContext().queryId()));
}
// Release the reference to stmtExecutor, reduce memory usage.
stmtExecutor = null;
}
// Release the reference to stmtExecutor, reduce memory usage.
stmtExecutor = null;
return resultRow;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ public static List<ResultRow> execStatisticQuery(String sql, boolean enableFileC
return Collections.emptyList();
}
boolean useFileCacheForStat = (enableFileCache && Config.allow_analyze_statistics_info_polluting_file_cache);
try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext(false, useFileCacheForStat)) {
try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext(useFileCacheForStat)) {
if (Config.isCloudMode()) {
try {
r.connectContext.getCloudCluster();
Expand All @@ -164,7 +164,7 @@ public static List<ResultRow> execStatisticQuery(String sql, boolean enableFileC

public static QueryState execUpdate(String sql) throws Exception {
StmtExecutor stmtExecutor = null;
AutoCloseConnectContext r = StatisticsUtil.buildConnectContext();
AutoCloseConnectContext r = StatisticsUtil.buildConnectContext(false);
try {
stmtExecutor = new StmtExecutor(r.connectContext, sql);
r.connectContext.setExecutor(stmtExecutor);
Expand Down Expand Up @@ -202,11 +202,7 @@ public static PartitionColumnStatistic deserializeToPartitionStatistics(List<Res
return PartitionColumnStatistic.fromResultRow(resultBatches);
}

public static AutoCloseConnectContext buildConnectContext() {
return buildConnectContext(false, false);
}

public static AutoCloseConnectContext buildConnectContext(boolean limitScan, boolean useFileCacheForStat) {
public static AutoCloseConnectContext buildConnectContext(boolean useFileCacheForStat) {
ConnectContext connectContext = new ConnectContext();
SessionVariable sessionVariable = connectContext.getSessionVariable();
sessionVariable.internalSession = true;
Expand All @@ -218,7 +214,6 @@ public static AutoCloseConnectContext buildConnectContext(boolean limitScan, boo
sessionVariable.enableProfile = Config.enable_profile_when_analyze;
sessionVariable.parallelExecInstanceNum = Config.statistics_sql_parallel_exec_instance_num;
sessionVariable.parallelPipelineTaskNum = Config.statistics_sql_parallel_exec_instance_num;
sessionVariable.enableScanRunSerial = limitScan;
sessionVariable.setQueryTimeoutS(StatisticsUtil.getAnalyzeTimeout());
sessionVariable.insertTimeoutS = StatisticsUtil.getAnalyzeTimeout();
sessionVariable.enableFileCache = false;
Expand Down Expand Up @@ -248,7 +243,7 @@ public static AutoCloseConnectContext buildConnectContext(boolean limitScan, boo
}

public static void analyze(StatementBase statementBase) throws UserException {
try (AutoCloseConnectContext r = buildConnectContext()) {
try (AutoCloseConnectContext r = buildConnectContext(false)) {
Analyzer analyzer = new Analyzer(Env.getCurrentEnv(), r.connectContext);
statementBase.analyze(analyzer);
}
Expand Down Expand Up @@ -496,7 +491,7 @@ public static boolean statsTblAvailable() {
LOG.info("there are no available backends");
return false;
}
try (AutoCloseConnectContext r = buildConnectContext()) {
try (AutoCloseConnectContext r = buildConnectContext(false)) {
try {
r.connectContext.getCloudCluster();
} catch (ComputeGroupException e) {
Expand Down

0 comments on commit a1d8610

Please sign in to comment.