diff --git a/BUILD b/BUILD index b55ca8a569ef2..bde7f174e6a35 100644 --- a/BUILD +++ b/BUILD @@ -3005,6 +3005,7 @@ grpc_cc_library( ], external_deps = [ "absl/base:core_headers", + "absl/cleanup", "absl/container:flat_hash_set", "absl/container:inlined_vector", "absl/functional:any_invocable", diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index ed07a090030cf..6cfd4c48fcd87 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -29,6 +29,7 @@ #include #include +#include "absl/cleanup/cleanup.h" #include "absl/status/status.h" #include "absl/status/statusor.h" #include "absl/strings/cord.h" @@ -661,12 +662,6 @@ class ClientChannel::SubchannelWrapper : public SubchannelInterface { GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_, "SubchannelWrapper"); } - void Orphan() override { - // Make sure we release the last ref inside the WorkSerializer, so - // that we can update the channel's subchannel maps. - chand_->work_serializer_->Run([self = WeakRef()]() {}, DEBUG_LOCATION); - } - void WatchConnectivityState( std::unique_ptr watcher) override ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) { @@ -2775,34 +2770,47 @@ void ClientChannel::LoadBalancedCall::AddCallToLbQueuedCallsLocked() { absl::optional ClientChannel::LoadBalancedCall::PickSubchannel( bool was_queued) { + // We may accumulate multiple pickers here, because if a picker says + // to queue the call, we check again to see if the picker has been + // updated before we queue it. + // We need to unref pickers in the WorkSerializer. + std::vector> pickers; + auto cleanup = absl::MakeCleanup([&]() { + chand_->work_serializer_->Run( + [pickers = std::move(pickers)]() mutable { + for (auto& picker : pickers) { + picker.reset(DEBUG_LOCATION, "PickSubchannel"); + } + }, + DEBUG_LOCATION); + }); // Grab mutex and take a ref to the picker. if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) { gpr_log(GPR_INFO, "chand=%p lb_call=%p: grabbing LB mutex to get picker", chand_, this); } - RefCountedPtr picker; { MutexLock lock(&chand_->lb_mu_); - picker = chand_->picker_; + pickers.emplace_back(chand_->picker_); } while (true) { // Do pick. if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) { gpr_log(GPR_INFO, "chand=%p lb_call=%p: performing pick with picker=%p", - chand_, this, picker.get()); + chand_, this, pickers.back().get()); } grpc_error_handle error; - bool pick_complete = PickSubchannelImpl(picker.get(), &error); + bool pick_complete = PickSubchannelImpl(pickers.back().get(), &error); if (!pick_complete) { MutexLock lock(&chand_->lb_mu_); // If picker has been swapped out since we grabbed it, try again. - if (picker != chand_->picker_) { + if (chand_->picker_ != pickers.back()) { if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) { gpr_log(GPR_INFO, "chand=%p lb_call=%p: pick not complete, but picker changed", chand_, this); } - picker = chand_->picker_; + pickers.emplace_back(chand_->picker_); continue; } // Otherwise queue the pick to try again later when we get a new picker. diff --git a/src/core/ext/filters/client_channel/lb_policy/weighted_round_robin/weighted_round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/weighted_round_robin/weighted_round_robin.cc index 89c4c1bd847c4..4d3a47c1d5b0c 100644 --- a/src/core/ext/filters/client_channel/lb_policy/weighted_round_robin/weighted_round_robin.cc +++ b/src/core/ext/filters/client_channel/lb_policy/weighted_round_robin/weighted_round_robin.cc @@ -608,7 +608,9 @@ void WeightedRoundRobin::Picker::BuildSchedulerAndStartTimerLocked() { // Start timer. WeakRefCountedPtr self = WeakRef(); timer_handle_ = wrr_->channel_control_helper()->GetEventEngine()->RunAfter( - config_->weight_update_period(), [self = std::move(self)]() mutable { + config_->weight_update_period(), + [self = std::move(self), + work_serializer = wrr_->work_serializer()]() mutable { ApplicationCallbackExecCtx callback_exec_ctx; ExecCtx exec_ctx; { @@ -621,7 +623,8 @@ void WeightedRoundRobin::Picker::BuildSchedulerAndStartTimerLocked() { self->BuildSchedulerAndStartTimerLocked(); } } - self.reset(); + // Release the picker ref inside the WorkSerializer. + work_serializer->Run([self = std::move(self)]() {}, DEBUG_LOCATION); }); }