From 64b69ed1bafee30fd5f03e40a31f151cea8c4f66 Mon Sep 17 00:00:00 2001 From: bobhan1 Date: Fri, 2 Aug 2024 20:05:31 +0800 Subject: [PATCH] [branch-2.1] Picks "[opt](merge-on-write) Skip the alignment process of some rowsets in partial update #38487" (#38682) ## Proposed changes picks https://github.com/apache/doris/pull/38487 --- be/src/olap/partial_update_info.h | 5 +- be/src/olap/rowset/rowset.h | 1 + be/src/olap/rowset/rowset_meta.h | 6 + be/src/olap/rowset_builder.cpp | 11 +- be/src/olap/rowset_builder.h | 1 + be/src/olap/tablet.cpp | 35 ++- .../olap/task/engine_publish_version_task.cpp | 14 ++ .../concurrency_update1.csv | 0 .../concurrency_update2.csv | 0 .../concurrency_update3.csv | 0 ...tial_update_column_num_fault_injection.out | 0 ..._update_compaction_with_higher_version.out | 11 + ...artial_update_conflict_skip_compaction.out | 11 + ...ial_update_publish_conflict_with_error.out | 0 .../test_partial_update_skip_compaction.out | 11 + ...l_update_column_num_fault_injection.groovy | 0 ...date_compaction_with_higher_version.groovy | 222 ++++++++++++++++++ ...ial_update_conflict_skip_compaction.groovy | 212 +++++++++++++++++ ..._update_publish_conflict_with_error.groovy | 0 ...test_partial_update_skip_compaction.groovy | 193 +++++++++++++++ 20 files changed, 725 insertions(+), 8 deletions(-) rename regression-test/data/fault_injection_p0/{ => partial_update}/concurrency_update1.csv (100%) rename regression-test/data/fault_injection_p0/{ => partial_update}/concurrency_update2.csv (100%) rename regression-test/data/fault_injection_p0/{ => partial_update}/concurrency_update3.csv (100%) rename regression-test/data/fault_injection_p0/{ => partial_update}/test_partial_update_column_num_fault_injection.out (100%) create mode 100644 regression-test/data/fault_injection_p0/partial_update/test_partial_update_compaction_with_higher_version.out create mode 100644 regression-test/data/fault_injection_p0/partial_update/test_partial_update_conflict_skip_compaction.out rename regression-test/data/fault_injection_p0/{ => partial_update}/test_partial_update_publish_conflict_with_error.out (100%) create mode 100644 regression-test/data/fault_injection_p0/partial_update/test_partial_update_skip_compaction.out rename regression-test/suites/fault_injection_p0/{ => partial_update}/test_partial_update_column_num_fault_injection.groovy (100%) create mode 100644 regression-test/suites/fault_injection_p0/partial_update/test_partial_update_compaction_with_higher_version.groovy create mode 100644 regression-test/suites/fault_injection_p0/partial_update/test_partial_update_conflict_skip_compaction.groovy rename regression-test/suites/fault_injection_p0/{ => partial_update}/test_partial_update_publish_conflict_with_error.groovy (100%) create mode 100644 regression-test/suites/fault_injection_p0/partial_update/test_partial_update_skip_compaction.groovy diff --git a/be/src/olap/partial_update_info.h b/be/src/olap/partial_update_info.h index f20f9680b0b57a..4b62cb8f0ffb31 100644 --- a/be/src/olap/partial_update_info.h +++ b/be/src/olap/partial_update_info.h @@ -25,10 +25,10 @@ struct PartialUpdateInfo { void init(const TabletSchema& tablet_schema, bool partial_update, const std::set& partial_update_cols, bool is_strict_mode, int64_t timestamp_ms, const std::string& timezone, - const std::string& auto_increment_column) { + const std::string& auto_increment_column, int64_t cur_max_version = -1) { is_partial_update = partial_update; partial_update_input_columns = partial_update_cols; - + max_version_in_flush_phase = cur_max_version; this->timestamp_ms = timestamp_ms; this->timezone = timezone; missing_cids.clear(); @@ -91,6 +91,7 @@ struct PartialUpdateInfo { public: bool is_partial_update {false}; + int64_t max_version_in_flush_phase {-1}; std::set partial_update_input_columns; std::vector missing_cids; std::vector update_cids; diff --git a/be/src/olap/rowset/rowset.h b/be/src/olap/rowset/rowset.h index 72c6c2fa29bec8..7677015f2e0d5c 100644 --- a/be/src/olap/rowset/rowset.h +++ b/be/src/olap/rowset/rowset.h @@ -166,6 +166,7 @@ class Rowset : public std::enable_shared_from_this { bool is_segments_overlapping() const { return rowset_meta()->is_segments_overlapping(); } KeysType keys_type() { return _schema->keys_type(); } RowsetStatePB rowset_meta_state() const { return rowset_meta()->rowset_state(); } + bool produced_by_compaction() const { return rowset_meta()->produced_by_compaction(); } // remove all files in this rowset // TODO should we rename the method to remove_files() to be more specific? diff --git a/be/src/olap/rowset/rowset_meta.h b/be/src/olap/rowset/rowset_meta.h index 30457d30bc65da..5284deb461b5a3 100644 --- a/be/src/olap/rowset/rowset_meta.h +++ b/be/src/olap/rowset/rowset_meta.h @@ -242,6 +242,12 @@ class RowsetMeta { return num_segments() > 1 && is_singleton_delta() && segments_overlap() != NONOVERLAPPING; } + bool produced_by_compaction() const { + return has_version() && + (start_version() < end_version() || + (start_version() == end_version() && segments_overlap() == NONOVERLAPPING)); + } + // get the compaction score of this rowset. // if segments are overlapping, the score equals to the number of segments, // otherwise, score is 1. diff --git a/be/src/olap/rowset_builder.cpp b/be/src/olap/rowset_builder.cpp index a1edc61e4784a1..32bbdb246a37af 100644 --- a/be/src/olap/rowset_builder.cpp +++ b/be/src/olap/rowset_builder.cpp @@ -118,7 +118,7 @@ void RowsetBuilder::_garbage_collection() { Status RowsetBuilder::init_mow_context(std::shared_ptr& mow_context) { std::lock_guard lck(tablet()->get_header_lock()); - int64_t cur_max_version = tablet()->max_version_unlocked().second; + _max_version_in_flush_phase = tablet()->max_version_unlocked().second; std::vector rowset_ptrs; // tablet is under alter process. The delete bitmap will be calculated after conversion. if (tablet()->tablet_state() == TABLET_NOTREADY) { @@ -130,12 +130,12 @@ Status RowsetBuilder::init_mow_context(std::shared_ptr& mow_context) } _rowset_ids.clear(); } else { - RETURN_IF_ERROR(tablet()->all_rs_id(cur_max_version, &_rowset_ids)); + RETURN_IF_ERROR(tablet()->all_rs_id(_max_version_in_flush_phase, &_rowset_ids)); rowset_ptrs = tablet()->get_rowset_by_ids(&_rowset_ids); } _delete_bitmap = std::make_shared(tablet()->tablet_id()); - mow_context = std::make_shared(cur_max_version, _req.txn_id, _rowset_ids, - rowset_ptrs, _delete_bitmap); + mow_context = std::make_shared(_max_version_in_flush_phase, _req.txn_id, + _rowset_ids, rowset_ptrs, _delete_bitmap); return Status::OK(); } @@ -402,7 +402,8 @@ void BaseRowsetBuilder::_build_current_tablet_schema(int64_t index_id, table_schema_param->partial_update_input_columns(), table_schema_param->is_strict_mode(), table_schema_param->timestamp_ms(), table_schema_param->timezone(), - table_schema_param->auto_increment_coulumn()); + table_schema_param->auto_increment_coulumn(), + _max_version_in_flush_phase); } } // namespace doris diff --git a/be/src/olap/rowset_builder.h b/be/src/olap/rowset_builder.h index 8f254074c3716d..362e976da71976 100644 --- a/be/src/olap/rowset_builder.h +++ b/be/src/olap/rowset_builder.h @@ -107,6 +107,7 @@ class BaseRowsetBuilder { std::unique_ptr _calc_delete_bitmap_token; // current rowset_ids, used to do diff in publish_version RowsetIdUnorderedSet _rowset_ids; + int64_t _max_version_in_flush_phase {-1}; std::shared_ptr _partial_update_info; diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index 7ff4b508c97d98..11cb7055c7f732 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -3506,7 +3506,9 @@ Status Tablet::update_delete_bitmap(TabletTxnInfo* txn_info, int64_t txn_id) { // When the new segment flush fails or the rowset build fails, the deletion marker for the // duplicate key of the original segment should not remain in `txn_info->delete_bitmap`, // so we need to make a copy of `txn_info->delete_bitmap` and make changes on it. - if (txn_info->partial_update_info && txn_info->partial_update_info->is_partial_update) { + bool is_partial_update = + txn_info->partial_update_info && txn_info->partial_update_info->is_partial_update; + if (is_partial_update) { delete_bitmap = std::make_shared(*(txn_info->delete_bitmap)); } @@ -3540,6 +3542,37 @@ Status Tablet::update_delete_bitmap(TabletTxnInfo* txn_info, int64_t txn_id) { } auto t3 = watch.get_elapse_time_us(); + // If a rowset is produced by compaction before the commit phase of the partial update load + // and is not included in txn_info->rowset_ids, we can skip the alignment process of that rowset + // because data remains the same before and after compaction. But we still need to calculate the + // the delete bitmap for that rowset. + std::vector rowsets_skip_alignment; + if (is_partial_update) { + int64_t max_version_in_flush_phase = + txn_info->partial_update_info->max_version_in_flush_phase; + DCHECK(max_version_in_flush_phase != -1); + std::vector remained_rowsets; + for (const auto& rowset : specified_rowsets) { + if (rowset->end_version() <= max_version_in_flush_phase && + rowset->produced_by_compaction()) { + rowsets_skip_alignment.emplace_back(rowset); + } else { + remained_rowsets.emplace_back(rowset); + } + } + if (!rowsets_skip_alignment.empty()) { + specified_rowsets = std::move(remained_rowsets); + } + } + + if (!rowsets_skip_alignment.empty()) { + auto token = _engine.calc_delete_bitmap_executor()->create_token(); + // set rowset_writer to nullptr to skip the alignment process + RETURN_IF_ERROR(calc_delete_bitmap(rowset, segments, rowsets_skip_alignment, delete_bitmap, + cur_version - 1, token.get(), nullptr)); + RETURN_IF_ERROR(token->wait()); + } + auto token = _engine.calc_delete_bitmap_executor()->create_token(); RETURN_IF_ERROR(calc_delete_bitmap(rowset, segments, specified_rowsets, delete_bitmap, cur_version - 1, token.get(), rowset_writer.get())); diff --git a/be/src/olap/task/engine_publish_version_task.cpp b/be/src/olap/task/engine_publish_version_task.cpp index 96cad7f934d1b6..6108e81bae3b59 100644 --- a/be/src/olap/task/engine_publish_version_task.cpp +++ b/be/src/olap/task/engine_publish_version_task.cpp @@ -110,6 +110,20 @@ Status EnginePublishVersionTask::execute() { std::this_thread::sleep_for(std::chrono::milliseconds(wait)); } }); + DBUG_EXECUTE_IF("EnginePublishVersionTask::execute.enable_spin_wait", { + auto token = dp->param("token", "invalid_token"); + while (DebugPoints::instance()->is_enable("EnginePublishVersionTask::execute.block")) { + auto block_dp = DebugPoints::instance()->get_debug_point( + "EnginePublishVersionTask::execute.block"); + if (block_dp) { + auto pass_token = block_dp->param("pass_token", ""); + if (pass_token == token) { + break; + } + } + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + } + }); std::unique_ptr token = StorageEngine::instance()->tablet_publish_txn_thread_pool()->new_token( ThreadPool::ExecutionMode::CONCURRENT); diff --git a/regression-test/data/fault_injection_p0/concurrency_update1.csv b/regression-test/data/fault_injection_p0/partial_update/concurrency_update1.csv similarity index 100% rename from regression-test/data/fault_injection_p0/concurrency_update1.csv rename to regression-test/data/fault_injection_p0/partial_update/concurrency_update1.csv diff --git a/regression-test/data/fault_injection_p0/concurrency_update2.csv b/regression-test/data/fault_injection_p0/partial_update/concurrency_update2.csv similarity index 100% rename from regression-test/data/fault_injection_p0/concurrency_update2.csv rename to regression-test/data/fault_injection_p0/partial_update/concurrency_update2.csv diff --git a/regression-test/data/fault_injection_p0/concurrency_update3.csv b/regression-test/data/fault_injection_p0/partial_update/concurrency_update3.csv similarity index 100% rename from regression-test/data/fault_injection_p0/concurrency_update3.csv rename to regression-test/data/fault_injection_p0/partial_update/concurrency_update3.csv diff --git a/regression-test/data/fault_injection_p0/test_partial_update_column_num_fault_injection.out b/regression-test/data/fault_injection_p0/partial_update/test_partial_update_column_num_fault_injection.out similarity index 100% rename from regression-test/data/fault_injection_p0/test_partial_update_column_num_fault_injection.out rename to regression-test/data/fault_injection_p0/partial_update/test_partial_update_column_num_fault_injection.out diff --git a/regression-test/data/fault_injection_p0/partial_update/test_partial_update_compaction_with_higher_version.out b/regression-test/data/fault_injection_p0/partial_update/test_partial_update_compaction_with_higher_version.out new file mode 100644 index 00000000000000..df12f4b08e5706 --- /dev/null +++ b/regression-test/data/fault_injection_p0/partial_update/test_partial_update_compaction_with_higher_version.out @@ -0,0 +1,11 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +1 1 1 1 1 +2 2 2 2 2 +3 3 3 3 3 + +-- !sql -- +1 999 999 666 666 +2 888 888 2 2 +3 777 777 555 555 + diff --git a/regression-test/data/fault_injection_p0/partial_update/test_partial_update_conflict_skip_compaction.out b/regression-test/data/fault_injection_p0/partial_update/test_partial_update_conflict_skip_compaction.out new file mode 100644 index 00000000000000..df12f4b08e5706 --- /dev/null +++ b/regression-test/data/fault_injection_p0/partial_update/test_partial_update_conflict_skip_compaction.out @@ -0,0 +1,11 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +1 1 1 1 1 +2 2 2 2 2 +3 3 3 3 3 + +-- !sql -- +1 999 999 666 666 +2 888 888 2 2 +3 777 777 555 555 + diff --git a/regression-test/data/fault_injection_p0/test_partial_update_publish_conflict_with_error.out b/regression-test/data/fault_injection_p0/partial_update/test_partial_update_publish_conflict_with_error.out similarity index 100% rename from regression-test/data/fault_injection_p0/test_partial_update_publish_conflict_with_error.out rename to regression-test/data/fault_injection_p0/partial_update/test_partial_update_publish_conflict_with_error.out diff --git a/regression-test/data/fault_injection_p0/partial_update/test_partial_update_skip_compaction.out b/regression-test/data/fault_injection_p0/partial_update/test_partial_update_skip_compaction.out new file mode 100644 index 00000000000000..6c7fe443a894fa --- /dev/null +++ b/regression-test/data/fault_injection_p0/partial_update/test_partial_update_skip_compaction.out @@ -0,0 +1,11 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +1 1 1 1 1 +2 2 2 2 2 +3 3 3 3 3 + +-- !sql -- +1 999 999 1 1 +2 888 888 2 2 +3 777 777 3 3 + diff --git a/regression-test/suites/fault_injection_p0/test_partial_update_column_num_fault_injection.groovy b/regression-test/suites/fault_injection_p0/partial_update/test_partial_update_column_num_fault_injection.groovy similarity index 100% rename from regression-test/suites/fault_injection_p0/test_partial_update_column_num_fault_injection.groovy rename to regression-test/suites/fault_injection_p0/partial_update/test_partial_update_column_num_fault_injection.groovy diff --git a/regression-test/suites/fault_injection_p0/partial_update/test_partial_update_compaction_with_higher_version.groovy b/regression-test/suites/fault_injection_p0/partial_update/test_partial_update_compaction_with_higher_version.groovy new file mode 100644 index 00000000000000..cd3d03330d5bee --- /dev/null +++ b/regression-test/suites/fault_injection_p0/partial_update/test_partial_update_compaction_with_higher_version.groovy @@ -0,0 +1,222 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.junit.Assert +import java.util.concurrent.TimeUnit +import org.awaitility.Awaitility + +suite("test_partial_update_compaction_with_higher_version", "nonConcurrent") { + + def table1 = "test_partial_update_compaction_with_higher_version" + sql "DROP TABLE IF EXISTS ${table1} FORCE;" + sql """ CREATE TABLE IF NOT EXISTS ${table1} ( + `k1` int NOT NULL, + `c1` int, + `c2` int, + `c3` int, + `c4` int + )UNIQUE KEY(k1) + DISTRIBUTED BY HASH(k1) BUCKETS 1 + PROPERTIES ( + "disable_auto_compaction" = "true", + "replication_num" = "1"); """ + + sql "insert into ${table1} values(1,1,1,1,1);" + sql "insert into ${table1} values(2,2,2,2,2);" + sql "insert into ${table1} values(3,3,3,3,3);" + sql "sync;" + order_qt_sql "select * from ${table1};" + + def beNodes = sql_return_maparray("show backends;") + def tabletStat = sql_return_maparray("show tablets from ${table1};").get(0) + def tabletBackendId = tabletStat.BackendId + def tabletId = tabletStat.TabletId + def tabletBackend; + for (def be : beNodes) { + if (be.BackendId == tabletBackendId) { + tabletBackend = be + break; + } + } + logger.info("tablet ${tabletId} on backend ${tabletBackend.Host} with backendId=${tabletBackend.BackendId}"); + + def check_rs_metas = { expected_rs_meta_size, check_func -> + if (isCloudMode()) { + return + } + + def metaUrl = sql_return_maparray("show tablets from ${table1};").get(0).MetaUrl + def (code, out, err) = curl("GET", metaUrl) + Assert.assertEquals(code, 0) + def jsonMeta = parseJson(out.trim()) + + Assert.assertEquals(jsonMeta.rs_metas.size(), expected_rs_meta_size) + for (def meta : jsonMeta.rs_metas) { + int startVersion = meta.start_version + int endVersion = meta.end_version + int numSegments = meta.num_segments + int numRows = meta.num_rows + String overlapPb = meta.segments_overlap_pb + logger.info("[${startVersion}-${endVersion}] ${overlapPb} ${meta.num_segments} ${numRows} ${meta.rowset_id_v2}") + check_func(startVersion, endVersion, numSegments, numRows, overlapPb) + } + } + + check_rs_metas(4, {int startVersion, int endVersion, int numSegments, int numRows, String overlapPb -> + if (startVersion == 0) { + // [0-1] + Assert.assertEquals(endVersion, 1) + Assert.assertEquals(numSegments, 0) + } else { + // [2-2], [3-3], [4-4] + Assert.assertEquals(startVersion, endVersion) + Assert.assertEquals(numSegments, 1) + } + }) + + def enable_publish_spin_wait = { tokenName -> + if (isCloudMode()) { + GetDebugPoint().enableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.enable_spin_wait", [token: "${tokenName}"]) + } else { + GetDebugPoint().enableDebugPointForAllBEs("EnginePublishVersionTask::execute.enable_spin_wait", [token: "${tokenName}"]) + } + } + + def disable_publish_spin_wait = { + if (isCloudMode()) { + GetDebugPoint().disableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.enable_spin_wait") + } else { + GetDebugPoint().disableDebugPointForAllBEs("EnginePublishVersionTask::execute.enable_spin_wait") + } + } + + def enable_block_in_publish = { passToken -> + if (isCloudMode()) { + GetDebugPoint().enableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.block", [pass_token: "${passToken}"]) + } else { + GetDebugPoint().enableDebugPointForAllBEs("EnginePublishVersionTask::execute.block", [pass_token: "${passToken}"]) + } + } + + def disable_block_in_publish = { + if (isCloudMode()) { + GetDebugPoint().disableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.block") + } else { + GetDebugPoint().disableDebugPointForAllBEs("EnginePublishVersionTask::execute.block") + } + } + + try { + GetDebugPoint().clearDebugPointsForAllFEs() + GetDebugPoint().clearDebugPointsForAllBEs() + + // block the partial update in publish phase + enable_publish_spin_wait("token1") + enable_block_in_publish("-1") + + // the first partial update load + def t1 = Thread.start { + sql "set enable_unique_key_partial_update=true;" + sql "sync;" + sql "insert into ${table1}(k1,c1,c2) values(1,999,999),(2,888,888),(3,777,777);" + } + + Thread.sleep(600) + + // the second partial update load that conflicts with the first one + enable_publish_spin_wait("token2") + def t2 = Thread.start { + sql "set enable_unique_key_partial_update=true;" + sql "sync;" + sql "insert into ${table1}(k1,c3,c4) values(1,666,666),(3,555,555);" + } + + Thread.sleep(400) + + // let the first partial update load finish + enable_block_in_publish("token1") + t1.join() + Thread.sleep(200) + check_rs_metas(5, {int startVersion, int endVersion, int numSegments, int numRows, String overlapPb -> + if (startVersion == 0) { + // [0-1] + Assert.assertEquals(endVersion, 1) + Assert.assertEquals(numSegments, 0) + } else { + // [2-2], [3-3], [4-4], [5-5] + Assert.assertEquals(startVersion, endVersion) + Assert.assertEquals(numSegments, 1) + } + }) + + // trigger full compaction on tablet + logger.info("trigger compaction on another BE ${tabletBackend.Host} with backendId=${tabletBackend.BackendId}") + def (code, out, err) = be_run_full_compaction(tabletBackend.Host, tabletBackend.HttpPort, tabletId) + logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err) + Assert.assertEquals(code, 0) + def compactJson = parseJson(out.trim()) + Assert.assertEquals("success", compactJson.status.toLowerCase()) + + // wait for full compaction to complete + Awaitility.await().atMost(3, TimeUnit.SECONDS).pollDelay(200, TimeUnit.MILLISECONDS).pollInterval(100, TimeUnit.MILLISECONDS).until( + { + (code, out, err) = be_get_compaction_status(tabletBackend.Host, tabletBackend.HttpPort, tabletId) + logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err) + Assert.assertEquals(code, 0) + def compactionStatus = parseJson(out.trim()) + Assert.assertEquals("success", compactionStatus.status.toLowerCase()) + return !compactionStatus.run_status + } + ) + + check_rs_metas(1, {int startVersion, int endVersion, int numSegments, int numRows, String overlapPb -> + // check the rowset produced by full compaction + // [0-5] + Assert.assertEquals(startVersion, 0) + Assert.assertEquals(endVersion, 5) + Assert.assertEquals(numRows, 3) + Assert.assertEquals(overlapPb, "NONOVERLAPPING") + }) + + // let the second partial update load publish + disable_block_in_publish() + t1.join() + Thread.sleep(300) + + order_qt_sql "select * from ${table1};" + + check_rs_metas(2, {int startVersion, int endVersion, int numSegments, int numRows, String overlapPb -> + if (startVersion == 6) { + // [6-6] + Assert.assertEquals(endVersion, 6) + // checks that partial update didn't skip the alignment process of rowsets produced by compaction and + // generate new segment in publish phase + Assert.assertEquals(numSegments, 2) + Assert.assertEquals(numRows, 4) // 4 = 2 + 2 + } + }) + + } catch(Exception e) { + logger.info(e.getMessage()) + throw e + } finally { + GetDebugPoint().clearDebugPointsForAllFEs() + GetDebugPoint().clearDebugPointsForAllBEs() + } + + // sql "DROP TABLE IF EXISTS ${table1};" +} diff --git a/regression-test/suites/fault_injection_p0/partial_update/test_partial_update_conflict_skip_compaction.groovy b/regression-test/suites/fault_injection_p0/partial_update/test_partial_update_conflict_skip_compaction.groovy new file mode 100644 index 00000000000000..51018d38288dec --- /dev/null +++ b/regression-test/suites/fault_injection_p0/partial_update/test_partial_update_conflict_skip_compaction.groovy @@ -0,0 +1,212 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.junit.Assert +import java.util.concurrent.TimeUnit +import org.awaitility.Awaitility + +suite("test_partial_update_conflict_skip_compaction", "nonConcurrent") { + + def table1 = "test_partial_update_conflict_skip_compaction" + sql "DROP TABLE IF EXISTS ${table1} FORCE;" + sql """ CREATE TABLE IF NOT EXISTS ${table1} ( + `k1` int NOT NULL, + `c1` int, + `c2` int, + `c3` int, + `c4` int + )UNIQUE KEY(k1) + DISTRIBUTED BY HASH(k1) BUCKETS 1 + PROPERTIES ( + "disable_auto_compaction" = "true", + "replication_num" = "1"); """ + + sql "insert into ${table1} values(1,1,1,1,1);" + sql "insert into ${table1} values(2,2,2,2,2);" + sql "insert into ${table1} values(3,3,3,3,3);" + sql "sync;" + order_qt_sql "select * from ${table1};" + + def beNodes = sql_return_maparray("show backends;") + def tabletStat = sql_return_maparray("show tablets from ${table1};").get(0) + def tabletBackendId = tabletStat.BackendId + def tabletId = tabletStat.TabletId + def tabletBackend; + for (def be : beNodes) { + if (be.BackendId == tabletBackendId) { + tabletBackend = be + break; + } + } + logger.info("tablet ${tabletId} on backend ${tabletBackend.Host} with backendId=${tabletBackend.BackendId}"); + + def check_rs_metas = { expected_rs_meta_size, check_func -> + if (isCloudMode()) { + return + } + + def metaUrl = sql_return_maparray("show tablets from ${table1};").get(0).MetaUrl + def (code, out, err) = curl("GET", metaUrl) + Assert.assertEquals(code, 0) + def jsonMeta = parseJson(out.trim()) + + Assert.assertEquals(jsonMeta.rs_metas.size(), expected_rs_meta_size) + for (def meta : jsonMeta.rs_metas) { + int startVersion = meta.start_version + int endVersion = meta.end_version + int numSegments = meta.num_segments + int numRows = meta.num_rows + String overlapPb = meta.segments_overlap_pb + logger.info("[${startVersion}-${endVersion}] ${overlapPb} ${meta.num_segments} ${numRows} ${meta.rowset_id_v2}") + check_func(startVersion, endVersion, numSegments, numRows, overlapPb) + } + } + + check_rs_metas(4, {int startVersion, int endVersion, int numSegments, int numRows, String overlapPb -> + if (startVersion == 0) { + // [0-1] + Assert.assertEquals(endVersion, 1) + Assert.assertEquals(numSegments, 0) + } else { + // [2-2], [3-3], [4-4] + Assert.assertEquals(startVersion, endVersion) + Assert.assertEquals(numSegments, 1) + } + }) + + def enable_publish_spin_wait = { + if (isCloudMode()) { + GetDebugPoint().enableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.enable_spin_wait") + } else { + GetDebugPoint().enableDebugPointForAllBEs("EnginePublishVersionTask::execute.enable_spin_wait") + } + } + + def disable_publish_spin_wait = { + if (isCloudMode()) { + GetDebugPoint().disableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.enable_spin_wait") + } else { + GetDebugPoint().disableDebugPointForAllBEs("EnginePublishVersionTask::execute.enable_spin_wait") + } + } + + def enable_block_in_publish = { + if (isCloudMode()) { + GetDebugPoint().enableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.block") + } else { + GetDebugPoint().enableDebugPointForAllBEs("EnginePublishVersionTask::execute.block") + } + } + + def disable_block_in_publish = { + if (isCloudMode()) { + GetDebugPoint().disableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.block") + } else { + GetDebugPoint().disableDebugPointForAllBEs("EnginePublishVersionTask::execute.block") + } + } + + try { + GetDebugPoint().clearDebugPointsForAllFEs() + GetDebugPoint().clearDebugPointsForAllBEs() + + // block the partial update before publish phase + enable_publish_spin_wait() + enable_block_in_publish() + + // the first partial update load + def t1 = Thread.start { + sql "set enable_unique_key_partial_update=true;" + sql "sync;" + sql "insert into ${table1}(k1,c1,c2) values(1,999,999),(2,888,888),(3,777,777);" + } + + Thread.sleep(300) + + // the second partial update load that has conflict with the first one + def t2 = Thread.start { + sql "set enable_unique_key_partial_update=true;" + sql "sync;" + sql "insert into ${table1}(k1,c3,c4) values(1,666,666),(3,555,555);" + } + + Thread.sleep(300) + + // trigger full compaction on tablet + logger.info("trigger compaction on another BE ${tabletBackend.Host} with backendId=${tabletBackend.BackendId}") + def (code, out, err) = be_run_full_compaction(tabletBackend.Host, tabletBackend.HttpPort, tabletId) + logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err) + Assert.assertEquals(code, 0) + def compactJson = parseJson(out.trim()) + Assert.assertEquals("success", compactJson.status.toLowerCase()) + + // wait for full compaction to complete + Awaitility.await().atMost(3, TimeUnit.SECONDS).pollDelay(200, TimeUnit.MILLISECONDS).pollInterval(100, TimeUnit.MILLISECONDS).until( + { + (code, out, err) = be_get_compaction_status(tabletBackend.Host, tabletBackend.HttpPort, tabletId) + logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err) + Assert.assertEquals(code, 0) + def compactionStatus = parseJson(out.trim()) + Assert.assertEquals("success", compactionStatus.status.toLowerCase()) + return !compactionStatus.run_status + } + ) + + check_rs_metas(1, {int startVersion, int endVersion, int numSegments, int numRows, String overlapPb -> + // check the rowset produced by full compaction + // [0-4] + Assert.assertEquals(startVersion, 0) + Assert.assertEquals(endVersion, 4) + Assert.assertEquals(numRows, 3) + Assert.assertEquals(overlapPb, "NONOVERLAPPING") + }) + + disable_block_in_publish() + + t1.join() + t2.join() + + order_qt_sql "select * from ${table1};" + + check_rs_metas(3, {int startVersion, int endVersion, int numSegments, int numRows, String overlapPb -> + if (startVersion == 5) { + // the first partial update load + // it should skip the alignment process of rowsets produced by full compaction and + // should not generate new segment in publish phase + Assert.assertEquals(endVersion, 5) + Assert.assertEquals(numSegments, 1) + Assert.assertEquals(numRows, 3) + } else if (startVersion == 6) { + // the second partial update load + // it should skip the alignment process of rowsets produced by full compaction and + // should generate new segment in publish phase for conflicting rows with the first partial update load + Assert.assertEquals(endVersion, 6) + Assert.assertEquals(numSegments, 2) + Assert.assertEquals(numRows, 4) // 4 = 2 + 2 + } + }) + + } catch(Exception e) { + logger.info(e.getMessage()) + throw e + } finally { + GetDebugPoint().clearDebugPointsForAllFEs() + GetDebugPoint().clearDebugPointsForAllBEs() + } + + sql "DROP TABLE IF EXISTS ${table1};" +} diff --git a/regression-test/suites/fault_injection_p0/test_partial_update_publish_conflict_with_error.groovy b/regression-test/suites/fault_injection_p0/partial_update/test_partial_update_publish_conflict_with_error.groovy similarity index 100% rename from regression-test/suites/fault_injection_p0/test_partial_update_publish_conflict_with_error.groovy rename to regression-test/suites/fault_injection_p0/partial_update/test_partial_update_publish_conflict_with_error.groovy diff --git a/regression-test/suites/fault_injection_p0/partial_update/test_partial_update_skip_compaction.groovy b/regression-test/suites/fault_injection_p0/partial_update/test_partial_update_skip_compaction.groovy new file mode 100644 index 00000000000000..b665ae4c19b539 --- /dev/null +++ b/regression-test/suites/fault_injection_p0/partial_update/test_partial_update_skip_compaction.groovy @@ -0,0 +1,193 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.junit.Assert +import java.util.concurrent.TimeUnit +import org.awaitility.Awaitility + +suite("test_partial_update_skip_compaction", "nonConcurrent") { + + def table1 = "test_partial_update_skip_compaction" + sql "DROP TABLE IF EXISTS ${table1} FORCE;" + sql """ CREATE TABLE IF NOT EXISTS ${table1} ( + `k1` int NOT NULL, + `c1` int, + `c2` int, + `c3` int, + `c4` int + )UNIQUE KEY(k1) + DISTRIBUTED BY HASH(k1) BUCKETS 1 + PROPERTIES ( + "disable_auto_compaction" = "true", + "replication_num" = "1"); """ + + sql "insert into ${table1} values(1,1,1,1,1);" + sql "insert into ${table1} values(2,2,2,2,2);" + sql "insert into ${table1} values(3,3,3,3,3);" + sql "sync;" + order_qt_sql "select * from ${table1};" + + def beNodes = sql_return_maparray("show backends;") + def tabletStat = sql_return_maparray("show tablets from ${table1};").get(0) + def tabletBackendId = tabletStat.BackendId + def tabletId = tabletStat.TabletId + def tabletBackend; + for (def be : beNodes) { + if (be.BackendId == tabletBackendId) { + tabletBackend = be + break; + } + } + logger.info("tablet ${tabletId} on backend ${tabletBackend.Host} with backendId=${tabletBackend.BackendId}"); + + def check_rs_metas = { expected_rs_meta_size, check_func -> + if (isCloudMode()) { + return + } + + def metaUrl = sql_return_maparray("show tablets from ${table1};").get(0).MetaUrl + def (code, out, err) = curl("GET", metaUrl) + Assert.assertEquals(code, 0) + def jsonMeta = parseJson(out.trim()) + + Assert.assertEquals(jsonMeta.rs_metas.size(), expected_rs_meta_size) + for (def meta : jsonMeta.rs_metas) { + int startVersion = meta.start_version + int endVersion = meta.end_version + int numSegments = meta.num_segments + int numRows = meta.num_rows + String overlapPb = meta.segments_overlap_pb + logger.info("[${startVersion}-${endVersion}] ${overlapPb} ${meta.num_segments} ${numRows} ${meta.rowset_id_v2}") + check_func(startVersion, endVersion, numSegments, numRows, overlapPb) + } + } + + check_rs_metas(4, {int startVersion, int endVersion, int numSegments, int numRows, String overlapPb -> + if (startVersion == 0) { + // [0-1] + Assert.assertEquals(endVersion, 1) + Assert.assertEquals(numSegments, 0) + } else { + // [2-2], [3-3], [4-4] + Assert.assertEquals(startVersion, endVersion) + Assert.assertEquals(numSegments, 1) + } + }) + + def enable_publish_spin_wait = { + if (isCloudMode()) { + GetDebugPoint().enableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.enable_spin_wait") + } else { + GetDebugPoint().enableDebugPointForAllBEs("EnginePublishVersionTask::execute.enable_spin_wait") + } + } + + def disable_publish_spin_wait = { + if (isCloudMode()) { + GetDebugPoint().disableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.enable_spin_wait") + } else { + GetDebugPoint().disableDebugPointForAllBEs("EnginePublishVersionTask::execute.enable_spin_wait") + } + } + + def enable_block_in_publish = { + if (isCloudMode()) { + GetDebugPoint().enableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.block") + } else { + GetDebugPoint().enableDebugPointForAllBEs("EnginePublishVersionTask::execute.block") + } + } + + def disable_block_in_publish = { + if (isCloudMode()) { + GetDebugPoint().disableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.block") + } else { + GetDebugPoint().disableDebugPointForAllBEs("EnginePublishVersionTask::execute.block") + } + } + + try { + GetDebugPoint().clearDebugPointsForAllFEs() + GetDebugPoint().clearDebugPointsForAllBEs() + + // block the partial update in publish phase + enable_publish_spin_wait() + enable_block_in_publish() + def t1 = Thread.start { + sql "set enable_unique_key_partial_update=true;" + sql "sync;" + sql "insert into ${table1}(k1,c1,c2) values(1,999,999),(2,888,888),(3,777,777);" + } + + Thread.sleep(500) + + // trigger full compaction on tablet + logger.info("trigger compaction on another BE ${tabletBackend.Host} with backendId=${tabletBackend.BackendId}") + def (code, out, err) = be_run_full_compaction(tabletBackend.Host, tabletBackend.HttpPort, tabletId) + logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err) + Assert.assertEquals(code, 0) + def compactJson = parseJson(out.trim()) + Assert.assertEquals("success", compactJson.status.toLowerCase()) + + // wait for full compaction to complete + Awaitility.await().atMost(3, TimeUnit.SECONDS).pollDelay(200, TimeUnit.MILLISECONDS).pollInterval(100, TimeUnit.MILLISECONDS).until( + { + (code, out, err) = be_get_compaction_status(tabletBackend.Host, tabletBackend.HttpPort, tabletId) + logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err) + Assert.assertEquals(code, 0) + def compactionStatus = parseJson(out.trim()) + Assert.assertEquals("success", compactionStatus.status.toLowerCase()) + return !compactionStatus.run_status + } + ) + + check_rs_metas(1, {int startVersion, int endVersion, int numSegments, int numRows, String overlapPb -> + // check the rowset produced by full compaction + // [0-4] + Assert.assertEquals(startVersion, 0) + Assert.assertEquals(endVersion, 4) + Assert.assertEquals(numRows, 3) + Assert.assertEquals(overlapPb, "NONOVERLAPPING") + }) + + // let the partial update load publish + disable_block_in_publish() + t1.join() + + order_qt_sql "select * from ${table1};" + + check_rs_metas(2, {int startVersion, int endVersion, int numSegments, int numRows, String overlapPb -> + if (startVersion == 5) { + // [5-5] + Assert.assertEquals(endVersion, 5) + // checks that partial update skips the alignment process of rowsets produced by compaction and + // doesn't generate new segment in publish phase + Assert.assertEquals(numSegments, 1) + Assert.assertEquals(numRows, 3) + } + }) + + } catch(Exception e) { + logger.info(e.getMessage()) + throw e + } finally { + GetDebugPoint().clearDebugPointsForAllFEs() + GetDebugPoint().clearDebugPointsForAllBEs() + } + + sql "DROP TABLE IF EXISTS ${table1};" +}