Skip to content

Commit

Permalink
[improvement](executor)Add tag property for workload group #32874
Browse files Browse the repository at this point in the history
  • Loading branch information
wangbo authored and Doris-Extras committed Mar 31, 2024
1 parent 71869a1 commit 267b836
Show file tree
Hide file tree
Showing 8 changed files with 190 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ std::vector<SchemaScanner::ColumnDesc> SchemaWorkloadGroupsScanner::_s_tbls_colu
{"MIN_REMOTE_SCAN_THREAD_NUM", TYPE_BIGINT, sizeof(int64_t), true},
{"SPILL_THRESHOLD_LOW_WATERMARK", TYPE_VARCHAR, sizeof(StringRef), true},
{"SPILL_THRESHOLD_HIGH_WATERMARK", TYPE_VARCHAR, sizeof(StringRef), true},
{"TAG", TYPE_VARCHAR, sizeof(StringRef), true},
};

SchemaWorkloadGroupsScanner::SchemaWorkloadGroupsScanner()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.doris.common.util.PrintableMap;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.resource.workloadgroup.WorkloadGroup;

import java.util.Map;

Expand Down Expand Up @@ -69,6 +70,11 @@ public void analyze(Analyzer analyzer) throws UserException {
if (properties == null || properties.isEmpty()) {
throw new AnalysisException("Resource group properties can't be null");
}

String wgTag = properties.get(WorkloadGroup.TAG);
if (wgTag != null) {
FeNameFormat.checkCommonName("workload group tag", wgTag);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,7 @@ public class SchemaTable extends Table {
.column("MIN_REMOTE_SCAN_THREAD_NUM", ScalarType.createType(PrimitiveType.BIGINT))
.column("SPILL_THRESHOLD_LOW_WATERMARK", ScalarType.createVarchar(256))
.column("SPILL_THRESHOLD_HIGH_WATERMARK", ScalarType.createVarchar(256))
.column("TAG", ScalarType.createVarchar(256))
.build()))
.build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,17 @@ public class WorkloadGroup implements Writable, GsonPostProcessable {

public static final String SPILL_THRESHOLD_HIGH_WATERMARK = "spill_threshold_high_watermark";

public static final String TAG = "tag";

// NOTE(wb): all property is not required, some properties default value is set in be
// default value is as followed
// cpu_share=1024, memory_limit=0%(0 means not limit), enable_memory_overcommit=true
private static final ImmutableSet<String> ALL_PROPERTIES_NAME = new ImmutableSet.Builder<String>()
.add(CPU_SHARE).add(MEMORY_LIMIT).add(ENABLE_MEMORY_OVERCOMMIT).add(MAX_CONCURRENCY)
.add(MAX_QUEUE_SIZE).add(QUEUE_TIMEOUT).add(CPU_HARD_LIMIT).add(SCAN_THREAD_NUM)
.add(MAX_REMOTE_SCAN_THREAD_NUM).add(MIN_REMOTE_SCAN_THREAD_NUM)
.add(SPILL_THRESHOLD_LOW_WATERMARK).add(SPILL_THRESHOLD_HIGH_WATERMARK).build();
.add(SPILL_THRESHOLD_LOW_WATERMARK).add(SPILL_THRESHOLD_HIGH_WATERMARK)
.add(TAG).build();

public static final int SPILL_LOW_WATERMARK_DEFAULT_VALUE = 50;
public static final int SPILL_HIGH_WATERMARK_DEFAULT_VALUE = 80;
Expand Down Expand Up @@ -142,6 +145,9 @@ private WorkloadGroup(long id, String name, Map<String, String> properties, long
}
this.properties.put(SPILL_THRESHOLD_HIGH_WATERMARK, highWatermarkStr);
}
if (properties.containsKey(TAG)) {
this.properties.put(TAG, properties.get(TAG).toLowerCase());
}
resetQueueProperty(properties);
}

Expand Down Expand Up @@ -206,11 +212,25 @@ private static void checkProperties(Map<String, String> properties) throws DdlEx

if (properties.containsKey(CPU_HARD_LIMIT)) {
String cpuHardLimit = properties.get(CPU_HARD_LIMIT);
if (cpuHardLimit.endsWith("%")) {
cpuHardLimit = cpuHardLimit.substring(0, cpuHardLimit.length() - 1);
}
if (!StringUtils.isNumeric(cpuHardLimit) || Long.parseLong(cpuHardLimit) <= 0) {
throw new DdlException(CPU_HARD_LIMIT + " " + cpuHardLimit + " requires a positive integer.");
String originValue = cpuHardLimit;
try {
boolean endWithSign = false;
if (cpuHardLimit.endsWith("%")) {
cpuHardLimit = cpuHardLimit.substring(0, cpuHardLimit.length() - 1);
endWithSign = true;
}

int intVal = Integer.parseInt(cpuHardLimit);
if (endWithSign && intVal == -1) {
throw new NumberFormatException();
}
if (!(intVal >= 1 && intVal <= 100) && -1 != intVal) {
throw new NumberFormatException();
}
} catch (NumberFormatException e) {
throw new DdlException(
"workload group's " + WorkloadGroup.CPU_HARD_LIMIT
+ " must be a positive integer[1,100] or -1, but input value is " + originValue);
}
}

Expand Down Expand Up @@ -395,7 +415,9 @@ public void getProcNodeData(BaseProcResult result, QueryQueue qq) {
if (CPU_HARD_LIMIT.equals(key)) {
String val = properties.get(key);
if (StringUtils.isEmpty(val)) { // cpu_hard_limit is not required
row.add("0%");
row.add("-1");
} else if ("-1".equals(val)) {
row.add(val);
} else {
row.add(val + "%");
}
Expand Down Expand Up @@ -431,6 +453,13 @@ public void getProcNodeData(BaseProcResult result, QueryQueue qq) {
row.add(qq == null ? "0" : String.valueOf(qq.getCurrentRunningQueryNum()));
} else if (QueryQueue.WAITING_QUERY_NUM.equals(key)) {
row.add(qq == null ? "0" : String.valueOf(qq.getCurrentWaitingQueryNum()));
} else if (TAG.equals(key)) {
String val = properties.get(key);
if (StringUtils.isEmpty(val)) {
row.add("");
} else {
row.add(val);
}
} else {
row.add(properties.get(key));
}
Expand All @@ -442,6 +471,10 @@ public int getCpuHardLimit() {
return cpuHardLimit;
}

public String getTag() {
return properties.get(TAG);
}

@Override
public String toString() {
return GsonUtils.GSON.toJson(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.gson.annotations.SerializedName;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand All @@ -59,7 +60,6 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class WorkloadGroupMgr implements Writable, GsonPostProcessable {
Expand All @@ -73,6 +73,7 @@ public class WorkloadGroupMgr implements Writable, GsonPostProcessable {
.add(WorkloadGroup.SCAN_THREAD_NUM).add(WorkloadGroup.MAX_REMOTE_SCAN_THREAD_NUM)
.add(WorkloadGroup.MIN_REMOTE_SCAN_THREAD_NUM)
.add(WorkloadGroup.SPILL_THRESHOLD_LOW_WATERMARK).add(WorkloadGroup.SPILL_THRESHOLD_HIGH_WATERMARK)
.add(WorkloadGroup.TAG)
.add(QueryQueue.RUNNING_QUERY_NUM).add(QueryQueue.WAITING_QUERY_NUM)
.build();

Expand Down Expand Up @@ -301,38 +302,44 @@ public void createWorkloadGroup(CreateWorkloadGroupStmt stmt) throws DdlExceptio
LOG.info("Create workload group success: {}", workloadGroup);
}

private void checkGlobalUnlock(WorkloadGroup workloadGroup, WorkloadGroup old) throws DdlException {
double totalMemoryLimit = idToWorkloadGroup.values().stream().mapToDouble(WorkloadGroup::getMemoryLimitPercent)
.sum() + workloadGroup.getMemoryLimitPercent();
if (!Objects.isNull(old)) {
totalMemoryLimit -= old.getMemoryLimitPercent();
}
if (totalMemoryLimit > 100.0 + 1e-6) {
throw new DdlException(
"The sum of all workload group " + WorkloadGroup.MEMORY_LIMIT + " cannot be greater than 100.0%.");
}

// 1, check new group
int newGroupCpuHardLimit = workloadGroup.getCpuHardLimit();
if (newGroupCpuHardLimit > 100 || newGroupCpuHardLimit < 0) {
throw new DdlException(
"new group's " + WorkloadGroup.CPU_HARD_LIMIT
+ " value can not be greater than 100% or less than or equal 0%");
}

// 2, check sum of all cpu hard limit
// NOTE: used for checking sum value of 100% for cpu_hard_limit and memory_limit
// when create/alter workload group with same tag.
// when oldWg is null it means caller is an alter stmt.
private void checkGlobalUnlock(WorkloadGroup newWg, WorkloadGroup oldWg) throws DdlException {
String wgTag = newWg.getTag();
double sumOfAllMemLimit = 0;
int sumOfAllCpuHardLimit = 0;
for (Map.Entry<Long, WorkloadGroup> entry : idToWorkloadGroup.entrySet()) {
if (old != null && entry.getKey() == old.getId()) {
WorkloadGroup wg = entry.getValue();
if (!StringUtils.equals(wgTag, wg.getTag())) {
continue;
}

if (oldWg != null && entry.getKey() == oldWg.getId()) {
continue;
}
sumOfAllCpuHardLimit += entry.getValue().getCpuHardLimit();

if (wg.getCpuHardLimit() > 0) {
sumOfAllCpuHardLimit += wg.getCpuHardLimit();
}
if (wg.getMemoryLimitPercent() > 0) {
sumOfAllMemLimit += wg.getMemoryLimitPercent();
}
}

sumOfAllMemLimit += newWg.getMemoryLimitPercent();
sumOfAllCpuHardLimit += newWg.getCpuHardLimit();

if (sumOfAllMemLimit > 100.0 + 1e-6) {
throw new DdlException(
"The sum of all workload group " + WorkloadGroup.MEMORY_LIMIT + " within tag " + wgTag
+ " cannot be greater than 100.0%.");
}
sumOfAllCpuHardLimit += newGroupCpuHardLimit;

if (sumOfAllCpuHardLimit > 100) {
throw new DdlException("sum of all workload group " + WorkloadGroup.CPU_HARD_LIMIT
+ " can not be greater than 100% ");
throw new DdlException(
"sum of all workload group " + WorkloadGroup.CPU_HARD_LIMIT + " within tag "
+ wgTag + " can not be greater than 100% ");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -431,10 +431,11 @@ private static TFetchSchemaTableDataResult workloadGroupsMetadataResult(TSchemaT
trow.addToColumnValue(new TCell().setLongVal(Long.valueOf(rGroupsInfo.get(10))));
// min remote scan thread num
trow.addToColumnValue(new TCell().setLongVal(Long.valueOf(rGroupsInfo.get(11))));
trow.addToColumnValue(new TCell().setStringVal(rGroupsInfo.get(12)));
trow.addToColumnValue(new TCell().setStringVal(rGroupsInfo.get(13)));
trow.addToColumnValue(new TCell().setStringVal(rGroupsInfo.get(14)));
trow.addToColumnValue(new TCell().setStringVal(rGroupsInfo.get(15)));
trow.addToColumnValue(new TCell().setStringVal(rGroupsInfo.get(12))); // spill low watermark
trow.addToColumnValue(new TCell().setStringVal(rGroupsInfo.get(13))); // spill high watermark
trow.addToColumnValue(new TCell().setStringVal(rGroupsInfo.get(14))); // tag
trow.addToColumnValue(new TCell().setStringVal(rGroupsInfo.get(15))); // running query num
trow.addToColumnValue(new TCell().setStringVal(rGroupsInfo.get(16))); // waiting query num
dataBatch.add(trow);
}

Expand Down
24 changes: 16 additions & 8 deletions regression-test/data/workload_manager_p0/test_curd_wlg.out
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,15 @@
2

-- !show_1 --
normal 20 50% true 2147483647 0 0 1% 16
test_group 10 10% true 2147483647 0 0 0% -1
normal 20 50% true 2147483647 0 0 1% 16
test_group 10 10% true 2147483647 0 0 -1 -1

-- !mem_limit_1 --
2

-- !mem_limit_2 --
normal 20 50% true 2147483647 0 0 1% 16
test_group 10 11% true 2147483647 0 0 0% -1
test_group 10 11% true 2147483647 0 0 -1 -1

-- !mem_overcommit_1 --
2
Expand All @@ -24,7 +24,7 @@ test_group 10 11% true 2147483647 0 0 0% -1

-- !mem_overcommit_3 --
normal 20 50% true 2147483647 0 0 1% 16
test_group 10 11% false 2147483647 0 0 0% -1
test_group 10 11% false 2147483647 0 0 -1 -1

-- !cpu_hard_limit_1 --
2
Expand All @@ -45,14 +45,22 @@ normal 20 50% true 2147483647 0 0 1% 16
test_group 10 11% false 100 0 0 20% -1

-- !show_spill_1 --
spill_group_test 1024 0% true 2147483647 0 0 0% -1 10% 10%
spill_group_test 1024 0% true 2147483647 0 0 -1 -1 10% 10%

-- !show_spill_1 --
spill_group_test 1024 0% true 2147483647 0 0 0% -1 -1 10%
spill_group_test 1024 0% true 2147483647 0 0 -1 -1 -1 10%

-- !show_spill_2 --
spill_group_test 1024 0% true 2147483647 0 0 0% -1 5% 10%
spill_group_test 1024 0% true 2147483647 0 0 -1 -1 5% 10%

-- !show_spill_3 --
spill_group_test 1024 0% true 2147483647 0 0 0% -1 5% 40%
spill_group_test 1024 0% true 2147483647 0 0 -1 -1 5% 40%

-- !show_wg_tag --
tag1_mem_wg1 50% -1 mem_tag1
tag1_mem_wg2 49% -1 mem_tag1
tag1_mem_wg3 1% -1 mem_tag1
tag1_wg1 0% 10% tag1
tag1_wg2 0% 10% tag1
tag1_wg3 0% 80% tag1

Loading

0 comments on commit 267b836

Please sign in to comment.