diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 355683230d51edb..a8dc8ff84c9e682 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -2397,9 +2397,6 @@ public class Config extends ConfigBase { @ConfField(mutable = false) public static boolean allow_analyze_statistics_info_polluting_file_cache = true; - @ConfField - public static int cpu_resource_limit_per_analyze_task = 1; - @ConfField(mutable = true) public static boolean force_sample_analyze = false; // avoid full analyze for performance reason diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateWorkloadGroupStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateWorkloadGroupStmt.java index fc4f99046d5aa9f..7b3338076be6edb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateWorkloadGroupStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateWorkloadGroupStmt.java @@ -36,11 +36,21 @@ public class CreateWorkloadGroupStmt extends DdlStmt implements NotFallbackInPar private final String workloadGroupName; private final Map properties; + private final boolean isInternal; public CreateWorkloadGroupStmt(boolean ifNotExists, String workloadGroupName, Map properties) { this.ifNotExists = ifNotExists; this.workloadGroupName = workloadGroupName; this.properties = properties; + this.isInternal = false; + } + + public CreateWorkloadGroupStmt(boolean ifNotExists, String workloadGroupName, + Map properties, boolean isInternal) { + this.ifNotExists = ifNotExists; + this.workloadGroupName = workloadGroupName; + this.properties = properties; + this.isInternal = isInternal; } public boolean isIfNotExists() { @@ -55,6 +65,10 @@ public Map getProperties() { return properties; } + public boolean isInternal() { + return isInternal; + } + @Override public void analyze(Analyzer analyzer) throws UserException { super.analyze(analyzer); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java index 87e8a0fc3b0ce85..16853a78b866071 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java @@ -21,6 +21,7 @@ import org.apache.doris.analysis.AlterTableStmt; import org.apache.doris.analysis.CreateDbStmt; import org.apache.doris.analysis.CreateTableStmt; +import org.apache.doris.analysis.CreateWorkloadGroupStmt; import org.apache.doris.analysis.DbName; import org.apache.doris.analysis.DistributionDesc; import org.apache.doris.analysis.DropTableStmt; @@ -39,6 +40,8 @@ import org.apache.doris.datasource.InternalCatalog; import org.apache.doris.ha.FrontendNodeType; import org.apache.doris.plugin.audit.AuditLoaderPlugin; +import org.apache.doris.resource.workloadgroup.WorkloadGroup; +import org.apache.doris.resource.workloadgroup.WorkloadGroupMgr; import org.apache.doris.statistics.StatisticConstants; import org.apache.doris.statistics.util.StatisticsUtil; @@ -59,6 +62,14 @@ public class InternalSchemaInitializer extends Thread { private static final Logger LOG = LogManager.getLogger(InternalSchemaInitializer.class); + // Init statistics work load group settings. Uses could change the setting using alter workload group command. + public static String CPU_SHARE = "1024"; + public static String MEMORY_LIMIT = "10%"; + public static String MAX_CONCURRENCY = "2147483647"; + public static String CPU_HARD_LIMIT = "-1"; + public static String SCAN_THREAD_NUM = "-1"; + public static String READ_BYTES_PER_SECOND = "100000000"; + public InternalSchemaInitializer() { super("InternalSchemaInitializer"); } @@ -79,6 +90,7 @@ public void run() { .join(TABLE_CREATION_RETRY_INTERVAL_IN_SECONDS * 1000L); createDb(); createTbl(); + createWorkloadGroup(); } catch (Throwable e) { LOG.warn("Statistics storage initiated failed, will try again later", e); try { @@ -174,6 +186,21 @@ public static void createTbl() throws UserException { Env.getCurrentEnv().getInternalCatalog().createTable(buildAuditTblStmt()); } + @VisibleForTesting + public static void createWorkloadGroup() throws UserException { + HashMap properties = new HashMap<>(); + properties.put(WorkloadGroup.CPU_SHARE, CPU_SHARE); + properties.put(WorkloadGroup.MEMORY_LIMIT, MEMORY_LIMIT); + properties.put(WorkloadGroup.CPU_HARD_LIMIT, CPU_HARD_LIMIT); + properties.put(WorkloadGroup.MAX_CONCURRENCY, MAX_CONCURRENCY); + properties.put(WorkloadGroup.SCAN_THREAD_NUM, SCAN_THREAD_NUM); + properties.put(WorkloadGroup.READ_BYTES_PER_SECOND, READ_BYTES_PER_SECOND); + WorkloadGroupMgr wgm = Env.getCurrentEnv().getWorkloadGroupMgr(); + CreateWorkloadGroupStmt stmt = new CreateWorkloadGroupStmt(true, + StatisticConstants.STATISTICS_WORKLOAD_GROUP_NAME, properties, true); + wgm.createWorkloadGroup(stmt); + } + @VisibleForTesting public static void createDb() { CreateDbStmt createDbStmt = new CreateDbStmt(true, @@ -258,11 +285,13 @@ private boolean created() { Env.getCurrentEnv().getInternalCatalog() .getDb(FeConstants.INTERNAL_DB_NAME); if (!optionalDatabase.isPresent()) { + LOG.info("Database " + FeConstants.INTERNAL_DB_NAME + " not exists."); return false; } Database db = optionalDatabase.get(); Optional optionalStatsTbl = db.getTable(StatisticConstants.TABLE_STATISTIC_TBL_NAME); if (!optionalStatsTbl.isPresent()) { + LOG.info("Table " + StatisticConstants.TABLE_STATISTIC_TBL_NAME + " not exists."); return false; } @@ -282,12 +311,24 @@ private boolean created() { } optionalStatsTbl = db.getTable(StatisticConstants.PARTITION_STATISTIC_TBL_NAME); if (!optionalStatsTbl.isPresent()) { + LOG.info("Table " + StatisticConstants.PARTITION_STATISTIC_TBL_NAME + " not exists."); return false; } // 3. check audit table optionalStatsTbl = db.getTable(AuditLoaderPlugin.AUDIT_LOG_TABLE); - return optionalStatsTbl.isPresent(); + if (!optionalStatsTbl.isPresent()) { + LOG.info("Table " + AuditLoaderPlugin.AUDIT_LOG_TABLE + " not exists."); + return false; + } + + // 4. check statistics work load group + boolean isReady = Env.getCurrentEnv().getWorkloadGroupMgr() + .isWorkloadGroupExists(StatisticConstants.STATISTICS_WORKLOAD_GROUP_NAME); + if (!isReady) { + LOG.info("Workload group " + StatisticConstants.STATISTICS_WORKLOAD_GROUP_NAME + " not exists."); + } + return isReady; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java index 5ddc5fb68f85994..7e9384b2cc39c32 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java @@ -40,6 +40,7 @@ import org.apache.doris.persist.gson.GsonPostProcessable; import org.apache.doris.persist.gson.GsonUtils; import org.apache.doris.qe.ConnectContext; +import org.apache.doris.statistics.StatisticConstants; import org.apache.doris.thrift.TPipelineWorkloadGroup; import org.apache.doris.thrift.TUserIdentity; import org.apache.doris.thrift.TopicInfo; @@ -336,7 +337,12 @@ public void createWorkloadGroup(CreateWorkloadGroupStmt stmt) throws DdlExceptio } throw new DdlException("workload group " + workloadGroupName + " already exist"); } - if (idToWorkloadGroup.size() >= Config.workload_group_max_num) { + if (!stmt.isInternal() && workloadGroupName.equals(StatisticConstants.STATISTICS_WORKLOAD_GROUP_NAME)) { + throw new DdlException("workload group " + StatisticConstants.STATISTICS_WORKLOAD_GROUP_NAME + + " is preserved for internal use, try another name."); + } + if (idToWorkloadGroup.size() >= Config.workload_group_max_num + && !workloadGroupName.equals(StatisticConstants.STATISTICS_WORKLOAD_GROUP_NAME)) { throw new DdlException( "workload group number can not be exceed " + Config.workload_group_max_num); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticConstants.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticConstants.java index 67de69c57dac873..6f4eae6c27e6283 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticConstants.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticConstants.java @@ -34,6 +34,7 @@ public class StatisticConstants { public static final String TABLE_STATISTIC_TBL_NAME = "column_statistics"; public static final String PARTITION_STATISTIC_TBL_NAME = "partition_statistics"; public static final String HISTOGRAM_TBL_NAME = "histogram_statistics"; + public static final String STATISTICS_WORKLOAD_GROUP_NAME = "doris_internal_statistics_workload_group"; public static final int MAX_NAME_LEN = 64; diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java index 0f5c81b1cf06929..85552177f6e6557 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java @@ -208,7 +208,7 @@ public static AutoCloseConnectContext buildConnectContext(boolean limitScan, boo SessionVariable sessionVariable = connectContext.getSessionVariable(); sessionVariable.internalSession = true; sessionVariable.setMaxExecMemByte(Config.statistics_sql_mem_limit_in_bytes); - sessionVariable.cpuResourceLimit = Config.cpu_resource_limit_per_analyze_task; + sessionVariable.setWorkloadGroup(StatisticConstants.STATISTICS_WORKLOAD_GROUP_NAME); sessionVariable.setEnableInsertStrict(true); sessionVariable.setInsertMaxFilterRatio(1.0); sessionVariable.enablePageCache = false; diff --git a/regression-test/suites/statistics/test_statistics_workload_group.groovy b/regression-test/suites/statistics/test_statistics_workload_group.groovy new file mode 100644 index 000000000000000..093ec30f82842d3 --- /dev/null +++ b/regression-test/suites/statistics/test_statistics_workload_group.groovy @@ -0,0 +1,38 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_statistics_workload_group") { + + // Disable for now. + return + + def result = sql """show workload groups like 'doris_internal_statistics_workload_group'""" + assertEquals(result.size(), 1) + assertEquals(result[0][15], "-1") + + sql """alter workload group doris_internal_statistics_workload_group properties ("read_bytes_per_second"="1222");""" + result = sql """show workload groups like 'doris_internal_statistics_workload_group'""" + assertEquals(result.size(), 1) + assertEquals(result[0][15], "1222") + + sql """alter workload group doris_internal_statistics_workload_group properties ("read_bytes_per_second"="-1");""" + result = sql """show workload groups like 'doris_internal_statistics_workload_group'""" + assertEquals(result.size(), 1) + assertEquals(result[0][15], "-1") + +} +