From 4c775a6a3c920e3fb56f71d81c8f9f39df9d205a Mon Sep 17 00:00:00 2001 From: yiguolei <676222867@qq.com> Date: Tue, 12 Mar 2024 21:17:07 +0800 Subject: [PATCH 1/7] [bugfix](becore) has to use value to capture lambda value to avoid core during callback (#32132) Co-authored-by: yiguolei --- be/src/vec/sink/writer/vtablet_writer.cpp | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp b/be/src/vec/sink/writer/vtablet_writer.cpp index 1e888f2460e60cc..97e8ffe1e77fd27 100644 --- a/be/src/vec/sink/writer/vtablet_writer.cpp +++ b/be/src/vec/sink/writer/vtablet_writer.cpp @@ -357,9 +357,10 @@ Status VNodeChannel::init(RuntimeState* state) { _cur_add_block_request->set_eos(false); // add block closure + // Has to using value to capture _task_exec_ctx because tablet writer may destroyed during callback. _send_block_callback = WriteBlockCallback::create_shared(); - _send_block_callback->addFailedHandler([this](bool is_last_rpc) { - auto ctx_lock = _task_exec_ctx.lock(); + _send_block_callback->addFailedHandler([&, task_exec_ctx = _task_exec_ctx](bool is_last_rpc) { + auto ctx_lock = task_exec_ctx.lock(); if (ctx_lock == nullptr) { return; } @@ -367,8 +368,9 @@ Status VNodeChannel::init(RuntimeState* state) { }); _send_block_callback->addSuccessHandler( - [this](const PTabletWriterAddBlockResult& result, bool is_last_rpc) { - auto ctx_lock = _task_exec_ctx.lock(); + [&, task_exec_ctx = _task_exec_ctx](const PTabletWriterAddBlockResult& result, + bool is_last_rpc) { + auto ctx_lock = task_exec_ctx.lock(); if (ctx_lock == nullptr) { return; } From e55d343430d597e08ee1463a8a9a585368d16c72 Mon Sep 17 00:00:00 2001 From: abmdocrt Date: Tue, 12 Mar 2024 21:50:51 +0800 Subject: [PATCH 2/7] [Fix](auto-inc) Fix partial update auto inc unique table case failure #32114 --- .../unique/test_unique_table_auto_inc.out | 22 ++++++++++--------- .../unique/test_unique_table_auto_inc.groovy | 3 ++- 2 files changed, 14 insertions(+), 11 deletions(-) diff --git a/regression-test/data/data_model_p0/unique/test_unique_table_auto_inc.out b/regression-test/data/data_model_p0/unique/test_unique_table_auto_inc.out index 60fe3b765e32bc9..e075ea4d2601f2f 100644 --- a/regression-test/data/data_model_p0/unique/test_unique_table_auto_inc.out +++ b/regression-test/data/data_model_p0/unique/test_unique_table_auto_inc.out @@ -116,16 +116,18 @@ Carter 500 9994 Beata 700 9996 Nereids 900 9998 --- !partial_update_value -- -Bob 100 1 -Alice 200 2 -Tom 300 3 -Test 400 4 -Carter 500 5 -Smith 600 6 -Beata 700 7 -Doris 800 8 -Nereids 900 9 +-- !partial_update_value1 -- +Bob 100 +Alice 200 +Tom 300 +Test 400 +Carter 500 +Smith 600 +Beata 700 +Doris 800 +Nereids 900 + +-- !partial_update_value2 -- -- !partial_update_value -- Bob 9990 1 diff --git a/regression-test/suites/data_model_p0/unique/test_unique_table_auto_inc.groovy b/regression-test/suites/data_model_p0/unique/test_unique_table_auto_inc.groovy index 16ab4f7abb4ef7b..dc5ed554a35e6a1 100644 --- a/regression-test/suites/data_model_p0/unique/test_unique_table_auto_inc.groovy +++ b/regression-test/suites/data_model_p0/unique/test_unique_table_auto_inc.groovy @@ -279,7 +279,8 @@ suite("test_unique_table_auto_inc") { time 10000 // limit inflight 10s } sql "sync" - qt_partial_update_value "select * from ${table7} order by id;" + qt_partial_update_value1 "select name, value from ${table7} order by value;" + qt_partial_update_value2 "select id, count(*) from ${table7} group by id having count(*) > 1;" streamLoad { table "${table7}" From 47be7c5e1737c084f0919eeee2001583a78a7ccb Mon Sep 17 00:00:00 2001 From: chunping Date: Tue, 12 Mar 2024 22:04:03 +0800 Subject: [PATCH 3/7] [test-framework](cloud) add necessary config and function for run cloud cases (#32127) * [test](cloud) add necessary config and function for run cloud cases * [test-framework](cloud) adjust format of config.groovy --- regression-test/framework/pom.xml | 153 +++++++-- .../org/apache/doris/regression/Config.groovy | 298 +++++++++++++++++- .../doris/regression/ConfigOptions.groovy | 200 +++++++++++- .../doris/regression/suite/Suite.groovy | 249 +++++++++++++++ 4 files changed, 857 insertions(+), 43 deletions(-) diff --git a/regression-test/framework/pom.xml b/regression-test/framework/pom.xml index 4c22fe22f13cd26..41f7ae3ed503123 100644 --- a/regression-test/framework/pom.xml +++ b/regression-test/framework/pom.xml @@ -71,9 +71,12 @@ under the License. 1.8 1.0-SNAPSHOT github - 4.0.19 + 3.0.7 + 3.0.7-01 + 3.7.0 4.9.3 2.8.0 + 1.11.95 15.0.0 @@ -84,19 +87,33 @@ under the License. 1.2.5 - org.codehaus.gmavenplus - gmavenplus-plugin - 3.0.2 - - - - addSources - addTestSources - compile - compileTests - - - + org.apache.maven.plugins + maven-compiler-plugin + 3.9.0 + + groovy-eclipse-compiler + ${maven.compiler.source} + ${maven.compiler.target} + true + + + + org.codehaus.groovy + groovy-eclipse-compiler + ${groovy-eclipse-compiler.version} + + + org.codehaus.groovy + groovy-eclipse-batch + ${groovy-eclipse-batch.version} + + + + + org.codehaus.groovy + groovy-eclipse-compiler + ${groovy-eclipse-compiler.version} + true org.apache.maven.plugins @@ -152,6 +169,35 @@ under the License. + + + + org.codehaus.mojo + flatten-maven-plugin + 1.2.5 + + true + bom + + + + flatten + process-resources + + flatten + + + + flatten.clean + clean + + clean + + + + + + @@ -167,7 +213,7 @@ under the License. 2.10.1 - org.apache.groovy + org.codehaus.groovy groovy-all ${groovy.version} pom @@ -197,11 +243,6 @@ under the License. jodd-core 5.3.0 - - org.apache.kafka - kafka-clients - 2.8.1 - ch.qos.logback logback-classic @@ -256,25 +297,69 @@ under the License. org.apache.hadoop hadoop-mapreduce-client-core ${hadoop.version} - - - - jdk.tools - jdk.tools - - org.apache.hive hive-jdbc 2.3.7 - - - - jdk.tools - jdk.tools - - + + + org.apache.hadoop + hadoop-hdfs-client + ${hadoop.version} + + + com.amazonaws + aws-java-sdk-s3 + ${aws-java-sdk-s3.version} + + + + com.aliyun + aliyun-java-sdk-core + 4.5.10 + + + com.aliyun + aliyun-java-sdk-ram + 3.3.1 + + + com.google.code.gson + gson + 2.2.4 + + + + + com.tencentcloudapi + tencentcloud-sdk-java-cam + 3.1.694 + + + + + + + + software.amazon.awssdk + iam + 2.20.8 + + + + software.amazon.awssdk + s3 + 2.19.8 + + + com.hierynomus + sshj + 0.32.0 org.apache.arrow diff --git a/regression-test/framework/src/main/groovy/org/apache/doris/regression/Config.groovy b/regression-test/framework/src/main/groovy/org/apache/doris/regression/Config.groovy index e240264060f8950..f678d7e2bc8f33c 100644 --- a/regression-test/framework/src/main/groovy/org/apache/doris/regression/Config.groovy +++ b/regression-test/framework/src/main/groovy/org/apache/doris/regression/Config.groovy @@ -51,7 +51,14 @@ class Config { public String feHttpUser public String feHttpPassword + public String feCloudHttpAddress + public String feCloudHttpUser + public String feCloudHttpPassword + + public String instanceId + public String cloudUniqueId public String metaServiceHttpAddress + public String recycleServiceHttpAddress public String suitePath public String dataPath @@ -92,21 +99,91 @@ class Config { public TNetworkAddress feTargetThriftNetworkAddress public TNetworkAddress syncerNetworkAddress public InetSocketAddress feHttpInetSocketAddress + public InetSocketAddress feCloudHttpInetSocketAddress public InetSocketAddress metaServiceHttpInetSocketAddress + public InetSocketAddress recycleServiceHttpInetSocketAddress public Integer parallel public Integer suiteParallel public Integer actionParallel public Integer times public boolean withOutLoadData + public boolean isSmokeTest + public String multiClusterBes + public String metaServiceToken + public String multiClusterInstance + public String upgradeNewBeIp + public String upgradeNewBeHbPort + public String upgradeNewBeHttpPort + public String upgradeNewBeUniqueId + + public String stageIamEndpoint + public String stageIamRegion + public String stageIamBucket + public String stageIamPolicy + public String stageIamRole + public String stageIamArn + public String stageIamAk + public String stageIamSk + public String stageIamUserId + + public String clusterDir + public String kafkaBrokerList + public String cloudVersion Config() {} - Config(String defaultDb, String jdbcUrl, String jdbcUser, String jdbcPassword, - String feSourceThriftAddress, String feTargetThriftAddress, String feSyncerUser, String feSyncerPassword, - String syncerPassword, String feHttpAddress, String feHttpUser, String feHttpPassword, String metaServiceHttpAddress, - String suitePath, String dataPath, String realDataPath, String cacheDataPath, Boolean enableCacheData, - String testGroups, String excludeGroups, String testSuites, String excludeSuites, - String testDirectories, String excludeDirectories, String pluginPath, String sslCertificatePath) { + Config( + String defaultDb, + String jdbcUrl, + String jdbcUser, + String jdbcPassword, + String feSourceThriftAddress, + String feTargetThriftAddress, + String feSyncerUser, + String feSyncerPassword, + String syncerPassword, + String feHttpAddress, + String feHttpUser, + String feHttpPassword, + String feCloudHttpAddress, + String feCloudHttpUser, + String feCloudHttpPassword, + String instanceId, + String cloudUniqueId, + String metaServiceHttpAddress, + String recycleServiceHttpAddress, + String suitePath, + String dataPath, + String realDataPath, + String cacheDataPath, + Boolean enableCacheData, + String testGroups, + String excludeGroups, + String testSuites, + String excludeSuites, + String testDirectories, + String excludeDirectories, + String pluginPath, + String sslCertificatePath, + String multiClusterBes, + String metaServiceToken, + String multiClusterInstance, + String upgradeNewBeIp, + String upgradeNewBeHbPort, + String upgradeNewBeHttpPort, + String upgradeNewBeUniqueId, + String stageIamEndpoint, + String stageIamRegion, + String stageIamBucket, + String stageIamPolicy, + String stageIamRole, + String stageIamArn, + String stageIamAk, + String stageIamSk, + String stageIamUserId, + String clusterDir, + String kafkaBrokerList, + String cloudVersion) { this.defaultDb = defaultDb this.jdbcUrl = jdbcUrl this.jdbcUser = jdbcUser @@ -119,7 +196,13 @@ class Config { this.feHttpAddress = feHttpAddress this.feHttpUser = feHttpUser this.feHttpPassword = feHttpPassword + this.feCloudHttpAddress = feCloudHttpAddress + this.feCloudHttpUser = feCloudHttpUser + this.feCloudHttpPassword = feCloudHttpPassword + this.instanceId = instanceId + this.cloudUniqueId = cloudUniqueId this.metaServiceHttpAddress = metaServiceHttpAddress + this.recycleServiceHttpAddress = recycleServiceHttpAddress this.suitePath = suitePath this.dataPath = dataPath this.realDataPath = realDataPath @@ -133,6 +216,25 @@ class Config { this.excludeDirectories = excludeDirectories this.pluginPath = pluginPath this.sslCertificatePath = sslCertificatePath + this.multiClusterBes = multiClusterBes + this.metaServiceToken = metaServiceToken + this.multiClusterInstance = multiClusterInstance + this.upgradeNewBeIp = upgradeNewBeIp + this.upgradeNewBeHbPort = upgradeNewBeHbPort + this.upgradeNewBeHttpPort = upgradeNewBeHttpPort + this.upgradeNewBeUniqueId = upgradeNewBeUniqueId + this.stageIamEndpoint = stageIamEndpoint + this.stageIamRegion = stageIamRegion + this.stageIamBucket = stageIamBucket + this.stageIamPolicy = stageIamPolicy + this.stageIamRole = stageIamRole + this.stageIamArn = stageIamArn + this.stageIamAk = stageIamAk + this.stageIamSk = stageIamSk + this.stageIamUserId = stageIamUserId + this.clusterDir = clusterDir + this.kafkaBrokerList = kafkaBrokerList + this.cloudVersion = cloudVersion } static Config fromCommandLine(CommandLine cmd) { @@ -237,6 +339,22 @@ class Config { throw new IllegalStateException("Can not parse stream load address: ${config.feHttpAddress}", t) } + config.feCloudHttpAddress = cmd.getOptionValue(feCloudHttpAddressOpt, config.feCloudHttpAddress) + try { + Inet4Address host = Inet4Address.getByName(config.feCloudHttpAddress.split(":")[0]) as Inet4Address + int port = Integer.valueOf(config.feCloudHttpAddress.split(":")[1]) + config.feCloudHttpInetSocketAddress = new InetSocketAddress(host, port) + } catch (Throwable t) { + throw new IllegalStateException("Can not parse fe cloud http address: ${config.feCloudHttpAddress}", t) + } + log.info("feCloudHttpAddress : $config.feCloudHttpAddress, socketAddr : $config.feCloudHttpInetSocketAddress") + + config.instanceId = cmd.getOptionValue(instanceIdOpt, config.instanceId) + log.info("instanceId : ${config.instanceId}") + + config.cloudUniqueId = cmd.getOptionValue(cloudUniqueIdOpt, config.cloudUniqueId) + log.info("cloudUniqueId : ${config.cloudUniqueId}") + config.metaServiceHttpAddress = cmd.getOptionValue(metaServiceHttpAddressOpt, config.metaServiceHttpAddress) try { Inet4Address host = Inet4Address.getByName(config.metaServiceHttpAddress.split(":")[0]) as Inet4Address @@ -247,6 +365,61 @@ class Config { } log.info("msAddr : $config.metaServiceHttpAddress, socketAddr : $config.metaServiceHttpInetSocketAddress") + config.multiClusterBes = cmd.getOptionValue(multiClusterBesOpt, config.multiClusterBes) + log.info("multiClusterBes is ${config.multiClusterBes}".toString()) + + config.metaServiceToken = cmd.getOptionValue(metaServiceTokenOpt, config.metaServiceToken) + log.info("metaServiceToken is ${config.metaServiceToken}".toString()) + + config.multiClusterInstance = cmd.getOptionValue(multiClusterInstanceOpt, config.multiClusterInstance) + log.info("multiClusterInstance is ${config.multiClusterInstance}".toString()) + + config.upgradeNewBeIp = cmd.getOptionValue(upgradeNewBeIpOpt, config.upgradeNewBeIp) + log.info("upgradeNewBeIp is ${config.upgradeNewBeIp}".toString()) + + config.upgradeNewBeHbPort = cmd.getOptionValue(upgradeNewBeHbPortOpt, config.upgradeNewBeHbPort) + log.info("upgradeNewBeHbPort is ${config.upgradeNewBeHbPort}".toString()) + + config.upgradeNewBeHttpPort = cmd.getOptionValue(upgradeNewBeHttpPortOpt, config.upgradeNewBeHttpPort) + log.info("upgradeNewBeHttpPort is ${config.upgradeNewBeHttpPort}".toString()) + + config.upgradeNewBeUniqueId = cmd.getOptionValue(upgradeNewBeUniqueIdOpt, config.upgradeNewBeUniqueId) + log.info("upgradeNewBeUniqueId is ${config.upgradeNewBeUniqueId}".toString()) + + config.stageIamEndpoint = cmd.getOptionValue(stageIamEndpointOpt, config.stageIamEndpoint) + log.info("stageIamEndpoint is ${config.stageIamEndpoint}".toString()) + config.stageIamRegion = cmd.getOptionValue(stageIamRegionOpt, config.stageIamRegion) + log.info("stageIamRegion is ${config.stageIamRegion}".toString()) + config.stageIamBucket = cmd.getOptionValue(stageIamBucketOpt, config.stageIamBucket) + log.info("stageIamBucket is ${config.stageIamBucket}".toString()) + config.stageIamPolicy = cmd.getOptionValue(stageIamPolicyOpt, config.stageIamPolicy) + log.info("stageIamPolicy is ${config.stageIamPolicy}".toString()) + config.stageIamRole = cmd.getOptionValue(stageIamRoleOpt, config.stageIamRole) + log.info("stageIamRole is ${config.stageIamRole}".toString()) + config.stageIamArn = cmd.getOptionValue(stageIamArnOpt, config.stageIamArn) + log.info("stageIamArn is ${config.stageIamArn}".toString()) + config.stageIamAk = cmd.getOptionValue(stageIamAkOpt, config.stageIamAk) + log.info("stageIamAk is ${config.stageIamAk}".toString()) + config.stageIamSk = cmd.getOptionValue(stageIamSkOpt, config.stageIamSk) + log.info("stageIamSk is ${config.stageIamSk}".toString()) + config.stageIamUserId = cmd.getOptionValue(stageIamUserIdOpt, config.stageIamUserId) + log.info("stageIamUserId is ${config.stageIamUserId}".toString()) + config.cloudVersion = cmd.getOptionValue(cloudVersionOpt, config.cloudVersion) + log.info("cloudVersion is ${config.cloudVersion}".toString()) + + config.kafkaBrokerList = cmd.getOptionValue(kafkaBrokerListOpt, config.kafkaBrokerList) + + config.recycleServiceHttpAddress = cmd.getOptionValue(recycleServiceHttpAddressOpt, config.recycleServiceHttpAddress) + try { + Inet4Address host = Inet4Address.getByName(config.recycleServiceHttpAddress.split(":")[0]) as Inet4Address + int port = Integer.valueOf(config.recycleServiceHttpAddress.split(":")[1]) + config.recycleServiceHttpInetSocketAddress = new InetSocketAddress(host, port) + } catch (Throwable t) { + throw new IllegalStateException("Can not parse recycle service address: ${config.recycleServiceHttpAddress}", t) + } + log.info("recycleAddr : $config.recycleServiceHttpAddress, socketAddr : $config.recycleServiceHttpInetSocketAddress") + + config.defaultDb = cmd.getOptionValue(defaultDbOpt, config.defaultDb) config.jdbcUrl = cmd.getOptionValue(jdbcOpt, config.jdbcUrl) config.jdbcUser = cmd.getOptionValue(userOpt, config.jdbcUser) @@ -255,6 +428,8 @@ class Config { config.feSyncerPassword = cmd.getOptionValue(feSyncerPasswordOpt, config.feSyncerPassword) config.feHttpUser = cmd.getOptionValue(feHttpUserOpt, config.feHttpUser) config.feHttpPassword = cmd.getOptionValue(feHttpPasswordOpt, config.feHttpPassword) + config.feCloudHttpUser = cmd.getOptionValue(feHttpUserOpt, config.feCloudHttpUser) + config.feCloudHttpPassword = cmd.getOptionValue(feHttpPasswordOpt, config.feCloudHttpPassword) config.generateOutputFile = cmd.hasOption(genOutOpt) config.forceGenerateOutputFile = cmd.hasOption(forceGenOutOpt) config.parallel = Integer.parseInt(cmd.getOptionValue(parallelOpt, "10")) @@ -265,6 +440,7 @@ class Config { config.stopWhenFail = cmd.hasOption(stopWhenFailOpt) config.withOutLoadData = cmd.hasOption(withOutLoadDataOpt) config.dryRun = cmd.hasOption(dryRunOpt) + config.isSmokeTest = cmd.hasOption(isSmokeTestOpt) log.info("randomOrder is ${config.randomOrder}".toString()) log.info("stopWhenFail is ${config.stopWhenFail}".toString()) @@ -294,7 +470,13 @@ class Config { configToString(obj.feHttpAddress), configToString(obj.feHttpUser), configToString(obj.feHttpPassword), + configToString(obj.feCloudHttpAddress), + configToString(obj.feCloudHttpUser), + configToString(obj.feCloudHttpPassword), + configToString(obj.instanceId), + configToString(obj.cloudUniqueId), configToString(obj.metaServiceHttpAddress), + configToString(obj.recycleServiceHttpAddress), configToString(obj.suitePath), configToString(obj.dataPath), configToString(obj.realDataPath), @@ -307,7 +489,26 @@ class Config { configToString(obj.testDirectories), configToString(obj.excludeDirectories), configToString(obj.pluginPath), - configToString(obj.sslCertificatePath) + configToString(obj.sslCertificatePath), + configToString(obj.multiClusterBes), + configToString(obj.metaServiceToken), + configToString(obj.multiClusterInstance), + configToString(obj.upgradeNewBeIp), + configToString(obj.upgradeNewBeHbPort), + configToString(obj.upgradeNewBeHttpPort), + configToString(obj.upgradeNewBeUniqueId), + configToString(obj.stageIamEndpoint), + configToString(obj.stageIamRegion), + configToString(obj.stageIamBucket), + configToString(obj.stageIamPolicy), + configToString(obj.stageIamRole), + configToString(obj.stageIamArn), + configToString(obj.stageIamAk), + configToString(obj.stageIamSk), + configToString(obj.stageIamUserId), + configToString(obj.clusterDir), + configToString(obj.kafkaBrokerList), + configToString(obj.cloudVersion) ) config.image = configToString(obj.image) @@ -325,9 +526,55 @@ class Config { config.otherConfigs.put(key, kv.getValue()) } } + + // check smoke config + if (obj.isSmokeTest) { + config.isSmokeTest = true + String env = config.otherConfigs.getOrDefault("smokeEnv", "UNKNOWN") + log.info("Start to check $env config") + def c = config.otherConfigs + c.put("feCloudHttpAddress", obj.feCloudHttpAddress) + checkCloudSmokeEnv(c) + } + return config } + static String getProvider(String endpoint) { + def providers = ["cos", "oss", "s3", "obs", "bos"] + for (final def provider in providers) { + if (endpoint.containsIgnoreCase(provider)) { + return provider + } + } + return "" + } + + static void checkCloudSmokeEnv(Properties properties) { + // external stage obj info + String s3Endpoint = properties.getOrDefault("s3Endpoint", "") + String feCloudHttpAddress = properties.getOrDefault("feCloudHttpAddress", "") + String s3Region = properties.getOrDefault("s3Region", "") + String s3BucketName = properties.getOrDefault("s3BucketName", "") + String s3AK = properties.getOrDefault("ak", "") + String s3SK = properties.getOrDefault("sk", "") + + def items = [ + fecloudHttpAddrConf:feCloudHttpAddress, + s3RegionConf:s3Region, + s3EndpointConf:s3Endpoint, + s3BucketConf:s3BucketName, + s3AKConf:s3AK, + s3SKConf:s3SK, + s3ProviderConf:getProvider(s3Endpoint) + ] + for (final def item in items) { + if (item.value == null || item.value.isEmpty()) { + throw new IllegalStateException("cloud smoke conf err, plz check " + item.key) + } + } + } + static void fillDefaultConfig(Config config) { if (config.defaultDb == null) { config.defaultDb = "regression_test" @@ -365,11 +612,26 @@ class Config { log.info("Set feHttpAddress to '${config.feHttpAddress}' because not specify.".toString()) } + if (config.instanceId == null) { + config.instanceId = "instance_xxx" + log.info("Set instanceId to '${config.instanceId}' because not specify.".toString()) + } + + if (config.cloudUniqueId == null) { + config.cloudUniqueId = "cloud_unique_id_xxx" + log.info("Set cloudUniqueId to '${config.cloudUniqueId}' because not specify.".toString()) + } + if (config.metaServiceHttpAddress == null) { config.metaServiceHttpAddress = "127.0.0.1:5000" log.info("Set metaServiceHttpAddress to '${config.metaServiceHttpAddress}' because not specify.".toString()) } + if (config.recycleServiceHttpAddress == null) { + config.recycleServiceHttpAddress = "127.0.0.1:5001" + log.info("Set recycleServiceHttpAddress to '${config.recycleServiceHttpAddress}' because not specify.".toString()) + } + if (config.feSyncerUser == null) { config.feSyncerUser = "root" log.info("Set feSyncerUser to '${config.feSyncerUser}' because not specify.".toString()) @@ -395,6 +657,22 @@ class Config { log.info("Set feHttpPassword to empty because not specify.".toString()) } + + if (config.feCloudHttpAddress == null) { + config.feCloudHttpAddress = "127.0.0.1:8876" + log.info("Set feCloudHttpAddress to '${config.feCloudHttpAddress}' because not specify.".toString()) + } + + if (config.feCloudHttpUser == null) { + config.feCloudHttpUser = "root" + log.info("Set feCloudHttpUser to '${config.feCloudHttpUser}' because not specify.".toString()) + } + + if (config.feCloudHttpPassword == null) { + config.feCloudHttpPassword = "" + log.info("Set feCloudHttpPassword to empty because not specify.".toString()) + } + if (config.suitePath == null) { config.suitePath = "regression-test/suites" log.info("Set suitePath to '${config.suitePath}' because not specify.".toString()) @@ -446,7 +724,11 @@ class Config { } if (config.testGroups == null) { - config.testGroups = "default" + if (config.isSmokeTest){ + config.testGroups = "smoke" + } else { + config.testGroups = "default" + } log.info("Set testGroups to '${config.testGroups}' because not specify.".toString()) } diff --git a/regression-test/framework/src/main/groovy/org/apache/doris/regression/ConfigOptions.groovy b/regression-test/framework/src/main/groovy/org/apache/doris/regression/ConfigOptions.groovy index 36648dbe61df7a9..3c455e9ecf1644a 100644 --- a/regression-test/framework/src/main/groovy/org/apache/doris/regression/ConfigOptions.groovy +++ b/regression-test/framework/src/main/groovy/org/apache/doris/regression/ConfigOptions.groovy @@ -40,7 +40,13 @@ class ConfigOptions { static Option feHttpAddressOpt static Option feHttpUserOpt static Option feHttpPasswordOpt + static Option feCloudHttpAddressOpt + static Option feCloudHttpUserOpt + static Option feCloudHttpPasswordOpt + static Option instanceIdOpt + static Option cloudUniqueIdOpt static Option metaServiceHttpAddressOpt + static Option recycleServiceHttpAddressOpt static Option pathOpt static Option dataOpt static Option realDataOpt @@ -66,6 +72,26 @@ class ConfigOptions { static Option timesOpt static Option withOutLoadDataOpt static Option dryRunOpt + static Option isSmokeTestOpt + static Option multiClusterBesOpt + static Option metaServiceTokenOpt + static Option multiClusterInstanceOpt + static Option upgradeNewBeIpOpt + static Option upgradeNewBeHbPortOpt + static Option upgradeNewBeHttpPortOpt + static Option upgradeNewBeUniqueIdOpt + static Option stageIamEndpointOpt + static Option stageIamRegionOpt + static Option stageIamBucketOpt + static Option stageIamPolicyOpt + static Option stageIamRoleOpt + static Option stageIamArnOpt + static Option stageIamAkOpt + static Option stageIamSkOpt + static Option stageIamUserIdOpt + static Option clusterDirOpt + static Option kafkaBrokerListOpt + static Option cloudVersionOpt static CommandLine initCommands(String[] args) { helpOption = Option.builder("h") @@ -300,14 +326,62 @@ class ConfigOptions { .longOpt("feHttpPassword") .desc("the password of fe http server") .build() - metaServiceHttpAddressOpt = Option.builder("hm") + feCloudHttpAddressOpt = Option.builder("cha") .argName("address") .required(false) .hasArg(true) .type(String.class) + .longOpt("feCloudHttpAddress") + .desc("the fe cloud http address, format is ip:port") + .build() + feCloudHttpUserOpt = Option.builder("chu") + .argName("userName") + .required(false) + .hasArg(true) + .type(String.class) + .longOpt("feCloudHttpUser") + .desc("the user of fe cloud http server") + .build() + feCloudHttpPasswordOpt = Option.builder("chp") + .argName("password") + .required(false) + .hasArg(true) + .type(String.class) + .longOpt("feCloudHttpPassword") + .desc("the password of fe cloud http server") + .build() + instanceIdOpt = Option.builder("ii") + .argName("instanceId") + .required(false) + .hasArg(true) + .type(String.class) + .longOpt("instanceId") + .desc("the instance id") + .build() + cloudUniqueIdOpt = Option.builder("cui") + .argName("cloudUniqueId") + .required(false) + .hasArg(true) + .type(String.class) + .longOpt("cloudUniqueId") + .desc("the cloudUniqueId") + .build() + metaServiceHttpAddressOpt = Option.builder("hm") + .argName("metaServiceHttpAddress") + .required(false) + .hasArg(true) + .type(String.class) .longOpt("metaServiceHttpAddress") .desc("the meta service http address, format is ip:port") .build() + recycleServiceHttpAddressOpt = Option.builder("hr") + .argName("recycleServiceHttpAddress") + .required(false) + .hasArg(true) + .type(String.class) + .longOpt("recycleServiceHttpAddress") + .desc("the recycle service http address, format is ip:port") + .build() genOutOpt = Option.builder("genOut") .required(false) .hasArg(false) @@ -381,6 +455,106 @@ class ConfigOptions { .hasArg(false) .desc("just print cases and does not run") .build() + isSmokeTestOpt = Option.builder("isSmokeTest") + .required(false) + .hasArg(false) + .desc("is smoke test") + .build() + multiClusterBesOpt = Option.builder("multiClusterBes") + .required(false) + .hasArg(false) + .desc("multi cluster backend info") + .build() + metaServiceTokenOpt = Option.builder("metaServiceToken") + .required(false) + .hasArg(false) + .desc("meta service token") + .build() + multiClusterInstanceOpt = Option.builder("multiClusterInstance") + .required(false) + .hasArg(false) + .desc("multi cluster instance") + .build() + upgradeNewBeIpOpt = Option.builder("upgradeNewBeIp") + .required(false) + .hasArg(false) + .desc("new BE ip") + .build() + upgradeNewBeHbPortOpt = Option.builder("upgradeNewBeHbPort") + .required(false) + .hasArg(false) + .desc("new BE heartbeat port") + .build() + upgradeNewBeHttpPortOpt = Option.builder("upgradeNewBeHttpPort") + .required(false) + .hasArg(false) + .desc("new BE http port") + .build() + upgradeNewBeUniqueIdOpt = Option.builder("upgradeNewBeUniqueId") + .required(false) + .hasArg(false) + .desc("new BE cloud unique id") + .build() + stageIamEndpointOpt = Option.builder("stageIamEndpoint") + .required(false) + .hasArg(false) + .desc("stage iam endpoint") + .build() + stageIamRegionOpt = Option.builder("stageIamRegion") + .required(false) + .hasArg(false) + .desc("stage iam region") + .build() + stageIamBucketOpt = Option.builder("stageIamBucket") + .required(false) + .hasArg(false) + .desc("stage iam bucket") + .build() + stageIamPolicyOpt = Option.builder("stageIamPolicy") + .required(false) + .hasArg(false) + .desc("stage iam policy") + .build() + stageIamRoleOpt = Option.builder("stageIamRole") + .required(false) + .hasArg(false) + .desc("stage iam role") + .build() + stageIamArnOpt = Option.builder("stageIamArn") + .required(false) + .hasArg(false) + .desc("stage iam arn") + .build() + stageIamAkOpt = Option.builder("stageIamAk") + .required(false) + .hasArg(false) + .desc("stage iam ak") + .build() + stageIamSkOpt = Option.builder("stageIamSk") + .required(false) + .hasArg(false) + .desc("stage iam sk") + .build() + stageIamUserIdOpt = Option.builder("stageIamUserId") + .required(false) + .hasArg(false) + .desc("stage iam user id") + .build() + clusterDirOpt = Option.builder("clusterDir") + .required(false) + .hasArg(false) + .desc("cloud cluster deploy dir") + .build() + kafkaBrokerListOpt = Option.builder("kafkaBrokerList") + .required(false) + .hasArg(false) + .desc("kafka broker list") + .build() + cloudVersionOpt = Option.builder("cloudVersion") + .required(false) + .hasArg(false) + .desc("selectdb cloud version") + .build() Options options = new Options() .addOption(helpOption) @@ -407,7 +581,11 @@ class ConfigOptions { .addOption(feHttpAddressOpt) .addOption(feHttpUserOpt) .addOption(feHttpPasswordOpt) + .addOption(feCloudHttpAddressOpt) + .addOption(feCloudHttpUserOpt) + .addOption(feCloudHttpPasswordOpt) .addOption(metaServiceHttpAddressOpt) + .addOption(recycleServiceHttpAddressOpt) .addOption(genOutOpt) .addOption(confFileOpt) .addOption(forceGenOutOpt) @@ -419,6 +597,26 @@ class ConfigOptions { .addOption(timesOpt) .addOption(withOutLoadDataOpt) .addOption(dryRunOpt) + .addOption(isSmokeTestOpt) + .addOption(multiClusterBesOpt) + .addOption(metaServiceTokenOpt) + .addOption(multiClusterInstanceOpt) + .addOption(upgradeNewBeIpOpt) + .addOption(upgradeNewBeHbPortOpt) + .addOption(upgradeNewBeHttpPortOpt) + .addOption(upgradeNewBeUniqueIdOpt) + .addOption(stageIamEndpointOpt) + .addOption(stageIamRegionOpt) + .addOption(stageIamBucketOpt) + .addOption(stageIamPolicyOpt) + .addOption(stageIamRoleOpt) + .addOption(stageIamArnOpt) + .addOption(stageIamAkOpt) + .addOption(stageIamSkOpt) + .addOption(stageIamUserIdOpt) + .addOption(clusterDirOpt) + .addOption(kafkaBrokerListOpt) + .addOption(cloudVersionOpt) CommandLine cmd = new DefaultParser().parse(options, args, true) if (cmd.hasOption(helpOption)) { diff --git a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy index c3bd25ad99492d4..0b7a1792ac2f8b7 100644 --- a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy +++ b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy @@ -17,6 +17,7 @@ package org.apache.doris.regression.suite +import groovy.json.JsonOutput import com.google.common.collect.Maps import com.google.common.util.concurrent.Futures import com.google.common.util.concurrent.ListenableFuture @@ -37,6 +38,7 @@ import org.apache.doris.regression.action.StreamLoadAction import org.apache.doris.regression.action.SuiteAction import org.apache.doris.regression.action.TestAction import org.apache.doris.regression.action.HttpCliAction +import org.apache.doris.regression.util.DataUtils import org.apache.doris.regression.util.JdbcUtils import org.apache.doris.regression.util.Hdfs import org.apache.doris.regression.util.SuiteUtils @@ -47,6 +49,11 @@ import org.slf4j.LoggerFactory import groovy.util.logging.Slf4j import java.sql.Connection +import java.io.File +import java.math.BigDecimal; +import java.sql.PreparedStatement +import java.sql.ResultSetMetaData +import java.util.Map; import java.util.concurrent.Callable import java.util.concurrent.Future import java.util.concurrent.atomic.AtomicBoolean @@ -1118,4 +1125,246 @@ class Suite implements GroovyInterceptable { notContains("${mv_name}(${mv_name})") } } + + def token = context.config.metaServiceToken + def instance_id = context.config.multiClusterInstance + def get_be_metric = { ip, port, field -> + def metric_api = { request_body, check_func -> + httpTest { + endpoint ip + ":" + port + uri "/metrics?type=json" + body request_body + op "get" + check check_func + } + } + + def jsonOutput = new JsonOutput() + def map = [] + def js = jsonOutput.toJson(map) + log.info("get be metric req: ${js} ".toString()) + + def ret = 0; + metric_api.call(js) { + respCode, body -> + log.info("get be metric resp: ${respCode}".toString()) + def json = parseJson(body) + for (item : json) { + if (item.tags.metric == field) { + ret = item.value + } + } + } + ret + } + + def add_cluster = { be_unique_id, ip, port, cluster_name, cluster_id -> + def jsonOutput = new JsonOutput() + def s3 = [ + type: "COMPUTE", + cluster_name : cluster_name, + cluster_id : cluster_id, + nodes: [ + [ + cloud_unique_id: be_unique_id, + ip: ip, + heartbeat_port: port + ], + ] + ] + def map = [instance_id: "${instance_id}", cluster: s3] + def js = jsonOutput.toJson(map) + log.info("add cluster req: ${js} ".toString()) + + def add_cluster_api = { request_body, check_func -> + httpTest { + endpoint context.config.metaServiceHttpAddress + uri "/MetaService/http/add_cluster?token=${token}" + body request_body + check check_func + } + } + + add_cluster_api.call(js) { + respCode, body -> + log.info("add cluster resp: ${body} ${respCode}".toString()) + def json = parseJson(body) + assertTrue(json.code.equalsIgnoreCase("OK") || json.code.equalsIgnoreCase("ALREADY_EXISTED")) + } + } + + def get_cluster = { be_unique_id -> + def jsonOutput = new JsonOutput() + def map = [instance_id: "${instance_id}", cloud_unique_id: "${be_unique_id}" ] + def js = jsonOutput.toJson(map) + log.info("get cluster req: ${js} ".toString()) + + def add_cluster_api = { request_body, check_func -> + httpTest { + endpoint context.config.metaServiceHttpAddress + uri "/MetaService/http/get_cluster?token=${token}" + body request_body + check check_func + } + } + + def json + add_cluster_api.call(js) { + respCode, body -> + log.info("get cluster resp: ${body} ${respCode}".toString()) + json = parseJson(body) + assertTrue(json.code.equalsIgnoreCase("OK") || json.code.equalsIgnoreCase("ALREADY_EXISTED")) + } + json.result.cluster + } + + def drop_cluster = { cluster_name, cluster_id -> + def jsonOutput = new JsonOutput() + def reqBody = [ + type: "COMPUTE", + cluster_name : cluster_name, + cluster_id : cluster_id, + nodes: [ + ] + ] + def map = [instance_id: "${instance_id}", cluster: reqBody] + def js = jsonOutput.toJson(map) + log.info("drop cluster req: ${js} ".toString()) + + def drop_cluster_api = { request_body, check_func -> + httpTest { + endpoint context.config.metaServiceHttpAddress + uri "/MetaService/http/drop_cluster?token=${token}" + body request_body + check check_func + } + } + + drop_cluster_api.call(js) { + respCode, body -> + log.info("dorp cluster resp: ${body} ${respCode}".toString()) + def json = parseJson(body) + assertTrue(json.code.equalsIgnoreCase("OK") || json.code.equalsIgnoreCase("ALREADY_EXISTED")) + } + } + + def add_node = { be_unique_id, ip, port, cluster_name, cluster_id -> + def jsonOutput = new JsonOutput() + def clusterInfo = [ + type: "COMPUTE", + cluster_name : cluster_name, + cluster_id : cluster_id, + nodes: [ + [ + cloud_unique_id: be_unique_id, + ip: ip, + heartbeat_port: port + ], + ] + ] + def map = [instance_id: "${instance_id}", cluster: clusterInfo] + def js = jsonOutput.toJson(map) + log.info("add node req: ${js} ".toString()) + + def add_cluster_api = { request_body, check_func -> + httpTest { + endpoint context.config.metaServiceHttpAddress + uri "/MetaService/http/add_node?token=${token}" + body request_body + check check_func + } + } + + add_cluster_api.call(js) { + respCode, body -> + log.info("add node resp: ${body} ${respCode}".toString()) + def json = parseJson(body) + assertTrue(json.code.equalsIgnoreCase("OK") || json.code.equalsIgnoreCase("ALREADY_EXISTED")) + } + } + + def d_node = { be_unique_id, ip, port, cluster_name, cluster_id -> + def jsonOutput = new JsonOutput() + def clusterInfo = [ + type: "COMPUTE", + cluster_name : cluster_name, + cluster_id : cluster_id, + nodes: [ + [ + cloud_unique_id: be_unique_id, + ip: ip, + heartbeat_port: port + ], + ] + ] + def map = [instance_id: "${instance_id}", cluster: clusterInfo] + def js = jsonOutput.toJson(map) + log.info("decommission node req: ${js} ".toString()) + + def d_cluster_api = { request_body, check_func -> + httpTest { + endpoint context.config.metaServiceHttpAddress + uri "/MetaService/http/decommission_node?token=${token}" + body request_body + check check_func + } + } + + d_cluster_api.call(js) { + respCode, body -> + log.info("decommission node resp: ${body} ${respCode}".toString()) + def json = parseJson(body) + assertTrue(json.code.equalsIgnoreCase("OK") || json.code.equalsIgnoreCase("ALREADY_EXISTED")) + } + } + + def checkProfile = { addrSet, fragNum -> + List> profileRes = sql " show query profile '/' " + for (row : profileRes) { + //println row + } + + for (int i = 0; i < fragNum; ++i) { + String exec_sql = "show query profile '/" + profileRes[0][0] + "/" + i.toString() + "'" + List> result = sql exec_sql + for (row : result) { + println row + } + + println result[0][1] + println addrSet + assertTrue(addrSet.contains(result[0][1])); + } + } + + def rename_cloud_cluster = { cluster_name, cluster_id -> + def jsonOutput = new JsonOutput() + def reqBody = [ + cluster_name : cluster_name, + cluster_id : cluster_id + ] + def map = [instance_id: "${instance_id}", cluster: reqBody] + def js = jsonOutput.toJson(map) + log.info("rename cluster req: ${js} ".toString()) + + def rename_cluster_api = { request_body, check_func -> + httpTest { + endpoint context.config.metaServiceHttpAddress + uri "/MetaService/http/rename_cluster?token=${token}" + body request_body + check check_func + } + } + + rename_cluster_api.call(js) { + respCode, body -> + log.info("rename cluster resp: ${body} ${respCode}".toString()) + def json = parseJson(body) + assertTrue(json.code.equalsIgnoreCase("OK")) + } + } + + public void resetConnection() { + context.resetConnection() + } } From dc7fcd0032b7069bbcf656e8cbc934f73e964c5f Mon Sep 17 00:00:00 2001 From: zhengyu Date: Tue, 12 Mar 2024 22:12:45 +0800 Subject: [PATCH 4/7] [fix](cloud) fix auto analyze triggering failure (#32139) * [fix](cloud) fix auto analyze triggering failure StatisticsCollector only collect when replicas are normal. In cloud mode, this check needs setCloudCluster before execution. Otherwise, check will be a dead-end failure. Signed-off-by: freemandealer --- .../doris/statistics/util/StatisticsUtil.java | 27 +++++++++++++++---- 1 file changed, 22 insertions(+), 5 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java index 8ee08d57e69a6aa..3df7385d9703636 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java @@ -139,6 +139,9 @@ public static List execStatisticQuery(String sql) { return Collections.emptyList(); } try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext()) { + if (Config.isCloudMode()) { + r.connectContext.getCloudCluster(); + } StmtExecutor stmtExecutor = new StmtExecutor(r.connectContext, sql); r.connectContext.setExecutor(stmtExecutor); return stmtExecutor.executeInternalQuery(); @@ -447,11 +450,25 @@ public static boolean statsTblAvailable() { } catch (Throwable t) { return false; } - for (OlapTable table : statsTbls) { - for (Partition partition : table.getPartitions()) { - if (partition.getBaseIndex().getTablets().stream() - .anyMatch(t -> t.getNormalReplicaBackendIds().isEmpty())) { - return false; + if (Config.isCloudMode()) { + try (AutoCloseConnectContext r = buildConnectContext()) { + r.connectContext.getCloudCluster(); + for (OlapTable table : statsTbls) { + for (Partition partition : table.getPartitions()) { + if (partition.getBaseIndex().getTablets().stream() + .anyMatch(t -> t.getNormalReplicaBackendIds().isEmpty())) { + return false; + } + } + } + } + } else { + for (OlapTable table : statsTbls) { + for (Partition partition : table.getPartitions()) { + if (partition.getBaseIndex().getTablets().stream() + .anyMatch(t -> t.getNormalReplicaBackendIds().isEmpty())) { + return false; + } } } } From 23aaf0f368fd5271a73aabc4e76bbcf071fc0940 Mon Sep 17 00:00:00 2001 From: Pxl Date: Tue, 12 Mar 2024 22:39:58 +0800 Subject: [PATCH 5/7] [Chore](top-n) check runtime predicate inited when scan operator open (#32140) check runtime predicate inited when scan operator open --- be/src/olap/tablet_reader.cpp | 2 ++ be/src/runtime/runtime_predicate.h | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/be/src/olap/tablet_reader.cpp b/be/src/olap/tablet_reader.cpp index f0229431b7b932c..1cecd56a82ab7e5 100644 --- a/be/src/olap/tablet_reader.cpp +++ b/be/src/olap/tablet_reader.cpp @@ -578,6 +578,8 @@ void TabletReader::_init_conditions_param_except_leafnode_of_andnode( for (int id : read_params.topn_filter_source_node_ids) { auto& runtime_predicate = read_params.runtime_state->get_query_ctx()->get_runtime_predicate(id); + DCHECK(runtime_predicate.inited()) + << "runtime predicate not inited, source_node_id=" << id; runtime_predicate.set_tablet_schema(_tablet_schema); } } diff --git a/be/src/runtime/runtime_predicate.h b/be/src/runtime/runtime_predicate.h index ec5b9612cbc9c45..fcfc9db7021235f 100644 --- a/be/src/runtime/runtime_predicate.h +++ b/be/src/runtime/runtime_predicate.h @@ -53,7 +53,7 @@ class RuntimePredicate { void set_tablet_schema(TabletSchemaSPtr tablet_schema) { std::unique_lock wlock(_rwlock); - if (_tablet_schema) { + if (_tablet_schema || !_inited) { return; } _tablet_schema = tablet_schema; From 1aa2d91574f7bd77f1e625e5af7f0f0b5b265d43 Mon Sep 17 00:00:00 2001 From: Mingyu Chen Date: Tue, 12 Mar 2024 22:44:30 +0800 Subject: [PATCH 6/7] [fix](audit-loader) fix invalid token check logic (#32095) The check of the token should be forwarded to Master FE. I add a new RPC method `checkToken()` in Frontend for this logic. Otherwise, after enable the audit loader, the log from non-master FE can not be loaded to audit table with `Invalid token` error. --- .../apache/doris/httpv2/rest/LoadAction.java | 7 ++- .../doris/load/loadv2/TokenManager.java | 61 ++++++++++++++++--- .../doris/service/FrontendServiceImpl.java | 28 ++++++--- gensrc/thrift/FrontendService.thrift | 2 + 4 files changed, 81 insertions(+), 17 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java index 6952bd37b5c91b2..6be5654a2ea1123 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java @@ -23,6 +23,7 @@ import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; import org.apache.doris.common.LoadException; +import org.apache.doris.common.UserException; import org.apache.doris.httpv2.entity.ResponseEntityBuilder; import org.apache.doris.httpv2.entity.RestBaseResult; import org.apache.doris.httpv2.exception.UnauthorizedException; @@ -362,7 +363,11 @@ private TNetworkAddress selectRedirectBackend(boolean groupCommit) throws LoadEx // temporarily addressing the users' needs for audit logs. // So this function is not widely tested under general scenario private boolean checkClusterToken(String token) { - return Env.getCurrentEnv().getLoadManager().getTokenManager().checkAuthToken(token); + try { + return Env.getCurrentEnv().getLoadManager().getTokenManager().checkAuthToken(token); + } catch (UserException e) { + throw new UnauthorizedException(e.getMessage()); + } } // NOTE: This function can only be used for AuditlogPlugin stream load for now. diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/TokenManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/TokenManager.java index 6443e6b23226877..ca714d66b29d29c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/TokenManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/TokenManager.java @@ -63,11 +63,6 @@ private String generateNewToken() { return UUID.randomUUID().toString(); } - // this method only will be called in master node, since stream load only send message to master. - public boolean checkAuthToken(String token) { - return tokenQueue.contains(token); - } - public String acquireToken() throws UserException { if (Env.getCurrentEnv().isMaster() || FeConstants.runningUnitTest) { return tokenQueue.peek(); @@ -81,9 +76,8 @@ public String acquireToken() throws UserException { } } - public String acquireTokenFromMaster() throws TException { + private String acquireTokenFromMaster() throws TException { TNetworkAddress thriftAddress = getMasterAddress(); - FrontendService.Client client = getClient(thriftAddress); if (LOG.isDebugEnabled()) { @@ -108,7 +102,7 @@ public String acquireTokenFromMaster() throws TException { } else { TMySqlLoadAcquireTokenResult result = client.acquireToken(); if (result.getStatus().getStatusCode() != TStatusCode.OK) { - throw new TException("commit failed."); + throw new TException("acquire token from master failed. " + result.getStatus()); } isReturnToPool = true; return result.getToken(); @@ -122,6 +116,57 @@ public String acquireTokenFromMaster() throws TException { } } + /** + * Check if the token is valid. + * If this is not Master FE, will send the request to Master FE. + */ + public boolean checkAuthToken(String token) throws UserException { + if (Env.getCurrentEnv().isMaster() || FeConstants.runningUnitTest) { + return tokenQueue.contains(token); + } else { + try { + return checkTokenFromMaster(token); + } catch (TException e) { + LOG.warn("check token error", e); + throw new UserException("Check token from master failed", e); + } + } + } + + private boolean checkTokenFromMaster(String token) throws TException { + TNetworkAddress thriftAddress = getMasterAddress(); + FrontendService.Client client = getClient(thriftAddress); + + if (LOG.isDebugEnabled()) { + LOG.debug("Send check token to Master {}", thriftAddress); + } + + boolean isReturnToPool = false; + try { + boolean result = client.checkToken(token); + isReturnToPool = true; + return result; + } catch (TTransportException e) { + boolean ok = ClientPool.frontendPool.reopen(client, thriftTimeoutMs); + if (!ok) { + throw e; + } + if (e.getType() == TTransportException.TIMED_OUT) { + throw e; + } else { + boolean result = client.checkToken(token); + isReturnToPool = true; + return result; + } + } finally { + if (isReturnToPool) { + ClientPool.frontendPool.returnObject(thriftAddress, client); + } else { + ClientPool.frontendPool.invalidateObject(thriftAddress, client); + } + } + } + private TNetworkAddress getMasterAddress() throws TException { Env.getCurrentEnv().checkReadyOrThrowTException(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index eb62cd9c75a25e7..eface922ef97ebe 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -1062,12 +1062,6 @@ private void checkPasswordAndPrivs(String user, String passwd, String db, List Date: Tue, 12 Mar 2024 22:56:59 +0800 Subject: [PATCH 7/7] [fix](ui) fix database cannot be choosed bug (#32091) --- ui/src/pages/layout/index.tsx | 108 +++++++++++-------- ui/src/pages/playground/tree/index.tsx | 140 ++++++++++++++----------- ui/src/router/renderRouter.tsx | 37 ++++--- ui/src/utils/utils.ts | 10 +- 4 files changed, 172 insertions(+), 123 deletions(-) diff --git a/ui/src/pages/layout/index.tsx b/ui/src/pages/layout/index.tsx index 0ea4ba6257a0635..5feeb45a0804947 100644 --- a/ui/src/pages/layout/index.tsx +++ b/ui/src/pages/layout/index.tsx @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -16,23 +16,23 @@ * specific language governing permissions and limitations * under the License. */ - + /** * @file test cron * @author lpx * @since 2020/08/19 */ -import React, {useState} from 'react'; -import {Layout, Menu, Dropdown, notification, Button} from 'antd'; -import { CaretDownOutlined, LogoutOutlined} from '@ant-design/icons'; -import {renderRoutes} from 'react-router-config'; -import {useHistory} from 'react-router-dom'; -import {useTranslation} from 'react-i18next'; +import React, { useState } from 'react'; +import { Layout, Menu, Dropdown, notification, Button } from 'antd'; +import { CaretDownOutlined, LogoutOutlined } from '@ant-design/icons'; +import { renderRoutes } from 'react-router-config'; +import { useHistory } from 'react-router-dom'; +import { useTranslation } from 'react-i18next'; import routes from 'Src/router'; -import {logOut} from 'Src/api/api'; +import { logOut } from 'Src/api/api'; import './index.css'; import styles from './index.less'; -const {Header, Content, Footer} = Layout; +const { Header, Content, Footer } = Layout; function Layouts(props: any) { let { t } = useTranslation(); const [route, setRoute] = useState(props.route.routes); @@ -48,7 +48,7 @@ function Layouts(props: any) { if (location.pathname === e.key) { location.reload(); } - if(location.pathname.includes('Playground')){ + if (location.pathname.includes('Playground')) { history.push(e.key); location.reload(); } @@ -59,52 +59,74 @@ function Layouts(props: any) { } function clearAllCookie() { var keys = document.cookie.match(/[^ =;]+(?=\=)/g); - if(keys) { - for(var i = keys.length; i--;) - document.cookie = keys[i] + '=0;expires=' + new Date(0).toUTCString() + if (keys) { + for (var i = keys.length; i--; ) + document.cookie = + keys[i] + '=0;expires=' + new Date(0).toUTCString(); } } - function onLogout(){ - logOut().then((res)=>{ - localStorage.setItem('username',''); + function onLogout() { + logOut().then((res) => { + localStorage.removeItem('username'); clearAllCookie(); - notification.success({message: t('exitSuccessfully')}) + notification.success({ message: t('exitSuccessfully') }); history.push('/login'); - }) + }); } - function changeLanguage(){ - if (localStorage.getItem('I18N_LANGUAGE') === 'zh-CN'){ - localStorage.setItem('I18N_LANGUAGE','en'); - location.reload() + function changeLanguage() { + if (localStorage.getItem('I18N_LANGUAGE') === 'zh-CN') { + localStorage.setItem('I18N_LANGUAGE', 'en'); + location.reload(); } else { - localStorage.setItem('I18N_LANGUAGE','zh-CN'); - location.reload() + localStorage.setItem('I18N_LANGUAGE', 'zh-CN'); + location.reload(); } } const menu = ( - + {t('signOut')} ); return ( -
-
{history.replace('/home');setCurrent('')}}>
- - +
+
{ + history.replace('/home'); + setCurrent(''); + }} + >
+ + {/* */} - {localStorage.getItem('username')} + {localStorage.getItem('username')}{' '} + - - {routes?.routes[1]?.routes?.map(item => { - if (item.title !== 'Login'&&item.title !== 'Home') { + + {routes?.routes[1]?.routes?.map((item) => { + if (item.title !== 'Login' && item.title !== 'Home') { return ( {item.title} @@ -115,19 +137,22 @@ function Layouts(props: any) {
- -
+ +
{renderRoutes(route)}
{/*
xxx
*/} - ); } -export default Layouts;/** +export default Layouts; +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -135,9 +160,9 @@ export default Layouts;/** * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -145,4 +170,3 @@ export default Layouts;/** * specific language governing permissions and limitations * under the License. */ - diff --git a/ui/src/pages/playground/tree/index.tsx b/ui/src/pages/playground/tree/index.tsx index e3f56f98e793c6d..72ccd9852dcedb7 100644 --- a/ui/src/pages/playground/tree/index.tsx +++ b/ui/src/pages/playground/tree/index.tsx @@ -16,15 +16,15 @@ * specific language governing permissions and limitations * under the License. */ -import React, {useEffect, useState} from 'react'; -import {Input, Spin, Tree} from 'antd'; -import {HddOutlined, ReloadOutlined, TableOutlined} from '@ant-design/icons'; -import {AdHocAPI} from 'Src/api/api'; -import {useTranslation} from 'react-i18next'; -import {AdhocContentRouteKeyEnum,} from '../adhoc.data'; +import React, { useEffect, useState } from 'react'; +import { Input, Spin, Tree } from 'antd'; +import { HddOutlined, ReloadOutlined, TableOutlined } from '@ant-design/icons'; +import { AdHocAPI } from 'Src/api/api'; +import { useTranslation } from 'react-i18next'; +import { AdhocContentRouteKeyEnum } from '../adhoc.data'; import './index.css'; -const {Search} = Input; +const { Search } = Input; interface DataNode { title: string; @@ -36,7 +36,7 @@ interface DataNode { const initTreeDate: DataNode[] = []; function updateTreeData(list: DataNode[], key, children) { - return list.map(node => { + return list.map((node) => { if (node.key === key) { return { ...node, @@ -48,7 +48,7 @@ function updateTreeData(list: DataNode[], key, children) { } export function AdHocTree(props: any) { - let {t} = useTranslation(); + let { t } = useTranslation(); const [treeData, setTreeData] = useState(initTreeDate); const [realTree, setRealTree] = useState(initTreeDate); const [loading, setLoading] = useState(true); @@ -62,38 +62,41 @@ export function AdHocTree(props: any) { }, []); function initTreeData(ac?: AbortController) { - AdHocAPI.getDatabaseList({signal: ac?.signal}).then(res => { - if (res.msg === 'success' && Array.isArray(res.data)) { - const num = Math.random() - const treeData = res.data.map((item, index) => { - return { - title: item, - key: `${num}-1-${index}-${item}`, - icon: , - }; - }); - setTreeData(treeData); - getRealTree(treeData); - } - setLoading(false); - }).catch(err => { - }); + AdHocAPI.getDatabaseList({ signal: ac?.signal }) + .then((res) => { + if (res.msg === 'success' && Array.isArray(res.data)) { + const num = Math.random(); + const treeData = res.data.map((item, index) => { + return { + title: item, + keys: [item], + key: `${num}-1-${index}-${item}`, + icon: , + }; + }); + setTreeData(treeData); + getRealTree(treeData); + } + setLoading(false); + }) + .catch((err) => {}); } - function onLoadData({key, children}) { + function onLoadData({ key, children }) { const [, storey, , db_name] = key.split('-'); const param = { db_name, // tbl_name, }; - return AdHocAPI.getDatabaseList(param).then(res => { + return AdHocAPI.getDatabaseList(param).then((res) => { if (res.msg == 'success' && Array.isArray(res.data)) { const children = res.data.map((item, index) => { if (storey === '1') { return { title: item, + keys: [param.db_name, item], key: `2-${index}-${param.db_name}-${item}`, - icon: , + icon: , isLeaf: true, }; } @@ -108,23 +111,24 @@ export function AdHocTree(props: any) { function handleTreeSelect( keys: React.ReactText[], info: any, - path: AdhocContentRouteKeyEnum = AdhocContentRouteKeyEnum.Result, + path: AdhocContentRouteKeyEnum = AdhocContentRouteKeyEnum.Result ) { - if (keys.length > 0) { - props.history.push(`/Playground/${path}/${keys[0].split(':')[1]}`); + console.log(info); + const tablePath = info.node.keys.join('-'); + if (info.node.keys.length > 0) { + props.history.push(`/Playground/${path}/${tablePath}`); } } function onSearch(e) { - const {value} = e.target; - const expandedKeys: any[] = treeData - .map((item, index) => { - if (getParentKey(value, treeData[index].children, index)) { - return item.key - } else { - return null; - } - }) + const { value } = e.target; + const expandedKeys: any[] = treeData.map((item, index) => { + if (getParentKey(value, treeData[index].children, index)) { + return item.key; + } else { + return null; + } + }); setExpandedKeys(expandedKeys); setAutoExpandParent(true); getRealTree(treeData, value); @@ -142,9 +146,11 @@ export function AdHocTree(props: any) { for (let i = 0; i < tree.length; i++) { const node = tree[i]; if (node.title.includes(key)) { - return true + return true; } else { - treeData[idx].children ? treeData[idx].children[i].title = node.title : '' + treeData[idx].children + ? (treeData[idx].children[i].title = node.title) + : ''; } } return false; @@ -154,7 +160,7 @@ export function AdHocTree(props: any) { const realTree = inner(treeData); function inner(treeData) { - return treeData.map(item => { + return treeData.map((item) => { const search = value || ''; const index = item.title.indexOf(search); const beforeStr = item.title.substr(0, index); @@ -162,19 +168,21 @@ export function AdHocTree(props: any) { const title = index > -1 ? ( - {beforeStr} - {search} + {beforeStr} + + {search} + {afterStr} - + ) : ( item.title ); if (item.children) { - return {...item, title, children: inner(item.children)}; + return { ...item, title, children: inner(item.children) }; } return { ...item, - title + title, }; }); } @@ -185,29 +193,35 @@ export function AdHocTree(props: any) { function debounce(fn, wait) { let timer = null; return function () { - let context = this - let args = arguments + let context = this; + let args = arguments; if (timer) { clearTimeout(timer); timer = null; } timer = setTimeout(function () { - fn.apply(context, args) - }, wait) - } + fn.apply(context, args); + }, wait); + }; } return ( <> - +
} + enterButton={} onSearch={initTreeData} - onChange={onSearch}/> + onChange={onSearch} + />
handleTreeSelect( selectedKeys, info, - AdhocContentRouteKeyEnum.Structure) + AdhocContentRouteKeyEnum.Structure + ) } /> ); - } - diff --git a/ui/src/router/renderRouter.tsx b/ui/src/router/renderRouter.tsx index 58a3c422789eda2..f80096dc016a80c 100644 --- a/ui/src/router/renderRouter.tsx +++ b/ui/src/router/renderRouter.tsx @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -16,15 +16,15 @@ * specific language governing permissions and limitations * under the License. */ - + import React from 'react'; import { Route, Redirect, Switch } from 'react-router-dom'; -import {getBasePath} from 'Src/utils/utils'; +import { checkLogin, getBasePath } from 'Src/utils/utils'; -let isLogin = document.cookie; +let isLogin = checkLogin(); const renderRoutes = (routes, authPath = '/login') => { let basepath = getBasePath(); - if(routes){ + if (routes) { return ( {routes.map((route, i) => ( @@ -33,28 +33,27 @@ const renderRoutes = (routes, authPath = '/login') => { path={route.path} exact={route.exact} strict={route.strict} - render= { props =>{ - if(props.location.pathname === basepath+'/'){ - return ; + render={(props) => { + if (props.location.pathname === basepath + '/') { + return ; } if (isLogin) { return route.render ? ( - route.render({ ...props, route: route }) - ) : ( - - ) + route.render({ ...props, route: route }) + ) : ( + + ); } else { isLogin = '1'; return ; - }} - } + } + }} /> ))} - ) + ); } - return null -} + return null; +}; export default renderRoutes; - diff --git a/ui/src/utils/utils.ts b/ui/src/utils/utils.ts index 770987f2743a408..05b64700c6ff2ec 100644 --- a/ui/src/utils/utils.ts +++ b/ui/src/utils/utils.ts @@ -92,4 +92,12 @@ function replaceToTxt(str: string) { return strNoBr; } -export {isSuccess, getDbName, getTimeNow, getBasePath, replaceToTxt}; +function checkLogin() { + const username = localStorage.getItem('username'); + if (username) { + return true; + } + return false; +} + +export {isSuccess, getDbName, getTimeNow, getBasePath, replaceToTxt, checkLogin};