Skip to content

Commit

Permalink
Use workload group to limit statistics io.
Browse files Browse the repository at this point in the history
  • Loading branch information
Jibing-Li committed Aug 30, 2024
1 parent f852385 commit fcab3f8
Show file tree
Hide file tree
Showing 7 changed files with 103 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2397,9 +2397,6 @@ public class Config extends ConfigBase {
@ConfField(mutable = false)
public static boolean allow_analyze_statistics_info_polluting_file_cache = true;

@ConfField
public static int cpu_resource_limit_per_analyze_task = 1;

@ConfField(mutable = true)
public static boolean force_sample_analyze = false; // avoid full analyze for performance reason

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,21 @@ public class CreateWorkloadGroupStmt extends DdlStmt implements NotFallbackInPar

private final String workloadGroupName;
private final Map<String, String> properties;
private final boolean isInternal;

public CreateWorkloadGroupStmt(boolean ifNotExists, String workloadGroupName, Map<String, String> properties) {
this.ifNotExists = ifNotExists;
this.workloadGroupName = workloadGroupName;
this.properties = properties;
this.isInternal = false;
}

public CreateWorkloadGroupStmt(boolean ifNotExists, String workloadGroupName,
Map<String, String> properties, boolean isInternal) {
this.ifNotExists = ifNotExists;
this.workloadGroupName = workloadGroupName;
this.properties = properties;
this.isInternal = isInternal;
}

public boolean isIfNotExists() {
Expand All @@ -55,6 +65,10 @@ public Map<String, String> getProperties() {
return properties;
}

public boolean isInternal() {
return isInternal;
}

@Override
public void analyze(Analyzer analyzer) throws UserException {
super.analyze(analyzer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.doris.analysis.AlterTableStmt;
import org.apache.doris.analysis.CreateDbStmt;
import org.apache.doris.analysis.CreateTableStmt;
import org.apache.doris.analysis.CreateWorkloadGroupStmt;
import org.apache.doris.analysis.DbName;
import org.apache.doris.analysis.DistributionDesc;
import org.apache.doris.analysis.DropTableStmt;
Expand All @@ -39,6 +40,8 @@
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.ha.FrontendNodeType;
import org.apache.doris.plugin.audit.AuditLoaderPlugin;
import org.apache.doris.resource.workloadgroup.WorkloadGroup;
import org.apache.doris.resource.workloadgroup.WorkloadGroupMgr;
import org.apache.doris.statistics.StatisticConstants;
import org.apache.doris.statistics.util.StatisticsUtil;

Expand All @@ -59,6 +62,14 @@ public class InternalSchemaInitializer extends Thread {

private static final Logger LOG = LogManager.getLogger(InternalSchemaInitializer.class);

// Init statistics work load group settings. Uses could change the setting using alter workload group command.
public static String CPU_SHARE = "1024";
public static String MEMORY_LIMIT = "10%";
public static String MAX_CONCURRENCY = "2147483647";
public static String CPU_HARD_LIMIT = "-1";
public static String SCAN_THREAD_NUM = "-1";
public static String READ_BYTES_PER_SECOND = "100000000";

public InternalSchemaInitializer() {
super("InternalSchemaInitializer");
}
Expand All @@ -79,6 +90,7 @@ public void run() {
.join(TABLE_CREATION_RETRY_INTERVAL_IN_SECONDS * 1000L);
createDb();
createTbl();
createWorkloadGroup();
} catch (Throwable e) {
LOG.warn("Statistics storage initiated failed, will try again later", e);
try {
Expand Down Expand Up @@ -174,6 +186,21 @@ public static void createTbl() throws UserException {
Env.getCurrentEnv().getInternalCatalog().createTable(buildAuditTblStmt());
}

@VisibleForTesting
public static void createWorkloadGroup() throws UserException {
HashMap<String, String> properties = new HashMap<>();
properties.put(WorkloadGroup.CPU_SHARE, CPU_SHARE);
properties.put(WorkloadGroup.MEMORY_LIMIT, MEMORY_LIMIT);
properties.put(WorkloadGroup.CPU_HARD_LIMIT, CPU_HARD_LIMIT);
properties.put(WorkloadGroup.MAX_CONCURRENCY, MAX_CONCURRENCY);
properties.put(WorkloadGroup.SCAN_THREAD_NUM, SCAN_THREAD_NUM);
properties.put(WorkloadGroup.READ_BYTES_PER_SECOND, READ_BYTES_PER_SECOND);
WorkloadGroupMgr wgm = Env.getCurrentEnv().getWorkloadGroupMgr();
CreateWorkloadGroupStmt stmt = new CreateWorkloadGroupStmt(true,
StatisticConstants.STATISTICS_WORKLOAD_GROUP_NAME, properties, true);
wgm.createWorkloadGroup(stmt);
}

@VisibleForTesting
public static void createDb() {
CreateDbStmt createDbStmt = new CreateDbStmt(true,
Expand Down Expand Up @@ -258,11 +285,13 @@ private boolean created() {
Env.getCurrentEnv().getInternalCatalog()
.getDb(FeConstants.INTERNAL_DB_NAME);
if (!optionalDatabase.isPresent()) {
LOG.info("Database " + FeConstants.INTERNAL_DB_NAME + " not exists.");
return false;
}
Database db = optionalDatabase.get();
Optional<Table> optionalStatsTbl = db.getTable(StatisticConstants.TABLE_STATISTIC_TBL_NAME);
if (!optionalStatsTbl.isPresent()) {
LOG.info("Table " + StatisticConstants.TABLE_STATISTIC_TBL_NAME + " not exists.");
return false;
}

Expand All @@ -282,12 +311,24 @@ private boolean created() {
}
optionalStatsTbl = db.getTable(StatisticConstants.PARTITION_STATISTIC_TBL_NAME);
if (!optionalStatsTbl.isPresent()) {
LOG.info("Table " + StatisticConstants.PARTITION_STATISTIC_TBL_NAME + " not exists.");
return false;
}

// 3. check audit table
optionalStatsTbl = db.getTable(AuditLoaderPlugin.AUDIT_LOG_TABLE);
return optionalStatsTbl.isPresent();
if (!optionalStatsTbl.isPresent()) {
LOG.info("Table " + AuditLoaderPlugin.AUDIT_LOG_TABLE + " not exists.");
return false;
}

// 4. check statistics work load group
boolean isReady = Env.getCurrentEnv().getWorkloadGroupMgr()
.isWorkloadGroupExists(StatisticConstants.STATISTICS_WORKLOAD_GROUP_NAME);
if (!isReady) {
LOG.info("Workload group " + StatisticConstants.STATISTICS_WORKLOAD_GROUP_NAME + " not exists.");
}
return isReady;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.doris.persist.gson.GsonPostProcessable;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.statistics.StatisticConstants;
import org.apache.doris.thrift.TPipelineWorkloadGroup;
import org.apache.doris.thrift.TUserIdentity;
import org.apache.doris.thrift.TopicInfo;
Expand Down Expand Up @@ -336,7 +337,12 @@ public void createWorkloadGroup(CreateWorkloadGroupStmt stmt) throws DdlExceptio
}
throw new DdlException("workload group " + workloadGroupName + " already exist");
}
if (idToWorkloadGroup.size() >= Config.workload_group_max_num) {
if (!stmt.isInternal() && workloadGroupName.equals(StatisticConstants.STATISTICS_WORKLOAD_GROUP_NAME)) {
throw new DdlException("workload group " + StatisticConstants.STATISTICS_WORKLOAD_GROUP_NAME
+ " is preserved for internal use, try another name.");
}
if (idToWorkloadGroup.size() >= Config.workload_group_max_num
&& !workloadGroupName.equals(StatisticConstants.STATISTICS_WORKLOAD_GROUP_NAME)) {
throw new DdlException(
"workload group number can not be exceed " + Config.workload_group_max_num);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public class StatisticConstants {
public static final String TABLE_STATISTIC_TBL_NAME = "column_statistics";
public static final String PARTITION_STATISTIC_TBL_NAME = "partition_statistics";
public static final String HISTOGRAM_TBL_NAME = "histogram_statistics";
public static final String STATISTICS_WORKLOAD_GROUP_NAME = "doris_internal_statistics_workload_group";

public static final int MAX_NAME_LEN = 64;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ public static AutoCloseConnectContext buildConnectContext(boolean limitScan, boo
SessionVariable sessionVariable = connectContext.getSessionVariable();
sessionVariable.internalSession = true;
sessionVariable.setMaxExecMemByte(Config.statistics_sql_mem_limit_in_bytes);
sessionVariable.cpuResourceLimit = Config.cpu_resource_limit_per_analyze_task;
sessionVariable.setWorkloadGroup(StatisticConstants.STATISTICS_WORKLOAD_GROUP_NAME);
sessionVariable.setEnableInsertStrict(true);
sessionVariable.setInsertMaxFilterRatio(1.0);
sessionVariable.enablePageCache = false;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

suite("test_statistics_workload_group") {

// Disable for now.
return

def result = sql """show workload groups like 'doris_internal_statistics_workload_group'"""
assertEquals(result.size(), 1)
assertEquals(result[0][15], "-1")

sql """alter workload group doris_internal_statistics_workload_group properties ("read_bytes_per_second"="1222");"""
result = sql """show workload groups like 'doris_internal_statistics_workload_group'"""
assertEquals(result.size(), 1)
assertEquals(result[0][15], "1222")

sql """alter workload group doris_internal_statistics_workload_group properties ("read_bytes_per_second"="-1");"""
result = sql """show workload groups like 'doris_internal_statistics_workload_group'"""
assertEquals(result.size(), 1)
assertEquals(result[0][15], "-1")

}

0 comments on commit fcab3f8

Please sign in to comment.