From d866be27ed607eb6e6b5350af4c01f28b69bce01 Mon Sep 17 00:00:00 2001 From: wwbmmm Date: Thu, 19 Oct 2023 20:04:49 +0800 Subject: [PATCH] Fix RetryPolicy blocks HealthCheck --- src/brpc/controller.cpp | 38 +++++++++++++++++++++++++++----------- 1 file changed, 27 insertions(+), 11 deletions(-) diff --git a/src/brpc/controller.cpp b/src/brpc/controller.cpp index 25a5940e01..5392f16c0b 100644 --- a/src/brpc/controller.cpp +++ b/src/brpc/controller.cpp @@ -662,18 +662,17 @@ void Controller::OnVersionedRPCReturned(const CompletionInfo& info, // Retry backoff. bthread::TaskGroup* g = bthread::tls_task_group; - if (retry_policy->CanRetryBackoffInPthread() || - (g && !g->is_current_pthread_task())) { - int64_t backoff_time_us = retry_policy->GetBackoffTimeMs(this) * 1000L; + int64_t backoff_time_us = retry_policy->GetBackoffTimeMs(this) * 1000L; + if (backoff_time_us > 0 && + backoff_time_us < _deadline_us - butil::gettimeofday_us()) { // No need to do retry backoff when the backoff time is longer than the remaining rpc time. - if (backoff_time_us > 0 && - backoff_time_us < _deadline_us - butil::gettimeofday_us()) { + if (retry_policy->CanRetryBackoffInPthread() || + (g && !g->is_current_pthread_task())) { bthread_usleep(backoff_time_us); + } else { + LOG(WARNING) << "`CanRetryBackoffInPthread()' returns false, " + "skip retry backoff in pthread."; } - - } else { - LOG(WARNING) << "`CanRetryBackoffInPthread()' returns false, " - "skip retry backoff in pthread."; } return IssueRPC(butil::gettimeofday_us()); } @@ -1262,8 +1261,25 @@ int Controller::HandleSocketFailed(bthread_id_t id, void* data, int error_code, cntl->SetFailed(error_code, "%s @%s", berror(error_code), butil::endpoint2str(cntl->remote_side()).c_str()); } - CompletionInfo info = { id, false }; - cntl->OnVersionedRPCReturned(info, true, saved_error); + + struct OnVersionedRPCReturnedArgs { + bthread_id_t id; + Controller* cntl; + int error; + }; + auto func = [](void* p) -> void* { + std::unique_ptr args(static_cast(p)); + CompletionInfo info = { args->id, false }; + args->cntl->OnVersionedRPCReturned(info, true, args->error); + return NULL; + }; + + auto* args = new OnVersionedRPCReturnedArgs{ id, cntl, saved_error }; + bthread_t tid; + // RetryPolicy may block current bthread, so start a new bthread to run OnVersionedRPCReturned + if (!cntl->_retry_policy || bthread_start_background(&tid, NULL, func, args) != 0) { + func(args); + } return 0; }