From 267b8363c2a7a6f4790ef7c52e36e0b90d41f4da Mon Sep 17 00:00:00 2001 From: wangbo Date: Thu, 28 Mar 2024 17:31:47 +0800 Subject: [PATCH] [improvement](executor)Add tag property for workload group #32874 --- .../schema_workload_groups_scanner.cpp | 1 + .../analysis/CreateWorkloadGroupStmt.java | 6 ++ .../org/apache/doris/catalog/SchemaTable.java | 1 + .../resource/workloadgroup/WorkloadGroup.java | 47 ++++++++-- .../workloadgroup/WorkloadGroupMgr.java | 59 ++++++------ .../tablefunction/MetadataGenerator.java | 9 +- .../workload_manager_p0/test_curd_wlg.out | 24 +++-- .../workload_manager_p0/test_curd_wlg.groovy | 92 ++++++++++++++++++- 8 files changed, 190 insertions(+), 49 deletions(-) diff --git a/be/src/exec/schema_scanner/schema_workload_groups_scanner.cpp b/be/src/exec/schema_scanner/schema_workload_groups_scanner.cpp index 24e23a3e336ae8..03bf9782dcd814 100644 --- a/be/src/exec/schema_scanner/schema_workload_groups_scanner.cpp +++ b/be/src/exec/schema_scanner/schema_workload_groups_scanner.cpp @@ -41,6 +41,7 @@ std::vector SchemaWorkloadGroupsScanner::_s_tbls_colu {"MIN_REMOTE_SCAN_THREAD_NUM", TYPE_BIGINT, sizeof(int64_t), true}, {"SPILL_THRESHOLD_LOW_WATERMARK", TYPE_VARCHAR, sizeof(StringRef), true}, {"SPILL_THRESHOLD_HIGH_WATERMARK", TYPE_VARCHAR, sizeof(StringRef), true}, + {"TAG", TYPE_VARCHAR, sizeof(StringRef), true}, }; SchemaWorkloadGroupsScanner::SchemaWorkloadGroupsScanner() diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateWorkloadGroupStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateWorkloadGroupStmt.java index 3d48cad5cd6e50..92a60a94e55289 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateWorkloadGroupStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateWorkloadGroupStmt.java @@ -26,6 +26,7 @@ import org.apache.doris.common.util.PrintableMap; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.qe.ConnectContext; +import org.apache.doris.resource.workloadgroup.WorkloadGroup; import java.util.Map; @@ -69,6 +70,11 @@ public void analyze(Analyzer analyzer) throws UserException { if (properties == null || properties.isEmpty()) { throw new AnalysisException("Resource group properties can't be null"); } + + String wgTag = properties.get(WorkloadGroup.TAG); + if (wgTag != null) { + FeNameFormat.checkCommonName("workload group tag", wgTag); + } } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java index 610a84b4740549..71354750d84157 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java @@ -486,6 +486,7 @@ public class SchemaTable extends Table { .column("MIN_REMOTE_SCAN_THREAD_NUM", ScalarType.createType(PrimitiveType.BIGINT)) .column("SPILL_THRESHOLD_LOW_WATERMARK", ScalarType.createVarchar(256)) .column("SPILL_THRESHOLD_HIGH_WATERMARK", ScalarType.createVarchar(256)) + .column("TAG", ScalarType.createVarchar(256)) .build())) .build(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java index 4a220252afe17a..482d2f6f11a301 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java @@ -71,6 +71,8 @@ public class WorkloadGroup implements Writable, GsonPostProcessable { public static final String SPILL_THRESHOLD_HIGH_WATERMARK = "spill_threshold_high_watermark"; + public static final String TAG = "tag"; + // NOTE(wb): all property is not required, some properties default value is set in be // default value is as followed // cpu_share=1024, memory_limit=0%(0 means not limit), enable_memory_overcommit=true @@ -78,7 +80,8 @@ public class WorkloadGroup implements Writable, GsonPostProcessable { .add(CPU_SHARE).add(MEMORY_LIMIT).add(ENABLE_MEMORY_OVERCOMMIT).add(MAX_CONCURRENCY) .add(MAX_QUEUE_SIZE).add(QUEUE_TIMEOUT).add(CPU_HARD_LIMIT).add(SCAN_THREAD_NUM) .add(MAX_REMOTE_SCAN_THREAD_NUM).add(MIN_REMOTE_SCAN_THREAD_NUM) - .add(SPILL_THRESHOLD_LOW_WATERMARK).add(SPILL_THRESHOLD_HIGH_WATERMARK).build(); + .add(SPILL_THRESHOLD_LOW_WATERMARK).add(SPILL_THRESHOLD_HIGH_WATERMARK) + .add(TAG).build(); public static final int SPILL_LOW_WATERMARK_DEFAULT_VALUE = 50; public static final int SPILL_HIGH_WATERMARK_DEFAULT_VALUE = 80; @@ -142,6 +145,9 @@ private WorkloadGroup(long id, String name, Map properties, long } this.properties.put(SPILL_THRESHOLD_HIGH_WATERMARK, highWatermarkStr); } + if (properties.containsKey(TAG)) { + this.properties.put(TAG, properties.get(TAG).toLowerCase()); + } resetQueueProperty(properties); } @@ -206,11 +212,25 @@ private static void checkProperties(Map properties) throws DdlEx if (properties.containsKey(CPU_HARD_LIMIT)) { String cpuHardLimit = properties.get(CPU_HARD_LIMIT); - if (cpuHardLimit.endsWith("%")) { - cpuHardLimit = cpuHardLimit.substring(0, cpuHardLimit.length() - 1); - } - if (!StringUtils.isNumeric(cpuHardLimit) || Long.parseLong(cpuHardLimit) <= 0) { - throw new DdlException(CPU_HARD_LIMIT + " " + cpuHardLimit + " requires a positive integer."); + String originValue = cpuHardLimit; + try { + boolean endWithSign = false; + if (cpuHardLimit.endsWith("%")) { + cpuHardLimit = cpuHardLimit.substring(0, cpuHardLimit.length() - 1); + endWithSign = true; + } + + int intVal = Integer.parseInt(cpuHardLimit); + if (endWithSign && intVal == -1) { + throw new NumberFormatException(); + } + if (!(intVal >= 1 && intVal <= 100) && -1 != intVal) { + throw new NumberFormatException(); + } + } catch (NumberFormatException e) { + throw new DdlException( + "workload group's " + WorkloadGroup.CPU_HARD_LIMIT + + " must be a positive integer[1,100] or -1, but input value is " + originValue); } } @@ -395,7 +415,9 @@ public void getProcNodeData(BaseProcResult result, QueryQueue qq) { if (CPU_HARD_LIMIT.equals(key)) { String val = properties.get(key); if (StringUtils.isEmpty(val)) { // cpu_hard_limit is not required - row.add("0%"); + row.add("-1"); + } else if ("-1".equals(val)) { + row.add(val); } else { row.add(val + "%"); } @@ -431,6 +453,13 @@ public void getProcNodeData(BaseProcResult result, QueryQueue qq) { row.add(qq == null ? "0" : String.valueOf(qq.getCurrentRunningQueryNum())); } else if (QueryQueue.WAITING_QUERY_NUM.equals(key)) { row.add(qq == null ? "0" : String.valueOf(qq.getCurrentWaitingQueryNum())); + } else if (TAG.equals(key)) { + String val = properties.get(key); + if (StringUtils.isEmpty(val)) { + row.add(""); + } else { + row.add(val); + } } else { row.add(properties.get(key)); } @@ -442,6 +471,10 @@ public int getCpuHardLimit() { return cpuHardLimit; } + public String getTag() { + return properties.get(TAG); + } + @Override public String toString() { return GsonUtils.GSON.toJson(this); diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java index 08de0ce338a61f..53269b15049985 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java @@ -48,6 +48,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.gson.annotations.SerializedName; +import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -59,7 +60,6 @@ import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.concurrent.locks.ReentrantReadWriteLock; public class WorkloadGroupMgr implements Writable, GsonPostProcessable { @@ -73,6 +73,7 @@ public class WorkloadGroupMgr implements Writable, GsonPostProcessable { .add(WorkloadGroup.SCAN_THREAD_NUM).add(WorkloadGroup.MAX_REMOTE_SCAN_THREAD_NUM) .add(WorkloadGroup.MIN_REMOTE_SCAN_THREAD_NUM) .add(WorkloadGroup.SPILL_THRESHOLD_LOW_WATERMARK).add(WorkloadGroup.SPILL_THRESHOLD_HIGH_WATERMARK) + .add(WorkloadGroup.TAG) .add(QueryQueue.RUNNING_QUERY_NUM).add(QueryQueue.WAITING_QUERY_NUM) .build(); @@ -301,38 +302,44 @@ public void createWorkloadGroup(CreateWorkloadGroupStmt stmt) throws DdlExceptio LOG.info("Create workload group success: {}", workloadGroup); } - private void checkGlobalUnlock(WorkloadGroup workloadGroup, WorkloadGroup old) throws DdlException { - double totalMemoryLimit = idToWorkloadGroup.values().stream().mapToDouble(WorkloadGroup::getMemoryLimitPercent) - .sum() + workloadGroup.getMemoryLimitPercent(); - if (!Objects.isNull(old)) { - totalMemoryLimit -= old.getMemoryLimitPercent(); - } - if (totalMemoryLimit > 100.0 + 1e-6) { - throw new DdlException( - "The sum of all workload group " + WorkloadGroup.MEMORY_LIMIT + " cannot be greater than 100.0%."); - } - - // 1, check new group - int newGroupCpuHardLimit = workloadGroup.getCpuHardLimit(); - if (newGroupCpuHardLimit > 100 || newGroupCpuHardLimit < 0) { - throw new DdlException( - "new group's " + WorkloadGroup.CPU_HARD_LIMIT - + " value can not be greater than 100% or less than or equal 0%"); - } - - // 2, check sum of all cpu hard limit + // NOTE: used for checking sum value of 100% for cpu_hard_limit and memory_limit + // when create/alter workload group with same tag. + // when oldWg is null it means caller is an alter stmt. + private void checkGlobalUnlock(WorkloadGroup newWg, WorkloadGroup oldWg) throws DdlException { + String wgTag = newWg.getTag(); + double sumOfAllMemLimit = 0; int sumOfAllCpuHardLimit = 0; for (Map.Entry entry : idToWorkloadGroup.entrySet()) { - if (old != null && entry.getKey() == old.getId()) { + WorkloadGroup wg = entry.getValue(); + if (!StringUtils.equals(wgTag, wg.getTag())) { + continue; + } + + if (oldWg != null && entry.getKey() == oldWg.getId()) { continue; } - sumOfAllCpuHardLimit += entry.getValue().getCpuHardLimit(); + + if (wg.getCpuHardLimit() > 0) { + sumOfAllCpuHardLimit += wg.getCpuHardLimit(); + } + if (wg.getMemoryLimitPercent() > 0) { + sumOfAllMemLimit += wg.getMemoryLimitPercent(); + } + } + + sumOfAllMemLimit += newWg.getMemoryLimitPercent(); + sumOfAllCpuHardLimit += newWg.getCpuHardLimit(); + + if (sumOfAllMemLimit > 100.0 + 1e-6) { + throw new DdlException( + "The sum of all workload group " + WorkloadGroup.MEMORY_LIMIT + " within tag " + wgTag + + " cannot be greater than 100.0%."); } - sumOfAllCpuHardLimit += newGroupCpuHardLimit; if (sumOfAllCpuHardLimit > 100) { - throw new DdlException("sum of all workload group " + WorkloadGroup.CPU_HARD_LIMIT - + " can not be greater than 100% "); + throw new DdlException( + "sum of all workload group " + WorkloadGroup.CPU_HARD_LIMIT + " within tag " + + wgTag + " can not be greater than 100% "); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java index c84939ffbe63ae..4ac216d5334e72 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java @@ -431,10 +431,11 @@ private static TFetchSchemaTableDataResult workloadGroupsMetadataResult(TSchemaT trow.addToColumnValue(new TCell().setLongVal(Long.valueOf(rGroupsInfo.get(10)))); // min remote scan thread num trow.addToColumnValue(new TCell().setLongVal(Long.valueOf(rGroupsInfo.get(11)))); - trow.addToColumnValue(new TCell().setStringVal(rGroupsInfo.get(12))); - trow.addToColumnValue(new TCell().setStringVal(rGroupsInfo.get(13))); - trow.addToColumnValue(new TCell().setStringVal(rGroupsInfo.get(14))); - trow.addToColumnValue(new TCell().setStringVal(rGroupsInfo.get(15))); + trow.addToColumnValue(new TCell().setStringVal(rGroupsInfo.get(12))); // spill low watermark + trow.addToColumnValue(new TCell().setStringVal(rGroupsInfo.get(13))); // spill high watermark + trow.addToColumnValue(new TCell().setStringVal(rGroupsInfo.get(14))); // tag + trow.addToColumnValue(new TCell().setStringVal(rGroupsInfo.get(15))); // running query num + trow.addToColumnValue(new TCell().setStringVal(rGroupsInfo.get(16))); // waiting query num dataBatch.add(trow); } diff --git a/regression-test/data/workload_manager_p0/test_curd_wlg.out b/regression-test/data/workload_manager_p0/test_curd_wlg.out index 13b33fd9b839a7..fca16d077e4095 100644 --- a/regression-test/data/workload_manager_p0/test_curd_wlg.out +++ b/regression-test/data/workload_manager_p0/test_curd_wlg.out @@ -6,15 +6,15 @@ 2 -- !show_1 -- -normal 20 50% true 2147483647 0 0 1% 16 -test_group 10 10% true 2147483647 0 0 0% -1 +normal 20 50% true 2147483647 0 0 1% 16 +test_group 10 10% true 2147483647 0 0 -1 -1 -- !mem_limit_1 -- 2 -- !mem_limit_2 -- normal 20 50% true 2147483647 0 0 1% 16 -test_group 10 11% true 2147483647 0 0 0% -1 +test_group 10 11% true 2147483647 0 0 -1 -1 -- !mem_overcommit_1 -- 2 @@ -24,7 +24,7 @@ test_group 10 11% true 2147483647 0 0 0% -1 -- !mem_overcommit_3 -- normal 20 50% true 2147483647 0 0 1% 16 -test_group 10 11% false 2147483647 0 0 0% -1 +test_group 10 11% false 2147483647 0 0 -1 -1 -- !cpu_hard_limit_1 -- 2 @@ -45,14 +45,22 @@ normal 20 50% true 2147483647 0 0 1% 16 test_group 10 11% false 100 0 0 20% -1 -- !show_spill_1 -- -spill_group_test 1024 0% true 2147483647 0 0 0% -1 10% 10% +spill_group_test 1024 0% true 2147483647 0 0 -1 -1 10% 10% -- !show_spill_1 -- -spill_group_test 1024 0% true 2147483647 0 0 0% -1 -1 10% +spill_group_test 1024 0% true 2147483647 0 0 -1 -1 -1 10% -- !show_spill_2 -- -spill_group_test 1024 0% true 2147483647 0 0 0% -1 5% 10% +spill_group_test 1024 0% true 2147483647 0 0 -1 -1 5% 10% -- !show_spill_3 -- -spill_group_test 1024 0% true 2147483647 0 0 0% -1 5% 40% +spill_group_test 1024 0% true 2147483647 0 0 -1 -1 5% 40% + +-- !show_wg_tag -- +tag1_mem_wg1 50% -1 mem_tag1 +tag1_mem_wg2 49% -1 mem_tag1 +tag1_mem_wg3 1% -1 mem_tag1 +tag1_wg1 0% 10% tag1 +tag1_wg2 0% 10% tag1 +tag1_wg3 0% 80% tag1 diff --git a/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy b/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy index a73801f8743c5e..b68aab16fff9b7 100644 --- a/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy +++ b/regression-test/suites/workload_manager_p0/test_curd_wlg.groovy @@ -24,6 +24,14 @@ suite("test_crud_wlg") { sql "drop table if exists ${table_name2}" sql "drop table if exists ${table_name3}" + sql "drop workload group if exists tag1_wg1;" + sql "drop workload group if exists tag1_wg2;" + sql "drop workload group if exists tag2_wg1;" + sql "drop workload group if exists tag1_wg3;" + sql "drop workload group if exists tag1_mem_wg1;" + sql "drop workload group if exists tag1_mem_wg2;" + sql "drop workload group if exists tag1_mem_wg3;" + sql """ CREATE TABLE IF NOT EXISTS `${table_name}` ( `siteid` int(11) NOT NULL COMMENT "", @@ -117,7 +125,7 @@ suite("test_crud_wlg") { ");" sql "set workload_group=test_group;" - qt_show_1 "select name,cpu_share,memory_limit,enable_memory_overcommit,max_concurrency,max_queue_size,queue_timeout,cpu_hard_limit,scan_thread_num from information_schema.workload_groups where name in ('normal','test_group') order by name;" + qt_show_1 "select name,cpu_share,memory_limit,enable_memory_overcommit,max_concurrency,max_queue_size,queue_timeout,cpu_hard_limit,scan_thread_num,tag from information_schema.workload_groups where name in ('normal','test_group') order by name;" // test memory_limit test { @@ -147,7 +155,7 @@ suite("test_crud_wlg") { test { sql "alter workload group test_group properties ( 'cpu_hard_limit'='101%' );" - exception "can not be greater than 100%" + exception "must be a positive integer" } sql "alter workload group test_group properties ( 'cpu_hard_limit'='99%' );" @@ -245,7 +253,7 @@ suite("test_crud_wlg") { " 'cpu_hard_limit'='120%' " + ");" - exception "can not be greater than" + exception "must be a positive integer" } test { @@ -257,7 +265,7 @@ suite("test_crud_wlg") { " 'cpu_hard_limit'='99%' " + ");" - exception "can not be greater than" + exception "can not be greater than 100%" } // test show workload groups @@ -402,4 +410,80 @@ suite("test_crud_wlg") { sql "drop workload group test_group;" sql "drop workload group spill_group_test;" + + + // test workload group's tag property, cpu_hard_limit + test { + sql "create workload group if not exists tag1_wg1 properties ( 'cpu_hard_limit'='101%', 'tag'='tag1')" + exception "must be a positive integer" + } + + test { + sql "create workload group if not exists tag1_wg1 properties ( 'cpu_hard_limit'='-2%', 'tag'='tag1')" + exception "must be a positive integer" + } + + test { + sql "create workload group if not exists tag1_wg1 properties ( 'cpu_hard_limit'='-1%', 'tag'='tag1')" + exception "must be a positive integer" + } + + sql "create workload group if not exists tag1_wg1 properties ( 'cpu_hard_limit'='10%', 'tag'='tag1');" + + test { + sql "create workload group if not exists tag1_wg2 properties ( 'cpu_hard_limit'='91%', 'tag'='tag1');" + exception "can not be greater than 100%" + } + + sql "create workload group if not exists tag1_wg2 properties ( 'cpu_hard_limit'='10%', 'tag'='tag1');" + + sql "create workload group if not exists tag2_wg1 properties ( 'cpu_hard_limit'='91%', 'tag'='tag2');" + + test { + sql "alter workload group tag2_wg1 properties ( 'tag'='tag1' );" + exception "can not be greater than 100% " + } + + sql "alter workload group tag2_wg1 properties ( 'cpu_hard_limit'='10%' );" + sql "alter workload group tag2_wg1 properties ( 'tag'='tag1' );" + + test { + sql "create workload group if not exists tag1_wg3 properties ( 'cpu_hard_limit'='80%', 'tag'='tag1');" + exception "can not be greater than 100% " + } + + sql "drop workload group tag2_wg1;" + sql "create workload group if not exists tag1_wg3 properties ( 'cpu_hard_limit'='80%', 'tag'='tag1');" + + // test workload group's tag property, memory_limit + sql "create workload group if not exists tag1_mem_wg1 properties ( 'memory_limit'='50%', 'tag'='mem_tag1');" + + test { + sql "create workload group if not exists tag1_mem_wg2 properties ( 'memory_limit'='60%', 'tag'='mem_tag1');" + exception "cannot be greater than 100.0%" + } + + sql "create workload group if not exists tag1_mem_wg2 properties ('memory_limit'='49%', 'tag'='mem_tag1');" + + sql "create workload group if not exists tag1_mem_wg3 properties ( 'memory_limit'='2%');" + + test { + sql "alter workload group tag1_mem_wg3 properties ( 'tag'='mem_tag1' );" + exception "cannot be greater than 100.0%" + } + + sql "alter workload group tag1_mem_wg3 properties ( 'memory_limit'='1%' );" + + sql "alter workload group tag1_mem_wg3 properties ( 'tag'='mem_tag1' );" + + qt_show_wg_tag "select name,MEMORY_LIMIT,CPU_HARD_LIMIT,TAG from information_schema.workload_groups where name in('tag1_wg1','tag1_wg2','tag2_wg1','tag1_wg3','tag1_mem_wg1','tag1_mem_wg2','tag1_mem_wg3') order by tag,name;" + + sql "drop workload group tag1_wg1;" + sql "drop workload group tag1_wg2;" + sql "drop workload group if exists tag2_wg1;" + sql "drop workload group tag1_wg3;" + sql "drop workload group tag1_mem_wg1;" + sql "drop workload group tag1_mem_wg2;" + sql "drop workload group tag1_mem_wg3;" + }