Skip to content

Commit

Permalink
fix publish version fail but return ok
Browse files Browse the repository at this point in the history
  • Loading branch information
yujun777 committed Dec 21, 2023
1 parent 0759510 commit 25e90bd
Show file tree
Hide file tree
Showing 5 changed files with 122 additions and 9 deletions.
5 changes: 3 additions & 2 deletions be/src/olap/storage_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1233,14 +1233,15 @@ void StorageEngine::notify_listeners() {
}

bool StorageEngine::notify_listener(TaskWorkerPool::TaskWorkerType task_worker_type) {
bool found = false;
std::lock_guard<std::mutex> l(_report_mtx);
for (auto& listener : _report_listeners) {
if (listener->task_worker_type() == task_worker_type) {
listener->notify_thread();
return true;
found = true;
}
}
return false;
return found;
}

Status StorageEngine::execute_task(EngineTask* task) {
Expand Down
34 changes: 27 additions & 7 deletions be/src/olap/task/engine_publish_version_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,14 @@ Status EnginePublishVersionTask::finish() {
StorageEngine::instance()->tablet_publish_txn_thread_pool()->new_token(
ThreadPool::ExecutionMode::CONCURRENT);
std::unordered_map<int64_t, int64_t> tablet_id_to_num_delta_rows;

#ifndef NDEBUG
if (UNLIKELY(_publish_version_req.partition_version_infos.empty())) {
LOG(WARNING) << "transaction_id: " << transaction_id << " empty partition_version_infos";
}
#endif

std::vector<std::shared_ptr<TabletPublishTxnTask>> tablet_tasks;
// each partition
for (auto& par_ver_info : _publish_version_req.partition_version_infos) {
int64_t partition_id = par_ver_info.partition_id;
Expand Down Expand Up @@ -221,12 +229,22 @@ Status EnginePublishVersionTask::finish() {

auto tablet_publish_txn_ptr = std::make_shared<TabletPublishTxnTask>(
this, tablet, rowset, partition_id, transaction_id, version, tablet_info);
tablet_tasks.push_back(tablet_publish_txn_ptr);
auto submit_st = token->submit_func([=]() { tablet_publish_txn_ptr->handle(); });
CHECK(submit_st.ok()) << submit_st;
}
}
token->wait();

if (res.ok()) {
for (const auto& tablet_task : tablet_tasks) {
res = tablet_task->result();
if (!res.ok()) {
break;
}
}
}

_succ_tablets->clear();
// check if the related tablet remained all have the version
for (auto& par_ver_info : _publish_version_req.partition_version_infos) {
Expand Down Expand Up @@ -315,24 +333,24 @@ void TabletPublishTxnTask::handle() {
rowset_update_lock.lock();
}
_stats.schedule_time_us = MonotonicMicros() - _stats.submit_time_us;
auto publish_status = StorageEngine::instance()->txn_manager()->publish_txn(
_result = StorageEngine::instance()->txn_manager()->publish_txn(
_partition_id, _tablet, _transaction_id, _version, &_stats);
if (publish_status != Status::OK()) {
if (!_result.ok()) {
LOG(WARNING) << "failed to publish version. rowset_id=" << _rowset->rowset_id()
<< ", tablet_id=" << _tablet_info.tablet_id << ", txn_id=" << _transaction_id
<< ", res=" << publish_status;
<< ", res=" << _result;
_engine_publish_version_task->add_error_tablet_id(_tablet_info.tablet_id);
return;
}

// add visible rowset to tablet
int64_t t1 = MonotonicMicros();
publish_status = _tablet->add_inc_rowset(_rowset);
_result = _tablet->add_inc_rowset(_rowset);
_stats.add_inc_rowset_us = MonotonicMicros() - t1;
if (publish_status != Status::OK() && !publish_status.is<PUSH_VERSION_ALREADY_EXIST>()) {
if (!_result.ok() && !_result.is<PUSH_VERSION_ALREADY_EXIST>()) {
LOG(WARNING) << "fail to add visible rowset to tablet. rowset_id=" << _rowset->rowset_id()
<< ", tablet_id=" << _tablet_info.tablet_id << ", txn_id=" << _transaction_id
<< ", res=" << publish_status;
<< ", res=" << _result;
_engine_publish_version_task->add_error_tablet_id(_tablet_info.tablet_id);
return;
}
Expand All @@ -342,9 +360,11 @@ void TabletPublishTxnTask::handle() {
LOG(INFO) << "publish version successfully on tablet"
<< ", table_id=" << _tablet->table_id() << ", tablet=" << _tablet->full_name()
<< ", transaction_id=" << _transaction_id << ", version=" << _version.first
<< ", num_rows=" << _rowset->num_rows() << ", res=" << publish_status
<< ", num_rows=" << _rowset->num_rows() << ", res=" << _result
<< ", cost: " << cost_us << "(us) "
<< (cost_us > 500 * 1000 ? _stats.to_string() : "");

_result = Status::OK();
}

void AsyncTabletPublishTask::handle() {
Expand Down
2 changes: 2 additions & 0 deletions be/src/olap/task/engine_publish_version_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ class TabletPublishTxnTask {
~TabletPublishTxnTask() = default;

void handle();
Status result() { return _result; }

private:
EnginePublishVersionTask* _engine_publish_version_task;
Expand All @@ -80,6 +81,7 @@ class TabletPublishTxnTask {
Version _version;
TabletInfo _tablet_info;
TabletPublishStatistics _stats;
Status _result;
};

class EnginePublishVersionTask : public EngineTask {
Expand Down
13 changes: 13 additions & 0 deletions regression-test/data/insert_p0/test_be_inject_publish_txn_fail.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !select_1 --

-- !select_2 --
100

-- !select_1 --
100

-- !select_2 --
100
200

Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

suite('test_be_inject_publish_txn_fail', 'nonConcurrent') {
def tbl = 'test_be_inject_publish_txn_fail_tbl'
def dbug1 = 'TxnManager.publish_txn.random_failed_before_save_rs_meta'
def dbug2 = 'TxnManager.publish_txn.random_failed_after_save_rs_meta'

def allBeReportTask = { ->
def backendId_to_backendIP = [:]
def backendId_to_backendHttpPort = [:]
getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort)
backendId_to_backendIP.each { beId, beIp ->
def port = backendId_to_backendHttpPort.get(beId) as int
be_report_task(beIp, port)
}
}

def testInsertValue = { dbug, value ->
// insert succ but not visible
GetDebugPoint().enableDebugPointForAllBEs(dbug, [percent : 1.0])
sql "INSERT INTO ${tbl} VALUES (${value})"
sleep(6000)
order_qt_select_1 "SELECT * FROM ${tbl}"

GetDebugPoint().disableDebugPointForAllBEs(dbug)

// be report publish fail to fe, then fe will not remove its task.
// after be report its tasks, fe will resend publish version task to be.
// the txn will visible
allBeReportTask()
sleep(8000)
order_qt_select_2 "SELECT * FROM ${tbl}"
}

try {
sql "DROP TABLE IF EXISTS ${tbl} FORCE"
sql "CREATE TABLE ${tbl} (k INT) DISTRIBUTED BY HASH(k) BUCKETS 5 PROPERTIES ('replication_num' = '1')"

sql "ADMIN SET FRONTEND CONFIG ('agent_task_resend_wait_time_ms' = '1000')"
sql 'SET insert_visible_timeout_ms = 2000'

testInsertValue dbug1, 100
testInsertValue dbug2, 200
} finally {
try {
sql "ADMIN SET FRONTEND CONFIG ('agent_task_resend_wait_time_ms' = '5000')"
} catch (Throwable e) {
}

try {
GetDebugPoint().disableDebugPointForAllBEs(dbug1)
} catch (Throwable e) {
}

try {
GetDebugPoint().disableDebugPointForAllBEs(dbug2)
} catch (Throwable e) {
}

sql "DROP TABLE IF EXISTS ${tbl} FORCE"
}
}

0 comments on commit 25e90bd

Please sign in to comment.