diff --git a/be/src/olap/base_tablet.cpp b/be/src/olap/base_tablet.cpp index 22940b40206de4..141e302af8c420 100644 --- a/be/src/olap/base_tablet.cpp +++ b/be/src/olap/base_tablet.cpp @@ -1201,7 +1201,9 @@ Status BaseTablet::update_delete_bitmap(const BaseTabletSPtr& self, TabletTxnInf std::unique_ptr transient_rs_writer; DeleteBitmapPtr delete_bitmap = txn_info->delete_bitmap; - if (txn_info->partial_update_info && txn_info->partial_update_info->is_partial_update) { + bool is_partial_update = + txn_info->partial_update_info && txn_info->partial_update_info->is_partial_update; + if (is_partial_update) { transient_rs_writer = DORIS_TRY(self->create_transient_rowset_writer( *rowset, txn_info->partial_update_info, txn_expiration)); // Partial update might generate new segments when there is conflicts while publish, and mark @@ -1242,6 +1244,37 @@ Status BaseTablet::update_delete_bitmap(const BaseTabletSPtr& self, TabletTxnInf } auto t3 = watch.get_elapse_time_us(); + // If a rowset is produced by compaction before the commit phase of the partial update load + // and is not included in txn_info->rowset_ids, we can skip the alignment process of that rowset + // because data remains the same before and after compaction. But we still need to calculate the + // the delete bitmap for that rowset. + std::vector rowsets_skip_alignment; + if (is_partial_update) { + int64_t max_version_in_flush_phase = + txn_info->partial_update_info->max_version_in_flush_phase; + DCHECK(max_version_in_flush_phase != -1); + std::vector remained_rowsets; + for (const auto& rowset : specified_rowsets) { + if (rowset->end_version() <= max_version_in_flush_phase && + rowset->produced_by_compaction()) { + rowsets_skip_alignment.emplace_back(rowset); + } else { + remained_rowsets.emplace_back(rowset); + } + } + if (!rowsets_skip_alignment.empty()) { + specified_rowsets = std::move(remained_rowsets); + } + } + + if (!rowsets_skip_alignment.empty()) { + auto token = self->calc_delete_bitmap_executor()->create_token(); + // set rowset_writer to nullptr to skip the alignment process + RETURN_IF_ERROR(calc_delete_bitmap(self, rowset, segments, rowsets_skip_alignment, + delete_bitmap, cur_version - 1, token.get(), nullptr)); + RETURN_IF_ERROR(token->wait()); + } + // When there is only one segment, it will be calculated in the current thread. // Otherwise, it will be submitted to the thread pool for calculation. if (segments.size() <= 1) { @@ -1433,7 +1466,8 @@ Status BaseTablet::update_delete_bitmap_without_lock( return Status::InternalError( "debug tablet update delete bitmap without lock random failed"); } else { - LOG(INFO) << "BaseTablet.update_delete_bitmap_without_lock.random_failed not triggered" + LOG(INFO) << "BaseTablet.update_delete_bitmap_without_lock.random_failed not " + "triggered" << ", rnd:" << rnd << ", percent: " << percent; } }); diff --git a/be/src/olap/partial_update_info.h b/be/src/olap/partial_update_info.h index f20f9680b0b57a..4b62cb8f0ffb31 100644 --- a/be/src/olap/partial_update_info.h +++ b/be/src/olap/partial_update_info.h @@ -25,10 +25,10 @@ struct PartialUpdateInfo { void init(const TabletSchema& tablet_schema, bool partial_update, const std::set& partial_update_cols, bool is_strict_mode, int64_t timestamp_ms, const std::string& timezone, - const std::string& auto_increment_column) { + const std::string& auto_increment_column, int64_t cur_max_version = -1) { is_partial_update = partial_update; partial_update_input_columns = partial_update_cols; - + max_version_in_flush_phase = cur_max_version; this->timestamp_ms = timestamp_ms; this->timezone = timezone; missing_cids.clear(); @@ -91,6 +91,7 @@ struct PartialUpdateInfo { public: bool is_partial_update {false}; + int64_t max_version_in_flush_phase {-1}; std::set partial_update_input_columns; std::vector missing_cids; std::vector update_cids; diff --git a/be/src/olap/rowset/rowset.h b/be/src/olap/rowset/rowset.h index 310d0901b2a751..6050a33bfc2f5d 100644 --- a/be/src/olap/rowset/rowset.h +++ b/be/src/olap/rowset/rowset.h @@ -169,6 +169,7 @@ class Rowset : public std::enable_shared_from_this { bool is_segments_overlapping() const { return rowset_meta()->is_segments_overlapping(); } KeysType keys_type() { return _schema->keys_type(); } RowsetStatePB rowset_meta_state() const { return rowset_meta()->rowset_state(); } + bool produced_by_compaction() const { return rowset_meta()->produced_by_compaction(); } // remove all files in this rowset // TODO should we rename the method to remove_files() to be more specific? diff --git a/be/src/olap/rowset/rowset_meta.h b/be/src/olap/rowset/rowset_meta.h index aa20b5b1ef13ac..c5a573d760c305 100644 --- a/be/src/olap/rowset/rowset_meta.h +++ b/be/src/olap/rowset/rowset_meta.h @@ -255,6 +255,12 @@ class RowsetMeta { return num_segments() > 1 && is_singleton_delta() && segments_overlap() != NONOVERLAPPING; } + bool produced_by_compaction() const { + return has_version() && + (start_version() < end_version() || + (start_version() == end_version() && segments_overlap() == NONOVERLAPPING)); + } + // get the compaction score of this rowset. // if segments are overlapping, the score equals to the number of segments, // otherwise, score is 1. diff --git a/be/src/olap/rowset_builder.cpp b/be/src/olap/rowset_builder.cpp index 93058c05be332f..85006cc183a79d 100644 --- a/be/src/olap/rowset_builder.cpp +++ b/be/src/olap/rowset_builder.cpp @@ -36,6 +36,7 @@ #include "io/fs/file_writer.h" // IWYU pragma: keep #include "olap/calc_delete_bitmap_executor.h" #include "olap/olap_define.h" +#include "olap/partial_update_info.h" #include "olap/rowset/beta_rowset.h" #include "olap/rowset/beta_rowset_writer.h" #include "olap/rowset/pending_rowset_helper.h" @@ -123,7 +124,7 @@ void RowsetBuilder::_garbage_collection() { Status BaseRowsetBuilder::init_mow_context(std::shared_ptr& mow_context) { std::lock_guard lck(tablet()->get_header_lock()); - int64_t cur_max_version = tablet()->max_version_unlocked(); + _max_version_in_flush_phase = tablet()->max_version_unlocked(); std::vector rowset_ptrs; // tablet is under alter process. The delete bitmap will be calculated after conversion. if (tablet()->tablet_state() == TABLET_NOTREADY) { @@ -135,12 +136,13 @@ Status BaseRowsetBuilder::init_mow_context(std::shared_ptr& mow_cont } _rowset_ids.clear(); } else { - RETURN_IF_ERROR(tablet()->get_all_rs_id_unlocked(cur_max_version, &_rowset_ids)); + RETURN_IF_ERROR( + tablet()->get_all_rs_id_unlocked(_max_version_in_flush_phase, &_rowset_ids)); rowset_ptrs = tablet()->get_rowset_by_ids(&_rowset_ids); } _delete_bitmap = std::make_shared(tablet()->tablet_id()); - mow_context = std::make_shared(cur_max_version, _req.txn_id, _rowset_ids, - rowset_ptrs, _delete_bitmap); + mow_context = std::make_shared(_max_version_in_flush_phase, _req.txn_id, + _rowset_ids, rowset_ptrs, _delete_bitmap); return Status::OK(); } @@ -408,7 +410,8 @@ void BaseRowsetBuilder::_build_current_tablet_schema(int64_t index_id, table_schema_param->partial_update_input_columns(), table_schema_param->is_strict_mode(), table_schema_param->timestamp_ms(), table_schema_param->timezone(), - table_schema_param->auto_increment_coulumn()); + table_schema_param->auto_increment_coulumn(), + _max_version_in_flush_phase); } } // namespace doris diff --git a/be/src/olap/rowset_builder.h b/be/src/olap/rowset_builder.h index e54faee3435c79..7fd578037363a0 100644 --- a/be/src/olap/rowset_builder.h +++ b/be/src/olap/rowset_builder.h @@ -106,6 +106,7 @@ class BaseRowsetBuilder { std::unique_ptr _calc_delete_bitmap_token; // current rowset_ids, used to do diff in publish_version RowsetIdUnorderedSet _rowset_ids; + int64_t _max_version_in_flush_phase {-1}; std::shared_ptr _partial_update_info; diff --git a/be/src/olap/task/engine_publish_version_task.cpp b/be/src/olap/task/engine_publish_version_task.cpp index acdcebae165c6f..ae7b694b6dfab4 100644 --- a/be/src/olap/task/engine_publish_version_task.cpp +++ b/be/src/olap/task/engine_publish_version_task.cpp @@ -111,6 +111,20 @@ Status EnginePublishVersionTask::execute() { std::this_thread::sleep_for(std::chrono::milliseconds(wait)); } }); + DBUG_EXECUTE_IF("EnginePublishVersionTask::execute.enable_spin_wait", { + auto token = dp->param("token", "invalid_token"); + while (DebugPoints::instance()->is_enable("EnginePublishVersionTask::execute.block")) { + auto block_dp = DebugPoints::instance()->get_debug_point( + "EnginePublishVersionTask::execute.block"); + if (block_dp) { + auto pass_token = block_dp->param("pass_token", ""); + if (pass_token == token) { + break; + } + } + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + } + }); std::unique_ptr token = _engine.tablet_publish_txn_thread_pool()->new_token( ThreadPool::ExecutionMode::CONCURRENT); std::unordered_map tablet_id_to_num_delta_rows; diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java index 862c72fe742709..b153fd006c4b99 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java @@ -715,6 +715,26 @@ private void getDeleteBitmapUpdateLock(Map> tableToParttions, lo LOG.info("error ", e); } } + if (DebugPointUtil.isEnable("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.enable_spin_wait")) { + LOG.info("debug point: block at CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.enable_spin_wait"); + DebugPoint debugPoint = DebugPointUtil.getDebugPoint( + "CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.enable_spin_wait"); + String token = debugPoint.param("token", "invalid_token"); + while (DebugPointUtil.isEnable("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.block")) { + DebugPoint blockDebugPoint = DebugPointUtil.getDebugPoint( + "CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.block"); + String passToken = blockDebugPoint.param("pass_token", ""); + if (token.equals(passToken)) { + break; + } + try { + Thread.sleep(100); + } catch (InterruptedException e) { + LOG.info("error ", e); + } + } + LOG.info("debug point: leave CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.enable_spin_wait"); + } StopWatch stopWatch = new StopWatch(); stopWatch.start(); int totalRetryTime = 0; diff --git a/regression-test/data/fault_injection_p0/concurrency_update1.csv b/regression-test/data/fault_injection_p0/partial_update/concurrency_update1.csv similarity index 100% rename from regression-test/data/fault_injection_p0/concurrency_update1.csv rename to regression-test/data/fault_injection_p0/partial_update/concurrency_update1.csv diff --git a/regression-test/data/fault_injection_p0/concurrency_update2.csv b/regression-test/data/fault_injection_p0/partial_update/concurrency_update2.csv similarity index 100% rename from regression-test/data/fault_injection_p0/concurrency_update2.csv rename to regression-test/data/fault_injection_p0/partial_update/concurrency_update2.csv diff --git a/regression-test/data/fault_injection_p0/concurrency_update3.csv b/regression-test/data/fault_injection_p0/partial_update/concurrency_update3.csv similarity index 100% rename from regression-test/data/fault_injection_p0/concurrency_update3.csv rename to regression-test/data/fault_injection_p0/partial_update/concurrency_update3.csv diff --git a/regression-test/data/fault_injection_p0/test_partial_update_column_num_fault_injection.out b/regression-test/data/fault_injection_p0/partial_update/test_partial_update_column_num_fault_injection.out similarity index 100% rename from regression-test/data/fault_injection_p0/test_partial_update_column_num_fault_injection.out rename to regression-test/data/fault_injection_p0/partial_update/test_partial_update_column_num_fault_injection.out diff --git a/regression-test/data/fault_injection_p0/partial_update/test_partial_update_compaction_with_higher_version.out b/regression-test/data/fault_injection_p0/partial_update/test_partial_update_compaction_with_higher_version.out new file mode 100644 index 00000000000000..df12f4b08e5706 --- /dev/null +++ b/regression-test/data/fault_injection_p0/partial_update/test_partial_update_compaction_with_higher_version.out @@ -0,0 +1,11 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +1 1 1 1 1 +2 2 2 2 2 +3 3 3 3 3 + +-- !sql -- +1 999 999 666 666 +2 888 888 2 2 +3 777 777 555 555 + diff --git a/regression-test/data/fault_injection_p0/partial_update/test_partial_update_conflict_skip_compaction.out b/regression-test/data/fault_injection_p0/partial_update/test_partial_update_conflict_skip_compaction.out new file mode 100644 index 00000000000000..df12f4b08e5706 --- /dev/null +++ b/regression-test/data/fault_injection_p0/partial_update/test_partial_update_conflict_skip_compaction.out @@ -0,0 +1,11 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +1 1 1 1 1 +2 2 2 2 2 +3 3 3 3 3 + +-- !sql -- +1 999 999 666 666 +2 888 888 2 2 +3 777 777 555 555 + diff --git a/regression-test/data/fault_injection_p0/test_partial_update_publish_conflict_with_error.out b/regression-test/data/fault_injection_p0/partial_update/test_partial_update_publish_conflict_with_error.out similarity index 100% rename from regression-test/data/fault_injection_p0/test_partial_update_publish_conflict_with_error.out rename to regression-test/data/fault_injection_p0/partial_update/test_partial_update_publish_conflict_with_error.out diff --git a/regression-test/data/fault_injection_p0/partial_update/test_partial_update_skip_compaction.out b/regression-test/data/fault_injection_p0/partial_update/test_partial_update_skip_compaction.out new file mode 100644 index 00000000000000..6c7fe443a894fa --- /dev/null +++ b/regression-test/data/fault_injection_p0/partial_update/test_partial_update_skip_compaction.out @@ -0,0 +1,11 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +1 1 1 1 1 +2 2 2 2 2 +3 3 3 3 3 + +-- !sql -- +1 999 999 1 1 +2 888 888 2 2 +3 777 777 3 3 + diff --git a/regression-test/suites/fault_injection_p0/test_partial_update_column_num_fault_injection.groovy b/regression-test/suites/fault_injection_p0/partial_update/test_partial_update_column_num_fault_injection.groovy similarity index 100% rename from regression-test/suites/fault_injection_p0/test_partial_update_column_num_fault_injection.groovy rename to regression-test/suites/fault_injection_p0/partial_update/test_partial_update_column_num_fault_injection.groovy diff --git a/regression-test/suites/fault_injection_p0/partial_update/test_partial_update_compaction_with_higher_version.groovy b/regression-test/suites/fault_injection_p0/partial_update/test_partial_update_compaction_with_higher_version.groovy new file mode 100644 index 00000000000000..7af53662dd2d21 --- /dev/null +++ b/regression-test/suites/fault_injection_p0/partial_update/test_partial_update_compaction_with_higher_version.groovy @@ -0,0 +1,222 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.junit.Assert +import java.util.concurrent.TimeUnit +import org.awaitility.Awaitility + +suite("test_partial_update_compaction_with_higher_version", "nonConcurrent") { + + def table1 = "test_partial_update_compaction_with_higher_version" + sql "DROP TABLE IF EXISTS ${table1} FORCE;" + sql """ CREATE TABLE IF NOT EXISTS ${table1} ( + `k1` int NOT NULL, + `c1` int, + `c2` int, + `c3` int, + `c4` int + )UNIQUE KEY(k1) + DISTRIBUTED BY HASH(k1) BUCKETS 1 + PROPERTIES ( + "disable_auto_compaction" = "true", + "replication_num" = "1"); """ + + sql "insert into ${table1} values(1,1,1,1,1);" + sql "insert into ${table1} values(2,2,2,2,2);" + sql "insert into ${table1} values(3,3,3,3,3);" + sql "sync;" + order_qt_sql "select * from ${table1};" + + def beNodes = sql_return_maparray("show backends;") + def tabletStat = sql_return_maparray("show tablets from ${table1};").get(0) + def tabletBackendId = tabletStat.BackendId + def tabletId = tabletStat.TabletId + def tabletBackend; + for (def be : beNodes) { + if (be.BackendId == tabletBackendId) { + tabletBackend = be + break; + } + } + logger.info("tablet ${tabletId} on backend ${tabletBackend.Host} with backendId=${tabletBackend.BackendId}"); + + def check_rs_metas = { expected_rs_meta_size, check_func -> + if (isCloudMode()) { + return + } + + def metaUrl = sql_return_maparray("show tablets from ${table1};").get(0).MetaUrl + def (code, out, err) = curl("GET", metaUrl) + Assert.assertEquals(code, 0) + def jsonMeta = parseJson(out.trim()) + + Assert.assertEquals(jsonMeta.rs_metas.size(), expected_rs_meta_size) + for (def meta : jsonMeta.rs_metas) { + int startVersion = meta.start_version + int endVersion = meta.end_version + int numSegments = meta.num_segments + int numRows = meta.num_rows + String overlapPb = meta.segments_overlap_pb + logger.info("[${startVersion}-${endVersion}] ${overlapPb} ${meta.num_segments} ${numRows} ${meta.rowset_id_v2}") + check_func(startVersion, endVersion, numSegments, numRows, overlapPb) + } + } + + check_rs_metas(4, {int startVersion, int endVersion, int numSegments, int numRows, String overlapPb -> + if (startVersion == 0) { + // [0-1] + Assert.assertEquals(endVersion, 1) + Assert.assertEquals(numSegments, 0) + } else { + // [2-2], [3-3], [4-4] + Assert.assertEquals(startVersion, endVersion) + Assert.assertEquals(numSegments, 1) + } + }) + + def enable_publish_spin_wait = { tokenName -> + if (isCloudMode()) { + GetDebugPoint().enableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.enable_spin_wait", [token: "${tokenName}"]) + } else { + GetDebugPoint().enableDebugPointForAllBEs("EnginePublishVersionTask::execute.enable_spin_wait", [token: "${tokenName}"]) + } + } + + def disable_publish_spin_wait = { + if (isCloudMode()) { + GetDebugPoint().disableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.enable_spin_wait") + } else { + GetDebugPoint().disableDebugPointForAllBEs("EnginePublishVersionTask::execute.enable_spin_wait") + } + } + + def enable_block_in_publish = { passToken -> + if (isCloudMode()) { + GetDebugPoint().enableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.block", [pass_token: "${passToken}"]) + } else { + GetDebugPoint().enableDebugPointForAllBEs("EnginePublishVersionTask::execute.block", [pass_token: "${passToken}"]) + } + } + + def disable_block_in_publish = { + if (isCloudMode()) { + GetDebugPoint().disableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.block") + } else { + GetDebugPoint().disableDebugPointForAllBEs("EnginePublishVersionTask::execute.block") + } + } + + try { + GetDebugPoint().clearDebugPointsForAllFEs() + GetDebugPoint().clearDebugPointsForAllBEs() + + // block the partial update in publish phase + enable_publish_spin_wait("token1") + enable_block_in_publish("-1") + + // the first partial update load + def t1 = Thread.start { + sql "set enable_unique_key_partial_update=true;" + sql "sync;" + sql "insert into ${table1}(k1,c1,c2) values(1,999,999),(2,888,888),(3,777,777);" + } + + Thread.sleep(600) + + // the second partial update load that conflicts with the first one + enable_publish_spin_wait("token2") + def t2 = Thread.start { + sql "set enable_unique_key_partial_update=true;" + sql "sync;" + sql "insert into ${table1}(k1,c3,c4) values(1,666,666),(3,555,555);" + } + + Thread.sleep(400) + + // let the first partial update load finish + enable_block_in_publish("token1") + t1.join() + Thread.sleep(200) + check_rs_metas(5, {int startVersion, int endVersion, int numSegments, int numRows, String overlapPb -> + if (startVersion == 0) { + // [0-1] + Assert.assertEquals(endVersion, 1) + Assert.assertEquals(numSegments, 0) + } else { + // [2-2], [3-3], [4-4], [5-5] + Assert.assertEquals(startVersion, endVersion) + Assert.assertEquals(numSegments, 1) + } + }) + + // trigger full compaction on tablet + logger.info("trigger compaction on another BE ${tabletBackend.Host} with backendId=${tabletBackend.BackendId}") + def (code, out, err) = be_run_full_compaction(tabletBackend.Host, tabletBackend.HttpPort, tabletId) + logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err) + Assert.assertEquals(code, 0) + def compactJson = parseJson(out.trim()) + Assert.assertEquals("success", compactJson.status.toLowerCase()) + + // wait for full compaction to complete + Awaitility.await().atMost(3, TimeUnit.SECONDS).pollDelay(200, TimeUnit.MILLISECONDS).pollInterval(100, TimeUnit.MILLISECONDS).until( + { + (code, out, err) = be_get_compaction_status(tabletBackend.Host, tabletBackend.HttpPort, tabletId) + logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err) + Assert.assertEquals(code, 0) + def compactionStatus = parseJson(out.trim()) + Assert.assertEquals("success", compactionStatus.status.toLowerCase()) + return !compactionStatus.run_status + } + ) + + check_rs_metas(1, {int startVersion, int endVersion, int numSegments, int numRows, String overlapPb -> + // check the rowset produced by full compaction + // [0-5] + Assert.assertEquals(startVersion, 0) + Assert.assertEquals(endVersion, 5) + Assert.assertEquals(numRows, 3) + Assert.assertEquals(overlapPb, "NONOVERLAPPING") + }) + + // let the second partial update load publish + disable_block_in_publish() + t1.join() + Thread.sleep(300) + + order_qt_sql "select * from ${table1};" + + check_rs_metas(2, {int startVersion, int endVersion, int numSegments, int numRows, String overlapPb -> + if (startVersion == 6) { + // [6-6] + Assert.assertEquals(endVersion, 6) + // checks that partial update didn't skip the alignment process of rowsets produced by compaction and + // generate new segment in publish phase + Assert.assertEquals(numSegments, 2) + Assert.assertEquals(numRows, 4) // 4 = 2 + 2 + } + }) + + } catch(Exception e) { + logger.info(e.getMessage()) + throw e + } finally { + GetDebugPoint().clearDebugPointsForAllFEs() + GetDebugPoint().clearDebugPointsForAllBEs() + } + + // sql "DROP TABLE IF EXISTS ${table1};" +} diff --git a/regression-test/suites/fault_injection_p0/partial_update/test_partial_update_conflict_skip_compaction.groovy b/regression-test/suites/fault_injection_p0/partial_update/test_partial_update_conflict_skip_compaction.groovy new file mode 100644 index 00000000000000..08eba337af3327 --- /dev/null +++ b/regression-test/suites/fault_injection_p0/partial_update/test_partial_update_conflict_skip_compaction.groovy @@ -0,0 +1,212 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.junit.Assert +import java.util.concurrent.TimeUnit +import org.awaitility.Awaitility + +suite("test_partial_update_conflict_skip_compaction", "nonConcurrent") { + + def table1 = "test_partial_update_conflict_skip_compaction" + sql "DROP TABLE IF EXISTS ${table1} FORCE;" + sql """ CREATE TABLE IF NOT EXISTS ${table1} ( + `k1` int NOT NULL, + `c1` int, + `c2` int, + `c3` int, + `c4` int + )UNIQUE KEY(k1) + DISTRIBUTED BY HASH(k1) BUCKETS 1 + PROPERTIES ( + "disable_auto_compaction" = "true", + "replication_num" = "1"); """ + + sql "insert into ${table1} values(1,1,1,1,1);" + sql "insert into ${table1} values(2,2,2,2,2);" + sql "insert into ${table1} values(3,3,3,3,3);" + sql "sync;" + order_qt_sql "select * from ${table1};" + + def beNodes = sql_return_maparray("show backends;") + def tabletStat = sql_return_maparray("show tablets from ${table1};").get(0) + def tabletBackendId = tabletStat.BackendId + def tabletId = tabletStat.TabletId + def tabletBackend; + for (def be : beNodes) { + if (be.BackendId == tabletBackendId) { + tabletBackend = be + break; + } + } + logger.info("tablet ${tabletId} on backend ${tabletBackend.Host} with backendId=${tabletBackend.BackendId}"); + + def check_rs_metas = { expected_rs_meta_size, check_func -> + if (isCloudMode()) { + return + } + + def metaUrl = sql_return_maparray("show tablets from ${table1};").get(0).MetaUrl + def (code, out, err) = curl("GET", metaUrl) + Assert.assertEquals(code, 0) + def jsonMeta = parseJson(out.trim()) + + Assert.assertEquals(jsonMeta.rs_metas.size(), expected_rs_meta_size) + for (def meta : jsonMeta.rs_metas) { + int startVersion = meta.start_version + int endVersion = meta.end_version + int numSegments = meta.num_segments + int numRows = meta.num_rows + String overlapPb = meta.segments_overlap_pb + logger.info("[${startVersion}-${endVersion}] ${overlapPb} ${meta.num_segments} ${numRows} ${meta.rowset_id_v2}") + check_func(startVersion, endVersion, numSegments, numRows, overlapPb) + } + } + + check_rs_metas(4, {int startVersion, int endVersion, int numSegments, int numRows, String overlapPb -> + if (startVersion == 0) { + // [0-1] + Assert.assertEquals(endVersion, 1) + Assert.assertEquals(numSegments, 0) + } else { + // [2-2], [3-3], [4-4] + Assert.assertEquals(startVersion, endVersion) + Assert.assertEquals(numSegments, 1) + } + }) + + def enable_publish_spin_wait = { + if (isCloudMode()) { + GetDebugPoint().enableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.enable_spin_wait") + } else { + GetDebugPoint().enableDebugPointForAllBEs("EnginePublishVersionTask::execute.enable_spin_wait") + } + } + + def disable_publish_spin_wait = { + if (isCloudMode()) { + GetDebugPoint().disableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.enable_spin_wait") + } else { + GetDebugPoint().disableDebugPointForAllBEs("EnginePublishVersionTask::execute.enable_spin_wait") + } + } + + def enable_block_in_publish = { + if (isCloudMode()) { + GetDebugPoint().enableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.block") + } else { + GetDebugPoint().enableDebugPointForAllBEs("EnginePublishVersionTask::execute.block") + } + } + + def disable_block_in_publish = { + if (isCloudMode()) { + GetDebugPoint().disableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.block") + } else { + GetDebugPoint().disableDebugPointForAllBEs("EnginePublishVersionTask::execute.block") + } + } + + try { + GetDebugPoint().clearDebugPointsForAllFEs() + GetDebugPoint().clearDebugPointsForAllBEs() + + // block the partial update before publish phase + enable_publish_spin_wait() + enable_block_in_publish() + + // the first partial update load + def t1 = Thread.start { + sql "set enable_unique_key_partial_update=true;" + sql "sync;" + sql "insert into ${table1}(k1,c1,c2) values(1,999,999),(2,888,888),(3,777,777);" + } + + Thread.sleep(300) + + // the second partial update load that has conflict with the first one + def t2 = Thread.start { + sql "set enable_unique_key_partial_update=true;" + sql "sync;" + sql "insert into ${table1}(k1,c3,c4) values(1,666,666),(3,555,555);" + } + + Thread.sleep(300) + + // trigger full compaction on tablet + logger.info("trigger compaction on another BE ${tabletBackend.Host} with backendId=${tabletBackend.BackendId}") + def (code, out, err) = be_run_full_compaction(tabletBackend.Host, tabletBackend.HttpPort, tabletId) + logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err) + Assert.assertEquals(code, 0) + def compactJson = parseJson(out.trim()) + Assert.assertEquals("success", compactJson.status.toLowerCase()) + + // wait for full compaction to complete + Awaitility.await().atMost(3, TimeUnit.SECONDS).pollDelay(200, TimeUnit.MILLISECONDS).pollInterval(100, TimeUnit.MILLISECONDS).until( + { + (code, out, err) = be_get_compaction_status(tabletBackend.Host, tabletBackend.HttpPort, tabletId) + logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err) + Assert.assertEquals(code, 0) + def compactionStatus = parseJson(out.trim()) + Assert.assertEquals("success", compactionStatus.status.toLowerCase()) + return !compactionStatus.run_status + } + ) + + check_rs_metas(1, {int startVersion, int endVersion, int numSegments, int numRows, String overlapPb -> + // check the rowset produced by full compaction + // [0-4] + Assert.assertEquals(startVersion, 0) + Assert.assertEquals(endVersion, 4) + Assert.assertEquals(numRows, 3) + Assert.assertEquals(overlapPb, "NONOVERLAPPING") + }) + + disable_block_in_publish() + + t1.join() + t2.join() + + order_qt_sql "select * from ${table1};" + + check_rs_metas(3, {int startVersion, int endVersion, int numSegments, int numRows, String overlapPb -> + if (startVersion == 5) { + // the first partial update load + // it should skip the alignment process of rowsets produced by full compaction and + // should not generate new segment in publish phase + Assert.assertEquals(endVersion, 5) + Assert.assertEquals(numSegments, 1) + Assert.assertEquals(numRows, 3) + } else if (startVersion == 6) { + // the second partial update load + // it should skip the alignment process of rowsets produced by full compaction and + // should generate new segment in publish phase for conflicting rows with the first partial update load + Assert.assertEquals(endVersion, 6) + Assert.assertEquals(numSegments, 2) + Assert.assertEquals(numRows, 4) // 4 = 2 + 2 + } + }) + + } catch(Exception e) { + logger.info(e.getMessage()) + throw e + } finally { + GetDebugPoint().clearDebugPointsForAllFEs() + GetDebugPoint().clearDebugPointsForAllBEs() + } + + sql "DROP TABLE IF EXISTS ${table1};" +} diff --git a/regression-test/suites/fault_injection_p0/test_partial_update_publish_conflict_with_error.groovy b/regression-test/suites/fault_injection_p0/partial_update/test_partial_update_publish_conflict_with_error.groovy similarity index 99% rename from regression-test/suites/fault_injection_p0/test_partial_update_publish_conflict_with_error.groovy rename to regression-test/suites/fault_injection_p0/partial_update/test_partial_update_publish_conflict_with_error.groovy index 9e61dd4eb0de9b..a3e18194318747 100644 --- a/regression-test/suites/fault_injection_p0/test_partial_update_publish_conflict_with_error.groovy +++ b/regression-test/suites/fault_injection_p0/partial_update/test_partial_update_publish_conflict_with_error.groovy @@ -35,6 +35,10 @@ import org.apache.http.client.methods.CloseableHttpResponse import org.apache.http.util.EntityUtils suite("test_partial_update_publish_conflict_with_error", "nonConcurrent") { + if (isCloudMode()) { + return + } + def dbName = context.config.getDbNameByFile(context.file) def tableName = "test_partial_update_publish_conflict_with_error" diff --git a/regression-test/suites/fault_injection_p0/partial_update/test_partial_update_skip_compaction.groovy b/regression-test/suites/fault_injection_p0/partial_update/test_partial_update_skip_compaction.groovy new file mode 100644 index 00000000000000..d816c30f7e9bd8 --- /dev/null +++ b/regression-test/suites/fault_injection_p0/partial_update/test_partial_update_skip_compaction.groovy @@ -0,0 +1,193 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.junit.Assert +import java.util.concurrent.TimeUnit +import org.awaitility.Awaitility + +suite("test_partial_update_skip_compaction", "nonConcurrent") { + + def table1 = "test_partial_update_skip_compaction" + sql "DROP TABLE IF EXISTS ${table1} FORCE;" + sql """ CREATE TABLE IF NOT EXISTS ${table1} ( + `k1` int NOT NULL, + `c1` int, + `c2` int, + `c3` int, + `c4` int + )UNIQUE KEY(k1) + DISTRIBUTED BY HASH(k1) BUCKETS 1 + PROPERTIES ( + "disable_auto_compaction" = "true", + "replication_num" = "1"); """ + + sql "insert into ${table1} values(1,1,1,1,1);" + sql "insert into ${table1} values(2,2,2,2,2);" + sql "insert into ${table1} values(3,3,3,3,3);" + sql "sync;" + order_qt_sql "select * from ${table1};" + + def beNodes = sql_return_maparray("show backends;") + def tabletStat = sql_return_maparray("show tablets from ${table1};").get(0) + def tabletBackendId = tabletStat.BackendId + def tabletId = tabletStat.TabletId + def tabletBackend; + for (def be : beNodes) { + if (be.BackendId == tabletBackendId) { + tabletBackend = be + break; + } + } + logger.info("tablet ${tabletId} on backend ${tabletBackend.Host} with backendId=${tabletBackend.BackendId}"); + + def check_rs_metas = { expected_rs_meta_size, check_func -> + if (isCloudMode()) { + return + } + + def metaUrl = sql_return_maparray("show tablets from ${table1};").get(0).MetaUrl + def (code, out, err) = curl("GET", metaUrl) + Assert.assertEquals(code, 0) + def jsonMeta = parseJson(out.trim()) + + Assert.assertEquals(jsonMeta.rs_metas.size(), expected_rs_meta_size) + for (def meta : jsonMeta.rs_metas) { + int startVersion = meta.start_version + int endVersion = meta.end_version + int numSegments = meta.num_segments + int numRows = meta.num_rows + String overlapPb = meta.segments_overlap_pb + logger.info("[${startVersion}-${endVersion}] ${overlapPb} ${meta.num_segments} ${numRows} ${meta.rowset_id_v2}") + check_func(startVersion, endVersion, numSegments, numRows, overlapPb) + } + } + + check_rs_metas(4, {int startVersion, int endVersion, int numSegments, int numRows, String overlapPb -> + if (startVersion == 0) { + // [0-1] + Assert.assertEquals(endVersion, 1) + Assert.assertEquals(numSegments, 0) + } else { + // [2-2], [3-3], [4-4] + Assert.assertEquals(startVersion, endVersion) + Assert.assertEquals(numSegments, 1) + } + }) + + def enable_publish_spin_wait = { + if (isCloudMode()) { + GetDebugPoint().enableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.enable_spin_wait") + } else { + GetDebugPoint().enableDebugPointForAllBEs("EnginePublishVersionTask::execute.enable_spin_wait") + } + } + + def disable_publish_spin_wait = { + if (isCloudMode()) { + GetDebugPoint().disableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.enable_spin_wait") + } else { + GetDebugPoint().disableDebugPointForAllBEs("EnginePublishVersionTask::execute.enable_spin_wait") + } + } + + def enable_block_in_publish = { + if (isCloudMode()) { + GetDebugPoint().enableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.block") + } else { + GetDebugPoint().enableDebugPointForAllBEs("EnginePublishVersionTask::execute.block") + } + } + + def disable_block_in_publish = { + if (isCloudMode()) { + GetDebugPoint().disableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.block") + } else { + GetDebugPoint().disableDebugPointForAllBEs("EnginePublishVersionTask::execute.block") + } + } + + try { + GetDebugPoint().clearDebugPointsForAllFEs() + GetDebugPoint().clearDebugPointsForAllBEs() + + // block the partial update in publish phase + enable_publish_spin_wait() + enable_block_in_publish() + def t1 = Thread.start { + sql "set enable_unique_key_partial_update=true;" + sql "sync;" + sql "insert into ${table1}(k1,c1,c2) values(1,999,999),(2,888,888),(3,777,777);" + } + + Thread.sleep(500) + + // trigger full compaction on tablet + logger.info("trigger compaction on another BE ${tabletBackend.Host} with backendId=${tabletBackend.BackendId}") + def (code, out, err) = be_run_full_compaction(tabletBackend.Host, tabletBackend.HttpPort, tabletId) + logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err) + Assert.assertEquals(code, 0) + def compactJson = parseJson(out.trim()) + Assert.assertEquals("success", compactJson.status.toLowerCase()) + + // wait for full compaction to complete + Awaitility.await().atMost(3, TimeUnit.SECONDS).pollDelay(200, TimeUnit.MILLISECONDS).pollInterval(100, TimeUnit.MILLISECONDS).until( + { + (code, out, err) = be_get_compaction_status(tabletBackend.Host, tabletBackend.HttpPort, tabletId) + logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err) + Assert.assertEquals(code, 0) + def compactionStatus = parseJson(out.trim()) + Assert.assertEquals("success", compactionStatus.status.toLowerCase()) + return !compactionStatus.run_status + } + ) + + check_rs_metas(1, {int startVersion, int endVersion, int numSegments, int numRows, String overlapPb -> + // check the rowset produced by full compaction + // [0-4] + Assert.assertEquals(startVersion, 0) + Assert.assertEquals(endVersion, 4) + Assert.assertEquals(numRows, 3) + Assert.assertEquals(overlapPb, "NONOVERLAPPING") + }) + + // let the partial update load publish + disable_block_in_publish() + t1.join() + + order_qt_sql "select * from ${table1};" + + check_rs_metas(2, {int startVersion, int endVersion, int numSegments, int numRows, String overlapPb -> + if (startVersion == 5) { + // [5-5] + Assert.assertEquals(endVersion, 5) + // checks that partial update skips the alignment process of rowsets produced by compaction and + // doesn't generate new segment in publish phase + Assert.assertEquals(numSegments, 1) + Assert.assertEquals(numRows, 3) + } + }) + + } catch(Exception e) { + logger.info(e.getMessage()) + throw e + } finally { + GetDebugPoint().clearDebugPointsForAllFEs() + GetDebugPoint().clearDebugPointsForAllBEs() + } + + sql "DROP TABLE IF EXISTS ${table1};" +}