diff --git a/src/core/BUILD b/src/core/BUILD index 191074eeccc45..cb7ee5e9ff7b6 100644 --- a/src/core/BUILD +++ b/src/core/BUILD @@ -4789,15 +4789,15 @@ grpc_cc_library( language = "c++", deps = [ "channel_args", - "grpc_lb_subchannel_list", "grpc_outlier_detection_header", + "health_check_client", + "iomgr_fwd", "json", "json_args", "json_object_loader", "lb_policy", "lb_policy_factory", "subchannel_interface", - "//:channel_arg_names", "//:config", "//:debug_location", "//:gpr", @@ -4806,7 +4806,6 @@ grpc_cc_library( "//:orphanable", "//:ref_counted_ptr", "//:server_address", - "//:work_serializer", ], ) diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc index 4632a82d74282..9b934a36819e3 100644 --- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc +++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc @@ -16,6 +16,8 @@ #include +#include "src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.h" + #include #include @@ -33,19 +35,19 @@ #include "absl/strings/string_view.h" #include "absl/types/optional.h" -#include #include #include +#include "src/core/ext/filters/client_channel/lb_policy/health_check_client.h" #include "src/core/ext/filters/client_channel/lb_policy/outlier_detection/outlier_detection.h" -#include "src/core/ext/filters/client_channel/lb_policy/subchannel_list.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/config/core_configuration.h" #include "src/core/lib/debug/trace.h" +#include "src/core/lib/gprpp/crash.h" #include "src/core/lib/gprpp/debug_location.h" #include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" -#include "src/core/lib/gprpp/work_serializer.h" +#include "src/core/lib/iomgr/iomgr_fwd.h" #include "src/core/lib/json/json.h" #include "src/core/lib/json/json_args.h" #include "src/core/lib/json/json_object_loader.h" @@ -98,72 +100,140 @@ class PickFirst : public LoadBalancingPolicy { private: ~PickFirst() override; - class PickFirstSubchannelList; - - class PickFirstSubchannelData - : public SubchannelData { + class SubchannelList : public InternallyRefCounted { public: - PickFirstSubchannelData( - SubchannelList* - subchannel_list, - const ServerAddress& address, - RefCountedPtr subchannel) - : SubchannelData(subchannel_list, address, std::move(subchannel)) {} + class SubchannelData { + public: + SubchannelData(SubchannelList* subchannel_list, + RefCountedPtr subchannel); + + SubchannelInterface* subchannel() const { return subchannel_.get(); } + absl::optional connectivity_state() const { + return connectivity_state_; + } + + // Returns the index into the subchannel list of this object. + size_t Index() const { + return static_cast(this - + &subchannel_list_->subchannels_.front()); + } + + // Resets the connection backoff. + void ResetBackoffLocked() { + if (subchannel_ != nullptr) subchannel_->ResetBackoff(); + } + + // Cancels any pending connectivity watch and unrefs the subchannel. + void ShutdownLocked(); + + private: + // Watcher for subchannel connectivity state. + class Watcher + : public SubchannelInterface::ConnectivityStateWatcherInterface { + public: + Watcher(SubchannelData* subchannel_data, + RefCountedPtr subchannel_list) + : subchannel_data_(subchannel_data), + subchannel_list_(std::move(subchannel_list)) {} + + ~Watcher() override { + subchannel_list_.reset(DEBUG_LOCATION, "Watcher dtor"); + } + + void OnConnectivityStateChange(grpc_connectivity_state new_state, + absl::Status status) override { + subchannel_data_->OnConnectivityStateChange(new_state, + std::move(status)); + } + + grpc_pollset_set* interested_parties() override { + return subchannel_list_->policy_->interested_parties(); + } + + private: + SubchannelData* subchannel_data_; + RefCountedPtr subchannel_list_; + }; - void ProcessConnectivityChangeLocked( - absl::optional old_state, - grpc_connectivity_state new_state) override; + // This method will be invoked once soon after instantiation to report + // the current connectivity state, and it will then be invoked again + // whenever the connectivity state changes. + void OnConnectivityStateChange(grpc_connectivity_state new_state, + absl::Status status); + + // Processes the connectivity change to READY for an unselected + // subchannel. + void ProcessUnselectedReadyLocked(); + + // Reacts to the current connectivity state while trying to connect. + void ReactToConnectivityStateLocked(); + + // Backpointer to owning subchannel list. Not owned. + SubchannelList* subchannel_list_; + // The subchannel. + RefCountedPtr subchannel_; + // Will be non-null when the subchannel's state is being watched. + SubchannelInterface::ConnectivityStateWatcherInterface* pending_watcher_ = + nullptr; + // Data updated by the watcher. + absl::optional connectivity_state_; + absl::Status connectivity_status_; + }; + + SubchannelList(RefCountedPtr policy, ServerAddressList addresses, + const ChannelArgs& args); + + ~SubchannelList() override; + + // The number of subchannels in the list. + size_t size() const { return subchannels_.size(); } + + // Resets connection backoff of all subchannels. + void ResetBackoffLocked(); + + void Orphan() override; private: - // Processes the connectivity change to READY for an unselected subchannel. - void ProcessUnselectedReadyLocked(); + // Returns true if all subchannels have seen their initial + // connectivity state notifications. + bool AllSubchannelsSeenInitialState(); + + // Backpointer to owning policy. + RefCountedPtr policy_; + + ChannelArgs args_; + + // The list of subchannels. + std::vector subchannels_; - // Reacts to the current connectivity state while trying to connect. - void ReactToConnectivityStateLocked(); + // Is this list shutting down? This may be true due to the shutdown of the + // policy itself or because a newer update has arrived while this one hadn't + // finished processing. + bool shutting_down_ = false; + + bool in_transient_failure_ = false; + size_t attempting_index_ = 0; }; - class PickFirstSubchannelList - : public SubchannelList { + class HealthWatcher + : public SubchannelInterface::ConnectivityStateWatcherInterface { public: - PickFirstSubchannelList(PickFirst* policy, ServerAddressList addresses, - const ChannelArgs& args) - : SubchannelList(policy, - (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace) - ? "PickFirstSubchannelList" - : nullptr), - std::move(addresses), policy->channel_control_helper(), - args) { - // Need to maintain a ref to the LB policy as long as we maintain - // any references to subchannels, since the subchannels' - // pollset_sets will include the LB policy's pollset_set. - policy->Ref(DEBUG_LOCATION, "subchannel_list").release(); - // Note that we do not start trying to connect to any subchannel here, - // since we will wait until we see the initial connectivity state for all - // subchannels before doing that. - } - - ~PickFirstSubchannelList() override { - PickFirst* p = static_cast(policy()); - p->Unref(DEBUG_LOCATION, "subchannel_list"); - } + explicit HealthWatcher(RefCountedPtr policy) + : policy_(std::move(policy)) {} - bool in_transient_failure() const { return in_transient_failure_; } - void set_in_transient_failure(bool in_transient_failure) { - in_transient_failure_ = in_transient_failure; + ~HealthWatcher() override { + policy_.reset(DEBUG_LOCATION, "HealthWatcher dtor"); } - size_t attempting_index() const { return attempting_index_; } - void set_attempting_index(size_t index) { attempting_index_ = index; } + void OnConnectivityStateChange(grpc_connectivity_state new_state, + absl::Status status) override; - private: - std::shared_ptr work_serializer() const override { - return static_cast(policy())->work_serializer(); + grpc_pollset_set* interested_parties() override { + return policy_->interested_parties(); } - bool in_transient_failure_ = false; - size_t attempting_index_ = 0; + private: + RefCountedPtr policy_; }; class Picker : public SubchannelPicker { @@ -181,19 +251,30 @@ class PickFirst : public LoadBalancingPolicy { void ShutdownLocked() override; - void AttemptToConnectUsingLatestUpdateArgsLocked(); - void UpdateState(grpc_connectivity_state state, const absl::Status& status, RefCountedPtr picker); + void AttemptToConnectUsingLatestUpdateArgsLocked(); + + void UnsetSelectedSubchannel(); + + // Whether we should enable health watching. + const bool enable_health_watch_; + // Whether we should omit our status message prefix. + const bool omit_status_message_prefix_; + // Lateset update args. UpdateArgs latest_update_args_; // All our subchannels. - RefCountedPtr subchannel_list_; + OrphanablePtr subchannel_list_; // Latest pending subchannel list. - RefCountedPtr latest_pending_subchannel_list_; - // Selected subchannel in \a subchannel_list_. - PickFirstSubchannelData* selected_ = nullptr; + OrphanablePtr latest_pending_subchannel_list_; + // Selected subchannel in subchannel_list_. + SubchannelList::SubchannelData* selected_ = nullptr; + // Health watcher for the selected subchannel. + SubchannelInterface::ConnectivityStateWatcherInterface* health_watcher_ = + nullptr; + SubchannelInterface::DataWatcherInterface* health_data_watcher_ = nullptr; // Current connectivity state. grpc_connectivity_state state_ = GRPC_CHANNEL_CONNECTING; // Are we shut down? @@ -202,7 +283,16 @@ class PickFirst : public LoadBalancingPolicy { absl::BitGen bit_gen_; }; -PickFirst::PickFirst(Args args) : LoadBalancingPolicy(std::move(args)) { +PickFirst::PickFirst(Args args) + : LoadBalancingPolicy(std::move(args)), + enable_health_watch_( + channel_args() + .GetBool(GRPC_ARG_INTERNAL_PICK_FIRST_ENABLE_HEALTH_CHECKING) + .value_or(false)), + omit_status_message_prefix_( + channel_args() + .GetBool(GRPC_ARG_INTERNAL_PICK_FIRST_OMIT_STATUS_MESSAGE_PREFIX) + .value_or(false)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) { gpr_log(GPR_INFO, "Pick First %p created.", this); } @@ -221,6 +311,7 @@ void PickFirst::ShutdownLocked() { gpr_log(GPR_INFO, "Pick First %p Shutting down", this); } shutdown_ = true; + UnsetSelectedSubchannel(); subchannel_list_.reset(); latest_pending_subchannel_list_.reset(); } @@ -255,13 +346,11 @@ void PickFirst::AttemptToConnectUsingLatestUpdateArgsLocked() { "[PF %p] Shutting down previous pending subchannel list %p", this, latest_pending_subchannel_list_.get()); } - latest_pending_subchannel_list_ = MakeRefCounted( - this, std::move(addresses), latest_update_args_.args); - latest_pending_subchannel_list_->StartWatchingLocked( - latest_update_args_.args); + latest_pending_subchannel_list_ = MakeOrphanable( + Ref(), std::move(addresses), latest_update_args_.args); // Empty update or no valid subchannels. Put the channel in // TRANSIENT_FAILURE and request re-resolution. - if (latest_pending_subchannel_list_->num_subchannels() == 0) { + if (latest_pending_subchannel_list_->size() == 0) { channel_control_helper()->RequestReresolution(); absl::Status status = latest_update_args_.addresses.ok() @@ -273,9 +362,8 @@ void PickFirst::AttemptToConnectUsingLatestUpdateArgsLocked() { } // If the new update is empty or we don't yet have a selected subchannel in // the current list, replace the current subchannel list immediately. - if (latest_pending_subchannel_list_->num_subchannels() == 0 || - selected_ == nullptr) { - selected_ = nullptr; + if (latest_pending_subchannel_list_->size() == 0 || selected_ == nullptr) { + UnsetSelectedSubchannel(); if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace) && subchannel_list_ != nullptr) { gpr_log(GPR_INFO, "[PF %p] Shutting down previous subchannel list %p", @@ -296,8 +384,6 @@ absl::Status PickFirst::UpdateLocked(UpdateArgs args) { this, args.addresses.status().ToString().c_str()); } } - // Add GRPC_ARG_INHIBIT_HEALTH_CHECKING channel arg. - args.args = args.args.Set(GRPC_ARG_INHIBIT_HEALTH_CHECKING, 1); // Set return status based on the address list. absl::Status status; if (!args.addresses.ok()) { @@ -345,18 +431,122 @@ void PickFirst::UpdateState(grpc_connectivity_state state, channel_control_helper()->UpdateState(state, status, std::move(picker)); } -void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked( - absl::optional old_state, - grpc_connectivity_state new_state) { - PickFirst* p = static_cast(subchannel_list()->policy()); +void PickFirst::UnsetSelectedSubchannel() { + if (selected_ != nullptr && health_data_watcher_ != nullptr) { + selected_->subchannel()->CancelDataWatcher(health_data_watcher_); + } + selected_ = nullptr; + health_watcher_ = nullptr; + health_data_watcher_ = nullptr; +} + +// +// PickFirst::HealthWatcher +// + +void PickFirst::HealthWatcher::OnConnectivityStateChange( + grpc_connectivity_state new_state, absl::Status status) { + if (policy_->health_watcher_ != this) return; + if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) { + gpr_log(GPR_INFO, "[PF %p] health watch state update: %s (%s)", + policy_.get(), ConnectivityStateName(new_state), + status.ToString().c_str()); + } + switch (new_state) { + case GRPC_CHANNEL_READY: + policy_->channel_control_helper()->UpdateState( + GRPC_CHANNEL_READY, absl::OkStatus(), + MakeRefCounted(policy_->selected_->subchannel()->Ref())); + break; + case GRPC_CHANNEL_IDLE: + // If the subchannel becomes disconnected, the health watcher + // might happen to see the change before the raw connectivity + // state watcher does. In this case, ignore it, since the raw + // connectivity state watcher will handle it shortly. + break; + case GRPC_CHANNEL_CONNECTING: + policy_->channel_control_helper()->UpdateState( + new_state, absl::OkStatus(), + MakeRefCounted(policy_->Ref())); + break; + case GRPC_CHANNEL_TRANSIENT_FAILURE: + policy_->channel_control_helper()->UpdateState( + GRPC_CHANNEL_TRANSIENT_FAILURE, status, + MakeRefCounted(status)); + break; + case GRPC_CHANNEL_SHUTDOWN: + Crash("health watcher reported state SHUTDOWN"); + } +} + +// +// PickFirst::SubchannelList::SubchannelData +// + +PickFirst::SubchannelList::SubchannelData::SubchannelData( + SubchannelList* subchannel_list, + RefCountedPtr subchannel) + : subchannel_list_(subchannel_list), subchannel_(std::move(subchannel)) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) { + gpr_log(GPR_INFO, + "[PF %p] subchannel list %p index %" PRIuPTR + " (subchannel %p): starting watch", + subchannel_list_->policy_.get(), subchannel_list_, + subchannel_list_->size(), subchannel_.get()); + } + auto watcher = std::make_unique( + this, subchannel_list_->Ref(DEBUG_LOCATION, "Watcher")); + pending_watcher_ = watcher.get(); + subchannel_->WatchConnectivityState(std::move(watcher)); +} + +void PickFirst::SubchannelList::SubchannelData::ShutdownLocked() { + if (subchannel_ != nullptr) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) { + gpr_log(GPR_INFO, + "[PF %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR + " (subchannel %p): cancelling watch and unreffing subchannel", + subchannel_list_->policy_.get(), subchannel_list_, Index(), + subchannel_list_->size(), subchannel_.get()); + } + subchannel_->CancelConnectivityStateWatch(pending_watcher_); + pending_watcher_ = nullptr; + subchannel_.reset(); + } +} + +void PickFirst::SubchannelList::SubchannelData::OnConnectivityStateChange( + grpc_connectivity_state new_state, absl::Status status) { + PickFirst* p = subchannel_list_->policy_.get(); + if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) { + gpr_log( + GPR_INFO, + "[PF %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR + " (subchannel %p): connectivity changed: old_state=%s, new_state=%s, " + "status=%s, shutting_down=%d, pending_watcher=%p, " + "p->selected_=%p, p->subchannel_list_=%p, " + "p->latest_pending_subchannel_list_=%p", + p, subchannel_list_, Index(), subchannel_list_->size(), + subchannel_.get(), + (connectivity_state_.has_value() + ? ConnectivityStateName(*connectivity_state_) + : "N/A"), + ConnectivityStateName(new_state), status.ToString().c_str(), + subchannel_list_->shutting_down_, pending_watcher_, p->selected_, + p->subchannel_list_.get(), p->latest_pending_subchannel_list_.get()); + } + if (subchannel_list_->shutting_down_ || pending_watcher_ == nullptr) return; // The notification must be for a subchannel in either the current or // latest pending subchannel lists. - GPR_ASSERT(subchannel_list() == p->subchannel_list_.get() || - subchannel_list() == p->latest_pending_subchannel_list_.get()); + GPR_ASSERT(subchannel_list_ == p->subchannel_list_.get() || + subchannel_list_ == p->latest_pending_subchannel_list_.get()); GPR_ASSERT(new_state != GRPC_CHANNEL_SHUTDOWN); + absl::optional old_state = connectivity_state_; + connectivity_state_ = new_state; + connectivity_status_ = status; // Handle updates for the currently selected subchannel. if (p->selected_ == this) { - GPR_ASSERT(subchannel_list() == p->subchannel_list_.get()); + GPR_ASSERT(subchannel_list_ == p->subchannel_list_.get()); if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) { gpr_log(GPR_INFO, "Pick First %p selected subchannel connectivity changed to %s", p, @@ -380,17 +570,15 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked( p, p->latest_pending_subchannel_list_.get(), p->subchannel_list_.get()); } - p->selected_ = nullptr; + p->UnsetSelectedSubchannel(); p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_); // Set our state to that of the pending subchannel list. - if (p->subchannel_list_->in_transient_failure()) { + if (p->subchannel_list_->in_transient_failure_) { absl::Status status = absl::UnavailableError(absl::StrCat( "selected subchannel failed; switching to pending update; " "last failure: ", - p->subchannel_list_ - ->subchannel(p->subchannel_list_->num_subchannels()) - ->connectivity_status() - .ToString())); + p->subchannel_list_->subchannels_.back() + .connectivity_status_.ToString())); p->UpdateState(GRPC_CHANNEL_TRANSIENT_FAILURE, status, MakeRefCounted(status)); } else if (p->state_ != GRPC_CHANNEL_TRANSIENT_FAILURE) { @@ -400,7 +588,7 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked( return; } // Enter idle. - p->selected_ = nullptr; + p->UnsetSelectedSubchannel(); p->subchannel_list_.reset(); p->UpdateState( GRPC_CHANNEL_IDLE, absl::Status(), @@ -418,32 +606,33 @@ void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked( // select in place of the current one. // If the subchannel is READY, use it. if (new_state == GRPC_CHANNEL_READY) { - subchannel_list()->set_in_transient_failure(false); + subchannel_list_->in_transient_failure_ = false; ProcessUnselectedReadyLocked(); return; } // If we haven't yet seen the initial connectivity state notification // for all subchannels, do nothing. - if (!subchannel_list()->AllSubchannelsSeenInitialState()) return; + if (!subchannel_list_->AllSubchannelsSeenInitialState()) return; // If we're still here and this is the initial connectivity state // notification for this subchannel, that means it was the last one to // see its initial notification. Start trying to connect, starting // with the first subchannel. if (!old_state.has_value()) { - subchannel_list()->subchannel(0)->ReactToConnectivityStateLocked(); + subchannel_list_->subchannels_.front().ReactToConnectivityStateLocked(); return; } // Ignore any other updates for subchannels we're not currently trying to // connect to. - if (Index() != subchannel_list()->attempting_index()) return; + if (Index() != subchannel_list_->attempting_index_) return; // React to the connectivity state. ReactToConnectivityStateLocked(); } -void PickFirst::PickFirstSubchannelData::ReactToConnectivityStateLocked() { - PickFirst* p = static_cast(subchannel_list()->policy()); +void PickFirst::SubchannelList::SubchannelData:: + ReactToConnectivityStateLocked() { + PickFirst* p = subchannel_list_->policy_.get(); // Otherwise, process connectivity state. - switch (connectivity_state().value()) { + switch (connectivity_state_.value()) { case GRPC_CHANNEL_READY: // Already handled this case above, so this should not happen. GPR_UNREACHABLE_CODE(break); @@ -451,13 +640,13 @@ void PickFirst::PickFirstSubchannelData::ReactToConnectivityStateLocked() { // Find the next subchannel not in state TRANSIENT_FAILURE. // We skip subchannels in state TRANSIENT_FAILURE to avoid a // large recursion that could overflow the stack. - PickFirstSubchannelData* found_subchannel = nullptr; + SubchannelData* found_subchannel = nullptr; for (size_t next_index = Index() + 1; - next_index < subchannel_list()->num_subchannels(); ++next_index) { - PickFirstSubchannelData* sc = subchannel_list()->subchannel(next_index); - GPR_ASSERT(sc->connectivity_state().has_value()); - if (sc->connectivity_state() != GRPC_CHANNEL_TRANSIENT_FAILURE) { - subchannel_list()->set_attempting_index(next_index); + next_index < subchannel_list_->size(); ++next_index) { + SubchannelData* sc = &subchannel_list_->subchannels_[next_index]; + GPR_ASSERT(sc->connectivity_state_.has_value()); + if (sc->connectivity_state_ != GRPC_CHANNEL_TRANSIENT_FAILURE) { + subchannel_list_->attempting_index_ = next_index; found_subchannel = sc; break; } @@ -475,14 +664,14 @@ void PickFirst::PickFirstSubchannelData::ReactToConnectivityStateLocked() { gpr_log(GPR_INFO, "Pick First %p subchannel list %p failed to connect to " "all subchannels", - p, subchannel_list()); + p, subchannel_list_); } - subchannel_list()->set_attempting_index(0); - subchannel_list()->set_in_transient_failure(true); + subchannel_list_->attempting_index_ = 0; + subchannel_list_->in_transient_failure_ = true; // In case 2, swap to the new subchannel list. This means reporting // TRANSIENT_FAILURE and dropping the existing (working) connection, // but we can't ignore what the control plane has told us. - if (subchannel_list() == p->latest_pending_subchannel_list_.get()) { + if (subchannel_list_ == p->latest_pending_subchannel_list_.get()) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) { gpr_log(GPR_INFO, "Pick First %p promoting pending subchannel list %p to " @@ -490,36 +679,38 @@ void PickFirst::PickFirstSubchannelData::ReactToConnectivityStateLocked() { p, p->latest_pending_subchannel_list_.get(), p->subchannel_list_.get()); } - p->selected_ = nullptr; // owned by p->subchannel_list_ + p->UnsetSelectedSubchannel(); p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_); } // If this is the current subchannel list (either because we were // in case 1 or because we were in case 2 and just promoted it to // be the current list), re-resolve and report new state. - if (subchannel_list() == p->subchannel_list_.get()) { + if (subchannel_list_ == p->subchannel_list_.get()) { p->channel_control_helper()->RequestReresolution(); - absl::Status status = absl::UnavailableError( - absl::StrCat("failed to connect to all addresses; last error: ", - connectivity_status().ToString())); + absl::Status status = absl::UnavailableError(absl::StrCat( + (p->omit_status_message_prefix_ + ? "" + : "failed to connect to all addresses; last error: "), + connectivity_status_.ToString())); p->UpdateState(GRPC_CHANNEL_TRANSIENT_FAILURE, status, MakeRefCounted(status)); } // If the first subchannel is already IDLE, trigger the next connection - // attempt immediately. Otherwise, we'll wait for it to report + // attempt immediately. Otherwise, we'll wait for it to report // its own connectivity state change. - auto* subchannel0 = subchannel_list()->subchannel(0); - if (subchannel0->connectivity_state() == GRPC_CHANNEL_IDLE) { - subchannel0->subchannel()->RequestConnection(); + auto& subchannel0 = subchannel_list_->subchannels_.front(); + if (subchannel0.connectivity_state_ == GRPC_CHANNEL_IDLE) { + subchannel0.subchannel_->RequestConnection(); } break; } case GRPC_CHANNEL_IDLE: - subchannel()->RequestConnection(); + subchannel_->RequestConnection(); break; case GRPC_CHANNEL_CONNECTING: // Only update connectivity state in case 1, and only if we're not // already in TRANSIENT_FAILURE. - if (subchannel_list() == p->subchannel_list_.get() && + if (subchannel_list_ == p->subchannel_list_.get() && p->state_ != GRPC_CHANNEL_TRANSIENT_FAILURE) { p->UpdateState(GRPC_CHANNEL_CONNECTING, absl::Status(), MakeRefCounted(nullptr)); @@ -530,8 +721,8 @@ void PickFirst::PickFirstSubchannelData::ReactToConnectivityStateLocked() { } } -void PickFirst::PickFirstSubchannelData::ProcessUnselectedReadyLocked() { - PickFirst* p = static_cast(subchannel_list()->policy()); +void PickFirst::SubchannelList::SubchannelData::ProcessUnselectedReadyLocked() { + PickFirst* p = subchannel_list_->policy_.get(); // If we get here, there are two possible cases: // 1. We do not currently have a selected subchannel, and the update is // for a subchannel in p->subchannel_list_ that we're trying to @@ -541,10 +732,10 @@ void PickFirst::PickFirstSubchannelData::ProcessUnselectedReadyLocked() { // for a subchannel in p->latest_pending_subchannel_list_. The // goal here is to find a subchannel from the update that we can // select in place of the current one. - GPR_ASSERT(subchannel_list() == p->subchannel_list_.get() || - subchannel_list() == p->latest_pending_subchannel_list_.get()); + GPR_ASSERT(subchannel_list_ == p->subchannel_list_.get() || + subchannel_list_ == p->latest_pending_subchannel_list_.get()); // Case 2. Promote p->latest_pending_subchannel_list_ to p->subchannel_list_. - if (subchannel_list() == p->latest_pending_subchannel_list_.get()) { + if (subchannel_list_ == p->latest_pending_subchannel_list_.get()) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) { gpr_log(GPR_INFO, "Pick First %p promoting pending subchannel list %p to " @@ -556,16 +747,113 @@ void PickFirst::PickFirstSubchannelData::ProcessUnselectedReadyLocked() { } // Cases 1 and 2. if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) { - gpr_log(GPR_INFO, "Pick First %p selected subchannel %p", p, subchannel()); + gpr_log(GPR_INFO, "Pick First %p selected subchannel %p", p, + subchannel_.get()); } p->selected_ = this; - p->UpdateState(GRPC_CHANNEL_READY, absl::Status(), - MakeRefCounted(subchannel()->Ref())); - for (size_t i = 0; i < subchannel_list()->num_subchannels(); ++i) { + // If health checking is enabled, start the health watch, but don't + // report a new picker -- we want to stay in CONNECTING while we wait + // for the health status notification. + // If health checking is NOT enabled, report READY. + if (p->enable_health_watch_) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) { + gpr_log(GPR_INFO, "[PF %p] starting health watch", p); + } + auto watcher = std::make_unique( + p->Ref(DEBUG_LOCATION, "HealthWatcher")); + p->health_watcher_ = watcher.get(); + auto health_data_watcher = MakeHealthCheckWatcher( + p->work_serializer(), subchannel_list_->args_, std::move(watcher)); + p->health_data_watcher_ = health_data_watcher.get(); + subchannel_->AddDataWatcher(std::move(health_data_watcher)); + } else { + p->UpdateState(GRPC_CHANNEL_READY, absl::Status(), + MakeRefCounted(subchannel()->Ref())); + } + // Unref all other subchannels in the list. + for (size_t i = 0; i < subchannel_list_->size(); ++i) { if (i != Index()) { - subchannel_list()->subchannel(i)->ShutdownLocked(); + subchannel_list_->subchannels_[i].ShutdownLocked(); + } + } +} + +// +// PickFirst::SubchannelList +// + +PickFirst::SubchannelList::SubchannelList(RefCountedPtr policy, + ServerAddressList addresses, + const ChannelArgs& args) + : InternallyRefCounted( + GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace) ? "SubchannelList" + : nullptr), + policy_(std::move(policy)), + args_(args.Remove(GRPC_ARG_INTERNAL_PICK_FIRST_ENABLE_HEALTH_CHECKING) + .Remove( + GRPC_ARG_INTERNAL_PICK_FIRST_OMIT_STATUS_MESSAGE_PREFIX)) { + if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) { + gpr_log(GPR_INFO, + "[PF %p] Creating subchannel list %p for %" PRIuPTR + " subchannels - channel args: %s", + policy_.get(), this, addresses.size(), args_.ToString().c_str()); + } + subchannels_.reserve(addresses.size()); + // Create a subchannel for each address. + for (const ServerAddress& address : addresses) { + RefCountedPtr subchannel = + policy_->channel_control_helper()->CreateSubchannel(address, args_); + if (subchannel == nullptr) { + // Subchannel could not be created. + if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) { + gpr_log(GPR_INFO, + "[PF %p] could not create subchannel for address %s, ignoring", + policy_.get(), address.ToString().c_str()); + } + continue; } + if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) { + gpr_log(GPR_INFO, + "[PF %p] subchannel list %p index %" PRIuPTR + ": Created subchannel %p for address %s", + policy_.get(), this, subchannels_.size(), subchannel.get(), + address.ToString().c_str()); + } + subchannels_.emplace_back(this, std::move(subchannel)); + } +} + +PickFirst::SubchannelList::~SubchannelList() { + if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) { + gpr_log(GPR_INFO, "[PF %p] Destroying subchannel_list %p", policy_.get(), + this); + } +} + +void PickFirst::SubchannelList::Orphan() { + if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) { + gpr_log(GPR_INFO, "[PF %p] Shutting down subchannel_list %p", policy_.get(), + this); + } + GPR_ASSERT(!shutting_down_); + shutting_down_ = true; + for (auto& sd : subchannels_) { + sd.ShutdownLocked(); + } + Unref(); +} + +void PickFirst::SubchannelList::ResetBackoffLocked() { + for (auto& sd : subchannels_) { + sd.ResetBackoffLocked(); + } +} + +bool PickFirst::SubchannelList::AllSubchannelsSeenInitialState() { + for (auto& sd : subchannels_) { + if (!sd.connectivity_state().has_value()) return false; } + return true; } // diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.h b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.h index 38b70e85f9a16..ff5e0e6f2a408 100644 --- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.h +++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.h @@ -17,4 +17,20 @@ #ifndef GRPC_SRC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_PICK_FIRST_PICK_FIRST_H #define GRPC_SRC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_PICK_FIRST_PICK_FIRST_H +#include + +#include "src/core/lib/resolver/server_address.h" + +// Internal channel arg to enable health checking in pick_first. +// Intended to be used by petiole policies (e.g., round_robin) that +// delegate to pick_first. +#define GRPC_ARG_INTERNAL_PICK_FIRST_ENABLE_HEALTH_CHECKING \ + GRPC_ARG_NO_SUBCHANNEL_PREFIX "pick_first_enable_health_checking" + +// Internal channel arg to tell pick_first to omit the prefix it normally +// adds to error status messages. Intended to be used by petiole policies +// (e.g., round_robin) that want to add their own prefixes. +#define GRPC_ARG_INTERNAL_PICK_FIRST_OMIT_STATUS_MESSAGE_PREFIX \ + GRPC_ARG_NO_SUBCHANNEL_PREFIX "pick_first_omit_status_message_prefix" + #endif // GRPC_SRC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_PICK_FIRST_PICK_FIRST_H diff --git a/test/core/client_channel/lb_policy/outlier_detection_test.cc b/test/core/client_channel/lb_policy/outlier_detection_test.cc index 440b117f2cb05..38b90e2f30fc9 100644 --- a/test/core/client_channel/lb_policy/outlier_detection_test.cc +++ b/test/core/client_channel/lb_policy/outlier_detection_test.cc @@ -33,12 +33,10 @@ #include #include -#include #include #include #include "src/core/ext/filters/client_channel/lb_policy/backend_metric_data.h" -#include "src/core/lib/channel/channel_args.h" #include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/gprpp/time.h" @@ -257,8 +255,7 @@ TEST_F(OutlierDetectionTest, DoesNotWorkWithPickFirst) { EXPECT_TRUE(status.ok()) << status; // LB policy should have created a subchannel for the first address with // the GRPC_ARG_INHIBIT_HEALTH_CHECKING channel arg. - auto* subchannel = FindSubchannel( - kAddresses[0], ChannelArgs().Set(GRPC_ARG_INHIBIT_HEALTH_CHECKING, true)); + auto* subchannel = FindSubchannel(kAddresses[0]); ASSERT_NE(subchannel, nullptr); // When the LB policy receives the subchannel's initial connectivity // state notification (IDLE), it will request a connection. diff --git a/test/core/client_channel/lb_policy/pick_first_test.cc b/test/core/client_channel/lb_policy/pick_first_test.cc index 460d79f47a165..8547e9103683a 100644 --- a/test/core/client_channel/lb_policy/pick_first_test.cc +++ b/test/core/client_channel/lb_policy/pick_first_test.cc @@ -31,10 +31,8 @@ #include "gtest/gtest.h" #include -#include #include -#include "src/core/lib/channel/channel_args.h" #include "src/core/lib/gprpp/debug_location.h" #include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" @@ -74,8 +72,7 @@ class PickFirstTest : public LoadBalancingPolicyTest { // We will remove entries as each subchannel starts to connect. std::map subchannels; for (auto address : addresses) { - auto* subchannel = FindSubchannel( - address, ChannelArgs().Set(GRPC_ARG_INHIBIT_HEALTH_CHECKING, true)); + auto* subchannel = FindSubchannel(address); ASSERT_NE(subchannel, nullptr); subchannels.emplace(subchannel, address); } @@ -136,13 +133,10 @@ TEST_F(PickFirstTest, FirstAddressWorks) { absl::Status status = ApplyUpdate( BuildUpdate(kAddresses, MakePickFirstConfig(false)), lb_policy_.get()); EXPECT_TRUE(status.ok()) << status; - // LB policy should have created a subchannel for both addresses with - // the GRPC_ARG_INHIBIT_HEALTH_CHECKING channel arg. - auto* subchannel = FindSubchannel( - kAddresses[0], ChannelArgs().Set(GRPC_ARG_INHIBIT_HEALTH_CHECKING, true)); + // LB policy should have created a subchannel for both addresses. + auto* subchannel = FindSubchannel(kAddresses[0]); ASSERT_NE(subchannel, nullptr); - auto* subchannel2 = FindSubchannel( - kAddresses[1], ChannelArgs().Set(GRPC_ARG_INHIBIT_HEALTH_CHECKING, true)); + auto* subchannel2 = FindSubchannel(kAddresses[1]); ASSERT_NE(subchannel2, nullptr); // When the LB policy receives the first subchannel's initial connectivity // state notification (IDLE), it will request a connection. @@ -172,13 +166,10 @@ TEST_F(PickFirstTest, FirstAddressFails) { absl::Status status = ApplyUpdate( BuildUpdate(kAddresses, MakePickFirstConfig(false)), lb_policy_.get()); EXPECT_TRUE(status.ok()) << status; - // LB policy should have created a subchannel for both addresses with - // the GRPC_ARG_INHIBIT_HEALTH_CHECKING channel arg. - auto* subchannel = FindSubchannel( - kAddresses[0], ChannelArgs().Set(GRPC_ARG_INHIBIT_HEALTH_CHECKING, true)); + // LB policy should have created a subchannel for both addresses. + auto* subchannel = FindSubchannel(kAddresses[0]); ASSERT_NE(subchannel, nullptr); - auto* subchannel2 = FindSubchannel( - kAddresses[1], ChannelArgs().Set(GRPC_ARG_INHIBIT_HEALTH_CHECKING, true)); + auto* subchannel2 = FindSubchannel(kAddresses[1]); ASSERT_NE(subchannel2, nullptr); // When the LB policy receives the first subchannel's initial connectivity // state notification (IDLE), it will request a connection. @@ -216,28 +207,26 @@ TEST_F(PickFirstTest, FirstTwoAddressesInTransientFailureAtStart) { // LB policy gets the update. constexpr std::array kAddresses = { "ipv4:127.0.0.1:443", "ipv4:127.0.0.1:444", "ipv4:127.0.0.1:445"}; - auto* subchannel = CreateSubchannel( - kAddresses[0], ChannelArgs().Set(GRPC_ARG_INHIBIT_HEALTH_CHECKING, true)); + auto* subchannel = CreateSubchannel(kAddresses[0]); subchannel->SetConnectivityState(GRPC_CHANNEL_TRANSIENT_FAILURE, absl::UnavailableError("failed to connect"), /*validate_state_transition=*/false); - auto* subchannel2 = CreateSubchannel( - kAddresses[1], ChannelArgs().Set(GRPC_ARG_INHIBIT_HEALTH_CHECKING, true)); + auto* subchannel2 = CreateSubchannel(kAddresses[1]); subchannel2->SetConnectivityState(GRPC_CHANNEL_TRANSIENT_FAILURE, absl::UnavailableError("failed to connect"), /*validate_state_transition=*/false); absl::Status status = ApplyUpdate( BuildUpdate(kAddresses, MakePickFirstConfig(false)), lb_policy_.get()); EXPECT_TRUE(status.ok()) << status; - // LB policy should have created a subchannel for all addresses with - // the GRPC_ARG_INHIBIT_HEALTH_CHECKING channel arg. - auto* subchannel3 = FindSubchannel( - kAddresses[2], ChannelArgs().Set(GRPC_ARG_INHIBIT_HEALTH_CHECKING, true)); + // LB policy should have created a subchannel for all addresses. + auto* subchannel3 = FindSubchannel(kAddresses[2]); ASSERT_NE(subchannel3, nullptr); // When the LB policy receives the first subchannel's initial connectivity // state notification (TRANSIENT_FAILURE), it will move on to the second - // subchannel. The second subchannel is in state IDLE, so the LB - // policy will request a connection attempt on it. + // subchannel. The second subchannel is also in state TRANSIENT_FAILURE, + // so the LB policy will move on to the third subchannel. That + // subchannel is in state IDLE, so the LB policy will request a connection + // attempt on it. EXPECT_TRUE(subchannel3->ConnectionRequested()); // This causes the subchannel to start to connect, so it reports // CONNECTING. @@ -261,13 +250,11 @@ TEST_F(PickFirstTest, AllAddressesInTransientFailureAtStart) { // when the LB policy gets the update. constexpr std::array kAddresses = { "ipv4:127.0.0.1:443", "ipv4:127.0.0.1:444"}; - auto* subchannel = CreateSubchannel( - kAddresses[0], ChannelArgs().Set(GRPC_ARG_INHIBIT_HEALTH_CHECKING, true)); + auto* subchannel = CreateSubchannel(kAddresses[0]); subchannel->SetConnectivityState(GRPC_CHANNEL_TRANSIENT_FAILURE, absl::UnavailableError("failed to connect"), /*validate_state_transition=*/false); - auto* subchannel2 = CreateSubchannel( - kAddresses[1], ChannelArgs().Set(GRPC_ARG_INHIBIT_HEALTH_CHECKING, true)); + auto* subchannel2 = CreateSubchannel(kAddresses[1]); subchannel2->SetConnectivityState(GRPC_CHANNEL_TRANSIENT_FAILURE, absl::UnavailableError("failed to connect"), /*validate_state_transition=*/false); @@ -309,13 +296,11 @@ TEST_F(PickFirstTest, StaysInTransientFailureAfterAddressListUpdate) { // when the LB policy gets the update. constexpr std::array kAddresses = { "ipv4:127.0.0.1:443", "ipv4:127.0.0.1:444"}; - auto* subchannel = CreateSubchannel( - kAddresses[0], ChannelArgs().Set(GRPC_ARG_INHIBIT_HEALTH_CHECKING, true)); + auto* subchannel = CreateSubchannel(kAddresses[0]); subchannel->SetConnectivityState(GRPC_CHANNEL_TRANSIENT_FAILURE, absl::UnavailableError("failed to connect"), /*validate_state_transition=*/false); - auto* subchannel2 = CreateSubchannel( - kAddresses[1], ChannelArgs().Set(GRPC_ARG_INHIBIT_HEALTH_CHECKING, true)); + auto* subchannel2 = CreateSubchannel(kAddresses[1]); subchannel2->SetConnectivityState(GRPC_CHANNEL_TRANSIENT_FAILURE, absl::UnavailableError("failed to connect"), /*validate_state_transition=*/false); @@ -342,9 +327,7 @@ TEST_F(PickFirstTest, StaysInTransientFailureAfterAddressListUpdate) { lb_policy_.get()); EXPECT_TRUE(status.ok()) << status; // The LB policy should have created a subchannel for the new address. - auto* subchannel3 = - FindSubchannel(kAddresses2[1], - ChannelArgs().Set(GRPC_ARG_INHIBIT_HEALTH_CHECKING, true)); + auto* subchannel3 = FindSubchannel(kAddresses2[1]); ASSERT_NE(subchannel3, nullptr); // The policy will ask it to connect. EXPECT_TRUE(subchannel3->ConnectionRequested()); @@ -368,13 +351,10 @@ TEST_F(PickFirstTest, FirstAddressGoesIdleBeforeSecondOneFails) { absl::Status status = ApplyUpdate( BuildUpdate(kAddresses, MakePickFirstConfig(false)), lb_policy_.get()); EXPECT_TRUE(status.ok()) << status; - // LB policy should have created a subchannel for both addresses with - // the GRPC_ARG_INHIBIT_HEALTH_CHECKING channel arg. - auto* subchannel = FindSubchannel( - kAddresses[0], ChannelArgs().Set(GRPC_ARG_INHIBIT_HEALTH_CHECKING, true)); + // LB policy should have created a subchannel for both addresses. + auto* subchannel = FindSubchannel(kAddresses[0]); ASSERT_NE(subchannel, nullptr); - auto* subchannel2 = FindSubchannel( - kAddresses[1], ChannelArgs().Set(GRPC_ARG_INHIBIT_HEALTH_CHECKING, true)); + auto* subchannel2 = FindSubchannel(kAddresses[1]); ASSERT_NE(subchannel2, nullptr); // When the LB policy receives the first subchannel's initial connectivity // state notification (IDLE), it will request a connection. @@ -432,13 +412,10 @@ TEST_F(PickFirstTest, GoesIdleWhenConnectionFailsThenCanReconnect) { absl::Status status = ApplyUpdate( BuildUpdate(kAddresses, MakePickFirstConfig(false)), lb_policy_.get()); EXPECT_TRUE(status.ok()) << status; - // LB policy should have created a subchannel for both addresses with - // the GRPC_ARG_INHIBIT_HEALTH_CHECKING channel arg. - auto* subchannel = FindSubchannel( - kAddresses[0], ChannelArgs().Set(GRPC_ARG_INHIBIT_HEALTH_CHECKING, true)); + // LB policy should have created a subchannel for both addresses. + auto* subchannel = FindSubchannel(kAddresses[0]); ASSERT_NE(subchannel, nullptr); - auto* subchannel2 = FindSubchannel( - kAddresses[1], ChannelArgs().Set(GRPC_ARG_INHIBIT_HEALTH_CHECKING, true)); + auto* subchannel2 = FindSubchannel(kAddresses[1]); ASSERT_NE(subchannel2, nullptr); // When the LB policy receives the first subchannel's initial connectivity // state notification (IDLE), it will request a connection.