diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp index 335a4450052d42..e22cb94040f49b 100644 --- a/be/src/olap/storage_engine.cpp +++ b/be/src/olap/storage_engine.cpp @@ -1233,14 +1233,15 @@ void StorageEngine::notify_listeners() { } bool StorageEngine::notify_listener(TaskWorkerPool::TaskWorkerType task_worker_type) { + bool found = false; std::lock_guard l(_report_mtx); for (auto& listener : _report_listeners) { if (listener->task_worker_type() == task_worker_type) { listener->notify_thread(); - return true; + found = true; } } - return false; + return found; } Status StorageEngine::execute_task(EngineTask* task) { diff --git a/be/src/olap/task/engine_publish_version_task.cpp b/be/src/olap/task/engine_publish_version_task.cpp index 3504d8a92805b6..83042afcc63907 100644 --- a/be/src/olap/task/engine_publish_version_task.cpp +++ b/be/src/olap/task/engine_publish_version_task.cpp @@ -108,6 +108,14 @@ Status EnginePublishVersionTask::finish() { StorageEngine::instance()->tablet_publish_txn_thread_pool()->new_token( ThreadPool::ExecutionMode::CONCURRENT); std::unordered_map tablet_id_to_num_delta_rows; + +#ifndef NDEBUG + if (UNLIKELY(_publish_version_req.partition_version_infos.empty())) { + LOG(WARNING) << "transaction_id: " << transaction_id << " empty partition_version_infos"; + } +#endif + + std::vector> tablet_tasks; // each partition for (auto& par_ver_info : _publish_version_req.partition_version_infos) { int64_t partition_id = par_ver_info.partition_id; @@ -221,12 +229,22 @@ Status EnginePublishVersionTask::finish() { auto tablet_publish_txn_ptr = std::make_shared( this, tablet, rowset, partition_id, transaction_id, version, tablet_info); + tablet_tasks.push_back(tablet_publish_txn_ptr); auto submit_st = token->submit_func([=]() { tablet_publish_txn_ptr->handle(); }); CHECK(submit_st.ok()) << submit_st; } } token->wait(); + if (res.ok()) { + for (const auto& tablet_task : tablet_tasks) { + res = tablet_task->result(); + if (!res.ok()) { + break; + } + } + } + _succ_tablets->clear(); // check if the related tablet remained all have the version for (auto& par_ver_info : _publish_version_req.partition_version_infos) { @@ -315,24 +333,24 @@ void TabletPublishTxnTask::handle() { rowset_update_lock.lock(); } _stats.schedule_time_us = MonotonicMicros() - _stats.submit_time_us; - auto publish_status = StorageEngine::instance()->txn_manager()->publish_txn( + _result = StorageEngine::instance()->txn_manager()->publish_txn( _partition_id, _tablet, _transaction_id, _version, &_stats); - if (publish_status != Status::OK()) { + if (!_result.ok()) { LOG(WARNING) << "failed to publish version. rowset_id=" << _rowset->rowset_id() << ", tablet_id=" << _tablet_info.tablet_id << ", txn_id=" << _transaction_id - << ", res=" << publish_status; + << ", res=" << _result; _engine_publish_version_task->add_error_tablet_id(_tablet_info.tablet_id); return; } // add visible rowset to tablet int64_t t1 = MonotonicMicros(); - publish_status = _tablet->add_inc_rowset(_rowset); + _result = _tablet->add_inc_rowset(_rowset); _stats.add_inc_rowset_us = MonotonicMicros() - t1; - if (publish_status != Status::OK() && !publish_status.is()) { + if (!_result.ok() && !_result.is()) { LOG(WARNING) << "fail to add visible rowset to tablet. rowset_id=" << _rowset->rowset_id() << ", tablet_id=" << _tablet_info.tablet_id << ", txn_id=" << _transaction_id - << ", res=" << publish_status; + << ", res=" << _result; _engine_publish_version_task->add_error_tablet_id(_tablet_info.tablet_id); return; } @@ -342,9 +360,11 @@ void TabletPublishTxnTask::handle() { LOG(INFO) << "publish version successfully on tablet" << ", table_id=" << _tablet->table_id() << ", tablet=" << _tablet->full_name() << ", transaction_id=" << _transaction_id << ", version=" << _version.first - << ", num_rows=" << _rowset->num_rows() << ", res=" << publish_status + << ", num_rows=" << _rowset->num_rows() << ", res=" << _result << ", cost: " << cost_us << "(us) " << (cost_us > 500 * 1000 ? _stats.to_string() : ""); + + _result = Status::OK(); } void AsyncTabletPublishTask::handle() { diff --git a/be/src/olap/task/engine_publish_version_task.h b/be/src/olap/task/engine_publish_version_task.h index 8f3790574a25d4..dc84dd73923eae 100644 --- a/be/src/olap/task/engine_publish_version_task.h +++ b/be/src/olap/task/engine_publish_version_task.h @@ -69,6 +69,7 @@ class TabletPublishTxnTask { ~TabletPublishTxnTask() = default; void handle(); + Status result() { return _result; } private: EnginePublishVersionTask* _engine_publish_version_task; @@ -80,6 +81,7 @@ class TabletPublishTxnTask { Version _version; TabletInfo _tablet_info; TabletPublishStatistics _stats; + Status _result; }; class EnginePublishVersionTask : public EngineTask { diff --git a/regression-test/data/insert_p0/test_be_inject_publish_txn_fail.out b/regression-test/data/insert_p0/test_be_inject_publish_txn_fail.out new file mode 100644 index 00000000000000..35378c9914e657 --- /dev/null +++ b/regression-test/data/insert_p0/test_be_inject_publish_txn_fail.out @@ -0,0 +1,13 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !select_1 -- + +-- !select_2 -- +100 + +-- !select_1 -- +100 + +-- !select_2 -- +100 +200 + diff --git a/regression-test/suites/insert_p0/test_be_inject_publish_txn_fail.groovy b/regression-test/suites/insert_p0/test_be_inject_publish_txn_fail.groovy new file mode 100644 index 00000000000000..bd7fe9d8a2fdba --- /dev/null +++ b/regression-test/suites/insert_p0/test_be_inject_publish_txn_fail.groovy @@ -0,0 +1,77 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite('test_be_inject_publish_txn_fail', 'nonConcurrent') { + def tbl = 'test_be_inject_publish_txn_fail_tbl' + def dbug1 = 'TxnManager.publish_txn.random_failed_before_save_rs_meta' + def dbug2 = 'TxnManager.publish_txn.random_failed_after_save_rs_meta' + + def allBeReportTask = { -> + def backendId_to_backendIP = [:] + def backendId_to_backendHttpPort = [:] + getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort) + backendId_to_backendIP.each { beId, beIp -> + def port = backendId_to_backendHttpPort.get(beId) as int + be_report_task(beIp, port) + } + } + + def testInsertValue = { dbug, value -> + // insert succ but not visible + GetDebugPoint().enableDebugPointForAllBEs(dbug, [percent : 1.0]) + sql "INSERT INTO ${tbl} VALUES (${value})" + sleep(6000) + order_qt_select_1 "SELECT * FROM ${tbl}" + + GetDebugPoint().disableDebugPointForAllBEs(dbug) + + // be report publish fail to fe, then fe will not remove its task. + // after be report its tasks, fe will resend publish version task to be. + // the txn will visible + allBeReportTask() + sleep(8000) + order_qt_select_2 "SELECT * FROM ${tbl}" + } + + try { + sql "DROP TABLE IF EXISTS ${tbl} FORCE" + sql "CREATE TABLE ${tbl} (k INT) DISTRIBUTED BY HASH(k) BUCKETS 5 PROPERTIES ('replication_num' = '1')" + + sql "ADMIN SET FRONTEND CONFIG ('agent_task_resend_wait_time_ms' = '1000')" + sql 'SET insert_visible_timeout_ms = 2000' + + testInsertValue dbug1, 100 + testInsertValue dbug2, 200 + } finally { + try { + sql "ADMIN SET FRONTEND CONFIG ('agent_task_resend_wait_time_ms' = '5000')" + } catch (Throwable e) { + } + + try { + GetDebugPoint().disableDebugPointForAllBEs(dbug1) + } catch (Throwable e) { + } + + try { + GetDebugPoint().disableDebugPointForAllBEs(dbug2) + } catch (Throwable e) { + } + + sql "DROP TABLE IF EXISTS ${tbl} FORCE" + } +}