diff --git a/envoy/config/grpc_mux.h b/envoy/config/grpc_mux.h index a08724fcdf6c..a18c87bc193e 100644 --- a/envoy/config/grpc_mux.h +++ b/envoy/config/grpc_mux.h @@ -134,8 +134,11 @@ template class GrpcStreamCallbacks { /** * For the GrpcStream to prompt the context to take appropriate action in response to * failure to establish the gRPC stream. + * @param next_attempt_may_send_initial_resource_version a flag indicating whether the + * next reconnection attempt will be to the same source that was previously successful + * or not (used to pass primary/failover reconnection information to the GrpcMux). */ - virtual void onEstablishmentFailure() PURE; + virtual void onEstablishmentFailure(bool next_attempt_may_send_initial_resource_version) PURE; /** * For the GrpcStream to pass received protos to the context. diff --git a/source/common/config/null_grpc_mux_impl.h b/source/common/config/null_grpc_mux_impl.h index 5e82f42e5447..453d723eb32d 100644 --- a/source/common/config/null_grpc_mux_impl.h +++ b/source/common/config/null_grpc_mux_impl.h @@ -31,7 +31,7 @@ class NullGrpcMuxImpl : public GrpcMux, void onWriteable() override {} void onStreamEstablished() override {} - void onEstablishmentFailure() override {} + void onEstablishmentFailure(bool) override {} void onDiscoveryResponse(std::unique_ptr&&, ControlPlaneStats&) override {} }; diff --git a/source/extensions/config_subscription/grpc/delta_subscription_state.cc b/source/extensions/config_subscription/grpc/delta_subscription_state.cc index 65ec1fc99b5c..7a1f2fd35445 100644 --- a/source/extensions/config_subscription/grpc/delta_subscription_state.cc +++ b/source/extensions/config_subscription/grpc/delta_subscription_state.cc @@ -139,6 +139,11 @@ bool DeltaSubscriptionState::subscriptionUpdatePending() const { return must_send_discovery_request_; } +void DeltaSubscriptionState::markStreamFresh(bool should_send_initial_resource_versions) { + any_request_sent_yet_in_current_stream_ = false; + should_send_initial_resource_versions_ = should_send_initial_resource_versions; +} + UpdateAck DeltaSubscriptionState::handleResponse( const envoy::service::discovery::v3::DeltaDiscoveryResponse& message) { // We *always* copy the response's nonce into the next request, even if we're going to make that @@ -286,21 +291,25 @@ DeltaSubscriptionState::getNextRequestAckless() { // Also, since this might be a new server, we must explicitly state *all* of our subscription // interest. for (auto const& [resource_name, resource_state] : requested_resource_state_) { - // Populate initial_resource_versions with the resource versions we currently have. - // Resources we are interested in, but are still waiting to get any version of from the - // server, do not belong in initial_resource_versions. (But do belong in new subscriptions!) - if (!resource_state.isWaitingForServer()) { - (*request.mutable_initial_resource_versions())[resource_name] = resource_state.version(); + if (should_send_initial_resource_versions_) { + // Populate initial_resource_versions with the resource versions we currently have. + // Resources we are interested in, but are still waiting to get any version of from the + // server, do not belong in initial_resource_versions. (But do belong in new subscriptions!) + if (!resource_state.isWaitingForServer()) { + (*request.mutable_initial_resource_versions())[resource_name] = resource_state.version(); + } } // We are going over a list of resources that we are interested in, so add them to // resource_names_subscribe. names_added_.insert(resource_name); } - for (auto const& [resource_name, resource_version] : wildcard_resource_state_) { - (*request.mutable_initial_resource_versions())[resource_name] = resource_version; - } - for (auto const& [resource_name, resource_version] : ambiguous_resource_state_) { - (*request.mutable_initial_resource_versions())[resource_name] = resource_version; + if (should_send_initial_resource_versions_) { + for (auto const& [resource_name, resource_version] : wildcard_resource_state_) { + (*request.mutable_initial_resource_versions())[resource_name] = resource_version; + } + for (auto const& [resource_name, resource_version] : ambiguous_resource_state_) { + (*request.mutable_initial_resource_versions())[resource_name] = resource_version; + } } // If this is a legacy wildcard request, then make sure that the resource_names_subscribe is // empty. diff --git a/source/extensions/config_subscription/grpc/delta_subscription_state.h b/source/extensions/config_subscription/grpc/delta_subscription_state.h index 656285a47935..261ae3282e5d 100644 --- a/source/extensions/config_subscription/grpc/delta_subscription_state.h +++ b/source/extensions/config_subscription/grpc/delta_subscription_state.h @@ -89,7 +89,11 @@ class DeltaSubscriptionState : public Logger::Loggable { // Whether there was a change in our subscription interest we have yet to inform the server of. bool subscriptionUpdatePending() const; - void markStreamFresh() { any_request_sent_yet_in_current_stream_ = false; } + // Marks the stream as fresh for the next reconnection attempt. If + // should_send_initial_resource_versions is true, then the next request will + // also populate the initial_resource_versions field in the first request (if + // there are relevant resources). + void markStreamFresh(bool should_send_initial_resource_versions); UpdateAck handleResponse(const envoy::service::discovery::v3::DeltaDiscoveryResponse& message); @@ -169,6 +173,7 @@ class DeltaSubscriptionState : public Logger::Loggable { bool in_initial_legacy_wildcard_{true}; bool any_request_sent_yet_in_current_stream_{}; + bool should_send_initial_resource_versions_{true}; bool must_send_discovery_request_{}; // Tracks changes in our subscription interest since the previous DeltaDiscoveryRequest we sent. diff --git a/source/extensions/config_subscription/grpc/grpc_mux_failover.h b/source/extensions/config_subscription/grpc/grpc_mux_failover.h index 745e1be9409f..19765df61fc7 100644 --- a/source/extensions/config_subscription/grpc/grpc_mux_failover.h +++ b/source/extensions/config_subscription/grpc/grpc_mux_failover.h @@ -70,7 +70,8 @@ class GrpcMuxFailover : public GrpcStreamInterface, Event::Dispatcher& dispatcher) : grpc_mux_callbacks_(grpc_mux_callbacks), primary_callbacks_(*this), primary_grpc_stream_(std::move(primary_stream_creator(&primary_callbacks_))), - connection_state_(ConnectionState::None), ever_connected_to_primary_(false) { + connection_state_(ConnectionState::None), ever_connected_to_primary_(false), + previously_connected_to_(ConnectedTo::None) { ASSERT(primary_grpc_stream_ != nullptr); if (failover_stream_creator.has_value()) { ENVOY_LOG(warn, "Using xDS-Failover. Note that the implementation is currently considered " @@ -202,7 +203,7 @@ class GrpcMuxFailover : public GrpcStreamInterface, parent_.grpc_mux_callbacks_.onStreamEstablished(); } - void onEstablishmentFailure() override { + void onEstablishmentFailure(bool) override { // This will be called when the primary stream fails to establish a connection, or after the // connection was closed. ASSERT(parent_.connectingToOrConnectedToPrimary()); @@ -221,11 +222,11 @@ class GrpcMuxFailover : public GrpcStreamInterface, "in a row. Attempting to connect to the failover stream."); // This will close the stream and prevent the retry timer from // reconnecting to the primary source. - // TODO(adisuissa): need to ensure that when moving between primary and failover, - // the initial_resource_versions that are sent are empty. This will be - // done in a followup PR. parent_.primary_grpc_stream_->closeStream(); - parent_.grpc_mux_callbacks_.onEstablishmentFailure(); + // Next attempt will be to the failover, set the value that + // determines whether to set initial_resource_versions or not. + parent_.grpc_mux_callbacks_.onEstablishmentFailure(parent_.previously_connected_to_ == + ConnectedTo::Failover); parent_.connection_state_ = ConnectionState::ConnectingToFailover; parent_.failover_grpc_stream_->establishNewStream(); return; @@ -237,7 +238,10 @@ class GrpcMuxFailover : public GrpcStreamInterface, ENVOY_LOG_MISC(trace, "Not trying to connect to failover. Will try again to reconnect to the " "primary (upon retry)."); parent_.connection_state_ = ConnectionState::ConnectingToPrimary; - parent_.grpc_mux_callbacks_.onEstablishmentFailure(); + // Next attempt will be to the primary, set the value that + // determines whether to set initial_resource_versions or not. + parent_.grpc_mux_callbacks_.onEstablishmentFailure(parent_.previously_connected_to_ == + ConnectedTo::Primary); } void onDiscoveryResponse(ResponseProtoPtr&& message, @@ -249,6 +253,7 @@ class GrpcMuxFailover : public GrpcStreamInterface, parent_.ever_connected_to_primary_ = true; primary_consecutive_failures_ = 0; parent_.connection_state_ = ConnectionState::ConnectedToPrimary; + parent_.previously_connected_to_ = ConnectedTo::Primary; parent_.grpc_mux_callbacks_.onDiscoveryResponse(std::move(message), control_plane_stats); } @@ -278,7 +283,7 @@ class GrpcMuxFailover : public GrpcStreamInterface, parent_.grpc_mux_callbacks_.onStreamEstablished(); } - void onEstablishmentFailure() override { + void onEstablishmentFailure(bool) override { // This will be called when the failover stream fails to establish a connection, or after the // connection was closed. ASSERT(parent_.connectingToOrConnectedToFailover()); @@ -288,12 +293,13 @@ class GrpcMuxFailover : public GrpcStreamInterface, "before). Attempting to connect to the primary stream."); // This will close the stream and prevent the retry timer from - // reconnecting to the failover source. - // TODO(adisuissa): need to ensure that when moving between primary and failover, - // the initial_resource_versions that are sent are empty. This will be - // done in a followup PR. + // reconnecting to the failover source. The next attempt will be to the + // primary source. parent_.failover_grpc_stream_->closeStream(); - parent_.grpc_mux_callbacks_.onEstablishmentFailure(); + // Next attempt will be to the primary, set the value that + // determines whether to set initial_resource_versions or not. + parent_.grpc_mux_callbacks_.onEstablishmentFailure(parent_.previously_connected_to_ == + ConnectedTo::Primary); // Setting the connection state to None, and when the retry timer will // expire, Envoy will try to connect to the primary source. parent_.connection_state_ = ConnectionState::None; @@ -312,6 +318,7 @@ class GrpcMuxFailover : public GrpcStreamInterface, // Received a response from the failover. The failover is now considered available (no going // back to the primary will be attempted). parent_.connection_state_ = ConnectionState::ConnectedToFailover; + parent_.previously_connected_to_ = ConnectedTo::Failover; parent_.grpc_mux_callbacks_.onDiscoveryResponse(std::move(message), control_plane_stats); } @@ -395,6 +402,10 @@ class GrpcMuxFailover : public GrpcStreamInterface, // primary or failover source. Envoy is considered successfully connected to a source // once it receives a response from it. bool ever_connected_to_primary_{false}; + + enum class ConnectedTo { None, Primary, Failover }; + // Used to track the most recent source that Envoy was connected to. + ConnectedTo previously_connected_to_; }; } // namespace Config diff --git a/source/extensions/config_subscription/grpc/grpc_mux_impl.cc b/source/extensions/config_subscription/grpc/grpc_mux_impl.cc index 5da627572280..ba038b140955 100644 --- a/source/extensions/config_subscription/grpc/grpc_mux_impl.cc +++ b/source/extensions/config_subscription/grpc/grpc_mux_impl.cc @@ -535,7 +535,7 @@ void GrpcMuxImpl::onStreamEstablished() { } } -void GrpcMuxImpl::onEstablishmentFailure() { +void GrpcMuxImpl::onEstablishmentFailure(bool) { for (const auto& api_state : api_state_) { for (auto watch : api_state.second->watches_) { watch->callbacks_.onConfigUpdateFailed( diff --git a/source/extensions/config_subscription/grpc/grpc_mux_impl.h b/source/extensions/config_subscription/grpc/grpc_mux_impl.h index 8766ce96935a..c6d6df79dc6d 100644 --- a/source/extensions/config_subscription/grpc/grpc_mux_impl.h +++ b/source/extensions/config_subscription/grpc/grpc_mux_impl.h @@ -78,7 +78,7 @@ class GrpcMuxImpl : public GrpcMux, // Config::GrpcStreamCallbacks void onStreamEstablished() override; - void onEstablishmentFailure() override; + void onEstablishmentFailure(bool) override; void onDiscoveryResponse(std::unique_ptr&& message, ControlPlaneStats& control_plane_stats) override; diff --git a/source/extensions/config_subscription/grpc/grpc_stream.h b/source/extensions/config_subscription/grpc/grpc_stream.h index f8429df2605e..cd04a03c182d 100644 --- a/source/extensions/config_subscription/grpc/grpc_stream.h +++ b/source/extensions/config_subscription/grpc/grpc_stream.h @@ -70,7 +70,7 @@ class GrpcStream : public GrpcStreamInterface, if (stream_ == nullptr) { ENVOY_LOG(debug, "Unable to establish new stream to configuration server {}", async_client_.destination()); - callbacks_->onEstablishmentFailure(); + callbacks_->onEstablishmentFailure(true); setRetryTimer(); return; } @@ -112,7 +112,10 @@ class GrpcStream : public GrpcStreamInterface, logClose(status, message); stream_ = nullptr; control_plane_stats_.connected_state_.set(0); - callbacks_->onEstablishmentFailure(); + // By default Envoy will reconnect to the same server, so pass true here. + // This will be overridden by the mux-failover if Envoy will reconnect to a + // different server. + callbacks_->onEstablishmentFailure(true); // Only retry the timer if not intentionally closed by Envoy. if (!stream_intentionally_closed_) { setRetryTimer(); diff --git a/source/extensions/config_subscription/grpc/new_grpc_mux_impl.cc b/source/extensions/config_subscription/grpc/new_grpc_mux_impl.cc index 93716197c016..15efac2b82d5 100644 --- a/source/extensions/config_subscription/grpc/new_grpc_mux_impl.cc +++ b/source/extensions/config_subscription/grpc/new_grpc_mux_impl.cc @@ -184,13 +184,13 @@ void NewGrpcMuxImpl::onDiscoveryResponse( void NewGrpcMuxImpl::onStreamEstablished() { for (auto& [type_url, subscription] : subscriptions_) { UNREFERENCED_PARAMETER(type_url); - subscription->sub_state_.markStreamFresh(); + subscription->sub_state_.markStreamFresh(should_send_initial_resource_versions_); } pausable_ack_queue_.clear(); trySendDiscoveryRequests(); } -void NewGrpcMuxImpl::onEstablishmentFailure() { +void NewGrpcMuxImpl::onEstablishmentFailure(bool next_attempt_may_send_initial_resource_version) { // If this happens while Envoy is still initializing, the onConfigUpdateFailed() we ultimately // call on CDS will cause LDS to start up, which adds to subscriptions_ here. So, to avoid a // crash, the iteration needs to dance around a little: collect pointers to all @@ -208,6 +208,7 @@ void NewGrpcMuxImpl::onEstablishmentFailure() { } } } while (all_subscribed.size() != subscriptions_.size()); + should_send_initial_resource_versions_ = next_attempt_may_send_initial_resource_version; } void NewGrpcMuxImpl::onWriteable() { trySendDiscoveryRequests(); } diff --git a/source/extensions/config_subscription/grpc/new_grpc_mux_impl.h b/source/extensions/config_subscription/grpc/new_grpc_mux_impl.h index 26a949ae4dbb..741ea6856e45 100644 --- a/source/extensions/config_subscription/grpc/new_grpc_mux_impl.h +++ b/source/extensions/config_subscription/grpc/new_grpc_mux_impl.h @@ -69,7 +69,7 @@ class NewGrpcMuxImpl void onStreamEstablished() override; - void onEstablishmentFailure() override; + void onEstablishmentFailure(bool next_attempt_may_send_initial_resource_version) override; void onWriteable() override; @@ -210,6 +210,9 @@ class NewGrpcMuxImpl XdsConfigTrackerOptRef xds_config_tracker_; EdsResourcesCachePtr eds_resources_cache_; + // Used to track whether initial_resource_versions should be populated on the + // next reconnection. + bool should_send_initial_resource_versions_{true}; bool started_{false}; // True iff Envoy is shutting down; no messages should be sent on the `grpc_stream_` when this is // true because it may contain dangling pointers. diff --git a/source/extensions/config_subscription/grpc/xds_mux/delta_subscription_state.cc b/source/extensions/config_subscription/grpc/xds_mux/delta_subscription_state.cc index 31a640801a14..46f1fb04a6f0 100644 --- a/source/extensions/config_subscription/grpc/xds_mux/delta_subscription_state.cc +++ b/source/extensions/config_subscription/grpc/xds_mux/delta_subscription_state.cc @@ -120,6 +120,11 @@ bool DeltaSubscriptionState::subscriptionUpdatePending() const { return dynamicContextChanged(); } +void DeltaSubscriptionState::markStreamFresh(bool should_send_initial_resource_versions) { + any_request_sent_yet_in_current_stream_ = false; + should_send_initial_resource_versions_ = should_send_initial_resource_versions; +} + bool DeltaSubscriptionState::isHeartbeatResource( const envoy::service::discovery::v3::Resource& resource) const { if (!supports_heartbeats_) { @@ -244,21 +249,25 @@ DeltaSubscriptionState::getNextRequestInternal() { // Also, since this might be a new server, we must explicitly state *all* of our subscription // interest. for (auto const& [resource_name, resource_state] : requested_resource_state_) { - // Populate initial_resource_versions with the resource versions we currently have. - // Resources we are interested in, but are still waiting to get any version of from the - // server, do not belong in initial_resource_versions. (But do belong in new subscriptions!) - if (!resource_state.isWaitingForServer()) { - (*request->mutable_initial_resource_versions())[resource_name] = resource_state.version(); + if (should_send_initial_resource_versions_) { + // Populate initial_resource_versions with the resource versions we currently have. + // Resources we are interested in, but are still waiting to get any version of from the + // server, do not belong in initial_resource_versions. (But do belong in new subscriptions!) + if (!resource_state.isWaitingForServer()) { + (*request->mutable_initial_resource_versions())[resource_name] = resource_state.version(); + } } // We are going over a list of resources that we are interested in, so add them to // resource_names_subscribe. names_added_.insert(resource_name); } - for (auto const& [resource_name, resource_version] : wildcard_resource_state_) { - (*request->mutable_initial_resource_versions())[resource_name] = resource_version; - } - for (auto const& [resource_name, resource_version] : ambiguous_resource_state_) { - (*request->mutable_initial_resource_versions())[resource_name] = resource_version; + if (should_send_initial_resource_versions_) { + for (auto const& [resource_name, resource_version] : wildcard_resource_state_) { + (*request->mutable_initial_resource_versions())[resource_name] = resource_version; + } + for (auto const& [resource_name, resource_version] : ambiguous_resource_state_) { + (*request->mutable_initial_resource_versions())[resource_name] = resource_version; + } } // If this is a legacy wildcard request, then make sure that the resource_names_subscribe is // empty. diff --git a/source/extensions/config_subscription/grpc/xds_mux/delta_subscription_state.h b/source/extensions/config_subscription/grpc/xds_mux/delta_subscription_state.h index 70b9801d5920..c6c62218c148 100644 --- a/source/extensions/config_subscription/grpc/xds_mux/delta_subscription_state.h +++ b/source/extensions/config_subscription/grpc/xds_mux/delta_subscription_state.h @@ -31,7 +31,7 @@ class DeltaSubscriptionState // Whether there was a change in our subscription interest we have yet to inform the server of. bool subscriptionUpdatePending() const override; - void markStreamFresh() override { any_request_sent_yet_in_current_stream_ = false; } + void markStreamFresh(bool should_send_initial_resource_versions) override; void ttlExpiryCallback(const std::vector& expired) override; @@ -100,6 +100,7 @@ class DeltaSubscriptionState bool in_initial_legacy_wildcard_{true}; bool any_request_sent_yet_in_current_stream_{}; + bool should_send_initial_resource_versions_{true}; // Tracks changes in our subscription interest since the previous DeltaDiscoveryRequest we sent. // TODO: Can't use absl::flat_hash_set due to ordering issues in gTest expectation matching. diff --git a/source/extensions/config_subscription/grpc/xds_mux/grpc_mux_impl.cc b/source/extensions/config_subscription/grpc/xds_mux/grpc_mux_impl.cc index 7dedcf910abe..9269df1d37df 100644 --- a/source/extensions/config_subscription/grpc/xds_mux/grpc_mux_impl.cc +++ b/source/extensions/config_subscription/grpc/xds_mux/grpc_mux_impl.cc @@ -274,7 +274,7 @@ template void GrpcMuxImpl::handleEstablishedStream() { ENVOY_LOG(debug, "GrpcMuxImpl stream successfully established"); for (auto& [type_url, subscription_state] : subscriptions_) { - subscription_state->markStreamFresh(); + subscription_state->markStreamFresh(should_send_initial_resource_versions_); } setAnyRequestSentYetInCurrentStream(false); maybeUpdateQueueSizeStat(0); @@ -283,7 +283,8 @@ void GrpcMuxImpl::handleEstablishedStream() { } template -void GrpcMuxImpl::handleStreamEstablishmentFailure() { +void GrpcMuxImpl::handleStreamEstablishmentFailure( + bool next_attempt_may_send_initial_resource_version) { ENVOY_LOG(debug, "GrpcMuxImpl stream failed to establish"); // If this happens while Envoy is still initializing, the onConfigUpdateFailed() we ultimately // call on CDS will cause LDS to start up, which adds to subscriptions_ here. So, to avoid a @@ -302,6 +303,7 @@ void GrpcMuxImpl::handleStreamEstablishmentFailure() { } } } while (all_subscribed.size() != subscriptions_.size()); + should_send_initial_resource_versions_ = next_attempt_may_send_initial_resource_version; } template diff --git a/source/extensions/config_subscription/grpc/xds_mux/grpc_mux_impl.h b/source/extensions/config_subscription/grpc/xds_mux/grpc_mux_impl.h index 2e0ce7407862..37f0c31f1729 100644 --- a/source/extensions/config_subscription/grpc/xds_mux/grpc_mux_impl.h +++ b/source/extensions/config_subscription/grpc/xds_mux/grpc_mux_impl.h @@ -96,7 +96,9 @@ class GrpcMuxImpl : public GrpcStreamCallbacks, // GrpcStreamCallbacks void onStreamEstablished() override { handleEstablishedStream(); } - void onEstablishmentFailure() override { handleStreamEstablishmentFailure(); } + void onEstablishmentFailure(bool next_attempt_may_send_initial_resource_version) override { + handleStreamEstablishmentFailure(next_attempt_may_send_initial_resource_version); + } void onWriteable() override { trySendDiscoveryRequests(); } void onDiscoveryResponse(std::unique_ptr&& message, ControlPlaneStats& control_plane_stats) override { @@ -152,7 +154,7 @@ class GrpcMuxImpl : public GrpcStreamCallbacks, S& subscriptionStateFor(const std::string& type_url); WatchMap& watchMapFor(const std::string& type_url); void handleEstablishedStream(); - void handleStreamEstablishmentFailure(); + void handleStreamEstablishmentFailure(bool next_attempt_may_send_initial_resource_version); void genericHandleResponse(const std::string& type_url, const RS& response_proto, ControlPlaneStats& control_plane_stats); void trySendDiscoveryRequests(); @@ -227,6 +229,10 @@ class GrpcMuxImpl : public GrpcStreamCallbacks, EdsResourcesCachePtr eds_resources_cache_; const std::string target_xds_authority_; + // Used to track whether initial_resource_versions should be populated on the + // next reconnection. + bool should_send_initial_resource_versions_{true}; + bool started_{false}; // True iff Envoy is shutting down; no messages should be sent on the `grpc_stream_` when this is // true because it may contain dangling pointers. diff --git a/source/extensions/config_subscription/grpc/xds_mux/sotw_subscription_state.cc b/source/extensions/config_subscription/grpc/xds_mux/sotw_subscription_state.cc index 18bafc1a8015..38ca7c980f2e 100644 --- a/source/extensions/config_subscription/grpc/xds_mux/sotw_subscription_state.cc +++ b/source/extensions/config_subscription/grpc/xds_mux/sotw_subscription_state.cc @@ -37,7 +37,7 @@ bool SotwSubscriptionState::subscriptionUpdatePending() const { return update_pending_ || dynamicContextChanged(); } -void SotwSubscriptionState::markStreamFresh() { +void SotwSubscriptionState::markStreamFresh(bool) { last_good_nonce_ = absl::nullopt; update_pending_ = true; clearDynamicContextChanged(); diff --git a/source/extensions/config_subscription/grpc/xds_mux/sotw_subscription_state.h b/source/extensions/config_subscription/grpc/xds_mux/sotw_subscription_state.h index 79d44b1ec73b..a084b03efbcb 100644 --- a/source/extensions/config_subscription/grpc/xds_mux/sotw_subscription_state.h +++ b/source/extensions/config_subscription/grpc/xds_mux/sotw_subscription_state.h @@ -35,7 +35,7 @@ class SotwSubscriptionState // Whether there was a change in our subscription interest we have yet to inform the server of. bool subscriptionUpdatePending() const override; - void markStreamFresh() override; + void markStreamFresh(bool) override; void ttlExpiryCallback(const std::vector& expired) override; diff --git a/source/extensions/config_subscription/grpc/xds_mux/subscription_state.h b/source/extensions/config_subscription/grpc/xds_mux/subscription_state.h index 3c7ec8d0c542..1602c9196bc6 100644 --- a/source/extensions/config_subscription/grpc/xds_mux/subscription_state.h +++ b/source/extensions/config_subscription/grpc/xds_mux/subscription_state.h @@ -60,7 +60,7 @@ class BaseSubscriptionState : public SubscriptionState, // Whether there was a change in our subscription interest we have yet to inform the server of. virtual bool subscriptionUpdatePending() const PURE; - virtual void markStreamFresh() PURE; + virtual void markStreamFresh(bool should_send_initial_resource_versions) PURE; UpdateAck handleResponse(const RS& response) { // We *always* copy the response's nonce into the next request, even if we're going to make that diff --git a/test/extensions/config_subscription/grpc/delta_subscription_state_test.cc b/test/extensions/config_subscription/grpc/delta_subscription_state_test.cc index 7f29e466dbc4..59e9cb501fa0 100644 --- a/test/extensions/config_subscription/grpc/delta_subscription_state_test.cc +++ b/test/extensions/config_subscription/grpc/delta_subscription_state_test.cc @@ -138,11 +138,11 @@ class DeltaSubscriptionStateTestBase : public testing::TestWithParam(state_)->markStreamFresh(); + absl::get<1>(state_)->markStreamFresh(should_send_initial_resource_versions); } else { - absl::get<0>(state_)->markStreamFresh(); + absl::get<0>(state_)->markStreamFresh(should_send_initial_resource_versions); } } @@ -193,7 +193,7 @@ TEST_P(DeltaSubscriptionStateTestBlank, SubscriptionPendingTest) { // We should send a request after a new stream is established if we are interested in some // resource. EXPECT_FALSE(subscriptionUpdatePending()); - markStreamFresh(); + markStreamFresh(true); EXPECT_TRUE(subscriptionUpdatePending()); getNextRequestAckless(); @@ -206,7 +206,7 @@ TEST_P(DeltaSubscriptionStateTestBlank, SubscriptionPendingTest) { // We should not be sending anything after stream reestablishing, because we are not interested in // anything. - markStreamFresh(); + markStreamFresh(true); EXPECT_FALSE(subscriptionUpdatePending()); } @@ -224,7 +224,7 @@ TEST_P(DeltaSubscriptionStateTestBlank, ResourceTransitionNonWildcardFromRequest EXPECT_TRUE(req->initial_resource_versions().empty()); deliverSimpleDiscoveryResponse({{"foo", "1"}, {"bar", "1"}}, {}, "d1"); - markStreamFresh(); + markStreamFresh(true); req = getNextRequestAckless(); EXPECT_THAT(req->resource_names_subscribe(), UnorderedElementsAre("foo", "bar")); EXPECT_TRUE(req->resource_names_unsubscribe().empty()); @@ -237,7 +237,7 @@ TEST_P(DeltaSubscriptionStateTestBlank, ResourceTransitionNonWildcardFromRequest EXPECT_THAT(req->resource_names_unsubscribe(), UnorderedElementsAre("foo")); deliverSimpleDiscoveryResponse({}, {"foo"}, "d2"); - markStreamFresh(); + markStreamFresh(true); req = getNextRequestAckless(); EXPECT_THAT(req->resource_names_subscribe(), UnorderedElementsAre("bar")); EXPECT_TRUE(req->resource_names_unsubscribe().empty()); @@ -256,7 +256,7 @@ TEST_P(DeltaSubscriptionStateTestBlank, ResourceTransitionWithWildcardFromReques deliverSimpleDiscoveryResponse({{"foo", "1"}, {"bar", "1"}, {"wild1", "1"}}, {}, "d1"); // ensure that foo is a part of resource versions - markStreamFresh(); + markStreamFresh(true); req = getNextRequestAckless(); EXPECT_THAT(req->resource_names_subscribe(), UnorderedElementsAre(WildcardStr, "foo", "bar")); EXPECT_TRUE(req->resource_names_unsubscribe().empty()); @@ -270,7 +270,7 @@ TEST_P(DeltaSubscriptionStateTestBlank, ResourceTransitionWithWildcardFromReques EXPECT_TRUE(req->resource_names_subscribe().empty()); EXPECT_THAT(req->resource_names_unsubscribe(), UnorderedElementsAre("foo")); // didn't receive a reply - markStreamFresh(); + markStreamFresh(true); req = getNextRequestAckless(); EXPECT_THAT(req->resource_names_subscribe(), UnorderedElementsAre(WildcardStr, "bar")); EXPECT_TRUE(req->resource_names_unsubscribe().empty()); @@ -292,7 +292,7 @@ TEST_P(DeltaSubscriptionStateTestBlank, ResourceTransitionWithWildcardFromReques deliverSimpleDiscoveryResponse({{"foo", "1"}, {"baz", "1"}, {"wild1", "1"}}, {}, "d1"); // ensure that foo is a part of resource versions, bar won't be, because we don't have its version - markStreamFresh(); + markStreamFresh(true); req = getNextRequestAckless(); EXPECT_THAT(req->resource_names_subscribe(), UnorderedElementsAre(WildcardStr, "foo", "bar", "baz")); @@ -307,7 +307,7 @@ TEST_P(DeltaSubscriptionStateTestBlank, ResourceTransitionWithWildcardFromReques EXPECT_TRUE(req->resource_names_subscribe().empty()); EXPECT_THAT(req->resource_names_unsubscribe(), UnorderedElementsAre("foo", "bar")); deliverSimpleDiscoveryResponse({}, {"foo"}, "d2"); - markStreamFresh(); + markStreamFresh(true); req = getNextRequestAckless(); EXPECT_THAT(req->resource_names_subscribe(), UnorderedElementsAre(WildcardStr, "baz")); EXPECT_TRUE(req->resource_names_unsubscribe().empty()); @@ -326,7 +326,7 @@ TEST_P(DeltaSubscriptionStateTestBlank, ResourceTransitionWithWildcardFromWildca deliverSimpleDiscoveryResponse({{"foo", "1"}, {"wild1", "1"}}, {}, "d1"); updateSubscriptionInterest({"foo"}, {}); - markStreamFresh(); + markStreamFresh(true); req = getNextRequestAckless(); EXPECT_THAT(req->resource_names_subscribe(), UnorderedElementsAre(WildcardStr, "foo")); EXPECT_TRUE(req->resource_names_unsubscribe().empty()); @@ -347,7 +347,7 @@ TEST_P(DeltaSubscriptionStateTestBlank, ResourceTransitionWithWildcardFromAmbigu // make foo ambiguous and request it again updateSubscriptionInterest({}, {"foo"}); updateSubscriptionInterest({"foo"}, {}); - markStreamFresh(); + markStreamFresh(true); req = getNextRequestAckless(); EXPECT_THAT(req->resource_names_subscribe(), UnorderedElementsAre(WildcardStr, "foo")); EXPECT_TRUE(req->resource_names_unsubscribe().empty()); @@ -366,7 +366,7 @@ TEST_P(DeltaSubscriptionStateTestBlank, LegacyWildcardInitialRequests) { // unsubscribing from unknown resource should keep the legacy // wildcard mode updateSubscriptionInterest({}, {"unknown"}); - markStreamFresh(); + markStreamFresh(true); req = getNextRequestAckless(); EXPECT_TRUE(req->resource_names_subscribe().empty()); EXPECT_TRUE(req->resource_names_unsubscribe().empty()); @@ -382,7 +382,7 @@ TEST_P(DeltaSubscriptionStateTestBlank, LegacyWildcardInitialRequests) { EXPECT_THAT(req->resource_names_unsubscribe(), UnorderedElementsAre("foo")); deliverSimpleDiscoveryResponse({}, {"foo"}, "d1"); - markStreamFresh(); + markStreamFresh(true); req = getNextRequestAckless(); EXPECT_TRUE(req->resource_names_subscribe().empty()); EXPECT_TRUE(req->resource_names_unsubscribe().empty()); @@ -404,7 +404,7 @@ TEST_P(DeltaSubscriptionStateTestBlank, ReconnectResourcesVersions) { deliverSimpleDiscoveryResponse({{"foo", "2"}, {"wild", "2"}}, {}, "d2"); // Reconnect, and end validate the initial resources versions. - markStreamFresh(); + markStreamFresh(true); req = getNextRequestAckless(); EXPECT_THAT(req->resource_names_subscribe(), UnorderedElementsAre(WildcardStr, "foo", "bar")); EXPECT_TRUE(req->resource_names_unsubscribe().empty()); @@ -412,6 +412,29 @@ TEST_P(DeltaSubscriptionStateTestBlank, ReconnectResourcesVersions) { UnorderedElementsAre(Pair("foo", "2"), Pair("bar", "1"), Pair("wild", "2"))); } +// Validates that if the reconnection notes that it should not send initial resource version, +// then that field isn't populated. +TEST_P(DeltaSubscriptionStateTestBlank, ReconnectAvoidResourcesVersions) { + // Subscribe to foo and bar. + updateSubscriptionInterest({WildcardStr, "foo", "bar"}, {}); + auto req = getNextRequestAckless(); + EXPECT_THAT(req->resource_names_subscribe(), UnorderedElementsAre(WildcardStr, "foo", "bar")); + EXPECT_TRUE(req->resource_names_unsubscribe().empty()); + EXPECT_TRUE(req->initial_resource_versions().empty()); + // Deliver foo, bar, and a wild with version 1. + deliverSimpleDiscoveryResponse({{"foo", "1"}, {"bar", "1"}, {"wild", "1"}}, {}, "d1"); + + // Update the versions of foo and wild to 2. + deliverSimpleDiscoveryResponse({{"foo", "2"}, {"wild", "2"}}, {}, "d2"); + + // Reconnect, and end validate the initial resources versions. + markStreamFresh(false); + req = getNextRequestAckless(); + EXPECT_THAT(req->resource_names_subscribe(), UnorderedElementsAre(WildcardStr, "foo", "bar")); + EXPECT_TRUE(req->resource_names_unsubscribe().empty()); + EXPECT_TRUE(req->initial_resource_versions().empty()); +} + // Check that ambiguous resources may also receive a heartbeat message. TEST_P(DeltaSubscriptionStateTestBlank, AmbiguousResourceTTL) { Event::SimulatedTimeSystem time_system; @@ -482,7 +505,7 @@ TEST_P(DeltaSubscriptionStateTestBlank, IgnoreSuperfluousResources) { // Force a reconnection and resending of the "initial" message. If the initial_resource_versions // in the message contains resources like did-not-want or spam, we haven't ignored that as we // should. - markStreamFresh(); + markStreamFresh(true); req = getNextRequestAckless(); EXPECT_THAT(req->resource_names_subscribe(), UnorderedElementsAre("foo", "bar")); EXPECT_TRUE(req->resource_names_unsubscribe().empty()); @@ -550,7 +573,7 @@ TEST_P(DeltaSubscriptionStateTest, NewPushDoesntAddUntrackedResources) { } { // On Reconnection, only "name4", "name5", "name6" are sent. - markStreamFresh(); + markStreamFresh(true); auto cur_request = getNextRequestAckless(); EXPECT_THAT(cur_request->resource_names_subscribe(), UnorderedElementsAre("name4", "name5", "name6")); @@ -571,7 +594,7 @@ TEST_P(DeltaSubscriptionStateTest, NewPushDoesntAddUntrackedResources) { EXPECT_EQ(Grpc::Status::WellKnownGrpcStatus::Ok, ack.error_detail_.code()); } { // Simulate a stream reconnection, just to see the current resource_state_. - markStreamFresh(); + markStreamFresh(true); auto cur_request = getNextRequestAckless(); EXPECT_THAT(cur_request->resource_names_subscribe(), UnorderedElementsAre("name4", "name5", "name6")); @@ -725,7 +748,7 @@ TEST_P(DeltaSubscriptionStateTest, ResourceGoneLeadsToBlankInitialVersion) { populateRepeatedResource({{"name1", "version1A"}, {"name2", "version2A"}}); EXPECT_CALL(*ttl_timer_, disableTimer()); deliverDiscoveryResponse(add1_2, {}, "debugversion1"); - markStreamFresh(); // simulate a stream reconnection + markStreamFresh(true); // simulate a stream reconnection auto cur_request = getNextRequestAckless(); EXPECT_EQ("version1A", cur_request->initial_resource_versions().at("name1")); EXPECT_EQ("version2A", cur_request->initial_resource_versions().at("name2")); @@ -741,7 +764,7 @@ TEST_P(DeltaSubscriptionStateTest, ResourceGoneLeadsToBlankInitialVersion) { *remove2.Add() = "name2"; EXPECT_CALL(*ttl_timer_, disableTimer()).Times(2); deliverDiscoveryResponse(add1_3, remove2, "debugversion2"); - markStreamFresh(); // simulate a stream reconnection + markStreamFresh(true); // simulate a stream reconnection auto cur_request = getNextRequestAckless(); EXPECT_EQ("version1B", cur_request->initial_resource_versions().at("name1")); EXPECT_EQ(cur_request->initial_resource_versions().end(), @@ -755,7 +778,7 @@ TEST_P(DeltaSubscriptionStateTest, ResourceGoneLeadsToBlankInitialVersion) { *remove1_3.Add() = "name1"; *remove1_3.Add() = "name3"; deliverDiscoveryResponse({}, remove1_3, "debugversion3"); - markStreamFresh(); // simulate a stream reconnection + markStreamFresh(true); // simulate a stream reconnection auto cur_request = getNextRequestAckless(); EXPECT_TRUE(cur_request->initial_resource_versions().empty()); } @@ -786,7 +809,7 @@ TEST_P(DeltaSubscriptionStateTest, SubscribeAndUnsubscribeAfterReconnect) { deliverDiscoveryResponse(add1_2, {}, "debugversion1"); updateSubscriptionInterest({"name4"}, {"name1"}); - markStreamFresh(); // simulate a stream reconnection + markStreamFresh(true); // simulate a stream reconnection auto cur_request = getNextRequestAckless(); // Regarding the resource_names_subscribe field: // name1: do not include: we lost interest. @@ -810,7 +833,7 @@ TEST_P(DeltaSubscriptionStateTest, SwitchIntoWildcardMode) { // switch into wildcard mode updateSubscriptionInterest({"name4", WildcardStr}, {"name1"}); - markStreamFresh(); // simulate a stream reconnection + markStreamFresh(true); // simulate a stream reconnection auto cur_request = getNextRequestAckless(); // Regarding the resource_names_subscribe field: // name1: do not include: we lost interest. @@ -826,7 +849,7 @@ TEST_P(DeltaSubscriptionStateTest, SwitchIntoWildcardMode) { populateRepeatedResource({{"name4", "version4A"}, {"name5", "version5A"}}); deliverDiscoveryResponse(add4_5, {}, "debugversion1"); - markStreamFresh(); // simulate a stream reconnection + markStreamFresh(true); // simulate a stream reconnection cur_request = getNextRequestAckless(); // Regarding the resource_names_subscribe field: // name1: do not include: we lost interest. @@ -851,7 +874,7 @@ TEST_P(WildcardDeltaSubscriptionStateTest, SubscribeAndUnsubscribeAfterReconnect EXPECT_CALL(*ttl_timer_, disableTimer()); deliverDiscoveryResponse(add1_2, {}, "debugversion1"); - markStreamFresh(); // simulate a stream reconnection + markStreamFresh(true); // simulate a stream reconnection auto cur_request = getNextRequestAckless(); // Regarding the resource_names_subscribe field: // name1: do not include: we lost interest. @@ -872,7 +895,7 @@ TEST_P(WildcardDeltaSubscriptionStateTest, SubscribeAndUnsubscribeAfterReconnect deliverDiscoveryResponse(add1_2, {}, "debugversion1"); updateSubscriptionInterest({"name3"}, {}); - markStreamFresh(); // simulate a stream reconnection + markStreamFresh(true); // simulate a stream reconnection auto cur_request = getNextRequestAckless(); // Regarding the resource_names_subscribe field: // name1: do not include: see below @@ -896,7 +919,7 @@ TEST_P(WildcardDeltaSubscriptionStateTest, CancellingImplicitWildcardSubscriptio auto cur_request = getNextRequestAckless(); EXPECT_THAT(cur_request->resource_names_subscribe(), UnorderedElementsAre("name3")); EXPECT_THAT(cur_request->resource_names_unsubscribe(), UnorderedElementsAre(Wildcard)); - markStreamFresh(); // simulate a stream reconnection + markStreamFresh(true); // simulate a stream reconnection // Regarding the resource_names_subscribe field: // name1: do not include, see below // name2: do not include: it came from wildcard subscription we lost interest in, so we are not @@ -926,7 +949,7 @@ TEST_P(WildcardDeltaSubscriptionStateTest, CancellingExplicitWildcardSubscriptio cur_request = getNextRequestAckless(); EXPECT_THAT(cur_request->resource_names_subscribe(), UnorderedElementsAre("name4")); EXPECT_THAT(cur_request->resource_names_unsubscribe(), UnorderedElementsAre(Wildcard)); - markStreamFresh(); // simulate a stream reconnection + markStreamFresh(true); // simulate a stream reconnection // Regarding the resource_names_subscribe field: // name1: do not include: see name2 // name2: do not include: it came as a part of wildcard subscription we cancelled, so we are not @@ -948,7 +971,7 @@ TEST_P(WildcardDeltaSubscriptionStateTest, ExplicitInterestOverridesImplicit) { // verify that neither name1 nor name2 appears in the initial request (they are of implicit // interest and initial wildcard request should not contain those). - markStreamFresh(); // simulate a stream reconnection + markStreamFresh(true); // simulate a stream reconnection auto cur_request = getNextRequestAckless(); EXPECT_TRUE(cur_request->resource_names_subscribe().empty()); EXPECT_TRUE(cur_request->resource_names_unsubscribe().empty()); @@ -963,7 +986,7 @@ TEST_P(WildcardDeltaSubscriptionStateTest, ExplicitInterestOverridesImplicit) { // verify that name1 and * appear in the initial request (name1 is of explicit interest and we are // in explicit wildcard mode). - markStreamFresh(); // simulate a stream reconnection + markStreamFresh(true); // simulate a stream reconnection cur_request = getNextRequestAckless(); EXPECT_THAT(cur_request->resource_names_subscribe(), UnorderedElementsAre("name1", Wildcard)); EXPECT_TRUE(cur_request->resource_names_unsubscribe().empty()); @@ -972,7 +995,7 @@ TEST_P(WildcardDeltaSubscriptionStateTest, ExplicitInterestOverridesImplicit) { Protobuf::RepeatedPtrField add1_2_b = populateRepeatedResource({{"name1", "version1B"}, {"name2", "version2B"}}); deliverDiscoveryResponse(add1_2_b, {}, "debugversion1"); - markStreamFresh(); // simulate a stream reconnection + markStreamFresh(true); // simulate a stream reconnection cur_request = getNextRequestAckless(); EXPECT_THAT(cur_request->resource_names_subscribe(), UnorderedElementsAre("name1", Wildcard)); EXPECT_TRUE(cur_request->resource_names_unsubscribe().empty()); @@ -992,7 +1015,7 @@ TEST_P(WildcardDeltaSubscriptionStateTest, ResetToLegacyWildcardBehaviorOnStream cur_request = getNextRequestAckless(); EXPECT_TRUE(cur_request->resource_names_subscribe().empty()); EXPECT_THAT(cur_request->resource_names_unsubscribe(), UnorderedElementsAre("resource")); - markStreamFresh(); // simulate a stream reconnection + markStreamFresh(true); // simulate a stream reconnection cur_request = getNextRequestAckless(); EXPECT_TRUE(cur_request->resource_names_subscribe().empty()); EXPECT_TRUE(cur_request->resource_names_unsubscribe().empty()); @@ -1007,7 +1030,7 @@ TEST_P(WildcardDeltaSubscriptionStateTest, ResetToLegacyWildcardBehaviorOnStream cur_request = getNextRequestAckless(); EXPECT_TRUE(cur_request->resource_names_subscribe().empty()); EXPECT_THAT(cur_request->resource_names_unsubscribe(), UnorderedElementsAre("resource")); - markStreamFresh(); // simulate a stream reconnection + markStreamFresh(true); // simulate a stream reconnection updateSubscriptionInterest({}, {}); cur_request = getNextRequestAckless(); EXPECT_TRUE(cur_request->resource_names_subscribe().empty()); @@ -1025,7 +1048,7 @@ TEST_P(WildcardDeltaSubscriptionStateTest, AllResourcesFromServerAreTrackedInWil } { // On Reconnection, only "name4", "name5", "name6" and wildcard resource are sent. - markStreamFresh(); + markStreamFresh(true); auto cur_request = getNextRequestAckless(); EXPECT_THAT(cur_request->resource_names_subscribe(), UnorderedElementsAre(WildcardStr, "name4", "name5", "name6")); @@ -1046,7 +1069,7 @@ TEST_P(WildcardDeltaSubscriptionStateTest, AllResourcesFromServerAreTrackedInWil EXPECT_EQ(Grpc::Status::WellKnownGrpcStatus::Ok, ack.error_detail_.code()); } { // Simulate a stream reconnection, just to see the current resource_state_. - markStreamFresh(); + markStreamFresh(true); auto cur_request = getNextRequestAckless(); EXPECT_THAT(cur_request->resource_names_subscribe(), UnorderedElementsAre(WildcardStr, "name4", "name5", "name6")); @@ -1069,7 +1092,7 @@ TEST_P(DeltaSubscriptionStateTest, InitialVersionMapFirstMessageOnly) { {{"name1", "version1A"}, {"name2", "version2A"}, {"name3", "version3A"}}); EXPECT_CALL(*ttl_timer_, disableTimer()); deliverDiscoveryResponse(add_all, {}, "debugversion1"); - markStreamFresh(); // simulate a stream reconnection + markStreamFresh(true); // simulate a stream reconnection auto cur_request = getNextRequestAckless(); EXPECT_EQ("version1A", cur_request->initial_resource_versions().at("name1")); EXPECT_EQ("version2A", cur_request->initial_resource_versions().at("name2")); @@ -1098,7 +1121,7 @@ TEST_P(DeltaSubscriptionStateTest, CheckUpdatePending) { EXPECT_FALSE(subscriptionUpdatePending()); updateSubscriptionInterest({}, {}); // no change EXPECT_FALSE(subscriptionUpdatePending()); - markStreamFresh(); + markStreamFresh(true); EXPECT_TRUE(subscriptionUpdatePending()); // no change, BUT fresh stream updateSubscriptionInterest({}, {"name3"}); // one removed EXPECT_TRUE(subscriptionUpdatePending()); @@ -1250,7 +1273,7 @@ TEST_P(DeltaSubscriptionStateTest, NoVersionUpdateOnNack) { EXPECT_NE(Grpc::Status::WellKnownGrpcStatus::Ok, ack.error_detail_.code()); } // Verify that a reconnect keeps the old versions. - markStreamFresh(); + markStreamFresh(true); { auto req = getNextRequestAckless(); EXPECT_THAT(req->resource_names_subscribe(), UnorderedElementsAre("name1", "name2", "name3")); diff --git a/test/extensions/config_subscription/grpc/grpc_mux_failover_test.cc b/test/extensions/config_subscription/grpc/grpc_mux_failover_test.cc index 383b13a750d6..8a990f134fcb 100644 --- a/test/extensions/config_subscription/grpc/grpc_mux_failover_test.cc +++ b/test/extensions/config_subscription/grpc/grpc_mux_failover_test.cc @@ -100,8 +100,8 @@ TEST_F(GrpcMuxFailoverNoFailoverTest, PrimaryOnEstablishmentFailureInvoked) { EXPECT_CALL(primary_stream_, establishNewStream()); grpc_mux_failover_.establishNewStream(); - EXPECT_CALL(grpc_mux_callbacks_, onEstablishmentFailure()); - primary_callbacks_->onEstablishmentFailure(); + EXPECT_CALL(grpc_mux_callbacks_, onEstablishmentFailure(false)); + primary_callbacks_->onEstablishmentFailure(false); } // Validates that onDiscoveryResponse callback is invoked on the primary stream @@ -208,16 +208,16 @@ class GrpcMuxFailoverTest : public testing::Test { grpc_mux_failover_->establishNewStream(); // First disconnect. - EXPECT_CALL(grpc_mux_callbacks_, onEstablishmentFailure()); - primary_callbacks_->onEstablishmentFailure(); + EXPECT_CALL(grpc_mux_callbacks_, onEstablishmentFailure(false)); + primary_callbacks_->onEstablishmentFailure(false); // Emulate a retry that ends with a second disconnect. It should close the // primary stream and try to establish the failover stream. EXPECT_CALL(primary_stream_, closeStream()); - EXPECT_CALL(grpc_mux_callbacks_, onEstablishmentFailure()); + EXPECT_CALL(grpc_mux_callbacks_, onEstablishmentFailure(false)); EXPECT_CALL(primary_stream_, establishNewStream()).Times(0); EXPECT_CALL(failover_stream_, establishNewStream()); - primary_callbacks_->onEstablishmentFailure(); + primary_callbacks_->onEstablishmentFailure(false); } // Successfully connect to the failover source. @@ -316,9 +316,9 @@ TEST_F(GrpcMuxFailoverTest, AttemptPrimaryAfterPrimaryInitialFailure) { // First disconnect. EXPECT_CALL(primary_stream_, closeStream()).Times(0); - EXPECT_CALL(grpc_mux_callbacks_, onEstablishmentFailure()); + EXPECT_CALL(grpc_mux_callbacks_, onEstablishmentFailure(false)); EXPECT_CALL(failover_stream_, establishNewStream()).Times(0); - primary_callbacks_->onEstablishmentFailure(); + primary_callbacks_->onEstablishmentFailure(false); } // Validate that upon failure of the second connection to the primary, the @@ -327,16 +327,16 @@ TEST_F(GrpcMuxFailoverTest, AttemptFailoverAfterPrimaryTwoFailures) { connectingToPrimary(); // First disconnect. - EXPECT_CALL(grpc_mux_callbacks_, onEstablishmentFailure()); - primary_callbacks_->onEstablishmentFailure(); + EXPECT_CALL(grpc_mux_callbacks_, onEstablishmentFailure(false)); + primary_callbacks_->onEstablishmentFailure(false); // Emulate a retry that ends with a second disconnect. It should close the // primary stream and try to establish the failover stream. EXPECT_CALL(primary_stream_, closeStream()); - EXPECT_CALL(grpc_mux_callbacks_, onEstablishmentFailure()); + EXPECT_CALL(grpc_mux_callbacks_, onEstablishmentFailure(false)); EXPECT_CALL(primary_stream_, establishNewStream()).Times(0); EXPECT_CALL(failover_stream_, establishNewStream()); - primary_callbacks_->onEstablishmentFailure(); + primary_callbacks_->onEstablishmentFailure(false); } // Validate that starting from the second failure to reach the primary, @@ -345,8 +345,8 @@ TEST_F(GrpcMuxFailoverTest, AlternatingBetweenFailoverAndPrimary) { connectingToPrimary(); // First disconnect. - EXPECT_CALL(grpc_mux_callbacks_, onEstablishmentFailure()); - primary_callbacks_->onEstablishmentFailure(); + EXPECT_CALL(grpc_mux_callbacks_, onEstablishmentFailure(false)); + primary_callbacks_->onEstablishmentFailure(false); // Emulate a 5 times disconnects. for (int attempt = 0; attempt < 5; ++attempt) { @@ -355,19 +355,19 @@ TEST_F(GrpcMuxFailoverTest, AlternatingBetweenFailoverAndPrimary) { // connect to the failover. It should close the primary stream, and // try to establish the failover stream. EXPECT_CALL(primary_stream_, closeStream()); - EXPECT_CALL(grpc_mux_callbacks_, onEstablishmentFailure()); + EXPECT_CALL(grpc_mux_callbacks_, onEstablishmentFailure(false)); EXPECT_CALL(primary_stream_, establishNewStream()).Times(0); EXPECT_CALL(failover_stream_, establishNewStream()); - primary_callbacks_->onEstablishmentFailure(); + primary_callbacks_->onEstablishmentFailure(false); } else { // Emulate a failover source failure that will result in an attempt to // connect to the primary. It should close the failover stream, and // enable the retry timer. EXPECT_CALL(failover_stream_, closeStream()); - EXPECT_CALL(grpc_mux_callbacks_, onEstablishmentFailure()); + EXPECT_CALL(grpc_mux_callbacks_, onEstablishmentFailure(false)); EXPECT_CALL(failover_stream_, establishNewStream()).Times(0); EXPECT_CALL(*timer_, enableTimer(_, _)); - failover_callbacks_->onEstablishmentFailure(); + failover_callbacks_->onEstablishmentFailure(false); // Emulate a timer tick, which should try to reconnect to the primary // stream. EXPECT_CALL(primary_stream_, establishNewStream()); @@ -387,9 +387,9 @@ TEST_F(GrpcMuxFailoverTest, PrimaryOnlyAttemptsAfterPrimaryAvailable) { // connect to the failover. It should not close the primary stream (so // the retry mechanism will kick in). EXPECT_CALL(primary_stream_, closeStream()).Times(0); - EXPECT_CALL(grpc_mux_callbacks_, onEstablishmentFailure()); + EXPECT_CALL(grpc_mux_callbacks_, onEstablishmentFailure(true)); EXPECT_CALL(failover_stream_, establishNewStream()).Times(0); - primary_callbacks_->onEstablishmentFailure(); + primary_callbacks_->onEstablishmentFailure(false); } // Emulate a call to establishNewStream(). @@ -411,10 +411,10 @@ TEST_F(GrpcMuxFailoverTest, AlternatingPrimaryAndFailoverAttemptsAfterFailoverAv // connect to the primary. It should close the failover stream, and // enable the retry timer. EXPECT_CALL(failover_stream_, closeStream()); - EXPECT_CALL(grpc_mux_callbacks_, onEstablishmentFailure()); + EXPECT_CALL(grpc_mux_callbacks_, onEstablishmentFailure(false)); EXPECT_CALL(failover_stream_, establishNewStream()).Times(0); EXPECT_CALL(*timer_, enableTimer(_, _)); - failover_callbacks_->onEstablishmentFailure(); + failover_callbacks_->onEstablishmentFailure(false); // Emulate a timer tick, which should try to reconnect to the primary // stream. EXPECT_CALL(primary_stream_, establishNewStream()); @@ -424,10 +424,12 @@ TEST_F(GrpcMuxFailoverTest, AlternatingPrimaryAndFailoverAttemptsAfterFailoverAv // connect to the failover. It should close the primary stream, and // try to establish the failover stream. EXPECT_CALL(primary_stream_, closeStream()); - EXPECT_CALL(grpc_mux_callbacks_, onEstablishmentFailure()); + // Expecting "true" to be passed as it was previously connected to the + // failover. + EXPECT_CALL(grpc_mux_callbacks_, onEstablishmentFailure(true)); EXPECT_CALL(primary_stream_, establishNewStream()).Times(0); EXPECT_CALL(failover_stream_, establishNewStream()); - primary_callbacks_->onEstablishmentFailure(); + primary_callbacks_->onEstablishmentFailure(false); } } @@ -457,10 +459,10 @@ TEST_F(GrpcMuxFailoverTest, TimerDisabledUponExternalReconnect) { // Fail the attempt to connect to the failover. EXPECT_CALL(failover_stream_, closeStream()); - EXPECT_CALL(grpc_mux_callbacks_, onEstablishmentFailure()); + EXPECT_CALL(grpc_mux_callbacks_, onEstablishmentFailure(false)); EXPECT_CALL(failover_stream_, establishNewStream()).Times(0); EXPECT_CALL(*timer_, enableTimer(_, _)); - failover_callbacks_->onEstablishmentFailure(); + failover_callbacks_->onEstablishmentFailure(false); // Attempt to reconnect again. EXPECT_CALL(*timer_, disableTimer()); diff --git a/test/extensions/config_subscription/grpc/grpc_mux_impl_test.cc b/test/extensions/config_subscription/grpc/grpc_mux_impl_test.cc index 93e5d13c5457..df541cd6bf98 100644 --- a/test/extensions/config_subscription/grpc/grpc_mux_impl_test.cc +++ b/test/extensions/config_subscription/grpc/grpc_mux_impl_test.cc @@ -1390,7 +1390,7 @@ TEST_F(NullGrpcMuxImplTest, OnStreamEstablishedImplemented) { EXPECT_NO_THROW(null_mux_.onStreamEstablished()); } TEST_F(NullGrpcMuxImplTest, OnEstablishmentFailureImplemented) { - EXPECT_NO_THROW(null_mux_.onEstablishmentFailure()); + EXPECT_NO_THROW(null_mux_.onEstablishmentFailure(false)); } TEST_F(NullGrpcMuxImplTest, OnDiscoveryResponseImplemented) { std::unique_ptr response; diff --git a/test/extensions/config_subscription/grpc/grpc_stream_test.cc b/test/extensions/config_subscription/grpc/grpc_stream_test.cc index 9f7645c3b712..2ef55b4b367b 100644 --- a/test/extensions/config_subscription/grpc/grpc_stream_test.cc +++ b/test/extensions/config_subscription/grpc/grpc_stream_test.cc @@ -118,14 +118,14 @@ TEST_F(GrpcStreamTest, LogClose) { EXPECT_FALSE(grpc_stream_->getCloseStatusForTest().has_value()); // Benign status: debug. - EXPECT_CALL(callbacks_, onEstablishmentFailure()); + EXPECT_CALL(callbacks_, onEstablishmentFailure(true)); EXPECT_LOG_CONTAINS("debug", "gRPC config stream to test_destination closed", { grpc_stream_->onRemoteClose(Grpc::Status::WellKnownGrpcStatus::Ok, "Ok"); }); EXPECT_FALSE(grpc_stream_->getCloseStatusForTest().has_value()); // Non-retriable failure: warn. - EXPECT_CALL(callbacks_, onEstablishmentFailure()); + EXPECT_CALL(callbacks_, onEstablishmentFailure(true)); EXPECT_LOG_CONTAINS("warn", "gRPC config stream to test_destination closed", { grpc_stream_->onRemoteClose(Grpc::Status::WellKnownGrpcStatus::NotFound, "Not Found"); }); @@ -134,7 +134,7 @@ TEST_F(GrpcStreamTest, LogClose) { // Repeated failures that warn after enough time. { // Retriable failure: debug. - EXPECT_CALL(callbacks_, onEstablishmentFailure()); + EXPECT_CALL(callbacks_, onEstablishmentFailure(true)); EXPECT_LOG_CONTAINS("debug", "gRPC config stream to test_destination closed", { grpc_stream_->onRemoteClose(Grpc::Status::WellKnownGrpcStatus::Unavailable, "Unavailable"); }); @@ -143,7 +143,7 @@ TEST_F(GrpcStreamTest, LogClose) { // Different retriable failure: warn. time_system_.advanceTimeWait(std::chrono::seconds(1)); - EXPECT_CALL(callbacks_, onEstablishmentFailure()); + EXPECT_CALL(callbacks_, onEstablishmentFailure(true)); EXPECT_LOG_CONTAINS("warn", "stream to test_destination closed: 4, Deadline Exceeded (previously 14, " "Unavailable since 1s ago)", @@ -157,7 +157,7 @@ TEST_F(GrpcStreamTest, LogClose) { // Same retriable failure after a short amount of time: debug. time_system_.advanceTimeWait(std::chrono::seconds(1)); - EXPECT_CALL(callbacks_, onEstablishmentFailure()); + EXPECT_CALL(callbacks_, onEstablishmentFailure(true)); EXPECT_LOG_CONTAINS("debug", "gRPC config stream to test_destination closed", { grpc_stream_->onRemoteClose(Grpc::Status::WellKnownGrpcStatus::DeadlineExceeded, "Deadline Exceeded"); @@ -167,7 +167,7 @@ TEST_F(GrpcStreamTest, LogClose) { // Same retriable failure after a long time: warn. time_system_.advanceTimeWait(std::chrono::seconds(100)); - EXPECT_CALL(callbacks_, onEstablishmentFailure()); + EXPECT_CALL(callbacks_, onEstablishmentFailure(true)); EXPECT_LOG_CONTAINS( "warn", "gRPC config stream to test_destination closed since 101s ago: 4, Deadline Exceeded", { @@ -179,7 +179,7 @@ TEST_F(GrpcStreamTest, LogClose) { // Warn again, using the newest message. time_system_.advanceTimeWait(std::chrono::seconds(1)); - EXPECT_CALL(callbacks_, onEstablishmentFailure()); + EXPECT_CALL(callbacks_, onEstablishmentFailure(true)); EXPECT_LOG_CONTAINS( "warn", "gRPC config stream to test_destination closed since 102s ago: 4, new message", { grpc_stream_->onRemoteClose(Grpc::Status::WellKnownGrpcStatus::DeadlineExceeded, @@ -190,7 +190,7 @@ TEST_F(GrpcStreamTest, LogClose) { // Different retriable failure, using the most recent error message from the previous one. time_system_.advanceTimeWait(std::chrono::seconds(1)); - EXPECT_CALL(callbacks_, onEstablishmentFailure()); + EXPECT_CALL(callbacks_, onEstablishmentFailure(true)); EXPECT_LOG_CONTAINS("warn", "gRPC config stream to test_destination closed: 14, Unavailable " "(previously 4, new message since 103s ago)", @@ -222,7 +222,7 @@ TEST_F(GrpcStreamTest, LogClose) { // sendMessage would segfault. TEST_F(GrpcStreamTest, FailToEstablishNewStream) { EXPECT_CALL(*async_client_, startRaw(_, _, _, _)).WillOnce(Return(nullptr)); - EXPECT_CALL(callbacks_, onEstablishmentFailure()); + EXPECT_CALL(callbacks_, onEstablishmentFailure(true)); grpc_stream_->establishNewStream(); EXPECT_FALSE(grpc_stream_->grpcStreamAvailable()); } @@ -296,7 +296,7 @@ TEST_F(GrpcStreamTest, RetryOnEstablishNewStreamFailure) { // simulate that first call to establish GRPC stream fails { EXPECT_CALL(*async_client_, startRaw(_, _, _, _)).WillOnce(Return(nullptr)); - EXPECT_CALL(callbacks_, onEstablishmentFailure()); + EXPECT_CALL(callbacks_, onEstablishmentFailure(true)); // First backoff interval should be 27%25=2 EXPECT_CALL(*grpc_stream_retry_timer, enableTimer(std::chrono::milliseconds(2), _)); grpc_stream_->establishNewStream(); @@ -306,7 +306,7 @@ TEST_F(GrpcStreamTest, RetryOnEstablishNewStreamFailure) { // assume 2ms have passed, invoke callback, fail 2nd time { EXPECT_CALL(*async_client_, startRaw(_, _, _, _)).WillOnce(Return(nullptr)); - EXPECT_CALL(callbacks_, onEstablishmentFailure()); + EXPECT_CALL(callbacks_, onEstablishmentFailure(true)); // Second backoff interval will be 27%30=27 EXPECT_CALL(*grpc_stream_retry_timer, enableTimer(std::chrono::milliseconds(27), _)); grpc_stream_retry_timer_cb(); @@ -347,7 +347,7 @@ TEST_F(GrpcStreamTest, RetryOnRemoteClose) { } // simulate that remote closes the stream, this should trigger a retry - EXPECT_CALL(callbacks_, onEstablishmentFailure()); + EXPECT_CALL(callbacks_, onEstablishmentFailure(true)); // First backoff interval will be 27%25=2 EXPECT_CALL(*grpc_stream_retry_timer, enableTimer(std::chrono::milliseconds(2), _)); grpc_stream_->onRemoteClose(Grpc::Status::WellKnownGrpcStatus::Unavailable, ""); @@ -356,7 +356,7 @@ TEST_F(GrpcStreamTest, RetryOnRemoteClose) { // assume 2ms have passed, invoke callback, fail the first time { EXPECT_CALL(*async_client_, startRaw(_, _, _, _)).WillOnce(Return(nullptr)); - EXPECT_CALL(callbacks_, onEstablishmentFailure()); + EXPECT_CALL(callbacks_, onEstablishmentFailure(true)); // Second backoff interval will be 27%30=27 EXPECT_CALL(*grpc_stream_retry_timer, enableTimer(std::chrono::milliseconds(27), _)); grpc_stream_retry_timer_cb(); @@ -366,7 +366,7 @@ TEST_F(GrpcStreamTest, RetryOnRemoteClose) { // assume 27ms have passed, invoke callback, fail the second time { EXPECT_CALL(*async_client_, startRaw(_, _, _, _)).WillOnce(Return(nullptr)); - EXPECT_CALL(callbacks_, onEstablishmentFailure()); + EXPECT_CALL(callbacks_, onEstablishmentFailure(true)); // First backoff interval will be 27%30=27 EXPECT_CALL(*grpc_stream_retry_timer, enableTimer(std::chrono::milliseconds(27), _)); grpc_stream_retry_timer_cb(); @@ -417,7 +417,7 @@ TEST_F(GrpcStreamTest, CloseStreamDisablesRetryTimer) { EXPECT_FALSE(grpc_stream_->grpcStreamAvailable()); // Simulate an establishment failure that will not recreate the timer. - EXPECT_CALL(callbacks_, onEstablishmentFailure()); + EXPECT_CALL(callbacks_, onEstablishmentFailure(true)); EXPECT_CALL(*grpc_stream_retry_timer, enableTimer(_, _)).Times(0); grpc_stream_->onRemoteClose(Grpc::Status::WellKnownGrpcStatus::Unavailable, ""); EXPECT_FALSE(grpc_stream_->grpcStreamAvailable()); diff --git a/test/extensions/config_subscription/grpc/new_grpc_mux_impl_test.cc b/test/extensions/config_subscription/grpc/new_grpc_mux_impl_test.cc index 71f33b93118b..5c27473e03f0 100644 --- a/test/extensions/config_subscription/grpc/new_grpc_mux_impl_test.cc +++ b/test/extensions/config_subscription/grpc/new_grpc_mux_impl_test.cc @@ -61,11 +61,12 @@ class NewGrpcMuxImplTestBase : public testing::TestWithParam(GetParam())) { // Once "envoy.restart_features.xds_failover_support" is deprecated, the // test should no longer be parameterized on the bool value. - scoped_runtime_.mergeValues({{"envoy.restart_features.xds_failover_support", - std::get<1>(GetParam()) ? "true" : "false"}}); + scoped_runtime_.mergeValues( + {{"envoy.restart_features.xds_failover_support", using_xds_failover_ ? "true" : "false"}}); } void setup() { @@ -186,6 +187,7 @@ class NewGrpcMuxImplTestBase : public testing::TestWithParamresponse_nonce()); EXPECT_EQ("version1", cur_request->version_info()); // Reconnect the stream. - state_->markStreamFresh(); + state_->markStreamFresh(true); cur_request = getNextDiscoveryRequestAckless(); EXPECT_EQ("", cur_request->response_nonce()); EXPECT_EQ("version1", cur_request->version_info()); @@ -313,7 +313,7 @@ TEST_F(SotwSubscriptionStateTest, CheckUpdatePending) { EXPECT_FALSE(state_->subscriptionUpdatePending()); state_->updateSubscriptionInterest({}, {}); // no change EXPECT_FALSE(state_->subscriptionUpdatePending()); - state_->markStreamFresh(); + state_->markStreamFresh(true); EXPECT_TRUE(state_->subscriptionUpdatePending()); // no change, BUT fresh stream state_->updateSubscriptionInterest({}, {"name3"}); // one removed EXPECT_TRUE(state_->subscriptionUpdatePending()); diff --git a/test/extensions/config_subscription/grpc/xds_failover_integration_test.cc b/test/extensions/config_subscription/grpc/xds_failover_integration_test.cc index 9318d0150bf3..de4948c91336 100644 --- a/test/extensions/config_subscription/grpc/xds_failover_integration_test.cc +++ b/test/extensions/config_subscription/grpc/xds_failover_integration_test.cc @@ -664,16 +664,17 @@ TEST_P(XdsFailoverAdsIntegrationTest, PrimaryUseAfterFailoverResponseAndDisconne // Ensure basic flow with primary works. Validate that the // initial_resource_versions for delta-xDS is empty. - // TODO(adisuissa): ensure initial_resource_versions is empty, once this is supported. + const absl::flat_hash_map empty_initial_resource_versions_map; EXPECT_TRUE(compareDiscoveryRequest(CdsTypeUrl, "", {}, {}, {}, true, - Grpc::Status::WellKnownGrpcStatus::Ok, "", - xds_stream_.get())); - EXPECT_TRUE( - compareDiscoveryRequest(EdsTypeUrl, "", {"failover_cluster_0"}, {"failover_cluster_0"}, {}, - false, Grpc::Status::WellKnownGrpcStatus::Ok, "", xds_stream_.get())); + Grpc::Status::WellKnownGrpcStatus::Ok, "", xds_stream_.get(), + OptRef(empty_initial_resource_versions_map))); + EXPECT_TRUE(compareDiscoveryRequest(EdsTypeUrl, "", {"failover_cluster_0"}, + {"failover_cluster_0"}, {}, false, + Grpc::Status::WellKnownGrpcStatus::Ok, "", xds_stream_.get(), + OptRef(empty_initial_resource_versions_map))); EXPECT_TRUE(compareDiscoveryRequest(LdsTypeUrl, "", {}, {}, {}, false, - Grpc::Status::WellKnownGrpcStatus::Ok, "", - xds_stream_.get())); + Grpc::Status::WellKnownGrpcStatus::Ok, "", xds_stream_.get(), + OptRef(empty_initial_resource_versions_map))); sendDiscoveryResponse( CdsTypeUrl, {ConfigHelper::buildCluster("primary_cluster_0")}, {ConfigHelper::buildCluster("primary_cluster_0")}, {}, "primary1", {}, xds_stream_.get()); @@ -778,18 +779,22 @@ TEST_P(XdsFailoverAdsIntegrationTest, FailoverUseAfterFailoverResponseAndDisconn RELEASE_ASSERT(result, result.message()); failover_xds_stream_->startGrpcStream(); - // Ensure basic flow with primary works. Validate that the - // initial_resource_versions for delta-xDS is empty. - // TODO(adisuissa): ensure initial_resource_versions contains the correct versions. - EXPECT_TRUE(compareDiscoveryRequest(CdsTypeUrl, "", {}, {}, {}, true, - Grpc::Status::WellKnownGrpcStatus::Ok, "", - failover_xds_stream_.get())); + // Ensure basic flow with failover after it was connected to failover is + // preserved. The initial resource versions of LDS will be empty, because no + // LDS response was previously sent. + const absl::flat_hash_map cds_eds_initial_resource_versions_map{ + {"failover_cluster_0", "failover1"}}; + const absl::flat_hash_map empty_initial_resource_versions_map; + EXPECT_TRUE(compareDiscoveryRequest( + CdsTypeUrl, "", {}, {}, {}, true, Grpc::Status::WellKnownGrpcStatus::Ok, "", + failover_xds_stream_.get(), OptRef(cds_eds_initial_resource_versions_map))); EXPECT_TRUE(compareDiscoveryRequest( EdsTypeUrl, "", {"failover_cluster_0"}, {"failover_cluster_0"}, {}, false, - Grpc::Status::WellKnownGrpcStatus::Ok, "", failover_xds_stream_.get())); - EXPECT_TRUE(compareDiscoveryRequest(LdsTypeUrl, "", {}, {}, {}, false, - Grpc::Status::WellKnownGrpcStatus::Ok, "", - failover_xds_stream_.get())); + Grpc::Status::WellKnownGrpcStatus::Ok, "", failover_xds_stream_.get(), + OptRef(cds_eds_initial_resource_versions_map))); + EXPECT_TRUE(compareDiscoveryRequest( + LdsTypeUrl, "", {}, {}, {}, false, Grpc::Status::WellKnownGrpcStatus::Ok, "", + failover_xds_stream_.get(), OptRef(empty_initial_resource_versions_map))); sendDiscoveryResponse( CdsTypeUrl, {ConfigHelper::buildCluster("failover_cluster_1")}, {ConfigHelper::buildCluster("failover_cluster_1")}, {}, "failover2", {}, diff --git a/test/integration/base_integration_test.cc b/test/integration/base_integration_test.cc index 457f03b0a011..6cabd66f3dfb 100644 --- a/test/integration/base_integration_test.cc +++ b/test/integration/base_integration_test.cc @@ -613,16 +613,17 @@ AssertionResult BaseIntegrationTest::compareDiscoveryRequest( const std::vector& expected_resource_names_added, const std::vector& expected_resource_names_removed, bool expect_node, const Protobuf::int32 expected_error_code, const std::string& expected_error_substring, - FakeStream* stream) { + FakeStream* stream, + OptRef> initial_resource_versions) { if (sotw_or_delta_ == Grpc::SotwOrDelta::Sotw || sotw_or_delta_ == Grpc::SotwOrDelta::UnifiedSotw) { return compareSotwDiscoveryRequest(expected_type_url, expected_version, expected_resource_names, expect_node, expected_error_code, expected_error_substring, stream); } else { - return compareDeltaDiscoveryRequest(expected_type_url, expected_resource_names_added, - expected_resource_names_removed, stream, - expected_error_code, expected_error_substring, expect_node); + return compareDeltaDiscoveryRequest( + expected_type_url, expected_resource_names_added, expected_resource_names_removed, stream, + expected_error_code, expected_error_substring, expect_node, initial_resource_versions); } } @@ -730,7 +731,8 @@ AssertionResult BaseIntegrationTest::compareDeltaDiscoveryRequest( const std::vector& expected_resource_subscriptions, const std::vector& expected_resource_unsubscriptions, FakeStream* xds_stream, const Protobuf::int32 expected_error_code, const std::string& expected_error_substring, - bool expect_node) { + bool expect_node, + OptRef> initial_resource_versions) { envoy::service::discovery::v3::DeltaDiscoveryRequest request; if (xds_stream == nullptr) { xds_stream = xds_stream_.get(); @@ -765,7 +767,34 @@ AssertionResult BaseIntegrationTest::compareDeltaDiscoveryRequest( if (!unsub_result) { return unsub_result; } - // (We don't care about response_nonce or initial_resource_versions.) + // Validate initial_resource_versions if given (otherwise, we don't care what + // the request contains). + if (initial_resource_versions.has_value()) { + const auto& req_map = request.initial_resource_versions(); + // Compare size, and that elements in one map appear in the other. + if (req_map.size() != initial_resource_versions->size()) { + return AssertionFailure() << fmt::format( + "Wrong size of initial_resource_versions. Expected: {}, observed: {}.\n{}", + initial_resource_versions->size(), req_map.size(), + absl::StrJoin(req_map, ", ", absl::PairFormatter("="))); + } + EXPECT_EQ(req_map.size(), initial_resource_versions->size()); + for (const auto& [resource_name, resource_version] : *initial_resource_versions) { + auto it = req_map.find(resource_name); + if (it == req_map.end()) { + return AssertionFailure() << fmt::format( + "Could not find resource {} in received initial_resource_versions map: {}", + resource_name, absl::StrJoin(req_map, ", ", absl::PairFormatter("="))); + } + if (resource_version != it->second) { + return AssertionFailure() << fmt::format( + "Incorrect resource version {} in received initial_resource_versions map. " + "Expected: {}, observed: {}", + resource_name, resource_version, it->second); + } + } + } + // (We don't care about response_nonce.) if (request.error_detail().code() != expected_error_code) { return AssertionFailure() << fmt::format( diff --git a/test/integration/base_integration_test.h b/test/integration/base_integration_test.h index 6278768d6d2d..a1b571699b60 100644 --- a/test/integration/base_integration_test.h +++ b/test/integration/base_integration_test.h @@ -185,7 +185,9 @@ class BaseIntegrationTest : protected Logger::Loggable { const std::vector& expected_resource_names_added, const std::vector& expected_resource_names_removed, bool expect_node = false, const Protobuf::int32 expected_error_code = Grpc::Status::WellKnownGrpcStatus::Ok, - const std::string& expected_error_message = "", FakeStream* stream = nullptr); + const std::string& expected_error_message = "", FakeStream* stream = nullptr, + OptRef> initial_resource_versions = + absl::nullopt); template void @@ -219,7 +221,9 @@ class BaseIntegrationTest : protected Logger::Loggable { const std::vector& expected_resource_subscriptions, const std::vector& expected_resource_unsubscriptions, FakeStream* stream, const Protobuf::int32 expected_error_code = Grpc::Status::WellKnownGrpcStatus::Ok, - const std::string& expected_error_message = "", bool expect_node = true); + const std::string& expected_error_message = "", bool expect_node = true, + OptRef> initial_resource_versions = + absl::nullopt); AssertionResult compareSotwDiscoveryRequest( const std::string& expected_type_url, const std::string& expected_version, diff --git a/test/mocks/config/mocks.h b/test/mocks/config/mocks.h index f24e28806b51..f153a22aa229 100644 --- a/test/mocks/config/mocks.h +++ b/test/mocks/config/mocks.h @@ -140,7 +140,7 @@ class MockGrpcStreamCallbacks ~MockGrpcStreamCallbacks() override; MOCK_METHOD(void, onStreamEstablished, ()); - MOCK_METHOD(void, onEstablishmentFailure, ()); + MOCK_METHOD(void, onEstablishmentFailure, (bool)); MOCK_METHOD(void, onDiscoveryResponse, (std::unique_ptr && message, ControlPlaneStats& control_plane_stats));