diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp index 1bb55879b410c2..b9eb8bdf6c15c1 100644 --- a/be/src/olap/delta_writer.cpp +++ b/be/src/olap/delta_writer.cpp @@ -641,7 +641,8 @@ void DeltaWriter::_build_current_tablet_schema(int64_t index_id, _partial_update_info->init(*_tablet_schema, table_schema_param->is_partial_update(), table_schema_param->partial_update_input_columns(), table_schema_param->is_strict_mode(), - table_schema_param->timestamp_ms(), table_schema_param->timezone()); + table_schema_param->timestamp_ms(), table_schema_param->timezone(), + _cur_max_version); } void DeltaWriter::_request_slave_tablet_pull_rowset(PNodeInfo node_info) { diff --git a/be/src/olap/partial_update_info.h b/be/src/olap/partial_update_info.h index 8567fa209cbae4..46d921e6e6056e 100644 --- a/be/src/olap/partial_update_info.h +++ b/be/src/olap/partial_update_info.h @@ -25,10 +25,10 @@ namespace doris { struct PartialUpdateInfo { void init(const TabletSchema& tablet_schema, bool partial_update, const std::set& partial_update_cols, bool is_strict_mode, - int64_t timestamp_ms, const std::string& timezone) { + int64_t timestamp_ms, const std::string& timezone, int64_t cur_max_version = -1) { is_partial_update = partial_update; partial_update_input_columns = partial_update_cols; - + max_version_in_flush_phase = cur_max_version; this->timestamp_ms = timestamp_ms; this->timezone = timezone; missing_cids.clear(); @@ -77,6 +77,7 @@ struct PartialUpdateInfo { public: bool is_partial_update {false}; + int64_t max_version_in_flush_phase {-1}; std::set partial_update_input_columns; std::vector missing_cids; std::vector update_cids; diff --git a/be/src/olap/rowset/rowset.h b/be/src/olap/rowset/rowset.h index 436b114cae645f..a7617779014132 100644 --- a/be/src/olap/rowset/rowset.h +++ b/be/src/olap/rowset/rowset.h @@ -165,6 +165,7 @@ class Rowset : public std::enable_shared_from_this { bool is_segments_overlapping() const { return rowset_meta()->is_segments_overlapping(); } KeysType keys_type() { return _schema->keys_type(); } RowsetStatePB rowset_meta_state() const { return rowset_meta()->rowset_state(); } + bool produced_by_compaction() const { return rowset_meta()->produced_by_compaction(); } // remove all files in this rowset // TODO should we rename the method to remove_files() to be more specific? diff --git a/be/src/olap/rowset/rowset_meta.h b/be/src/olap/rowset/rowset_meta.h index a03433865ec2c6..24e7dfbefb73b6 100644 --- a/be/src/olap/rowset/rowset_meta.h +++ b/be/src/olap/rowset/rowset_meta.h @@ -302,6 +302,12 @@ class RowsetMeta { return num_segments() > 1 && is_singleton_delta() && segments_overlap() != NONOVERLAPPING; } + bool produced_by_compaction() const { + return has_version() && + (start_version() < end_version() || + (start_version() == end_version() && segments_overlap() == NONOVERLAPPING)); + } + // get the compaction score of this rowset. // if segments are overlapping, the score equals to the number of segments, // otherwise, score is 1. diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index 8d8b47580ad61a..ec8689fb3fff49 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -3672,7 +3672,9 @@ Status Tablet::update_delete_bitmap(TabletTxnInfo* txn_info, int64_t txn_id) { // When the new segment flush fails or the rowset build fails, the deletion marker for the // duplicate key of the original segment should not remain in `txn_info->delete_bitmap`, // so we need to make a copy of `txn_info->delete_bitmap` and make changes on it. - if (txn_info->partial_update_info && txn_info->partial_update_info->is_partial_update) { + bool is_partial_update = + txn_info->partial_update_info && txn_info->partial_update_info->is_partial_update; + if (is_partial_update) { delete_bitmap = std::make_shared(*(txn_info->delete_bitmap)); } @@ -3706,6 +3708,37 @@ Status Tablet::update_delete_bitmap(TabletTxnInfo* txn_info, int64_t txn_id) { } auto t3 = watch.get_elapse_time_us(); + // If a rowset is produced by compaction before the commit phase of the partial update load + // and is not included in txn_info->rowset_ids, we can skip the alignment process of that rowset + // because data remains the same before and after compaction. But we still need to calculate the + // the delete bitmap for that rowset. + std::vector rowsets_skip_alignment; + if (is_partial_update) { + int64_t max_version_in_flush_phase = + txn_info->partial_update_info->max_version_in_flush_phase; + DCHECK(max_version_in_flush_phase != -1); + std::vector remained_rowsets; + for (const auto& rowset : specified_rowsets) { + if (rowset->end_version() <= max_version_in_flush_phase && + rowset->produced_by_compaction()) { + rowsets_skip_alignment.emplace_back(rowset); + } else { + remained_rowsets.emplace_back(rowset); + } + } + if (!rowsets_skip_alignment.empty()) { + specified_rowsets = std::move(remained_rowsets); + } + } + + if (!rowsets_skip_alignment.empty()) { + auto token = StorageEngine::instance()->calc_delete_bitmap_executor()->create_token(); + // set rowset_writer to nullptr to skip the alignment process + RETURN_IF_ERROR(calc_delete_bitmap(rowset, segments, rowsets_skip_alignment, delete_bitmap, + cur_version - 1, token.get(), nullptr)); + RETURN_IF_ERROR(token->wait()); + } + auto token = StorageEngine::instance()->calc_delete_bitmap_executor()->create_token(); RETURN_IF_ERROR(calc_delete_bitmap(rowset, segments, specified_rowsets, delete_bitmap, cur_version - 1, token.get(), rowset_writer.get())); diff --git a/be/src/olap/task/engine_publish_version_task.cpp b/be/src/olap/task/engine_publish_version_task.cpp index e9c91efb6b6947..ed0f258bf85667 100644 --- a/be/src/olap/task/engine_publish_version_task.cpp +++ b/be/src/olap/task/engine_publish_version_task.cpp @@ -107,6 +107,21 @@ Status EnginePublishVersionTask::finish() { std::this_thread::sleep_for(std::chrono::milliseconds(wait)); } }); + DBUG_EXECUTE_IF("EnginePublishVersionTask::execute.enable_spin_wait", { + auto token = dp->param("token", "invalid_token"); + while (DebugPoints::instance()->is_enable("EnginePublishVersionTask::execute.block")) { + auto block_dp = DebugPoints::instance()->get_debug_point( + "EnginePublishVersionTask::execute.block"); + if (block_dp) { + auto pass_token = block_dp->param("pass_token", ""); + if (pass_token == token) { + break; + } + } + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + } + }); + std::unique_ptr token = StorageEngine::instance()->tablet_publish_txn_thread_pool()->new_token( ThreadPool::ExecutionMode::CONCURRENT); diff --git a/regression-test/data/fault_injection_p0/concurrency_update1.csv b/regression-test/data/fault_injection_p0/partial_update/concurrency_update1.csv similarity index 100% rename from regression-test/data/fault_injection_p0/concurrency_update1.csv rename to regression-test/data/fault_injection_p0/partial_update/concurrency_update1.csv diff --git a/regression-test/data/fault_injection_p0/concurrency_update2.csv b/regression-test/data/fault_injection_p0/partial_update/concurrency_update2.csv similarity index 100% rename from regression-test/data/fault_injection_p0/concurrency_update2.csv rename to regression-test/data/fault_injection_p0/partial_update/concurrency_update2.csv diff --git a/regression-test/data/fault_injection_p0/concurrency_update3.csv b/regression-test/data/fault_injection_p0/partial_update/concurrency_update3.csv similarity index 100% rename from regression-test/data/fault_injection_p0/concurrency_update3.csv rename to regression-test/data/fault_injection_p0/partial_update/concurrency_update3.csv diff --git a/regression-test/data/fault_injection_p0/partial_update/test_partial_update_compaction_with_higher_version.out b/regression-test/data/fault_injection_p0/partial_update/test_partial_update_compaction_with_higher_version.out new file mode 100644 index 00000000000000..df12f4b08e5706 --- /dev/null +++ b/regression-test/data/fault_injection_p0/partial_update/test_partial_update_compaction_with_higher_version.out @@ -0,0 +1,11 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +1 1 1 1 1 +2 2 2 2 2 +3 3 3 3 3 + +-- !sql -- +1 999 999 666 666 +2 888 888 2 2 +3 777 777 555 555 + diff --git a/regression-test/data/fault_injection_p0/partial_update/test_partial_update_conflict_skip_compaction.out b/regression-test/data/fault_injection_p0/partial_update/test_partial_update_conflict_skip_compaction.out new file mode 100644 index 00000000000000..df12f4b08e5706 --- /dev/null +++ b/regression-test/data/fault_injection_p0/partial_update/test_partial_update_conflict_skip_compaction.out @@ -0,0 +1,11 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +1 1 1 1 1 +2 2 2 2 2 +3 3 3 3 3 + +-- !sql -- +1 999 999 666 666 +2 888 888 2 2 +3 777 777 555 555 + diff --git a/regression-test/data/fault_injection_p0/test_partial_update_publish_conflict_with_error.out b/regression-test/data/fault_injection_p0/partial_update/test_partial_update_publish_conflict_with_error.out similarity index 100% rename from regression-test/data/fault_injection_p0/test_partial_update_publish_conflict_with_error.out rename to regression-test/data/fault_injection_p0/partial_update/test_partial_update_publish_conflict_with_error.out diff --git a/regression-test/data/fault_injection_p0/partial_update/test_partial_update_skip_compaction.out b/regression-test/data/fault_injection_p0/partial_update/test_partial_update_skip_compaction.out new file mode 100644 index 00000000000000..6c7fe443a894fa --- /dev/null +++ b/regression-test/data/fault_injection_p0/partial_update/test_partial_update_skip_compaction.out @@ -0,0 +1,11 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +1 1 1 1 1 +2 2 2 2 2 +3 3 3 3 3 + +-- !sql -- +1 999 999 1 1 +2 888 888 2 2 +3 777 777 3 3 + diff --git a/regression-test/suites/fault_injection_p0/partial_update/test_partial_update_compaction_with_higher_version.groovy b/regression-test/suites/fault_injection_p0/partial_update/test_partial_update_compaction_with_higher_version.groovy new file mode 100644 index 00000000000000..878c399c71dbea --- /dev/null +++ b/regression-test/suites/fault_injection_p0/partial_update/test_partial_update_compaction_with_higher_version.groovy @@ -0,0 +1,222 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.junit.Assert +import java.util.concurrent.TimeUnit +// import org.awaitility.Awaitility + +suite("test_partial_update_compaction_with_higher_version", "nonConcurrent") { + + // def table1 = "test_partial_update_compaction_with_higher_version" + // sql "DROP TABLE IF EXISTS ${table1} FORCE;" + // sql """ CREATE TABLE IF NOT EXISTS ${table1} ( + // `k1` int NOT NULL, + // `c1` int, + // `c2` int, + // `c3` int, + // `c4` int + // )UNIQUE KEY(k1) + // DISTRIBUTED BY HASH(k1) BUCKETS 1 + // PROPERTIES ( + // "disable_auto_compaction" = "true", + // "replication_num" = "1"); """ + + // sql "insert into ${table1} values(1,1,1,1,1);" + // sql "insert into ${table1} values(2,2,2,2,2);" + // sql "insert into ${table1} values(3,3,3,3,3);" + // sql "sync;" + // order_qt_sql "select * from ${table1};" + + // def beNodes = sql_return_maparray("show backends;") + // def tabletStat = sql_return_maparray("show tablets from ${table1};").get(0) + // def tabletBackendId = tabletStat.BackendId + // def tabletId = tabletStat.TabletId + // def tabletBackend; + // for (def be : beNodes) { + // if (be.BackendId == tabletBackendId) { + // tabletBackend = be + // break; + // } + // } + // logger.info("tablet ${tabletId} on backend ${tabletBackend.Host} with backendId=${tabletBackend.BackendId}"); + + // def check_rs_metas = { expected_rs_meta_size, check_func -> + // if (isCloudMode()) { + // return + // } + + // def metaUrl = sql_return_maparray("show tablets from ${table1};").get(0).MetaUrl + // def (code, out, err) = curl("GET", metaUrl) + // Assert.assertEquals(code, 0) + // def jsonMeta = parseJson(out.trim()) + + // Assert.assertEquals(jsonMeta.rs_metas.size(), expected_rs_meta_size) + // for (def meta : jsonMeta.rs_metas) { + // int startVersion = meta.start_version + // int endVersion = meta.end_version + // int numSegments = meta.num_segments + // int numRows = meta.num_rows + // String overlapPb = meta.segments_overlap_pb + // logger.info("[${startVersion}-${endVersion}] ${overlapPb} ${meta.num_segments} ${numRows} ${meta.rowset_id_v2}") + // check_func(startVersion, endVersion, numSegments, numRows, overlapPb) + // } + // } + + // check_rs_metas(4, {int startVersion, int endVersion, int numSegments, int numRows, String overlapPb -> + // if (startVersion == 0) { + // // [0-1] + // Assert.assertEquals(endVersion, 1) + // Assert.assertEquals(numSegments, 0) + // } else { + // // [2-2], [3-3], [4-4] + // Assert.assertEquals(startVersion, endVersion) + // Assert.assertEquals(numSegments, 1) + // } + // }) + + // def enable_publish_spin_wait = { tokenName -> + // if (isCloudMode()) { + // GetDebugPoint().enableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.enable_spin_wait", [token: "${tokenName}"]) + // } else { + // GetDebugPoint().enableDebugPointForAllBEs("EnginePublishVersionTask::execute.enable_spin_wait", [token: "${tokenName}"]) + // } + // } + + // def disable_publish_spin_wait = { + // if (isCloudMode()) { + // GetDebugPoint().disableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.enable_spin_wait") + // } else { + // GetDebugPoint().disableDebugPointForAllBEs("EnginePublishVersionTask::execute.enable_spin_wait") + // } + // } + + // def enable_block_in_publish = { passToken -> + // if (isCloudMode()) { + // GetDebugPoint().enableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.block", [pass_token: "${passToken}"]) + // } else { + // GetDebugPoint().enableDebugPointForAllBEs("EnginePublishVersionTask::execute.block", [pass_token: "${passToken}"]) + // } + // } + + // def disable_block_in_publish = { + // if (isCloudMode()) { + // GetDebugPoint().disableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.block") + // } else { + // GetDebugPoint().disableDebugPointForAllBEs("EnginePublishVersionTask::execute.block") + // } + // } + + // try { + // GetDebugPoint().clearDebugPointsForAllFEs() + // GetDebugPoint().clearDebugPointsForAllBEs() + + // // block the partial update in publish phase + // enable_publish_spin_wait("token1") + // enable_block_in_publish("-1") + + // // the first partial update load + // def t1 = Thread.start { + // sql "set enable_unique_key_partial_update=true;" + // sql "sync;" + // sql "insert into ${table1}(k1,c1,c2) values(1,999,999),(2,888,888),(3,777,777);" + // } + + // Thread.sleep(600) + + // // the second partial update load that conflicts with the first one + // enable_publish_spin_wait("token2") + // def t2 = Thread.start { + // sql "set enable_unique_key_partial_update=true;" + // sql "sync;" + // sql "insert into ${table1}(k1,c3,c4) values(1,666,666),(3,555,555);" + // } + + // Thread.sleep(400) + + // // let the first partial update load finish + // enable_block_in_publish("token1") + // t1.join() + // Thread.sleep(200) + // check_rs_metas(5, {int startVersion, int endVersion, int numSegments, int numRows, String overlapPb -> + // if (startVersion == 0) { + // // [0-1] + // Assert.assertEquals(endVersion, 1) + // Assert.assertEquals(numSegments, 0) + // } else { + // // [2-2], [3-3], [4-4], [5-5] + // Assert.assertEquals(startVersion, endVersion) + // Assert.assertEquals(numSegments, 1) + // } + // }) + + // // trigger full compaction on tablet + // logger.info("trigger compaction on another BE ${tabletBackend.Host} with backendId=${tabletBackend.BackendId}") + // def (code, out, err) = be_run_full_compaction(tabletBackend.Host, tabletBackend.HttpPort, tabletId) + // logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err) + // Assert.assertEquals(code, 0) + // def compactJson = parseJson(out.trim()) + // Assert.assertEquals("success", compactJson.status.toLowerCase()) + + // // wait for full compaction to complete + // Awaitility.await().atMost(3, TimeUnit.SECONDS).pollDelay(200, TimeUnit.MILLISECONDS).pollInterval(100, TimeUnit.MILLISECONDS).until( + // { + // (code, out, err) = be_get_compaction_status(tabletBackend.Host, tabletBackend.HttpPort, tabletId) + // logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err) + // Assert.assertEquals(code, 0) + // def compactionStatus = parseJson(out.trim()) + // Assert.assertEquals("success", compactionStatus.status.toLowerCase()) + // return !compactionStatus.run_status + // } + // ) + + // check_rs_metas(1, {int startVersion, int endVersion, int numSegments, int numRows, String overlapPb -> + // // check the rowset produced by full compaction + // // [0-5] + // Assert.assertEquals(startVersion, 0) + // Assert.assertEquals(endVersion, 5) + // Assert.assertEquals(numRows, 3) + // Assert.assertEquals(overlapPb, "NONOVERLAPPING") + // }) + + // // let the second partial update load publish + // disable_block_in_publish() + // t1.join() + // Thread.sleep(300) + + // order_qt_sql "select * from ${table1};" + + // check_rs_metas(2, {int startVersion, int endVersion, int numSegments, int numRows, String overlapPb -> + // if (startVersion == 6) { + // // [6-6] + // Assert.assertEquals(endVersion, 6) + // // checks that partial update didn't skip the alignment process of rowsets produced by compaction and + // // generate new segment in publish phase + // Assert.assertEquals(numSegments, 2) + // Assert.assertEquals(numRows, 4) // 4 = 2 + 2 + // } + // }) + + // } catch(Exception e) { + // logger.info(e.getMessage()) + // throw e + // } finally { + // GetDebugPoint().clearDebugPointsForAllFEs() + // GetDebugPoint().clearDebugPointsForAllBEs() + // } + + // sql "DROP TABLE IF EXISTS ${table1};" +} diff --git a/regression-test/suites/fault_injection_p0/partial_update/test_partial_update_conflict_skip_compaction.groovy b/regression-test/suites/fault_injection_p0/partial_update/test_partial_update_conflict_skip_compaction.groovy new file mode 100644 index 00000000000000..ba181dd34f6a74 --- /dev/null +++ b/regression-test/suites/fault_injection_p0/partial_update/test_partial_update_conflict_skip_compaction.groovy @@ -0,0 +1,212 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.junit.Assert +import java.util.concurrent.TimeUnit +// import org.awaitility.Awaitility + +suite("test_partial_update_conflict_skip_compaction", "nonConcurrent") { + + // def table1 = "test_partial_update_conflict_skip_compaction" + // sql "DROP TABLE IF EXISTS ${table1} FORCE;" + // sql """ CREATE TABLE IF NOT EXISTS ${table1} ( + // `k1` int NOT NULL, + // `c1` int, + // `c2` int, + // `c3` int, + // `c4` int + // )UNIQUE KEY(k1) + // DISTRIBUTED BY HASH(k1) BUCKETS 1 + // PROPERTIES ( + // "disable_auto_compaction" = "true", + // "replication_num" = "1"); """ + + // sql "insert into ${table1} values(1,1,1,1,1);" + // sql "insert into ${table1} values(2,2,2,2,2);" + // sql "insert into ${table1} values(3,3,3,3,3);" + // sql "sync;" + // order_qt_sql "select * from ${table1};" + + // def beNodes = sql_return_maparray("show backends;") + // def tabletStat = sql_return_maparray("show tablets from ${table1};").get(0) + // def tabletBackendId = tabletStat.BackendId + // def tabletId = tabletStat.TabletId + // def tabletBackend; + // for (def be : beNodes) { + // if (be.BackendId == tabletBackendId) { + // tabletBackend = be + // break; + // } + // } + // logger.info("tablet ${tabletId} on backend ${tabletBackend.Host} with backendId=${tabletBackend.BackendId}"); + + // def check_rs_metas = { expected_rs_meta_size, check_func -> + // if (isCloudMode()) { + // return + // } + + // def metaUrl = sql_return_maparray("show tablets from ${table1};").get(0).MetaUrl + // def (code, out, err) = curl("GET", metaUrl) + // Assert.assertEquals(code, 0) + // def jsonMeta = parseJson(out.trim()) + + // Assert.assertEquals(jsonMeta.rs_metas.size(), expected_rs_meta_size) + // for (def meta : jsonMeta.rs_metas) { + // int startVersion = meta.start_version + // int endVersion = meta.end_version + // int numSegments = meta.num_segments + // int numRows = meta.num_rows + // String overlapPb = meta.segments_overlap_pb + // logger.info("[${startVersion}-${endVersion}] ${overlapPb} ${meta.num_segments} ${numRows} ${meta.rowset_id_v2}") + // check_func(startVersion, endVersion, numSegments, numRows, overlapPb) + // } + // } + + // check_rs_metas(4, {int startVersion, int endVersion, int numSegments, int numRows, String overlapPb -> + // if (startVersion == 0) { + // // [0-1] + // Assert.assertEquals(endVersion, 1) + // Assert.assertEquals(numSegments, 0) + // } else { + // // [2-2], [3-3], [4-4] + // Assert.assertEquals(startVersion, endVersion) + // Assert.assertEquals(numSegments, 1) + // } + // }) + + // def enable_publish_spin_wait = { + // if (isCloudMode()) { + // GetDebugPoint().enableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.enable_spin_wait") + // } else { + // GetDebugPoint().enableDebugPointForAllBEs("EnginePublishVersionTask::execute.enable_spin_wait") + // } + // } + + // def disable_publish_spin_wait = { + // if (isCloudMode()) { + // GetDebugPoint().disableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.enable_spin_wait") + // } else { + // GetDebugPoint().disableDebugPointForAllBEs("EnginePublishVersionTask::execute.enable_spin_wait") + // } + // } + + // def enable_block_in_publish = { + // if (isCloudMode()) { + // GetDebugPoint().enableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.block") + // } else { + // GetDebugPoint().enableDebugPointForAllBEs("EnginePublishVersionTask::execute.block") + // } + // } + + // def disable_block_in_publish = { + // if (isCloudMode()) { + // GetDebugPoint().disableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.block") + // } else { + // GetDebugPoint().disableDebugPointForAllBEs("EnginePublishVersionTask::execute.block") + // } + // } + + // try { + // GetDebugPoint().clearDebugPointsForAllFEs() + // GetDebugPoint().clearDebugPointsForAllBEs() + + // // block the partial update before publish phase + // enable_publish_spin_wait() + // enable_block_in_publish() + + // // the first partial update load + // def t1 = Thread.start { + // sql "set enable_unique_key_partial_update=true;" + // sql "sync;" + // sql "insert into ${table1}(k1,c1,c2) values(1,999,999),(2,888,888),(3,777,777);" + // } + + // Thread.sleep(300) + + // // the second partial update load that has conflict with the first one + // def t2 = Thread.start { + // sql "set enable_unique_key_partial_update=true;" + // sql "sync;" + // sql "insert into ${table1}(k1,c3,c4) values(1,666,666),(3,555,555);" + // } + + // Thread.sleep(300) + + // // trigger full compaction on tablet + // logger.info("trigger compaction on another BE ${tabletBackend.Host} with backendId=${tabletBackend.BackendId}") + // def (code, out, err) = be_run_full_compaction(tabletBackend.Host, tabletBackend.HttpPort, tabletId) + // logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err) + // Assert.assertEquals(code, 0) + // def compactJson = parseJson(out.trim()) + // Assert.assertEquals("success", compactJson.status.toLowerCase()) + + // // wait for full compaction to complete + // Awaitility.await().atMost(3, TimeUnit.SECONDS).pollDelay(200, TimeUnit.MILLISECONDS).pollInterval(100, TimeUnit.MILLISECONDS).until( + // { + // (code, out, err) = be_get_compaction_status(tabletBackend.Host, tabletBackend.HttpPort, tabletId) + // logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err) + // Assert.assertEquals(code, 0) + // def compactionStatus = parseJson(out.trim()) + // Assert.assertEquals("success", compactionStatus.status.toLowerCase()) + // return !compactionStatus.run_status + // } + // ) + + // check_rs_metas(1, {int startVersion, int endVersion, int numSegments, int numRows, String overlapPb -> + // // check the rowset produced by full compaction + // // [0-4] + // Assert.assertEquals(startVersion, 0) + // Assert.assertEquals(endVersion, 4) + // Assert.assertEquals(numRows, 3) + // Assert.assertEquals(overlapPb, "NONOVERLAPPING") + // }) + + // disable_block_in_publish() + + // t1.join() + // t2.join() + + // order_qt_sql "select * from ${table1};" + + // check_rs_metas(3, {int startVersion, int endVersion, int numSegments, int numRows, String overlapPb -> + // if (startVersion == 5) { + // // the first partial update load + // // it should skip the alignment process of rowsets produced by full compaction and + // // should not generate new segment in publish phase + // Assert.assertEquals(endVersion, 5) + // Assert.assertEquals(numSegments, 1) + // Assert.assertEquals(numRows, 3) + // } else if (startVersion == 6) { + // // the second partial update load + // // it should skip the alignment process of rowsets produced by full compaction and + // // should generate new segment in publish phase for conflicting rows with the first partial update load + // Assert.assertEquals(endVersion, 6) + // Assert.assertEquals(numSegments, 2) + // Assert.assertEquals(numRows, 4) // 4 = 2 + 2 + // } + // }) + + // } catch(Exception e) { + // logger.info(e.getMessage()) + // throw e + // } finally { + // GetDebugPoint().clearDebugPointsForAllFEs() + // GetDebugPoint().clearDebugPointsForAllBEs() + // } + + // sql "DROP TABLE IF EXISTS ${table1};" +} diff --git a/regression-test/suites/fault_injection_p0/test_partial_update_publish_conflict_with_error.groovy b/regression-test/suites/fault_injection_p0/partial_update/test_partial_update_publish_conflict_with_error.groovy similarity index 100% rename from regression-test/suites/fault_injection_p0/test_partial_update_publish_conflict_with_error.groovy rename to regression-test/suites/fault_injection_p0/partial_update/test_partial_update_publish_conflict_with_error.groovy diff --git a/regression-test/suites/fault_injection_p0/partial_update/test_partial_update_skip_compaction.groovy b/regression-test/suites/fault_injection_p0/partial_update/test_partial_update_skip_compaction.groovy new file mode 100644 index 00000000000000..424e35969948f4 --- /dev/null +++ b/regression-test/suites/fault_injection_p0/partial_update/test_partial_update_skip_compaction.groovy @@ -0,0 +1,193 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.junit.Assert +import java.util.concurrent.TimeUnit +// import org.awaitility.Awaitility + +suite("test_partial_update_skip_compaction", "nonConcurrent") { + + // def table1 = "test_partial_update_skip_compaction" + // sql "DROP TABLE IF EXISTS ${table1} FORCE;" + // sql """ CREATE TABLE IF NOT EXISTS ${table1} ( + // `k1` int NOT NULL, + // `c1` int, + // `c2` int, + // `c3` int, + // `c4` int + // )UNIQUE KEY(k1) + // DISTRIBUTED BY HASH(k1) BUCKETS 1 + // PROPERTIES ( + // "disable_auto_compaction" = "true", + // "replication_num" = "1"); """ + + // sql "insert into ${table1} values(1,1,1,1,1);" + // sql "insert into ${table1} values(2,2,2,2,2);" + // sql "insert into ${table1} values(3,3,3,3,3);" + // sql "sync;" + // order_qt_sql "select * from ${table1};" + + // def beNodes = sql_return_maparray("show backends;") + // def tabletStat = sql_return_maparray("show tablets from ${table1};").get(0) + // def tabletBackendId = tabletStat.BackendId + // def tabletId = tabletStat.TabletId + // def tabletBackend; + // for (def be : beNodes) { + // if (be.BackendId == tabletBackendId) { + // tabletBackend = be + // break; + // } + // } + // logger.info("tablet ${tabletId} on backend ${tabletBackend.Host} with backendId=${tabletBackend.BackendId}"); + + // def check_rs_metas = { expected_rs_meta_size, check_func -> + // if (isCloudMode()) { + // return + // } + + // def metaUrl = sql_return_maparray("show tablets from ${table1};").get(0).MetaUrl + // def (code, out, err) = curl("GET", metaUrl) + // Assert.assertEquals(code, 0) + // def jsonMeta = parseJson(out.trim()) + + // Assert.assertEquals(jsonMeta.rs_metas.size(), expected_rs_meta_size) + // for (def meta : jsonMeta.rs_metas) { + // int startVersion = meta.start_version + // int endVersion = meta.end_version + // int numSegments = meta.num_segments + // int numRows = meta.num_rows + // String overlapPb = meta.segments_overlap_pb + // logger.info("[${startVersion}-${endVersion}] ${overlapPb} ${meta.num_segments} ${numRows} ${meta.rowset_id_v2}") + // check_func(startVersion, endVersion, numSegments, numRows, overlapPb) + // } + // } + + // check_rs_metas(4, {int startVersion, int endVersion, int numSegments, int numRows, String overlapPb -> + // if (startVersion == 0) { + // // [0-1] + // Assert.assertEquals(endVersion, 1) + // Assert.assertEquals(numSegments, 0) + // } else { + // // [2-2], [3-3], [4-4] + // Assert.assertEquals(startVersion, endVersion) + // Assert.assertEquals(numSegments, 1) + // } + // }) + + // def enable_publish_spin_wait = { + // if (isCloudMode()) { + // GetDebugPoint().enableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.enable_spin_wait") + // } else { + // GetDebugPoint().enableDebugPointForAllBEs("EnginePublishVersionTask::execute.enable_spin_wait") + // } + // } + + // def disable_publish_spin_wait = { + // if (isCloudMode()) { + // GetDebugPoint().disableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.enable_spin_wait") + // } else { + // GetDebugPoint().disableDebugPointForAllBEs("EnginePublishVersionTask::execute.enable_spin_wait") + // } + // } + + // def enable_block_in_publish = { + // if (isCloudMode()) { + // GetDebugPoint().enableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.block") + // } else { + // GetDebugPoint().enableDebugPointForAllBEs("EnginePublishVersionTask::execute.block") + // } + // } + + // def disable_block_in_publish = { + // if (isCloudMode()) { + // GetDebugPoint().disableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.block") + // } else { + // GetDebugPoint().disableDebugPointForAllBEs("EnginePublishVersionTask::execute.block") + // } + // } + + // try { + // GetDebugPoint().clearDebugPointsForAllFEs() + // GetDebugPoint().clearDebugPointsForAllBEs() + + // // block the partial update in publish phase + // enable_publish_spin_wait() + // enable_block_in_publish() + // def t1 = Thread.start { + // sql "set enable_unique_key_partial_update=true;" + // sql "sync;" + // sql "insert into ${table1}(k1,c1,c2) values(1,999,999),(2,888,888),(3,777,777);" + // } + + // Thread.sleep(500) + + // // trigger full compaction on tablet + // logger.info("trigger compaction on another BE ${tabletBackend.Host} with backendId=${tabletBackend.BackendId}") + // def (code, out, err) = be_run_full_compaction(tabletBackend.Host, tabletBackend.HttpPort, tabletId) + // logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err) + // Assert.assertEquals(code, 0) + // def compactJson = parseJson(out.trim()) + // Assert.assertEquals("success", compactJson.status.toLowerCase()) + + // // wait for full compaction to complete + // Awaitility.await().atMost(3, TimeUnit.SECONDS).pollDelay(200, TimeUnit.MILLISECONDS).pollInterval(100, TimeUnit.MILLISECONDS).until( + // { + // (code, out, err) = be_get_compaction_status(tabletBackend.Host, tabletBackend.HttpPort, tabletId) + // logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err) + // Assert.assertEquals(code, 0) + // def compactionStatus = parseJson(out.trim()) + // Assert.assertEquals("success", compactionStatus.status.toLowerCase()) + // return !compactionStatus.run_status + // } + // ) + + // check_rs_metas(1, {int startVersion, int endVersion, int numSegments, int numRows, String overlapPb -> + // // check the rowset produced by full compaction + // // [0-4] + // Assert.assertEquals(startVersion, 0) + // Assert.assertEquals(endVersion, 4) + // Assert.assertEquals(numRows, 3) + // Assert.assertEquals(overlapPb, "NONOVERLAPPING") + // }) + + // // let the partial update load publish + // disable_block_in_publish() + // t1.join() + + // order_qt_sql "select * from ${table1};" + + // check_rs_metas(2, {int startVersion, int endVersion, int numSegments, int numRows, String overlapPb -> + // if (startVersion == 5) { + // // [5-5] + // Assert.assertEquals(endVersion, 5) + // // checks that partial update skips the alignment process of rowsets produced by compaction and + // // doesn't generate new segment in publish phase + // Assert.assertEquals(numSegments, 1) + // Assert.assertEquals(numRows, 3) + // } + // }) + + // } catch(Exception e) { + // logger.info(e.getMessage()) + // throw e + // } finally { + // GetDebugPoint().clearDebugPointsForAllFEs() + // GetDebugPoint().clearDebugPointsForAllBEs() + // } + + // sql "DROP TABLE IF EXISTS ${table1};" +}