From 53fdd9ed51bdc370f58c3fc0ce778c9670f37c70 Mon Sep 17 00:00:00 2001 From: huanghaibin <284824253@qq.com> Date: Thu, 19 Sep 2024 21:07:15 +0800 Subject: [PATCH] [fix](cloud-mow) Add retry when calculating delete bitmap timeout when loading data (#40562) Add retry when calculating delete bitmap timeout on broker load , like stream load doing. --- .../cloud_engine_calc_delete_bitmap_task.cpp | 1 + .../java/org/apache/doris/common/Config.java | 2 +- .../CloudGlobalTransactionMgr.java | 23 +- .../doris/common/util/MetaLockUtils.java | 10 +- .../doris/load/loadv2/BrokerLoadJob.java | 88 +++--- .../doris/load/loadv2/SparkLoadJob.java | 61 +++-- .../insert/AbstractInsertExecutor.java | 21 +- .../test_cloud_mow_broker_load_with_retry.out | 7 + .../test_cloud_mow_insert_with_retry.out | 15 ++ ...st_cloud_mow_broker_load_with_retry.groovy | 251 ++++++++++++++++++ .../test_cloud_mow_insert_timeout.groovy | 2 +- .../test_cloud_mow_insert_with_retry.groovy | 86 ++++++ 12 files changed, 492 insertions(+), 75 deletions(-) create mode 100644 regression-test/data/fault_injection_p0/cloud/test_cloud_mow_broker_load_with_retry.out create mode 100644 regression-test/data/fault_injection_p0/cloud/test_cloud_mow_insert_with_retry.out create mode 100644 regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_broker_load_with_retry.groovy create mode 100644 regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_insert_with_retry.groovy diff --git a/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp b/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp index d22af1c1f39c9e..5c369af38a6c4e 100644 --- a/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp +++ b/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp @@ -95,6 +95,7 @@ Status CloudEngineCalcDeleteBitmapTask::execute() { } // wait for all finished token->wait(); + DBUG_EXECUTE_IF("CloudEngineCalcDeleteBitmapTask.execute.enable_wait", { sleep(3); }); LOG(INFO) << "finish to calculate delete bitmap on transaction." << "transaction_id=" << transaction_id << ", cost(us): " << watch.get_elapse_time_us() diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index be312582a6c2ab..9e9ce4637d33b8 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -2984,7 +2984,7 @@ public static int metaServiceRpcRetryTimes() { public static String security_checker_class_name = ""; @ConfField(mutable = true) - public static int mow_insert_into_commit_retry_times = 10; + public static int mow_calculate_delete_bitmap_retry_times = 10; @ConfField(mutable = true, description = {"指定S3 Load endpoint白名单, 举例: s3_load_endpoint_white_list=a,b,c", "the white list for the s3 load endpoint, if it is empty, no white list will be set," diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java index 9890eaa0b3a275..eee2faff6f40b5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java @@ -930,7 +930,28 @@ private void debugCalcDeleteBitmapRandomTimeout() throws UserException { public boolean commitAndPublishTransaction(DatabaseIf db, List tableList, long transactionId, List tabletCommitInfos, long timeoutMillis) throws UserException { - return commitAndPublishTransaction(db, tableList, transactionId, tabletCommitInfos, timeoutMillis, null); + int retryTimes = 0; + boolean res = false; + while (true) { + try { + res = commitAndPublishTransaction(db, tableList, transactionId, tabletCommitInfos, timeoutMillis, null); + break; + } catch (UserException e) { + LOG.warn("failed to commit txn, txnId={},retryTimes={},exception={}", + transactionId, retryTimes, e); + // only mow table will catch DELETE_BITMAP_LOCK_ERR and need to retry + if (e.getErrorCode() == InternalErrorCode.DELETE_BITMAP_LOCK_ERR) { + retryTimes++; + if (retryTimes >= Config.mow_calculate_delete_bitmap_retry_times) { + // should throw exception after running out of retry times + throw e; + } + } else { + throw e; + } + } + } + return res; } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/MetaLockUtils.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/MetaLockUtils.java index 16afbcecdaea5c..ffd411d0cf3ea1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/MetaLockUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/MetaLockUtils.java @@ -127,8 +127,14 @@ public static void writeUnlockTables(List tableList) { } public static void commitLockTables(List
tableList) { - for (Table table : tableList) { - table.commitLock(); + for (int i = 0; i < tableList.size(); i++) { + try { + tableList.get(i).commitLock(); + } catch (Exception e) { + for (int j = i - 1; j >= 0; j--) { + tableList.get(i).commitUnlock(); + } + } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java index fb5f06fced570b..5e1b085b239474 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java @@ -29,6 +29,7 @@ import org.apache.doris.common.DataQualityException; import org.apache.doris.common.DdlException; import org.apache.doris.common.DuplicatedRequestException; +import org.apache.doris.common.InternalErrorCode; import org.apache.doris.common.LabelAlreadyUsedException; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.QuotaExceedException; @@ -335,42 +336,59 @@ private void onLoadingTaskFinished(BrokerLoadingTaskAttachment attachment) { } Database db = null; List
tableList = null; - try { - db = getDb(); - tableList = db.getTablesOnIdOrderOrThrowException(Lists.newArrayList(fileGroupAggInfo.getAllTableIds())); - if (Config.isCloudMode()) { - MetaLockUtils.commitLockTables(tableList); - } else { - MetaLockUtils.writeLockTablesOrMetaException(tableList); + int retryTimes = 0; + while (true) { + try { + db = getDb(); + tableList = db.getTablesOnIdOrderOrThrowException( + Lists.newArrayList(fileGroupAggInfo.getAllTableIds())); + if (Config.isCloudMode()) { + MetaLockUtils.commitLockTables(tableList); + } else { + MetaLockUtils.writeLockTablesOrMetaException(tableList); + } + } catch (MetaNotFoundException e) { + LOG.warn(new LogBuilder(LogKey.LOAD_JOB, id) + .add("database_id", dbId) + .add("error_msg", "db has been deleted when job is loading") + .build(), e); + cancelJobWithoutCheck(new FailMsg(FailMsg.CancelType.LOAD_RUN_FAIL, e.getMessage()), true, true); + return; } - } catch (MetaNotFoundException e) { - LOG.warn(new LogBuilder(LogKey.LOAD_JOB, id) - .add("database_id", dbId) - .add("error_msg", "db has been deleted when job is loading") - .build(), e); - cancelJobWithoutCheck(new FailMsg(FailMsg.CancelType.LOAD_RUN_FAIL, e.getMessage()), true, true); - return; - } - try { - LOG.info(new LogBuilder(LogKey.LOAD_JOB, id) - .add("txn_id", transactionId) - .add("msg", "Load job try to commit txn") - .build()); - Env.getCurrentGlobalTransactionMgr().commitTransaction( - dbId, tableList, transactionId, commitInfos, getLoadJobFinalOperation()); - afterLoadingTaskCommitTransaction(tableList); - afterCommit(); - } catch (UserException e) { - LOG.warn(new LogBuilder(LogKey.LOAD_JOB, id) - .add("database_id", dbId) - .add("error_msg", "Failed to commit txn with error:" + e.getMessage()) - .build(), e); - cancelJobWithoutCheck(new FailMsg(FailMsg.CancelType.LOAD_RUN_FAIL, e.getMessage()), true, true); - } finally { - if (Config.isCloudMode()) { - MetaLockUtils.commitUnlockTables(tableList); - } else { - MetaLockUtils.writeUnlockTables(tableList); + try { + LOG.info(new LogBuilder(LogKey.LOAD_JOB, id) + .add("txn_id", transactionId) + .add("msg", "Load job try to commit txn") + .build()); + Env.getCurrentGlobalTransactionMgr().commitTransaction( + dbId, tableList, transactionId, commitInfos, getLoadJobFinalOperation()); + afterLoadingTaskCommitTransaction(tableList); + afterCommit(); + return; + } catch (UserException e) { + LOG.warn(new LogBuilder(LogKey.LOAD_JOB, id) + .add("database_id", dbId) + .add("retry_times", retryTimes) + .add("error_msg", "Failed to commit txn with error:" + e.getMessage()) + .build(), e); + if (e.getErrorCode() == InternalErrorCode.DELETE_BITMAP_LOCK_ERR) { + retryTimes++; + if (retryTimes >= Config.mow_calculate_delete_bitmap_retry_times) { + LOG.warn("cancelJob {} because up to max retry time,exception {}", id, e); + cancelJobWithoutCheck(new FailMsg(FailMsg.CancelType.LOAD_RUN_FAIL, e.getMessage()), true, + true); + return; + } + } else { + cancelJobWithoutCheck(new FailMsg(FailMsg.CancelType.LOAD_RUN_FAIL, e.getMessage()), true, true); + return; + } + } finally { + if (Config.isCloudMode()) { + MetaLockUtils.commitUnlockTables(tableList); + } else { + MetaLockUtils.writeUnlockTables(tableList); + } } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java index 1f1c71d7a903d2..f01f205e96dc0d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java @@ -45,9 +45,11 @@ import org.apache.doris.catalog.Tablet; import org.apache.doris.catalog.Type; import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.Config; import org.apache.doris.common.DataQualityException; import org.apache.doris.common.DdlException; import org.apache.doris.common.DuplicatedRequestException; +import org.apache.doris.common.InternalErrorCode; import org.apache.doris.common.LabelAlreadyUsedException; import org.apache.doris.common.LoadException; import org.apache.doris.common.MetaNotFoundException; @@ -656,21 +658,50 @@ public void updateLoadingStatus() throws UserException { } private void tryCommitJob() throws UserException { - LOG.info(new LogBuilder(LogKey.LOAD_JOB, id).add("txn_id", transactionId) - .add("msg", "Load job try to commit txn").build()); - Database db = getDb(); - List
tableList = db.getTablesOnIdOrderOrThrowException( - Lists.newArrayList(tableToLoadPartitions.keySet())); - MetaLockUtils.writeLockTablesOrMetaException(tableList); - try { - Env.getCurrentGlobalTransactionMgr().commitTransaction( - dbId, tableList, transactionId, commitInfos, - new LoadJobFinalOperation(id, loadingStatus, progress, loadStartTimestamp, - finishTimestamp, state, failMsg)); - } catch (TabletQuorumFailedException e) { - // retry in next loop - } finally { - MetaLockUtils.writeUnlockTables(tableList); + int retryTimes = 0; + while (true) { + Database db = getDb(); + List
tableList = db.getTablesOnIdOrderOrThrowException( + Lists.newArrayList(tableToLoadPartitions.keySet())); + if (Config.isCloudMode()) { + MetaLockUtils.commitLockTables(tableList); + } else { + MetaLockUtils.writeLockTablesOrMetaException(tableList); + } + try { + LOG.info(new LogBuilder(LogKey.LOAD_JOB, id).add("txn_id", transactionId) + .add("msg", "Load job try to commit txn").build()); + Env.getCurrentGlobalTransactionMgr().commitTransaction( + dbId, tableList, transactionId, commitInfos, + new LoadJobFinalOperation(id, loadingStatus, progress, loadStartTimestamp, + finishTimestamp, state, failMsg)); + return; + } catch (TabletQuorumFailedException e) { + // retry in next loop + return; + } catch (UserException e) { + LOG.warn(new LogBuilder(LogKey.LOAD_JOB, id) + .add("txn_id", transactionId) + .add("database_id", dbId) + .add("retry_times", retryTimes) + .add("error_msg", "Failed to commit txn with error:" + e.getMessage()) + .build(), e); + if (e.getErrorCode() == InternalErrorCode.DELETE_BITMAP_LOCK_ERR) { + retryTimes++; + if (retryTimes >= Config.mow_calculate_delete_bitmap_retry_times) { + LOG.warn("cancelJob {} because up to max retry time, exception {}", id, e); + throw e; + } + } else { + throw e; + } + } finally { + if (Config.isCloudMode()) { + MetaLockUtils.commitUnlockTables(tableList); + } else { + MetaLockUtils.writeUnlockTables(tableList); + } + } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java index cdf74f5e9aca3a..cafffab295eba1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java @@ -24,7 +24,6 @@ import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; import org.apache.doris.common.FeConstants; -import org.apache.doris.common.InternalErrorCode; import org.apache.doris.common.UserException; import org.apache.doris.common.util.DebugUtil; import org.apache.doris.nereids.NereidsPlanner; @@ -193,25 +192,7 @@ public void executeSingleInsert(StmtExecutor executor, long jobId) throws Except executor.updateProfile(false); execImpl(executor, jobId); checkStrictModeAndFilterRatio(); - int retryTimes = 0; - while (true) { - try { - onComplete(); - break; - } catch (UserException e) { - LOG.warn("failed to commit txn, txnId={}, jobId={}, retryTimes={}", - getTxnId(), jobId, retryTimes, e); - if (e.getErrorCode() == InternalErrorCode.DELETE_BITMAP_LOCK_ERR) { - retryTimes++; - if (retryTimes >= Config.mow_insert_into_commit_retry_times) { - // should throw exception after running out of retry times - throw e; - } - } else { - throw e; - } - } - } + onComplete(); } catch (Throwable t) { onFail(t); // retry insert into from select when meet E-230 in cloud diff --git a/regression-test/data/fault_injection_p0/cloud/test_cloud_mow_broker_load_with_retry.out b/regression-test/data/fault_injection_p0/cloud/test_cloud_mow_broker_load_with_retry.out new file mode 100644 index 00000000000000..9369fd5ae32f4f --- /dev/null +++ b/regression-test/data/fault_injection_p0/cloud/test_cloud_mow_broker_load_with_retry.out @@ -0,0 +1,7 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !select -- +19 + +-- !select -- +19 + diff --git a/regression-test/data/fault_injection_p0/cloud/test_cloud_mow_insert_with_retry.out b/regression-test/data/fault_injection_p0/cloud/test_cloud_mow_insert_with_retry.out new file mode 100644 index 00000000000000..979483692d31dc --- /dev/null +++ b/regression-test/data/fault_injection_p0/cloud/test_cloud_mow_insert_with_retry.out @@ -0,0 +1,15 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +1 1 1 +2 2 2 + +-- !sql -- +1 1 1 + +-- !sql -- +1 1 1 +2 2 2 + +-- !sql -- +1 1 1 + diff --git a/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_broker_load_with_retry.groovy b/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_broker_load_with_retry.groovy new file mode 100644 index 00000000000000..035a6307d46e20 --- /dev/null +++ b/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_broker_load_with_retry.groovy @@ -0,0 +1,251 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_cloud_mow_broker_load_with_retry", "nonConcurrent") { + if (!isCloudMode()) { + return + } + + def s3BucketName = getS3BucketName() + def s3Endpoint = getS3Endpoint() + def s3Region = getS3Region() + + GetDebugPoint().clearDebugPointsForAllFEs() + GetDebugPoint().clearDebugPointsForAllBEs() + + def customFeConfig = [ + calculate_delete_bitmap_task_timeout_seconds: 2 + ] + + def table = "tbl_basic" + setFeConfigTemporary(customFeConfig) { + + def attributesList = [ + + ] + + /* ========================================================== normal ========================================================== */ + attributesList.add(new LoadAttributes("s3://${s3BucketName}/regression/load/data/basic_data.csv", + "${table}", "LINES TERMINATED BY \"\n\"", "COLUMNS TERMINATED BY \"|\"", "FORMAT AS \"CSV\"", "(k00,k01,k02,k03,k04,k05,k06,k07,k08,k09,k10,k11,k12,k13,k14,k15,k16,k17,k18)", + "", "", "", "", "")) + + attributesList.add(new LoadAttributes("s3://${s3BucketName}/regression/load/data/basic_data.csv", + "${table}", "LINES TERMINATED BY \"\n\"", "COLUMNS TERMINATED BY \"|\"", "FORMAT AS \"CSV\"", "(K00,K01,K02,K03,K04,K05,K06,K07,K08,K09,K10,K11,K12,K13,K14,K15,K16,K17,K18)", + "", "", "", "", "")) + def ak = getS3AK() + def sk = getS3SK() + try { + sql """ DROP TABLE IF EXISTS ${table} """ + sql """ + CREATE TABLE ${table} + ( + k00 INT NOT NULL, + k01 DATE NOT NULL, + k02 BOOLEAN NULL, + k03 TINYINT NULL, + k04 SMALLINT NULL, + k05 INT NULL, + k06 BIGINT NULL, + k07 LARGEINT NULL, + k08 FLOAT NULL, + k09 DOUBLE NULL, + k10 DECIMAL(9,1) NULL, + k11 DECIMALV3(9,1) NULL, + k12 DATETIME NULL, + k13 DATEV2 NULL, + k14 DATETIMEV2 NULL, + k15 CHAR NULL, + k16 VARCHAR NULL, + k17 STRING NULL, + k18 JSON NULL, + kd01 BOOLEAN NOT NULL DEFAULT "TRUE", + kd02 TINYINT NOT NULL DEFAULT "1", + kd03 SMALLINT NOT NULL DEFAULT "2", + kd04 INT NOT NULL DEFAULT "3", + kd05 BIGINT NOT NULL DEFAULT "4", + kd06 LARGEINT NOT NULL DEFAULT "5", + kd07 FLOAT NOT NULL DEFAULT "6.0", + kd08 DOUBLE NOT NULL DEFAULT "7.0", + kd09 DECIMAL NOT NULL DEFAULT "888888888", + kd10 DECIMALV3 NOT NULL DEFAULT "999999999", + kd11 DATE NOT NULL DEFAULT "2023-08-24", + kd12 DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, + kd13 DATEV2 NOT NULL DEFAULT "2023-08-24", + kd14 DATETIMEV2 NOT NULL DEFAULT CURRENT_TIMESTAMP, + kd15 CHAR(255) NOT NULL DEFAULT "我能吞下玻璃而不伤身体", + kd16 VARCHAR(300) NOT NULL DEFAULT "我能吞下玻璃而不伤身体", + kd17 STRING NOT NULL DEFAULT "我能吞下玻璃而不伤身体", + kd18 JSON NULL, + + INDEX idx_inverted_k104 (`k05`) USING INVERTED, + INDEX idx_inverted_k110 (`k11`) USING INVERTED, + INDEX idx_inverted_k113 (`k13`) USING INVERTED, + INDEX idx_inverted_k114 (`k14`) USING INVERTED, + INDEX idx_inverted_k117 (`k17`) USING INVERTED PROPERTIES("parser" = "english"), + INDEX idx_ngrambf_k115 (`k15`) USING NGRAM_BF PROPERTIES("gram_size"="3", "bf_size"="256"), + INDEX idx_ngrambf_k116 (`k16`) USING NGRAM_BF PROPERTIES("gram_size"="3", "bf_size"="256"), + INDEX idx_ngrambf_k117 (`k17`) USING NGRAM_BF PROPERTIES("gram_size"="3", "bf_size"="256"), + + INDEX idx_bitmap_k104 (`k02`) USING BITMAP, + INDEX idx_bitmap_k110 (`kd01`) USING BITMAP + + ) + UNIQUE KEY(k00) + DISTRIBUTED BY HASH(k00) BUCKETS 1 + PROPERTIES ( + "enable_unique_key_merge_on_write" = "true", + "bloom_filter_columns"="k05", + "replication_num" = "1" + ); + """ + GetDebugPoint().enableDebugPointForAllBEs("CloudEngineCalcDeleteBitmapTask.execute.enable_wait") + def i = 0 + for (LoadAttributes attributes : attributesList) { + def label = "test_s3_load_" + UUID.randomUUID().toString().replace("-", "_") + "_" + i + attributes.label = label + def prop = attributes.getPropertiesStr() + + def sql_str = """ + LOAD LABEL $label ( + $attributes.dataDesc.mergeType + DATA INFILE("$attributes.dataDesc.path") + INTO TABLE $attributes.dataDesc.tableName + $attributes.dataDesc.columnTermClause + $attributes.dataDesc.lineTermClause + $attributes.dataDesc.formatClause + $attributes.dataDesc.columns + $attributes.dataDesc.columnsFromPathClause + $attributes.dataDesc.columnMappingClause + $attributes.dataDesc.precedingFilterClause + $attributes.dataDesc.orderByClause + $attributes.dataDesc.whereExpr + ) + WITH S3 ( + "AWS_ACCESS_KEY" = "$ak", + "AWS_SECRET_KEY" = "$sk", + "AWS_ENDPOINT" = "${s3Endpoint}", + "AWS_REGION" = "${s3Region}", + "use_path_style" = "$attributes.usePathStyle", + "provider" = "${getS3Provider()}" + ) + ${prop} + """ + logger.info("submit sql: ${sql_str}"); + sql """${sql_str}""" + logger.info("Submit load with lable: $label, table: $attributes.dataDesc.tableName, path: $attributes.dataDesc.path") + + def max_try_milli_secs = 600000 + while (max_try_milli_secs > 0) { + String[][] result = sql """ show load where label="$attributes.label" order by createtime desc limit 1; """ + if (result[0][2].equals("FINISHED")) { + if (attributes.isExceptFailed) { + assertTrue(false, "load should be failed but was success: $result") + } + logger.info("Load FINISHED " + attributes.label + ": $result") + break + } + if (result[0][2].equals("CANCELLED")) { + if (attributes.isExceptFailed) { + logger.info("Load FINISHED " + attributes.label) + break + } + assertTrue(false, "load failed: $result") + break + } + Thread.sleep(1000) + max_try_milli_secs -= 1000 + if (max_try_milli_secs <= 0) { + assertTrue(false, "load Timeout: $attributes.label") + } + } + qt_select """ select count(*) from $attributes.dataDesc.tableName """ + ++i + } + } finally { + GetDebugPoint().disableDebugPointForAllFEs("CloudEngineCalcDeleteBitmapTask.execute.enable_wait") + sql "DROP TABLE IF EXISTS ${table};" + GetDebugPoint().clearDebugPointsForAllBEs() + } + } + +} + +class DataDesc { + public String mergeType = "" + public String path + public String tableName + public String lineTermClause + public String columnTermClause + public String formatClause + public String columns + public String columnsFromPathClause + public String precedingFilterClause + public String columnMappingClause + public String whereExpr + public String orderByClause +} + +class LoadAttributes { + LoadAttributes(String path, String tableName, String lineTermClause, String columnTermClause, String formatClause, + String columns, String columnsFromPathClause, String precedingFilterClause, String columnMappingClause, String whereExpr, String orderByClause, boolean isExceptFailed = false) { + this.dataDesc = new DataDesc() + this.dataDesc.path = path + this.dataDesc.tableName = tableName + this.dataDesc.lineTermClause = lineTermClause + this.dataDesc.columnTermClause = columnTermClause + this.dataDesc.formatClause = formatClause + this.dataDesc.columns = columns + this.dataDesc.columnsFromPathClause = columnsFromPathClause + this.dataDesc.precedingFilterClause = precedingFilterClause + this.dataDesc.columnMappingClause = columnMappingClause + this.dataDesc.whereExpr = whereExpr + this.dataDesc.orderByClause = orderByClause + + this.isExceptFailed = isExceptFailed + + properties = new HashMap<>() + } + + LoadAttributes addProperties(String k, String v) { + properties.put(k, v) + return this + } + + String getPropertiesStr() { + if (properties.isEmpty()) { + return "" + } + String prop = "PROPERTIES (" + properties.forEach (k, v) -> { + prop += "\"${k}\" = \"${v}\"," + } + prop = prop.substring(0, prop.size() - 1) + prop += ")" + return prop + } + + LoadAttributes withPathStyle() { + usePathStyle = "true" + return this + } + + public DataDesc dataDesc + public Map properties + public String label + public String usePathStyle = "false" + public boolean isExceptFailed +} \ No newline at end of file diff --git a/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_insert_timeout.groovy b/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_insert_timeout.groovy index 23d92f31e5ad8e..7baf18c772290f 100644 --- a/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_insert_timeout.groovy +++ b/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_insert_timeout.groovy @@ -50,7 +50,7 @@ suite("test_cloud_mow_insert_timeout", "nonConcurrent") { def customFeConfig = [ delete_bitmap_lock_expiration_seconds : 5, calculate_delete_bitmap_task_timeout_seconds : 2, - mow_insert_into_commit_retry_times : 2 + mow_calculate_delete_bitmap_retry_times : 2 ] setFeConfigTemporary(customFeConfig) { diff --git a/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_insert_with_retry.groovy b/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_insert_with_retry.groovy new file mode 100644 index 00000000000000..f7038b80e426fc --- /dev/null +++ b/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_insert_with_retry.groovy @@ -0,0 +1,86 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_cloud_mow_insert_with_retry", "nonConcurrent") { + if (!isCloudMode()) { + return + } + + GetDebugPoint().clearDebugPointsForAllFEs() + GetDebugPoint().clearDebugPointsForAllBEs() + + def customFeConfig = [ + calculate_delete_bitmap_task_timeout_seconds: 2 + ] + def dbName = "regression_test_fault_injection_p0_cloud" + def table1 = dbName + ".test_cloud_mow_insert_with_retry" + setFeConfigTemporary(customFeConfig) { + for (item in ["legacy", "nereids"]) { + try { + GetDebugPoint().enableDebugPointForAllBEs("CloudEngineCalcDeleteBitmapTask.execute.enable_wait") + sql "DROP TABLE IF EXISTS ${table1} FORCE;" + sql """ CREATE TABLE IF NOT EXISTS ${table1} ( + `k1` int NOT NULL, + `c1` int, + `c2` int + )UNIQUE KEY(k1) + DISTRIBUTED BY HASH(k1) BUCKETS 1 + PROPERTIES ( + "enable_unique_key_merge_on_write" = "true", + "disable_auto_compaction" = "true", + "replication_num" = "1"); """ + connect(user = context.config.jdbcUser, password = context.config.jdbcPassword, url = context.config.jdbcUrl + "&useLocalSessionState=true") { + if (item == "nereids") { + sql """ set enable_nereids_planner=true; """ + sql """ set enable_fallback_to_original_planner=false; """ + } else { + sql """ set enable_nereids_planner = false; """ + } + def timeout = 2000 + def now = System.currentTimeMillis() + sql "insert into ${table1} values(1,1,1);" + def time_diff = System.currentTimeMillis() - now + logger.info("time_diff:" + time_diff) + assertTrue(time_diff > timeout, "insert or delete should take over " + timeout + " ms") + + now = System.currentTimeMillis() + sql "insert into ${table1} values(2,2,2);" + time_diff = System.currentTimeMillis() - now + logger.info("time_diff:" + time_diff) + assertTrue(time_diff > timeout, "insert or delete should take over " + timeout + " ms") + order_qt_sql "select * from ${table1};" + + now = System.currentTimeMillis() + sql "delete from ${table1} where k1=2;" + time_diff = System.currentTimeMillis() - now + logger.info("time_diff:" + time_diff) + assertTrue(time_diff > timeout, "insert or delete should take over " + timeout + " ms") + order_qt_sql "select * from ${table1};" + } + } catch (Exception e) { + logger.info(e.getMessage()) + throw e + } finally { + GetDebugPoint().disableDebugPointForAllFEs("CloudEngineCalcDeleteBitmapTask.execute.enable_wait") + sql "DROP TABLE IF EXISTS ${table1};" + GetDebugPoint().clearDebugPointsForAllBEs() + } + } + + } + +} \ No newline at end of file