Skip to content

Commit

Permalink
Revert "Revert "[client_channel] SubchannelWrapper hops into WorkSeri…
Browse files Browse the repository at this point in the history
…alizer before destruction" (grpc#34318)"

This reverts commit 7c428b6.
  • Loading branch information
markdroth committed Sep 12, 2023
1 parent 7c428b6 commit 718c726
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 26 deletions.
1 change: 0 additions & 1 deletion BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -3005,7 +3005,6 @@ grpc_cc_library(
],
external_deps = [
"absl/base:core_headers",
"absl/cleanup",
"absl/container:flat_hash_set",
"absl/container:inlined_vector",
"absl/functional:any_invocable",
Expand Down
32 changes: 12 additions & 20 deletions src/core/ext/filters/client_channel/client_channel.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
#include <utility>
#include <vector>

#include "absl/cleanup/cleanup.h"
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/cord.h"
Expand Down Expand Up @@ -662,6 +661,12 @@ class ClientChannel::SubchannelWrapper : public SubchannelInterface {
GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_, "SubchannelWrapper");
}

void Orphan() override {
// Make sure we release the last ref inside the WorkSerializer, so
// that we can update the channel's subchannel maps.
chand_->work_serializer_->Run([self = WeakRef()]() {}, DEBUG_LOCATION);
}

void WatchConnectivityState(
std::unique_ptr<ConnectivityStateWatcherInterface> watcher) override
ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) {
Expand Down Expand Up @@ -2770,47 +2775,34 @@ void ClientChannel::LoadBalancedCall::AddCallToLbQueuedCallsLocked() {

absl::optional<absl::Status> ClientChannel::LoadBalancedCall::PickSubchannel(
bool was_queued) {
// We may accumulate multiple pickers here, because if a picker says
// to queue the call, we check again to see if the picker has been
// updated before we queue it.
// We need to unref pickers in the WorkSerializer.
std::vector<RefCountedPtr<LoadBalancingPolicy::SubchannelPicker>> pickers;
auto cleanup = absl::MakeCleanup([&]() {
chand_->work_serializer_->Run(
[pickers = std::move(pickers)]() mutable {
for (auto& picker : pickers) {
picker.reset(DEBUG_LOCATION, "PickSubchannel");
}
},
DEBUG_LOCATION);
});
// Grab mutex and take a ref to the picker.
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
gpr_log(GPR_INFO, "chand=%p lb_call=%p: grabbing LB mutex to get picker",
chand_, this);
}
RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> picker;
{
MutexLock lock(&chand_->lb_mu_);
pickers.emplace_back(chand_->picker_);
picker = chand_->picker_;
}
while (true) {
// Do pick.
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
gpr_log(GPR_INFO, "chand=%p lb_call=%p: performing pick with picker=%p",
chand_, this, pickers.back().get());
chand_, this, picker.get());
}
grpc_error_handle error;
bool pick_complete = PickSubchannelImpl(pickers.back().get(), &error);
bool pick_complete = PickSubchannelImpl(picker.get(), &error);
if (!pick_complete) {
MutexLock lock(&chand_->lb_mu_);
// If picker has been swapped out since we grabbed it, try again.
if (chand_->picker_ != pickers.back()) {
if (picker != chand_->picker_) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_lb_call_trace)) {
gpr_log(GPR_INFO,
"chand=%p lb_call=%p: pick not complete, but picker changed",
chand_, this);
}
pickers.emplace_back(chand_->picker_);
picker = chand_->picker_;
continue;
}
// Otherwise queue the pick to try again later when we get a new picker.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -608,9 +608,7 @@ void WeightedRoundRobin::Picker::BuildSchedulerAndStartTimerLocked() {
// Start timer.
WeakRefCountedPtr<Picker> self = WeakRef();
timer_handle_ = wrr_->channel_control_helper()->GetEventEngine()->RunAfter(
config_->weight_update_period(),
[self = std::move(self),
work_serializer = wrr_->work_serializer()]() mutable {
config_->weight_update_period(), [self = std::move(self)]() mutable {
ApplicationCallbackExecCtx callback_exec_ctx;
ExecCtx exec_ctx;
{
Expand All @@ -623,8 +621,7 @@ void WeightedRoundRobin::Picker::BuildSchedulerAndStartTimerLocked() {
self->BuildSchedulerAndStartTimerLocked();
}
}
// Release the picker ref inside the WorkSerializer.
work_serializer->Run([self = std::move(self)]() {}, DEBUG_LOCATION);
self.reset();
});
}

Expand Down

0 comments on commit 718c726

Please sign in to comment.