Skip to content

Commit

Permalink
[backoff] fix bug and improve API
Browse files Browse the repository at this point in the history
  • Loading branch information
markdroth committed Aug 28, 2024
1 parent ae5dfa1 commit 7916b88
Show file tree
Hide file tree
Showing 14 changed files with 107 additions and 192 deletions.
2 changes: 1 addition & 1 deletion src/core/client_channel/retry_filter_legacy_call_data.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1890,7 +1890,7 @@ void RetryFilter::LegacyCallData::StartRetryTimer(
next_attempt_timeout = *server_pushback;
retry_backoff_.Reset();
} else {
next_attempt_timeout = retry_backoff_.NextAttemptTime() - Timestamp::Now();
next_attempt_timeout = retry_backoff_.NextAttemptDelay();
}
GRPC_TRACE_LOG(retry, INFO)
<< "chand=" << chand_ << " calld=" << this << ": retrying failed call in "
Expand Down
5 changes: 3 additions & 2 deletions src/core/client_channel/subchannel.cc
Original file line number Diff line number Diff line change
Expand Up @@ -764,8 +764,9 @@ void Subchannel::OnRetryTimerLocked() {

void Subchannel::StartConnectingLocked() {
// Set next attempt time.
const Timestamp min_deadline = min_connect_timeout_ + Timestamp::Now();
next_attempt_time_ = backoff_.NextAttemptTime();
const Timestamp now = Timestamp::Now();
const Timestamp min_deadline = now + min_connect_timeout_;
next_attempt_time_ = now + backoff_.NextAttemptDelay();
// Report CONNECTING.
SetConnectivityStateLocked(GRPC_CHANNEL_CONNECTING, absl::OkStatus());
// Start connection attempt.
Expand Down
2 changes: 1 addition & 1 deletion src/core/client_channel/subchannel_stream_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ void SubchannelStreamClient::StartRetryTimerLocked() {
if (event_handler_ != nullptr) {
event_handler_->OnRetryTimerStartLocked(this);
}
const Duration timeout = retry_backoff_.NextAttemptTime() - Timestamp::Now();
const Duration timeout = retry_backoff_.NextAttemptDelay();
if (GPR_UNLIKELY(tracer_ != nullptr)) {
LOG(INFO) << tracer_ << " " << this
<< ": SubchannelStreamClient health check call lost...";
Expand Down
17 changes: 7 additions & 10 deletions src/core/lib/backoff/backoff.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,23 +20,20 @@

#include <algorithm>

#include <grpc/support/port_platform.h>

namespace grpc_core {

BackOff::BackOff(const Options& options) : options_(options) { Reset(); }

Timestamp BackOff::NextAttemptTime() {
Duration BackOff::NextAttemptDelay() {
if (initial_) {
initial_ = false;
return current_backoff_ + Timestamp::Now();
} else {
current_backoff_ = std::min(current_backoff_ * options_.multiplier(),
options_.max_backoff());
}
current_backoff_ = std::min(current_backoff_ * options_.multiplier(),
options_.max_backoff());
const Duration jitter = Duration::FromSecondsAsDouble(
absl::Uniform(rand_gen_, -options_.jitter() * current_backoff_.seconds(),
options_.jitter() * current_backoff_.seconds()));
return Timestamp::Now() + current_backoff_ + jitter;
const double jitter =
absl::Uniform(rand_gen_, 1 - options_.jitter(), 1 + options_.jitter());
return current_backoff_ * jitter;
}

void BackOff::Reset() {
Expand Down
10 changes: 4 additions & 6 deletions src/core/lib/backoff/backoff.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@

#include "absl/random/random.h"

#include <grpc/support/port_platform.h>

#include "src/core/lib/gprpp/time.h"

namespace grpc_core {
Expand All @@ -36,11 +34,11 @@ class BackOff {
/// Initialize backoff machinery - does not need to be destroyed
explicit BackOff(const Options& options);

/// Returns the time at which the next attempt should start.
Timestamp NextAttemptTime();
/// Returns the delay before the next attempt should start.
Duration NextAttemptDelay();

/// Reset the backoff, so the next value returned by NextAttemptTime()
/// will be the time of the second attempt (rather than the Nth).
/// Reset the backoff, so the next value returned by NextAttemptDelay()
/// will be the delay for the second attempt (rather than the Nth).
void Reset();

class Options {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -400,9 +400,7 @@ void WorkStealingThreadPool::WorkStealingThreadPoolImpl::Lifeguard::
if (pool_->IsQuiesced()) break;
} else {
lifeguard_should_shut_down_->WaitForNotificationWithTimeout(
absl::Milliseconds(
(backoff_.NextAttemptTime() - grpc_core::Timestamp::Now())
.millis()));
absl::Milliseconds(backoff_.NextAttemptDelay().millis()));
}
MaybeStartNewThread();
}
Expand Down Expand Up @@ -556,8 +554,8 @@ bool WorkStealingThreadPool::ThreadState::Step() {
// No closures were retrieved from anywhere.
// Quit the thread if the pool has been shut down.
if (pool_->IsShutdown()) break;
bool timed_out = pool_->work_signal()->WaitWithTimeout(
backoff_.NextAttemptTime() - grpc_core::Timestamp::Now());
bool timed_out =
pool_->work_signal()->WaitWithTimeout(backoff_.NextAttemptDelay());
if (pool_->IsForking() || pool_->IsShutdown()) break;
// Quit a thread if the pool has more than it requires, and this thread
// has been idle long enough.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,13 @@ void TokenFetcherCredentials::Token::AddTokenToClientInitialMetadata(
TokenFetcherCredentials::FetchState::BackoffTimer::BackoffTimer(
RefCountedPtr<FetchState> fetch_state)
: fetch_state_(std::move(fetch_state)) {
const Timestamp next_attempt_time = fetch_state_->backoff_.NextAttemptTime();
const Duration duration = next_attempt_time - Timestamp::Now();
const Duration delay = fetch_state_->backoff_.NextAttemptDelay();
GRPC_TRACE_LOG(token_fetcher_credentials, INFO)
<< "[TokenFetcherCredentials " << fetch_state_->creds_.get()
<< "]: fetch_state=" << fetch_state_.get() << " backoff_timer=" << this
<< ": starting backoff timer for " << next_attempt_time << " ("
<< duration << " from now)";
<< ": starting backoff timer for " << delay;
timer_handle_ = fetch_state_->creds_->event_engine().RunAfter(
duration, [self = Ref()]() mutable {
delay, [self = Ref()]() mutable {
ApplicationCallbackExecCtx callback_exec_ctx;
ExecCtx exec_ctx;
self->OnTimer();
Expand Down
8 changes: 4 additions & 4 deletions src/core/load_balancing/grpclb/grpclb.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1655,20 +1655,20 @@ void GrpcLb::StartBalancerCallLocked() {
}

void GrpcLb::StartBalancerCallRetryTimerLocked() {
Duration timeout = lb_call_backoff_.NextAttemptTime() - Timestamp::Now();
Duration delay = lb_call_backoff_.NextAttemptDelay();
if (GRPC_TRACE_FLAG_ENABLED(glb)) {
LOG(INFO) << "[grpclb " << this << "] Connection to LB server lost...";
if (timeout > Duration::Zero()) {
if (delay > Duration::Zero()) {
LOG(INFO) << "[grpclb " << this << "] ... retry_timer_active in "
<< timeout.millis() << "ms.";
<< delay.millis() << "ms.";
} else {
LOG(INFO) << "[grpclb " << this
<< "] ... retry_timer_active immediately.";
}
}
lb_call_retry_timer_handle_ =
channel_control_helper()->GetEventEngine()->RunAfter(
timeout,
delay,
[self = RefAsSubclass<GrpcLb>(
DEBUG_LOCATION, "on_balancer_call_retry_timer")]() mutable {
ApplicationCallbackExecCtx callback_exec_ctx;
Expand Down
16 changes: 8 additions & 8 deletions src/core/load_balancing/rls/rls.cc
Original file line number Diff line number Diff line change
Expand Up @@ -526,7 +526,7 @@ class RlsLb final : public LoadBalancingPolicy {
private:
class BackoffTimer final : public InternallyRefCounted<BackoffTimer> {
public:
BackoffTimer(RefCountedPtr<Entry> entry, Timestamp backoff_time);
BackoffTimer(RefCountedPtr<Entry> entry, Duration delay);

// Note: We are forced to disable lock analysis here because
// Orphan() is called by OrphanablePtr<>, which cannot have lock
Expand Down Expand Up @@ -1138,12 +1138,11 @@ LoadBalancingPolicy::PickResult RlsLb::Picker::PickFromDefaultTargetOrFail(
//

RlsLb::Cache::Entry::BackoffTimer::BackoffTimer(RefCountedPtr<Entry> entry,
Timestamp backoff_time)
Duration delay)
: entry_(std::move(entry)) {
backoff_timer_task_handle_ =
entry_->lb_policy_->channel_control_helper()->GetEventEngine()->RunAfter(
backoff_time - Timestamp::Now(),
[self = Ref(DEBUG_LOCATION, "BackoffTimer")]() mutable {
delay, [self = Ref(DEBUG_LOCATION, "BackoffTimer")]() mutable {
ApplicationCallbackExecCtx callback_exec_ctx;
ExecCtx exec_ctx;
auto self_ptr = self.get();
Expand Down Expand Up @@ -1311,11 +1310,12 @@ RlsLb::Cache::Entry::OnRlsResponseLocked(
} else {
backoff_state_ = MakeCacheEntryBackoff();
}
backoff_time_ = backoff_state_->NextAttemptTime();
Timestamp now = Timestamp::Now();
backoff_expiration_time_ = now + (backoff_time_ - now) * 2;
const Duration delay = backoff_state_->NextAttemptDelay();
const Timestamp now = Timestamp::Now();
backoff_time_ = now + delay;
backoff_expiration_time_ = now + delay * 2;
backoff_timer_ = MakeOrphanable<BackoffTimer>(
Ref(DEBUG_LOCATION, "BackoffTimer"), backoff_time_);
Ref(DEBUG_LOCATION, "BackoffTimer"), delay);
lb_policy_->UpdatePickerAsync();
return {};
}
Expand Down
21 changes: 6 additions & 15 deletions src/core/resolver/polling_resolver.cc
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,10 @@ void PollingResolver::ShutdownLocked() {
request_.reset();
}

void PollingResolver::ScheduleNextResolutionTimer(const Duration& timeout) {
void PollingResolver::ScheduleNextResolutionTimer(Duration delay) {
next_resolution_timer_handle_ =
channel_args_.GetObject<EventEngine>()->RunAfter(
timeout, [self = RefAsSubclass<PollingResolver>()]() mutable {
delay, [self = RefAsSubclass<PollingResolver>()]() mutable {
ApplicationCallbackExecCtx callback_exec_ctx;
ExecCtx exec_ctx;
auto* self_ptr = self.get();
Expand Down Expand Up @@ -198,22 +198,13 @@ void PollingResolver::GetResultStatus(absl::Status status) {
}
} else {
// Set up for retry.
// InvalidateNow to avoid getting stuck re-initializing this timer
// in a loop while draining the currently-held WorkSerializer.
// Also see https://github.com/grpc/grpc/issues/26079.
ExecCtx::Get()->InvalidateNow();
const Timestamp next_try = backoff_.NextAttemptTime();
const Duration timeout = next_try - Timestamp::Now();
const Duration delay = backoff_.NextAttemptDelay();
CHECK(!next_resolution_timer_handle_.has_value());
if (GPR_UNLIKELY(tracer_ != nullptr && tracer_->enabled())) {
if (timeout > Duration::Zero()) {
LOG(INFO) << "[polling resolver " << this << "] retrying in "
<< timeout.millis() << " ms";
} else {
LOG(INFO) << "[polling resolver " << this << "] retrying immediately";
}
LOG(INFO) << "[polling resolver " << this << "] retrying in "
<< delay.millis() << " ms";
}
ScheduleNextResolutionTimer(timeout);
ScheduleNextResolutionTimer(delay);
// Reset result_status_state_. Note that even if re-resolution was
// requested while the result-health callback was pending, we can
// ignore it here, because we are in backoff to re-resolve anyway.
Expand Down
2 changes: 1 addition & 1 deletion src/core/resolver/polling_resolver.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ class PollingResolver : public Resolver {
void OnRequestCompleteLocked(Result result);
void GetResultStatus(absl::Status status);

void ScheduleNextResolutionTimer(const Duration& timeout);
void ScheduleNextResolutionTimer(Duration delay);
void OnNextResolutionLocked();
void MaybeCancelNextResolutionTimer();

Expand Down
8 changes: 3 additions & 5 deletions src/core/xds/xds_client/xds_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -715,16 +715,14 @@ void XdsClient::XdsChannel::RetryableCall<T>::StartNewCallLocked() {
template <typename T>
void XdsClient::XdsChannel::RetryableCall<T>::StartRetryTimerLocked() {
if (shutting_down_) return;
const Timestamp next_attempt_time = backoff_.NextAttemptTime();
const Duration timeout =
std::max(next_attempt_time - Timestamp::Now(), Duration::Zero());
const Duration delay = backoff_.NextAttemptDelay();
GRPC_TRACE_LOG(xds_client, INFO)
<< "[xds_client " << xds_channel()->xds_client() << "] xds server "
<< xds_channel()->server_.server_uri()
<< ": call attempt failed; retry timer will fire in " << timeout.millis()
<< ": call attempt failed; retry timer will fire in " << delay.millis()
<< "ms.";
timer_handle_ = xds_channel()->xds_client()->engine()->RunAfter(
timeout,
delay,
[self = this->Ref(DEBUG_LOCATION, "RetryableCall+retry_timer_start")]() {
ApplicationCallbackExecCtx callback_exec_ctx;
ExecCtx exec_ctx;
Expand Down
1 change: 0 additions & 1 deletion test/core/backoff/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ grpc_cc_test(
uses_polling = False,
deps = [
"//:backoff",
"//:exec_ctx",
"//:grpc",
"//src/core:time",
"//test/core/test_util:grpc_test_util",
Expand Down
Loading

0 comments on commit 7916b88

Please sign in to comment.