From ce176052524b89e3e5d7d0a461bc03742342e84a Mon Sep 17 00:00:00 2001 From: Rama Chavali Date: Tue, 1 Oct 2024 23:10:41 +0530 Subject: [PATCH 1/6] add log line to indicate the admin server address (#36371) Commit Message: add log line to indicate the admin server address Risk Level: Low Testing: n/a Docs Changes: n/a Release Notes: n/a Signed-off-by: Rama Chavali --- source/server/server.cc | 2 ++ 1 file changed, 2 insertions(+) diff --git a/source/server/server.cc b/source/server/server.cc index 826c6dc4910c..d29713c65aad 100644 --- a/source/server/server.cc +++ b/source/server/server.cc @@ -720,6 +720,8 @@ absl::Status InstanceBase::initializeOrThrow(Network::Address::InstanceConstShar auto typed_admin = dynamic_cast(admin_.get()); RELEASE_ASSERT(typed_admin != nullptr, "Admin implementation is not an AdminImpl."); initial_config.initAdminAccessLog(bootstrap_, typed_admin->factoryContext()); + ENVOY_LOG(info, "Starting admin HTTP server at {}", + initial_config.admin().address()->asString()); admin_->startHttpListener(initial_config.admin().accessLogs(), initial_config.admin().address(), initial_config.admin().socketOptions()); #else From 966eba3987f0d3f20dd3aca813cad8a54e9e01b7 Mon Sep 17 00:00:00 2001 From: danzh Date: Tue, 1 Oct 2024 15:27:07 -0400 Subject: [PATCH 2/6] quic: report stream reset direction in reset callback failure string (#36368) Commit Message: add `FROM_PEER`/`FROM_SELF` to the transport_failure_reason string when a QUIC stream gets reset. Risk Level: low, error reporting format change Testing: existing tests Docs Changes: N/A Release Notes: N/A Platform Specific Features: N/A --------- Signed-off-by: Dan Zhang Co-authored-by: Dan Zhang --- source/common/quic/envoy_quic_client_stream.cc | 7 ++++--- source/common/quic/envoy_quic_server_stream.cc | 8 +++++--- test/common/quic/envoy_quic_client_session_test.cc | 4 ++-- test/integration/protocol_integration_test.cc | 8 ++++---- 4 files changed, 15 insertions(+), 12 deletions(-) diff --git a/source/common/quic/envoy_quic_client_stream.cc b/source/common/quic/envoy_quic_client_stream.cc index 88b1a96d767a..cdfb83d2553f 100644 --- a/source/common/quic/envoy_quic_client_stream.cc +++ b/source/common/quic/envoy_quic_client_stream.cc @@ -258,7 +258,8 @@ bool EnvoyQuicClientStream::OnStopSending(quic::QuicResetStreamError error) { runResetCallbacks( quicRstErrorToEnvoyRemoteResetReason(error.internal_code()), Runtime::runtimeFeatureEnabled("envoy.reloadable_features.report_stream_reset_error_code") - ? quic::QuicRstStreamErrorCodeToString(error.internal_code()) + ? absl::StrCat(quic::QuicRstStreamErrorCodeToString(error.internal_code()), + "|FROM_PEER") : absl::string_view()); } return true; @@ -360,7 +361,7 @@ void EnvoyQuicClientStream::OnStreamReset(const quic::QuicRstStreamFrame& frame) runResetCallbacks( quicRstErrorToEnvoyRemoteResetReason(frame.error_code), Runtime::runtimeFeatureEnabled("envoy.reloadable_features.report_stream_reset_error_code") - ? quic::QuicRstStreamErrorCodeToString(frame.error_code) + ? absl::StrCat(quic::QuicRstStreamErrorCodeToString(frame.error_code), "|FROM_PEER") : absl::string_view()); } } @@ -374,7 +375,7 @@ void EnvoyQuicClientStream::ResetWithError(quic::QuicResetStreamError error) { runResetCallbacks( quicRstErrorToEnvoyLocalResetReason(error.internal_code()), Runtime::runtimeFeatureEnabled("envoy.reloadable_features.report_stream_reset_error_code") - ? quic::QuicRstStreamErrorCodeToString(error.internal_code()) + ? absl::StrCat(quic::QuicRstStreamErrorCodeToString(error.internal_code()), "|FROM_SELF") : absl::string_view()); if (session()->connection()->connected()) { quic::QuicSpdyClientStream::ResetWithError(error); diff --git a/source/common/quic/envoy_quic_server_stream.cc b/source/common/quic/envoy_quic_server_stream.cc index 331495df292b..5da95f50d26c 100644 --- a/source/common/quic/envoy_quic_server_stream.cc +++ b/source/common/quic/envoy_quic_server_stream.cc @@ -340,7 +340,8 @@ bool EnvoyQuicServerStream::OnStopSending(quic::QuicResetStreamError error) { runResetCallbacks( quicRstErrorToEnvoyRemoteResetReason(error.internal_code()), Runtime::runtimeFeatureEnabled("envoy.reloadable_features.report_stream_reset_error_code") - ? quic::QuicRstStreamErrorCodeToString(error.internal_code()) + ? absl::StrCat(quic::QuicRstStreamErrorCodeToString(error.internal_code()), + "|FROM_PEER") : absl::string_view()); } return true; @@ -360,7 +361,7 @@ void EnvoyQuicServerStream::OnStreamReset(const quic::QuicRstStreamFrame& frame) runResetCallbacks( quicRstErrorToEnvoyRemoteResetReason(frame.error_code), Runtime::runtimeFeatureEnabled("envoy.reloadable_features.report_stream_reset_error_code") - ? quic::QuicRstStreamErrorCodeToString(frame.error_code) + ? absl::StrCat(quic::QuicRstStreamErrorCodeToString(frame.error_code), "|FROM_PEER") : absl::string_view()); } } @@ -375,7 +376,8 @@ void EnvoyQuicServerStream::ResetWithError(quic::QuicResetStreamError error) { runResetCallbacks( quicRstErrorToEnvoyLocalResetReason(error.internal_code()), Runtime::runtimeFeatureEnabled("envoy.reloadable_features.report_stream_reset_error_code") - ? quic::QuicRstStreamErrorCodeToString(error.internal_code()) + ? absl::StrCat(quic::QuicRstStreamErrorCodeToString(error.internal_code()), + "|FROM_SELF") : absl::string_view()); } quic::QuicSpdyServerStreamBase::ResetWithError(error); diff --git a/test/common/quic/envoy_quic_client_session_test.cc b/test/common/quic/envoy_quic_client_session_test.cc index decd040b5bfe..8e100e5d77fe 100644 --- a/test/common/quic/envoy_quic_client_session_test.cc +++ b/test/common/quic/envoy_quic_client_session_test.cc @@ -676,8 +676,8 @@ TEST_P(EnvoyQuicClientSessionTest, WriteBlockedAndUnblock) { EnvoyQuicClientConnectionPeer::onFileEvent(*quic_connection_, Event::FileReadyType::Write, *quic_connection_->connectionSocket()); EXPECT_FALSE(quic_connection_->writer()->IsWriteBlocked()); - EXPECT_CALL(stream_callbacks, - onResetStream(Http::StreamResetReason::LocalReset, "QUIC_STREAM_REQUEST_REJECTED")); + EXPECT_CALL(stream_callbacks, onResetStream(Http::StreamResetReason::LocalReset, + "QUIC_STREAM_REQUEST_REJECTED|FROM_SELF")); EXPECT_CALL(*quic_connection_, SendControlFrame(_)); stream.resetStream(Http::StreamResetReason::LocalReset); } diff --git a/test/integration/protocol_integration_test.cc b/test/integration/protocol_integration_test.cc index 921e5ba80925..510cd6924d9c 100644 --- a/test/integration/protocol_integration_test.cc +++ b/test/integration/protocol_integration_test.cc @@ -4975,10 +4975,10 @@ TEST_P(ProtocolIntegrationTest, InvalidResponseHeaderNameStreamError) { EXPECT_EQ("502", response->headers().getStatusValue()); test_server_->waitForCounterGe("http.config_test.downstream_rq_5xx", 1); - std::string error_message = - upstreamProtocol() == Http::CodecType::HTTP3 - ? "upstream_reset_before_response_started{protocol_error|QUIC_BAD_APPLICATION_PAYLOAD}" - : "upstream_reset_before_response_started{protocol_error}"; + std::string error_message = upstreamProtocol() == Http::CodecType::HTTP3 + ? "upstream_reset_before_response_started{protocol_error|QUIC_" + "BAD_APPLICATION_PAYLOAD|FROM_SELF}" + : "upstream_reset_before_response_started{protocol_error}"; EXPECT_EQ(waitForAccessLog(access_log_name_), error_message); // Upstream connection should stay up From a40acb85897037ad87138b24fe7993c6654bb164 Mon Sep 17 00:00:00 2001 From: "Adi (Suissa) Peleg" Date: Tue, 1 Oct 2024 16:05:32 -0400 Subject: [PATCH 3/6] ads-replacement: adding gRPC-mux support for ADS config replacement (#36155) Commit Message: ads-replacement: adding gRPC-mux support for ADS config replacement Additional Description: Adds support for replacing the ADS config in runtime. This PR includes the changes for the gRPC-Mux (Sotw and Delta). First part of #35956. Risk Level: low - requires another PR that enables this Testing: Added unit and integration tests. Docs Changes: N/A Release Notes: N/A - not as part of this PR. Platform Specific Features: N/A --------- Signed-off-by: Adi Suissa-Peleg --- envoy/config/grpc_mux.h | 13 ++ source/common/config/null_grpc_mux_impl.h | 6 + .../grpc/grpc_mux_failover.h | 12 +- .../config_subscription/grpc/grpc_mux_impl.cc | 73 +++++-- .../config_subscription/grpc/grpc_mux_impl.h | 17 +- .../grpc/new_grpc_mux_impl.cc | 79 ++++++-- .../grpc/new_grpc_mux_impl.h | 19 +- .../config_subscription/grpc/watch_map.cc | 4 +- .../config_subscription/grpc/watch_map.h | 8 +- .../grpc/xds_mux/grpc_mux_impl.cc | 99 +++++++--- .../grpc/xds_mux/grpc_mux_impl.h | 32 ++- .../extensions/config_subscription/grpc/BUILD | 3 + .../grpc/grpc_mux_failover_test.cc | 50 +++-- .../grpc/grpc_mux_impl_test.cc | 184 +++++++++++++++++- .../grpc/new_grpc_mux_impl_test.cc | 179 ++++++++++++++++- .../grpc/watch_map_test.cc | 24 +-- .../grpc/xds_grpc_mux_impl_test.cc | 171 +++++++++++++++- test/mocks/config/mocks.h | 7 + 18 files changed, 864 insertions(+), 116 deletions(-) diff --git a/envoy/config/grpc_mux.h b/envoy/config/grpc_mux.h index a18c87bc193e..096c439b0bd0 100644 --- a/envoy/config/grpc_mux.h +++ b/envoy/config/grpc_mux.h @@ -2,10 +2,13 @@ #include +#include "envoy/common/backoff_strategy.h" #include "envoy/common/exception.h" #include "envoy/common/pure.h" +#include "envoy/config/custom_config_validators.h" #include "envoy/config/eds_resources_cache.h" #include "envoy/config/subscription.h" +#include "envoy/grpc/async_client.h" #include "envoy/stats/stats_macros.h" #include "source/common/common/cleanup.h" @@ -112,6 +115,16 @@ class GrpcMux { * @return EdsResourcesCacheOptRef optional eds resources cache for the gRPC-mux. */ virtual EdsResourcesCacheOptRef edsResourcesCache() PURE; + + /** + * Updates the current gRPC-Mux object to use a new gRPC client, and config. + */ + virtual absl::Status + updateMuxSource(Grpc::RawAsyncClientPtr&& primary_async_client, + Grpc::RawAsyncClientPtr&& failover_async_client, + CustomConfigValidatorsPtr&& custom_config_validators, Stats::Scope& scope, + BackOffStrategyPtr&& backoff_strategy, + const envoy::config::core::v3::ApiConfigSource& ads_config_source) PURE; }; using GrpcMuxPtr = std::unique_ptr; diff --git a/source/common/config/null_grpc_mux_impl.h b/source/common/config/null_grpc_mux_impl.h index 453d723eb32d..ce45640bfa20 100644 --- a/source/common/config/null_grpc_mux_impl.h +++ b/source/common/config/null_grpc_mux_impl.h @@ -27,6 +27,12 @@ class NullGrpcMuxImpl : public GrpcMux, ENVOY_BUG(false, "unexpected request for on demand update"); } + absl::Status updateMuxSource(Grpc::RawAsyncClientPtr&&, Grpc::RawAsyncClientPtr&&, + CustomConfigValidatorsPtr&&, Stats::Scope&, BackOffStrategyPtr&&, + const envoy::config::core::v3::ApiConfigSource&) override { + return absl::UnimplementedError(""); + } + EdsResourcesCacheOptRef edsResourcesCache() override { return absl::nullopt; } void onWriteable() override {} diff --git a/source/extensions/config_subscription/grpc/grpc_mux_failover.h b/source/extensions/config_subscription/grpc/grpc_mux_failover.h index 19765df61fc7..1b7daa8b0219 100644 --- a/source/extensions/config_subscription/grpc/grpc_mux_failover.h +++ b/source/extensions/config_subscription/grpc/grpc_mux_failover.h @@ -188,6 +188,8 @@ class GrpcMuxFailover : public GrpcStreamInterface, } private: + friend class GrpcMuxFailoverTest; + // A helper class that proxies the callbacks of GrpcStreamCallbacks for the primary service. class PrimaryGrpcStreamCallbacks : public GrpcStreamCallbacks { public: @@ -356,7 +358,15 @@ class GrpcMuxFailover : public GrpcStreamInterface, void onRemoteClose(Grpc::Status::GrpcStatus, const std::string&) override { PANIC("not implemented"); } - void closeStream() override { PANIC("not implemented"); } + void closeStream() override { + if (connectingToOrConnectedToPrimary()) { + ENVOY_LOG_MISC(debug, "Intentionally closing the primary gRPC stream"); + primary_grpc_stream_->closeStream(); + } else if (connectingToOrConnectedToFailover()) { + ENVOY_LOG_MISC(debug, "Intentionally closing the failover gRPC stream"); + failover_grpc_stream_->closeStream(); + } + } // The stream callbacks that will be invoked on the GrpcMux object, to notify // about the state of the underlying primary/failover stream. diff --git a/source/extensions/config_subscription/grpc/grpc_mux_impl.cc b/source/extensions/config_subscription/grpc/grpc_mux_impl.cc index 4317cdca1a2c..f036cea34e64 100644 --- a/source/extensions/config_subscription/grpc/grpc_mux_impl.cc +++ b/source/extensions/config_subscription/grpc/grpc_mux_impl.cc @@ -60,14 +60,18 @@ std::string convertToWildcard(const std::string& resource_name) { } // namespace GrpcMuxImpl::GrpcMuxImpl(GrpcMuxContext& grpc_mux_context, bool skip_subsequent_node) - : grpc_stream_(createGrpcStreamObject(grpc_mux_context)), + : dispatcher_(grpc_mux_context.dispatcher_), + grpc_stream_(createGrpcStreamObject(std::move(grpc_mux_context.async_client_), + std::move(grpc_mux_context.failover_async_client_), + grpc_mux_context.service_method_, grpc_mux_context.scope_, + std::move(grpc_mux_context.backoff_strategy_), + grpc_mux_context.rate_limit_settings_)), local_info_(grpc_mux_context.local_info_), skip_subsequent_node_(skip_subsequent_node), config_validators_(std::move(grpc_mux_context.config_validators_)), xds_config_tracker_(grpc_mux_context.xds_config_tracker_), xds_resources_delegate_(grpc_mux_context.xds_resources_delegate_), eds_resources_cache_(std::move(grpc_mux_context.eds_resources_cache_)), target_xds_authority_(grpc_mux_context.target_xds_authority_), - dispatcher_(grpc_mux_context.dispatcher_), dynamic_update_callback_handle_( grpc_mux_context.local_info_.contextProvider().addDynamicContextUpdateCallback( [this](absl::string_view resource_type_url) { @@ -80,29 +84,33 @@ GrpcMuxImpl::GrpcMuxImpl(GrpcMuxContext& grpc_mux_context, bool skip_subsequent_ std::unique_ptr> -GrpcMuxImpl::createGrpcStreamObject(GrpcMuxContext& grpc_mux_context) { +GrpcMuxImpl::createGrpcStreamObject(Grpc::RawAsyncClientPtr&& async_client, + Grpc::RawAsyncClientPtr&& failover_async_client, + const Protobuf::MethodDescriptor& service_method, + Stats::Scope& scope, BackOffStrategyPtr&& backoff_strategy, + const RateLimitSettings& rate_limit_settings) { if (Runtime::runtimeFeatureEnabled("envoy.restart_features.xds_failover_support")) { return std::make_unique>( /*primary_stream_creator=*/ - [&grpc_mux_context]( + [&async_client, &service_method, &dispatcher = dispatcher_, &scope, &backoff_strategy, + &rate_limit_settings]( GrpcStreamCallbacks* callbacks) -> GrpcStreamInterfacePtr { return std::make_unique>( - callbacks, std::move(grpc_mux_context.async_client_), - grpc_mux_context.service_method_, grpc_mux_context.dispatcher_, - grpc_mux_context.scope_, std::move(grpc_mux_context.backoff_strategy_), - grpc_mux_context.rate_limit_settings_, + callbacks, std::move(async_client), service_method, dispatcher, scope, + std::move(backoff_strategy), rate_limit_settings, GrpcStream::ConnectedStateValue:: FIRST_ENTRY); }, /*failover_stream_creator=*/ - grpc_mux_context.failover_async_client_ + failover_async_client ? absl::make_optional( - [&grpc_mux_context]( + [&failover_async_client, &service_method, &dispatcher = dispatcher_, &scope, + &rate_limit_settings]( GrpcStreamCallbacks* callbacks) -> GrpcStreamInterfacePtr>( - callbacks, std::move(grpc_mux_context.failover_async_client_), - grpc_mux_context.service_method_, grpc_mux_context.dispatcher_, - grpc_mux_context.scope_, + callbacks, std::move(failover_async_client), service_method, dispatcher, + scope, // TODO(adisuissa): the backoff strategy for the failover should // be the same as the primary source. std::make_unique( GrpcMuxFailover:: DefaultFailoverBackoffMilliseconds), - grpc_mux_context.rate_limit_settings_, + rate_limit_settings, GrpcStream:: ConnectedStateValue::SECOND_ENTRY); }) : absl::nullopt, /*grpc_mux_callbacks=*/*this, - /*dispatch=*/grpc_mux_context.dispatcher_); + /*dispatch=*/dispatcher_); } return std::make_unique>( - this, std::move(grpc_mux_context.async_client_), grpc_mux_context.service_method_, - grpc_mux_context.dispatcher_, grpc_mux_context.scope_, - std::move(grpc_mux_context.backoff_strategy_), grpc_mux_context.rate_limit_settings_, + this, std::move(async_client), service_method, dispatcher_, scope, + std::move(backoff_strategy), rate_limit_settings, GrpcStream< envoy::service::discovery::v3::DiscoveryRequest, envoy::service::discovery::v3::DiscoveryResponse>::ConnectedStateValue::FIRST_ENTRY); @@ -292,6 +298,37 @@ GrpcMuxWatchPtr GrpcMuxImpl::addWatch(const std::string& type_url, return watch; } +absl::Status +GrpcMuxImpl::updateMuxSource(Grpc::RawAsyncClientPtr&& primary_async_client, + Grpc::RawAsyncClientPtr&& failover_async_client, + CustomConfigValidatorsPtr&& custom_config_validators, + Stats::Scope& scope, BackOffStrategyPtr&& backoff_strategy, + const envoy::config::core::v3::ApiConfigSource& ads_config_source) { + // Process the rate limit settings. + absl::StatusOr rate_limit_settings_or_error = + Utility::parseRateLimitSettings(ads_config_source); + RETURN_IF_NOT_OK_REF(rate_limit_settings_or_error.status()); + + const Protobuf::MethodDescriptor& service_method = + *Protobuf::DescriptorPool::generated_pool()->FindMethodByName( + "envoy.service.discovery.v3.AggregatedDiscoveryService.StreamAggregatedResources"); + + // Disconnect from current xDS servers. + ENVOY_LOG_MISC(info, "Replacing xDS gRPC mux source"); + grpc_stream_->closeStream(); + grpc_stream_ = createGrpcStreamObject(std::move(primary_async_client), + std::move(failover_async_client), service_method, scope, + std::move(backoff_strategy), *rate_limit_settings_or_error); + + // Update the config validators. + config_validators_ = std::move(custom_config_validators); + + // Start the subscriptions over the grpc_stream. + grpc_stream_->establishNewStream(); + + return absl::OkStatus(); +} + ScopedResume GrpcMuxImpl::pause(const std::string& type_url) { return pause(std::vector{type_url}); } diff --git a/source/extensions/config_subscription/grpc/grpc_mux_impl.h b/source/extensions/config_subscription/grpc/grpc_mux_impl.h index 885942f3fb2a..c53d51cd0979 100644 --- a/source/extensions/config_subscription/grpc/grpc_mux_impl.h +++ b/source/extensions/config_subscription/grpc/grpc_mux_impl.h @@ -73,6 +73,13 @@ class GrpcMuxImpl : public GrpcMux, return makeOptRefFromPtr(eds_resources_cache_.get()); } + absl::Status + updateMuxSource(Grpc::RawAsyncClientPtr&& primary_async_client, + Grpc::RawAsyncClientPtr&& failover_async_client, + CustomConfigValidatorsPtr&& custom_config_validators, Stats::Scope& scope, + BackOffStrategyPtr&& backoff_strategy, + const envoy::config::core::v3::ApiConfigSource& ads_config_source) override; + void handleDiscoveryResponse( std::unique_ptr&& message); @@ -100,11 +107,13 @@ class GrpcMuxImpl : public GrpcMux, private: // Helper function to create the grpc_stream_ object. - // TODO(adisuissa): this should be removed when envoy.restart_features.xds_failover_support - // is deprecated. std::unique_ptr> - createGrpcStreamObject(GrpcMuxContext& grpc_mux_context); + createGrpcStreamObject(Grpc::RawAsyncClientPtr&& async_client, + Grpc::RawAsyncClientPtr&& failover_async_client, + const Protobuf::MethodDescriptor& service_method, Stats::Scope& scope, + BackOffStrategyPtr&& backoff_strategy, + const RateLimitSettings& rate_limit_settings); void drainRequests(); void setRetryTimer(); @@ -272,6 +281,7 @@ class GrpcMuxImpl : public GrpcMux, ApiState& api_state, const std::string& type_url, const std::string& version_info, bool call_delegate); + Event::Dispatcher& dispatcher_; // Multiplexes the stream to the primary and failover sources. // TODO(adisuissa): Once envoy.restart_features.xds_failover_support is deprecated, // convert from unique_ptr to GrpcMuxFailover directly. @@ -301,7 +311,6 @@ class GrpcMuxImpl : public GrpcMux, // URL. std::unique_ptr> request_queue_; - Event::Dispatcher& dispatcher_; Common::CallbackHandlePtr dynamic_update_callback_handle_; bool started_{false}; diff --git a/source/extensions/config_subscription/grpc/new_grpc_mux_impl.cc b/source/extensions/config_subscription/grpc/new_grpc_mux_impl.cc index 1a47a63572c7..6315853829ef 100644 --- a/source/extensions/config_subscription/grpc/new_grpc_mux_impl.cc +++ b/source/extensions/config_subscription/grpc/new_grpc_mux_impl.cc @@ -36,7 +36,12 @@ using AllMuxes = ThreadSafeSingleton; } // namespace NewGrpcMuxImpl::NewGrpcMuxImpl(GrpcMuxContext& grpc_mux_context) - : grpc_stream_(createGrpcStreamObject(grpc_mux_context)), + : dispatcher_(grpc_mux_context.dispatcher_), + grpc_stream_(createGrpcStreamObject(std::move(grpc_mux_context.async_client_), + std::move(grpc_mux_context.failover_async_client_), + grpc_mux_context.service_method_, grpc_mux_context.scope_, + std::move(grpc_mux_context.backoff_strategy_), + grpc_mux_context.rate_limit_settings_)), local_info_(grpc_mux_context.local_info_), config_validators_(std::move(grpc_mux_context.config_validators_)), dynamic_update_callback_handle_( @@ -45,7 +50,6 @@ NewGrpcMuxImpl::NewGrpcMuxImpl(GrpcMuxContext& grpc_mux_context) onDynamicContextUpdate(resource_type_url); return absl::OkStatus(); })), - dispatcher_(grpc_mux_context.dispatcher_), xds_config_tracker_(grpc_mux_context.xds_config_tracker_), eds_resources_cache_(std::move(grpc_mux_context.eds_resources_cache_)) { AllMuxes::get().insert(this); @@ -53,30 +57,34 @@ NewGrpcMuxImpl::NewGrpcMuxImpl(GrpcMuxContext& grpc_mux_context) std::unique_ptr> -NewGrpcMuxImpl::createGrpcStreamObject(GrpcMuxContext& grpc_mux_context) { +NewGrpcMuxImpl::createGrpcStreamObject(Grpc::RawAsyncClientPtr&& async_client, + Grpc::RawAsyncClientPtr&& failover_async_client, + const Protobuf::MethodDescriptor& service_method, + Stats::Scope& scope, BackOffStrategyPtr&& backoff_strategy, + const RateLimitSettings& rate_limit_settings) { if (Runtime::runtimeFeatureEnabled("envoy.restart_features.xds_failover_support")) { return std::make_unique>( /*primary_stream_creator=*/ - [&grpc_mux_context]( + [&async_client, &service_method, &dispatcher = dispatcher_, &scope, &backoff_strategy, + &rate_limit_settings]( GrpcStreamCallbacks* callbacks) -> GrpcStreamInterfacePtr { return std::make_unique< GrpcStream>( - callbacks, std::move(grpc_mux_context.async_client_), - grpc_mux_context.service_method_, grpc_mux_context.dispatcher_, - grpc_mux_context.scope_, std::move(grpc_mux_context.backoff_strategy_), - grpc_mux_context.rate_limit_settings_, + callbacks, std::move(async_client), service_method, dispatcher, scope, + std::move(backoff_strategy), rate_limit_settings, GrpcStream:: ConnectedStateValue::FIRST_ENTRY); }, /*failover_stream_creator=*/ - grpc_mux_context.failover_async_client_ + failover_async_client ? absl::make_optional( - [&grpc_mux_context]( + [&failover_async_client, &service_method, &dispatcher = dispatcher_, &scope, + &rate_limit_settings]( GrpcStreamCallbacks* callbacks) -> GrpcStreamInterfacePtr< @@ -85,29 +93,27 @@ NewGrpcMuxImpl::createGrpcStreamObject(GrpcMuxContext& grpc_mux_context) { return std::make_unique< GrpcStream>( - callbacks, std::move(grpc_mux_context.failover_async_client_), - grpc_mux_context.service_method_, grpc_mux_context.dispatcher_, - grpc_mux_context.scope_, + callbacks, std::move(failover_async_client), service_method, dispatcher, + scope, // TODO(adisuissa): the backoff strategy for the failover should // be the same as the primary source. std::make_unique( GrpcMuxFailover:: DefaultFailoverBackoffMilliseconds), - grpc_mux_context.rate_limit_settings_, + rate_limit_settings, GrpcStream:: ConnectedStateValue::SECOND_ENTRY); }) : absl::nullopt, /*grpc_mux_callbacks=*/*this, - /*dispatch=*/grpc_mux_context.dispatcher_); + /*dispatch=*/dispatcher_); } return std::make_unique>( - this, std::move(grpc_mux_context.async_client_), grpc_mux_context.service_method_, - grpc_mux_context.dispatcher_, grpc_mux_context.scope_, - std::move(grpc_mux_context.backoff_strategy_), grpc_mux_context.rate_limit_settings_, + this, std::move(async_client), service_method, dispatcher_, scope, + std::move(backoff_strategy), rate_limit_settings, GrpcStream< envoy::service::discovery::v3::DeltaDiscoveryRequest, envoy::service::discovery::v3::DeltaDiscoveryResponse>::ConnectedStateValue::FIRST_ENTRY); @@ -246,6 +252,41 @@ GrpcMuxWatchPtr NewGrpcMuxImpl::addWatch(const std::string& type_url, return std::make_unique(type_url, watch, *this, options); } +absl::Status +NewGrpcMuxImpl::updateMuxSource(Grpc::RawAsyncClientPtr&& primary_async_client, + Grpc::RawAsyncClientPtr&& failover_async_client, + CustomConfigValidatorsPtr&& custom_config_validators, + Stats::Scope& scope, BackOffStrategyPtr&& backoff_strategy, + const envoy::config::core::v3::ApiConfigSource& ads_config_source) { + // Process the rate limit settings. + absl::StatusOr rate_limit_settings_or_error = + Utility::parseRateLimitSettings(ads_config_source); + RETURN_IF_NOT_OK_REF(rate_limit_settings_or_error.status()); + + const Protobuf::MethodDescriptor& service_method = + *Protobuf::DescriptorPool::generated_pool()->FindMethodByName( + "envoy.service.discovery.v3.AggregatedDiscoveryService.DeltaAggregatedResources"); + + // Disconnect from current xDS servers. + ENVOY_LOG_MISC(info, "Replacing the xDS gRPC mux source"); + grpc_stream_->closeStream(); + grpc_stream_ = createGrpcStreamObject(std::move(primary_async_client), + std::move(failover_async_client), service_method, scope, + std::move(backoff_strategy), *rate_limit_settings_or_error); + + // Update the config validators. + config_validators_ = std::move(custom_config_validators); + // Update the watch map's config validators. + for (auto& [type_url, subscription] : subscriptions_) { + subscription->watch_map_.setConfigValidators(config_validators_.get()); + } + + // Start the subscriptions over the grpc_stream. + grpc_stream_->establishNewStream(); + + return absl::OkStatus(); +} + // Updates the list of resource names watched by the given watch. If an added name is new across // the whole subscription, or if a removed name has no other watch interested in it, then the // subscription will enqueue and attempt to send an appropriate discovery request. @@ -320,7 +361,7 @@ void NewGrpcMuxImpl::addSubscription(const std::string& type_url, } subscriptions_.emplace( type_url, std::make_unique(type_url, local_info_, use_namespace_matching, - dispatcher_, *config_validators_.get(), + dispatcher_, config_validators_.get(), xds_config_tracker_, resources_cache)); subscription_ordering_.emplace_back(type_url); } diff --git a/source/extensions/config_subscription/grpc/new_grpc_mux_impl.h b/source/extensions/config_subscription/grpc/new_grpc_mux_impl.h index 741ea6856e45..5c35dd6e177b 100644 --- a/source/extensions/config_subscription/grpc/new_grpc_mux_impl.h +++ b/source/extensions/config_subscription/grpc/new_grpc_mux_impl.h @@ -78,6 +78,13 @@ class NewGrpcMuxImpl // TODO(fredlas) remove this from the GrpcMux interface. void start() override; + absl::Status + updateMuxSource(Grpc::RawAsyncClientPtr&& primary_async_client, + Grpc::RawAsyncClientPtr&& failover_async_client, + CustomConfigValidatorsPtr&& custom_config_validators, Stats::Scope& scope, + BackOffStrategyPtr&& backoff_strategy, + const envoy::config::core::v3::ApiConfigSource& ads_config_source) override; + GrpcStreamInterface& grpcStreamForTest() { @@ -95,7 +102,7 @@ class NewGrpcMuxImpl struct SubscriptionStuff { SubscriptionStuff(const std::string& type_url, const LocalInfo::LocalInfo& local_info, const bool use_namespace_matching, Event::Dispatcher& dispatcher, - CustomConfigValidators& config_validators, + CustomConfigValidators* config_validators, XdsConfigTrackerOptRef xds_config_tracker, EdsResourcesCacheOptRef eds_resources_cache) : watch_map_(use_namespace_matching, type_url, config_validators, eds_resources_cache), @@ -149,11 +156,13 @@ class NewGrpcMuxImpl }; // Helper function to create the grpc_stream_ object. - // TODO(adisuissa): this should be removed when envoy.restart_features.xds_failover_support - // is deprecated. std::unique_ptr> - createGrpcStreamObject(GrpcMuxContext& grpc_mux_context); + createGrpcStreamObject(Grpc::RawAsyncClientPtr&& async_client, + Grpc::RawAsyncClientPtr&& failover_async_client, + const Protobuf::MethodDescriptor& service_method, Stats::Scope& scope, + BackOffStrategyPtr&& backoff_strategy, + const RateLimitSettings& rate_limit_settings); void removeWatch(const std::string& type_url, Watch* watch); @@ -196,6 +205,7 @@ class NewGrpcMuxImpl // the order of Envoy's dependency ordering). std::list subscription_ordering_; + Event::Dispatcher& dispatcher_; // Multiplexes the stream to the primary and failover sources. // TODO(adisuissa): Once envoy.restart_features.xds_failover_support is deprecated, // convert from unique_ptr to GrpcMuxFailover directly. @@ -206,7 +216,6 @@ class NewGrpcMuxImpl const LocalInfo::LocalInfo& local_info_; CustomConfigValidatorsPtr config_validators_; Common::CallbackHandlePtr dynamic_update_callback_handle_; - Event::Dispatcher& dispatcher_; XdsConfigTrackerOptRef xds_config_tracker_; EdsResourcesCachePtr eds_resources_cache_; diff --git a/source/extensions/config_subscription/grpc/watch_map.cc b/source/extensions/config_subscription/grpc/watch_map.cc index 815bcca48581..ffa57508eed7 100644 --- a/source/extensions/config_subscription/grpc/watch_map.cc +++ b/source/extensions/config_subscription/grpc/watch_map.cc @@ -154,7 +154,7 @@ void WatchMap::onConfigUpdate(const std::vector& resources, } // Execute external config validators. - config_validators_.executeValidators(type_url_, resources); + config_validators_->executeValidators(type_url_, resources); const bool map_is_single_wildcard = (watches_.size() == 1 && wildcard_watches_.size() == 1); // We just bundled up the updates into nice per-watch packages. Now, deliver them. @@ -254,7 +254,7 @@ void WatchMap::onConfigUpdate( } // Execute external config validators. - config_validators_.executeValidators(type_url_, decoded_resources, removed_resources); + config_validators_->executeValidators(type_url_, decoded_resources, removed_resources); // We just bundled up the updates into nice per-watch packages. Now, deliver them. for (const auto& [cur_watch, resource_to_add] : per_watch_added) { diff --git a/source/extensions/config_subscription/grpc/watch_map.h b/source/extensions/config_subscription/grpc/watch_map.h index 47cd43b4581f..07140804edee 100644 --- a/source/extensions/config_subscription/grpc/watch_map.h +++ b/source/extensions/config_subscription/grpc/watch_map.h @@ -73,7 +73,7 @@ struct Watch { class WatchMap : public UntypedConfigUpdateCallbacks, public Logger::Loggable { public: WatchMap(const bool use_namespace_matching, const std::string& type_url, - CustomConfigValidators& config_validators, EdsResourcesCacheOptRef eds_resources_cache) + CustomConfigValidators* config_validators, EdsResourcesCacheOptRef eds_resources_cache) : use_namespace_matching_(use_namespace_matching), type_url_(type_url), config_validators_(config_validators), eds_resources_cache_(eds_resources_cache) { // If eds resources cache is provided, then the type must be ClusterLoadAssignment. @@ -114,6 +114,10 @@ class WatchMap : public UntypedConfigUpdateCallbacks, public Logger::Loggable; template GrpcMuxImpl::GrpcMuxImpl(std::unique_ptr subscription_state_factory, GrpcMuxContext& grpc_mux_context, bool skip_subsequent_node) - : grpc_stream_(createGrpcStreamObject(grpc_mux_context)), + : dispatcher_(grpc_mux_context.dispatcher_), + grpc_stream_(createGrpcStreamObject(std::move(grpc_mux_context.async_client_), + std::move(grpc_mux_context.failover_async_client_), + grpc_mux_context.service_method_, grpc_mux_context.scope_, + std::move(grpc_mux_context.backoff_strategy_), + grpc_mux_context.rate_limit_settings_)), subscription_state_factory_(std::move(subscription_state_factory)), skip_subsequent_node_(skip_subsequent_node), local_info_(grpc_mux_context.local_info_), dynamic_update_callback_handle_( @@ -59,44 +64,46 @@ GrpcMuxImpl::GrpcMuxImpl(std::unique_ptr subscription_state_fac } template -std::unique_ptr> -GrpcMuxImpl::createGrpcStreamObject(GrpcMuxContext& grpc_mux_context) { +std::unique_ptr> GrpcMuxImpl::createGrpcStreamObject( + Grpc::RawAsyncClientPtr&& async_client, Grpc::RawAsyncClientPtr&& failover_async_client, + const Protobuf::MethodDescriptor& service_method, Stats::Scope& scope, + BackOffStrategyPtr&& backoff_strategy, const RateLimitSettings& rate_limit_settings) { if (Runtime::runtimeFeatureEnabled("envoy.restart_features.xds_failover_support")) { return std::make_unique>( /*primary_stream_creator=*/ - [&grpc_mux_context](GrpcStreamCallbacks* callbacks) -> GrpcStreamInterfacePtr { + [&async_client, &service_method, &dispatcher = dispatcher_, &scope, &backoff_strategy, + &rate_limit_settings]( + GrpcStreamCallbacks* callbacks) -> GrpcStreamInterfacePtr { return std::make_unique>( - callbacks, std::move(grpc_mux_context.async_client_), - grpc_mux_context.service_method_, grpc_mux_context.dispatcher_, - grpc_mux_context.scope_, std::move(grpc_mux_context.backoff_strategy_), - grpc_mux_context.rate_limit_settings_, + callbacks, std::move(async_client), service_method, dispatcher, scope, + std::move(backoff_strategy), rate_limit_settings, GrpcStream::ConnectedStateValue::FIRST_ENTRY); }, /*failover_stream_creator=*/ - grpc_mux_context.failover_async_client_ - ? absl::make_optional([&grpc_mux_context](GrpcStreamCallbacks* callbacks) - -> GrpcStreamInterfacePtr { - return std::make_unique>( - callbacks, std::move(grpc_mux_context.failover_async_client_), - grpc_mux_context.service_method_, grpc_mux_context.dispatcher_, - grpc_mux_context.scope_, - // TODO(adisuissa): the backoff strategy for the failover should - // be the same as the primary source. - std::make_unique( - GrpcMuxFailover::DefaultFailoverBackoffMilliseconds), - grpc_mux_context.rate_limit_settings_, - GrpcStream::ConnectedStateValue::SECOND_ENTRY); - }) + failover_async_client + ? absl::make_optional( + [&failover_async_client, &service_method, &dispatcher = dispatcher_, &scope, + &rate_limit_settings]( + GrpcStreamCallbacks* callbacks) -> GrpcStreamInterfacePtr { + return std::make_unique>( + callbacks, std::move(failover_async_client), service_method, dispatcher, + scope, + // TODO(adisuissa): the backoff strategy for the failover should + // be the same as the primary source. + std::make_unique( + GrpcMuxFailover::DefaultFailoverBackoffMilliseconds), + rate_limit_settings, GrpcStream::ConnectedStateValue::SECOND_ENTRY); + }) : absl::nullopt, /*grpc_mux_callbacks=*/*this, - /*dispatch=*/grpc_mux_context.dispatcher_); + /*dispatch=*/dispatcher_); } - return std::make_unique>( - this, std::move(grpc_mux_context.async_client_), grpc_mux_context.service_method_, - grpc_mux_context.dispatcher_, grpc_mux_context.scope_, - std::move(grpc_mux_context.backoff_strategy_), grpc_mux_context.rate_limit_settings_, - GrpcStream::ConnectedStateValue::FIRST_ENTRY); + return std::make_unique>(this, std::move(async_client), service_method, + dispatcher_, scope, std::move(backoff_strategy), + rate_limit_settings, + GrpcStream::ConnectedStateValue::FIRST_ENTRY); } + template GrpcMuxImpl::~GrpcMuxImpl() { AllMuxes::get().erase(this); } @@ -134,7 +141,7 @@ Config::GrpcMuxWatchPtr GrpcMuxImpl::addWatch( watch_map = watch_maps_ .emplace(type_url, std::make_unique(options.use_namespace_matching_, type_url, - *config_validators_.get(), resources_cache)) + config_validators_.get(), resources_cache)) .first; subscriptions_.emplace(type_url, subscription_state_factory_->makeSubscriptionState( type_url, *watch_maps_[type_url], resource_decoder, @@ -221,6 +228,40 @@ ScopedResume GrpcMuxImpl::pause(const std::vector typ }); } +template +absl::Status GrpcMuxImpl::updateMuxSource( + Grpc::RawAsyncClientPtr&& primary_async_client, Grpc::RawAsyncClientPtr&& failover_async_client, + CustomConfigValidatorsPtr&& custom_config_validators, Stats::Scope& scope, + BackOffStrategyPtr&& backoff_strategy, + const envoy::config::core::v3::ApiConfigSource& ads_config_source) { + // Process the rate limit settings. + absl::StatusOr rate_limit_settings_or_error = + Utility::parseRateLimitSettings(ads_config_source); + RETURN_IF_NOT_OK_REF(rate_limit_settings_or_error.status()); + + const Protobuf::MethodDescriptor& service_method = + *Protobuf::DescriptorPool::generated_pool()->FindMethodByName(methodName()); + + // Disconnect from current xDS servers. + ENVOY_LOG_MISC(info, "Replacing the xDS gRPC mux source"); + grpc_stream_->closeStream(); + grpc_stream_ = createGrpcStreamObject(std::move(primary_async_client), + std::move(failover_async_client), service_method, scope, + std::move(backoff_strategy), *rate_limit_settings_or_error); + + // Update the config validators. + config_validators_ = std::move(custom_config_validators); + // Update the watch map's config validators. + for (auto& [type_url, watch_map] : watch_maps_) { + watch_map->setConfigValidators(config_validators_.get()); + } + + // Start the subscriptions over the grpc_stream. + grpc_stream_->establishNewStream(); + + return absl::OkStatus(); +} + template void GrpcMuxImpl::sendGrpcMessage(RQ& msg_proto, S& sub_state) { if (sub_state.dynamicContextChanged() || !anyRequestSentYetInCurrentStream() || diff --git a/source/extensions/config_subscription/grpc/xds_mux/grpc_mux_impl.h b/source/extensions/config_subscription/grpc/xds_mux/grpc_mux_impl.h index 37f0c31f1729..46397351407b 100644 --- a/source/extensions/config_subscription/grpc/xds_mux/grpc_mux_impl.h +++ b/source/extensions/config_subscription/grpc/xds_mux/grpc_mux_impl.h @@ -105,6 +105,13 @@ class GrpcMuxImpl : public GrpcStreamCallbacks, genericHandleResponse(message->type_url(), *message, control_plane_stats); } + absl::Status + updateMuxSource(Grpc::RawAsyncClientPtr&& primary_async_client, + Grpc::RawAsyncClientPtr&& failover_async_client, + CustomConfigValidatorsPtr&& custom_config_validators, Stats::Scope& scope, + BackOffStrategyPtr&& backoff_strategy, + const envoy::config::core::v3::ApiConfigSource& ads_config_source) override; + EdsResourcesCacheOptRef edsResourcesCache() override { return makeOptRefFromPtr(eds_resources_cache_.get()); } @@ -165,12 +172,16 @@ class GrpcMuxImpl : public GrpcStreamCallbacks, } const LocalInfo::LocalInfo& localInfo() const { return local_info_; } + virtual absl::string_view methodName() const PURE; + private: // Helper function to create the grpc_stream_ object. // TODO(adisuissa): this should be removed when envoy.restart_features.xds_failover_support // is deprecated. - std::unique_ptr> - createGrpcStreamObject(GrpcMuxContext& grpc_mux_context); + std::unique_ptr> createGrpcStreamObject( + Grpc::RawAsyncClientPtr&& async_client, Grpc::RawAsyncClientPtr&& failover_async_client, + const Protobuf::MethodDescriptor& service_method, Stats::Scope& scope, + BackOffStrategyPtr&& backoff_strategy, const RateLimitSettings& rate_limit_settings); // Checks whether external conditions allow sending a DeltaDiscoveryRequest. (Does not check // whether we *want* to send a (Delta)DiscoveryRequest). @@ -187,6 +198,7 @@ class GrpcMuxImpl : public GrpcStreamCallbacks, // Invoked when dynamic context parameters change for a resource type. void onDynamicContextUpdate(absl::string_view resource_type_url); + Event::Dispatcher& dispatcher_; // Multiplexes the stream to the primary and failover sources. // TODO(adisuissa): Once envoy.restart_features.xds_failover_support is deprecated, // convert from unique_ptr to GrpcMuxFailover directly. @@ -248,6 +260,11 @@ class GrpcMuxDelta : public GrpcMuxImpl& for_update) override; + +private: + absl::string_view methodName() const override { + return "envoy.service.discovery.v3.AggregatedDiscoveryService.DeltaAggregatedResources"; + } }; class GrpcMuxSotw : public GrpcMuxImpl&) override { ENVOY_BUG(false, "unexpected request for on demand update"); } + +private: + absl::string_view methodName() const override { + return "envoy.service.discovery.v3.AggregatedDiscoveryService.StreamAggregatedResources"; + } }; class NullGrpcMuxImpl : public GrpcMux { @@ -277,6 +299,12 @@ class NullGrpcMuxImpl : public GrpcMux { SubscriptionCallbacks&, OpaqueResourceDecoderSharedPtr, const SubscriptionOptions&) override; + absl::Status updateMuxSource(Grpc::RawAsyncClientPtr&&, Grpc::RawAsyncClientPtr&&, + CustomConfigValidatorsPtr&&, Stats::Scope&, BackOffStrategyPtr&&, + const envoy::config::core::v3::ApiConfigSource&) override { + return absl::UnimplementedError(""); + } + void requestOnDemandUpdate(const std::string&, const absl::flat_hash_set&) override { ENVOY_BUG(false, "unexpected request for on demand update"); } diff --git a/test/extensions/config_subscription/grpc/BUILD b/test/extensions/config_subscription/grpc/BUILD index b0f9ce75ed79..466a14269bb0 100644 --- a/test/extensions/config_subscription/grpc/BUILD +++ b/test/extensions/config_subscription/grpc/BUILD @@ -32,6 +32,7 @@ envoy_cc_test( "//test/test_common:logging_lib", "//test/test_common:resources_lib", "//test/test_common:simulated_time_system_lib", + "//test/test_common:status_utility_lib", "//test/test_common:test_runtime_lib", "//test/test_common:utility_lib", "@envoy_api//envoy/config/endpoint/v3:pkg_cc_proto", @@ -64,6 +65,7 @@ envoy_cc_test( "//test/test_common:logging_lib", "//test/test_common:resources_lib", "//test/test_common:simulated_time_system_lib", + "//test/test_common:status_utility_lib", "//test/test_common:test_runtime_lib", "//test/test_common:utility_lib", "@envoy_api//envoy/config/endpoint/v3:pkg_cc_proto", @@ -180,6 +182,7 @@ envoy_cc_test( "//test/test_common:logging_lib", "//test/test_common:resources_lib", "//test/test_common:simulated_time_system_lib", + "//test/test_common:status_utility_lib", "//test/test_common:test_runtime_lib", "//test/test_common:utility_lib", "@envoy_api//envoy/api/v2:pkg_cc_proto", diff --git a/test/extensions/config_subscription/grpc/grpc_mux_failover_test.cc b/test/extensions/config_subscription/grpc/grpc_mux_failover_test.cc index 8a990f134fcb..2b611e1c7b51 100644 --- a/test/extensions/config_subscription/grpc/grpc_mux_failover_test.cc +++ b/test/extensions/config_subscription/grpc/grpc_mux_failover_test.cc @@ -15,7 +15,6 @@ using testing::Return; namespace Envoy { namespace Config { -namespace { // Validates that if no failover is set, then all actions are essentially a pass // through. @@ -237,6 +236,13 @@ class GrpcMuxFailoverTest : public testing::Test { failover_callbacks_->onDiscoveryResponse(std::move(response), cp_stats); } + void invokeCloseStream() { + // A wrapper that invokes closeStream(). It is needed because closeStream() + // is a private method, and while this class is a friend for GrpcMuxFailover, + // the tests cannot invoke the method directly. + grpc_mux_failover_->closeStream(); + } + // Override a timer to emulate its expiration without waiting for it to expire. NiceMock dispatcher_; Event::MockTimer* timer_; @@ -626,26 +632,38 @@ TEST_F(GrpcMuxFailoverTest, OnWriteableConnectedToPrimaryInvoked) { // Validates that when connected to primary, a subsequent call to establishNewStream // will not try to recreate the stream. TEST_F(GrpcMuxFailoverTest, NoRecreateStreamWhenConnectedToPrimary) { - // Validate connected to primary. - { - connectToPrimary(); - EXPECT_CALL(primary_stream_, establishNewStream()).Times(0); - EXPECT_CALL(failover_stream_, establishNewStream()).Times(0); - grpc_mux_failover_->establishNewStream(); - } + connectToPrimary(); + EXPECT_CALL(primary_stream_, establishNewStream()).Times(0); + EXPECT_CALL(failover_stream_, establishNewStream()).Times(0); + grpc_mux_failover_->establishNewStream(); } // Validates that when connected to failover, a subsequent call to establishNewStream // will not try to recreate the stream. TEST_F(GrpcMuxFailoverTest, NoRecreateStreamWhenConnectedToFailover) { - // Validate connected to failover. - { - connectToFailover(); - EXPECT_CALL(primary_stream_, establishNewStream()).Times(0); - EXPECT_CALL(failover_stream_, establishNewStream()).Times(0); - grpc_mux_failover_->establishNewStream(); - } + connectToFailover(); + EXPECT_CALL(primary_stream_, establishNewStream()).Times(0); + EXPECT_CALL(failover_stream_, establishNewStream()).Times(0); + grpc_mux_failover_->establishNewStream(); +} + +// Validates that closing the stream when connected to primary closes the +// primary stream. +TEST_F(GrpcMuxFailoverTest, CloseStreamWhenConnectedToPrimary) { + connectToPrimary(); + EXPECT_CALL(primary_stream_, closeStream()); + EXPECT_CALL(failover_stream_, closeStream()).Times(0); + invokeCloseStream(); } -} // namespace + +// Validates that closing the stream when connected to failover closes the +// failover stream. +TEST_F(GrpcMuxFailoverTest, CloseStreamWhenConnectedToFailover) { + connectToFailover(); + EXPECT_CALL(primary_stream_, closeStream()).Times(0); + EXPECT_CALL(failover_stream_, closeStream()); + invokeCloseStream(); +} + } // namespace Config } // namespace Envoy diff --git a/test/extensions/config_subscription/grpc/grpc_mux_impl_test.cc b/test/extensions/config_subscription/grpc/grpc_mux_impl_test.cc index df541cd6bf98..5108a0502afd 100644 --- a/test/extensions/config_subscription/grpc/grpc_mux_impl_test.cc +++ b/test/extensions/config_subscription/grpc/grpc_mux_impl_test.cc @@ -27,6 +27,7 @@ #include "test/test_common/logging.h" #include "test/test_common/resources.h" #include "test/test_common/simulated_time_system.h" +#include "test/test_common/status_utility.h" #include "test/test_common/test_runtime.h" #include "test/test_common/test_time.h" #include "test/test_common/utility.h" @@ -95,7 +96,8 @@ class GrpcMuxImplTestBase : public testing::TestWithParam { const std::vector& resource_names, const std::string& version, bool first = false, const std::string& nonce = "", const Protobuf::int32 error_code = Grpc::Status::WellKnownGrpcStatus::Ok, - const std::string& error_message = "") { + const std::string& error_message = "", + Grpc::MockAsyncStream* async_stream = nullptr) { envoy::service::discovery::v3::DiscoveryRequest expected_request; if (first) { expected_request.mutable_node()->CopyFrom(local_info_.node()); @@ -113,7 +115,8 @@ class GrpcMuxImplTestBase : public testing::TestWithParam { error_detail->set_code(error_code); error_detail->set_message(error_message); } - EXPECT_CALL(async_stream_, sendMessageRaw_(Grpc::ProtoBufferEq(expected_request), false)); + EXPECT_CALL(async_stream ? *async_stream : async_stream_, + sendMessageRaw_(Grpc::ProtoBufferEq(expected_request), false)); } TestScopedRuntime scoped_runtime_; @@ -122,6 +125,9 @@ class GrpcMuxImplTestBase : public testing::TestWithParam { NiceMock local_info_; Grpc::MockAsyncClient* async_client_; Grpc::MockAsyncStream async_stream_; + // Used for tests invoking updateMuxSource(). + Grpc::MockAsyncClient* replaced_async_client_; + Grpc::MockAsyncStream replaced_async_stream_; CustomConfigValidatorsPtr config_validators_; GrpcMuxImplPtr grpc_mux_; NiceMock callbacks_; @@ -1346,6 +1352,172 @@ TEST_P(GrpcMuxImplTest, RemoveCachedResourceOnLastSubscription) { EXPECT_CALL(*eds_resources_cache_, removeResource("x")); } +// Updating the mux object while being connected sends the correct requests. +TEST_P(GrpcMuxImplTest, MuxDynamicReplacementWhenConnected) { + replaced_async_client_ = new Grpc::MockAsyncClient(); + setup(); + InSequence s; + + auto foo_sub = grpc_mux_->addWatch("type_url_foo", {"x", "y"}, callbacks_, resource_decoder_, {}); + auto bar_sub = grpc_mux_->addWatch("type_url_bar", {}, callbacks_, resource_decoder_, {}); + EXPECT_CALL(*async_client_, startRaw(_, _, _, _)).WillOnce(Return(&async_stream_)); + expectSendMessage("type_url_foo", {"x", "y"}, "", true); + expectSendMessage("type_url_bar", {}, ""); + grpc_mux_->start(); + EXPECT_EQ(1, control_plane_connected_state_.value()); + + // Switch the mux. + envoy::config::core::v3::ApiConfigSource empty_ads_config; + // Expect a disconnect from the original async_client and stream. + EXPECT_CALL(async_stream_, resetStream()); + // Expect establishing connection to the new client and stream. + EXPECT_CALL(*replaced_async_client_, startRaw(_, _, _, _)) + .WillOnce(Return(&replaced_async_stream_)); + // Expect the initial messages to be sent to the new stream. + expectSendMessage("type_url_foo", {"x", "y"}, "", true, "", Grpc::Status::WellKnownGrpcStatus::Ok, + "", &replaced_async_stream_); + expectSendMessage("type_url_bar", {}, "", false, "", Grpc::Status::WellKnownGrpcStatus::Ok, "", + &replaced_async_stream_); + EXPECT_OK(grpc_mux_->updateMuxSource( + /*primary_async_client=*/std::unique_ptr(replaced_async_client_), + /*failover_async_client=*/nullptr, + /*custom_config_validators=*/nullptr, + /*scope=*/*stats_.rootScope(), + /*backoff_strategy=*/ + std::make_unique( + SubscriptionFactory::RetryInitialDelayMs, SubscriptionFactory::RetryMaxDelayMs, random_), + empty_ads_config)); + // Ending test, removing subscriptions for type_url_foo. + expectSendMessage("type_url_foo", {}, "", false, "", Grpc::Status::WellKnownGrpcStatus::Ok, "", + &replaced_async_stream_); +} + +// Updating the mux object after receiving a response, sends the correct requests. +TEST_P(GrpcMuxImplTest, MuxDynamicReplacementFetchingResources) { + replaced_async_client_ = new Grpc::MockAsyncClient(); + setup(); + InSequence s; + + const std::string& type_url = Config::TypeUrl::get().ClusterLoadAssignment; + OpaqueResourceDecoderSharedPtr resource_decoder( + std::make_shared>("cluster_name")); + auto foo_sub = grpc_mux_->addWatch(type_url, {"x", "y"}, callbacks_, resource_decoder, {}); + EXPECT_CALL(*async_client_, startRaw(_, _, _, _)).WillOnce(Return(&async_stream_)); + expectSendMessage(type_url, {"x", "y"}, "", true); + grpc_mux_->start(); + + // Send back a response for one of the resources. + { + auto response = std::make_unique(); + response->set_type_url(type_url); + response->set_version_info("1"); + envoy::config::endpoint::v3::ClusterLoadAssignment load_assignment; + load_assignment.set_cluster_name("x"); + response->add_resources()->PackFrom(load_assignment); + EXPECT_CALL(callbacks_, onConfigUpdate(_, "1")) + .WillOnce(Invoke([&load_assignment](const std::vector& resources, + const std::string&) { + EXPECT_EQ(1, resources.size()); + const auto& expected_assignment = + dynamic_cast( + resources[0].get().resource()); + EXPECT_TRUE(TestUtility::protoEqual(expected_assignment, load_assignment)); + return absl::OkStatus(); + })); + expectSendMessage(type_url, {"x", "y"}, "1"); + grpc_mux_->grpcStreamForTest().onReceiveMessage(std::move(response)); + } + + // Switch the mux. + envoy::config::core::v3::ApiConfigSource empty_ads_config; + // Expect a disconnect from the original async_client and stream. + EXPECT_CALL(async_stream_, resetStream()); + // Expect establishing connection to the new client and stream. + EXPECT_CALL(*replaced_async_client_, startRaw(_, _, _, _)) + .WillOnce(Return(&replaced_async_stream_)); + // Expect the initial message to be sent to the new stream. + expectSendMessage(type_url, {"x", "y"}, "1", true, "", Grpc::Status::WellKnownGrpcStatus::Ok, "", + &replaced_async_stream_); + EXPECT_OK(grpc_mux_->updateMuxSource( + /*primary_async_client=*/std::unique_ptr(replaced_async_client_), + /*failover_async_client=*/nullptr, + /*custom_config_validators=*/std::make_unique>(), + /*scope=*/*stats_.rootScope(), + /*backoff_strategy=*/ + std::make_unique( + SubscriptionFactory::RetryInitialDelayMs, SubscriptionFactory::RetryMaxDelayMs, random_), + empty_ads_config)); + + // Send a response to resource "y" on the replaced mux. + { + auto response = std::make_unique(); + response->set_type_url(type_url); + response->set_version_info("2"); + envoy::config::endpoint::v3::ClusterLoadAssignment load_assignment; + load_assignment.set_cluster_name("y"); + response->add_resources()->PackFrom(load_assignment); + EXPECT_CALL(callbacks_, onConfigUpdate(_, "2")) + .WillOnce(Invoke([&load_assignment](const std::vector& resources, + const std::string&) { + EXPECT_EQ(1, resources.size()); + const auto& expected_assignment = + dynamic_cast( + resources[0].get().resource()); + EXPECT_TRUE(TestUtility::protoEqual(expected_assignment, load_assignment)); + return absl::OkStatus(); + })); + expectSendMessage(type_url, {"x", "y"}, "2", false, "", Grpc::Status::WellKnownGrpcStatus::Ok, + "", &replaced_async_stream_); + grpc_mux_->grpcStreamForTest().onReceiveMessage(std::move(response)); + } + + // Ending test, removing subscriptions for the subscription. + expectSendMessage(type_url, {}, "2", false, "", Grpc::Status::WellKnownGrpcStatus::Ok, "", + &replaced_async_stream_); +} + +// Updating the mux object with wrong rate limit settings is rejected. +TEST_P(GrpcMuxImplTest, RejectMuxDynamicReplacementRateLimitSettingsError) { + replaced_async_client_ = new Grpc::MockAsyncClient(); + setup(); + InSequence s; + + auto foo_sub = grpc_mux_->addWatch("type_url_foo", {"x", "y"}, callbacks_, resource_decoder_, {}); + auto bar_sub = grpc_mux_->addWatch("type_url_bar", {}, callbacks_, resource_decoder_, {}); + EXPECT_CALL(*async_client_, startRaw(_, _, _, _)).WillOnce(Return(&async_stream_)); + expectSendMessage("type_url_foo", {"x", "y"}, "", true); + expectSendMessage("type_url_bar", {}, ""); + grpc_mux_->start(); + EXPECT_EQ(1, control_plane_connected_state_.value()); + + // Switch the mux. + envoy::config::core::v3::ApiConfigSource ads_config_wrong_settings; + envoy::config::core::v3::RateLimitSettings* rate_limits = + ads_config_wrong_settings.mutable_rate_limit_settings(); + rate_limits->mutable_max_tokens()->set_value(500); + rate_limits->mutable_fill_rate()->set_value(std::numeric_limits::quiet_NaN()); + // No disconnect and replacement of the original async_client. + EXPECT_CALL(async_stream_, resetStream()).Times(0); + EXPECT_CALL(*replaced_async_client_, startRaw(_, _, _, _)).Times(0); + EXPECT_FALSE(grpc_mux_ + ->updateMuxSource( + /*primary_async_client=*/std::unique_ptr( + replaced_async_client_), + /*failover_async_client=*/nullptr, + /*custom_config_validators=*/nullptr, + /*scope=*/*stats_.rootScope(), + /*backoff_strategy=*/ + std::make_unique( + SubscriptionFactory::RetryInitialDelayMs, + SubscriptionFactory::RetryMaxDelayMs, random_), + ads_config_wrong_settings) + .ok()); + // Ending test, removing subscriptions for type_url_foo. + expectSendMessage("type_url_foo", {}, "", false, "", Grpc::Status::WellKnownGrpcStatus::Ok, "", + &async_stream_); +} + /** * Tests the NullGrpcMuxImpl object to increase code-coverage. */ @@ -1399,6 +1571,14 @@ TEST_F(NullGrpcMuxImplTest, OnDiscoveryResponseImplemented) { EXPECT_NO_THROW(null_mux_.onDiscoveryResponse(std::move(response), cp_stats)); } +TEST_F(NullGrpcMuxImplTest, UpdateMuxSourceError) { + Stats::TestUtil::TestStore stats; + const envoy::config::core::v3::ApiConfigSource empty_config; + const absl::Status status = null_mux_.updateMuxSource(nullptr, nullptr, nullptr, + *stats.rootScope(), nullptr, empty_config); + EXPECT_EQ(status.code(), absl::StatusCode::kUnimplemented); +} + TEST(GrpcMuxFactoryTest, InvalidRateLimit) { auto* factory = Config::Utility::getFactoryByName("envoy.config_mux.grpc_mux_factory"); diff --git a/test/extensions/config_subscription/grpc/new_grpc_mux_impl_test.cc b/test/extensions/config_subscription/grpc/new_grpc_mux_impl_test.cc index 5c27473e03f0..b787557025c3 100644 --- a/test/extensions/config_subscription/grpc/new_grpc_mux_impl_test.cc +++ b/test/extensions/config_subscription/grpc/new_grpc_mux_impl_test.cc @@ -28,6 +28,7 @@ #include "test/test_common/logging.h" #include "test/test_common/resources.h" #include "test/test_common/simulated_time_system.h" +#include "test/test_common/status_utility.h" #include "test/test_common/test_runtime.h" #include "test/test_common/test_time.h" #include "test/test_common/utility.h" @@ -101,7 +102,8 @@ class NewGrpcMuxImplTestBase : public testing::TestWithParam& initial_resource_versions = {}) { + const std::map& initial_resource_versions = {}, + Grpc::MockAsyncStream* async_stream = nullptr) { API_NO_BOOST(envoy::service::discovery::v3::DeltaDiscoveryRequest) expected_request; expected_request.mutable_node()->CopyFrom(local_info_.node()); for (const auto& resource : resource_names_subscribe) { @@ -120,7 +122,8 @@ class NewGrpcMuxImplTestBase : public testing::TestWithParamset_code(error_code); error_detail->set_message(error_message); } - EXPECT_CALL(async_stream_, sendMessageRaw_(Grpc::ProtoBufferEq(expected_request), false)); + EXPECT_CALL(async_stream ? *async_stream : async_stream_, + sendMessageRaw_(Grpc::ProtoBufferEq(expected_request), false)); } void remoteClose() { @@ -176,6 +179,9 @@ class NewGrpcMuxImplTestBase : public testing::TestWithParam random_; Grpc::MockAsyncClient* async_client_; NiceMock async_stream_; + // Used for tests invoking updateMuxSource(). + Grpc::MockAsyncClient* replaced_async_client_; + Grpc::MockAsyncStream replaced_async_stream_; CustomConfigValidatorsPtr config_validators_; NiceMock local_info_; std::unique_ptr grpc_mux_; @@ -795,6 +801,175 @@ TEST_P(NewGrpcMuxImplTest, AddRemoveSubscriptions) { } } +// Updating the mux object while being connected sends the correct requests. +TEST_P(NewGrpcMuxImplTest, MuxDynamicReplacementWhenConnected) { + replaced_async_client_ = new Grpc::MockAsyncClient(); + setup(); + InSequence s; + + auto foo_sub = grpc_mux_->addWatch("type_url_foo", {"x", "y"}, callbacks_, resource_decoder_, {}); + auto bar_sub = grpc_mux_->addWatch("type_url_bar", {}, callbacks_, resource_decoder_, {}); + EXPECT_CALL(*async_client_, startRaw(_, _, _, _)).WillOnce(Return(&async_stream_)); + expectSendMessage("type_url_foo", {"x", "y"}, {}); + expectSendMessage("type_url_bar", {}, {}); + grpc_mux_->start(); + EXPECT_EQ(1, control_plane_connected_state_.value()); + + // Switch the mux. + envoy::config::core::v3::ApiConfigSource empty_ads_config; + // Expect a disconnect from the original async_client and stream. + EXPECT_CALL(async_stream_, resetStream()); + // Expect establishing connection to the new client and stream. + EXPECT_CALL(*replaced_async_client_, startRaw(_, _, _, _)) + .WillOnce(Return(&replaced_async_stream_)); + // Expect the initial messages to be sent to the new stream. + expectSendMessage("type_url_foo", {"x", "y"}, {}, "", Grpc::Status::WellKnownGrpcStatus::Ok, "", + {}, &replaced_async_stream_); + expectSendMessage("type_url_bar", {}, {}, "", Grpc::Status::WellKnownGrpcStatus::Ok, "", {}, + &replaced_async_stream_); + EXPECT_OK(grpc_mux_->updateMuxSource( + /*primary_async_client=*/std::unique_ptr(replaced_async_client_), + /*failover_async_client=*/nullptr, + /*custom_config_validators=*/nullptr, + /*scope=*/*stats_.rootScope(), + /*backoff_strategy=*/ + std::make_unique( + SubscriptionFactory::RetryInitialDelayMs, SubscriptionFactory::RetryMaxDelayMs, random_), + empty_ads_config)); + // Ending test, removing subscriptions for type_url_foo. + expectSendMessage("type_url_foo", {}, {"x", "y"}, "", Grpc::Status::WellKnownGrpcStatus::Ok, "", + {}, &replaced_async_stream_); +} + +// Updating the mux object after receiving a response, sends the correct requests. +TEST_P(NewGrpcMuxImplTest, MuxDynamicReplacementFetchingResources) { + replaced_async_client_ = new Grpc::MockAsyncClient(); + setup(); + InSequence s; + + const std::string& type_url = Config::TypeUrl::get().ClusterLoadAssignment; + auto foo_sub = grpc_mux_->addWatch(type_url, {"x", "y"}, callbacks_, resource_decoder_, {}); + EXPECT_CALL(*async_client_, startRaw(_, _, _, _)).WillOnce(Return(&async_stream_)); + expectSendMessage(type_url, {"x", "y"}, {}); + grpc_mux_->start(); + + // Send back a response for one of the resources. + { + auto response = std::make_unique(); + response->set_type_url(type_url); + response->set_system_version_info("1"); + response->set_nonce("n1"); + envoy::config::endpoint::v3::ClusterLoadAssignment load_assignment; + load_assignment.set_cluster_name("x"); + auto* resource = response->add_resources(); + resource->set_name("x"); + resource->mutable_resource()->PackFrom(load_assignment); + resource->set_version("x1"); + EXPECT_CALL(callbacks_, onConfigUpdate(_, _, "1")) + .WillOnce(Invoke([&load_assignment](const std::vector& added_resources, + const Protobuf::RepeatedPtrField&, + const std::string&) { + EXPECT_EQ(1, added_resources.size()); + EXPECT_TRUE( + TestUtility::protoEqual(added_resources[0].get().resource(), load_assignment)); + return absl::OkStatus(); + })); + expectSendMessage(type_url, {}, {}, "n1"); + onDiscoveryResponse(std::move(response)); + } + + // Switch the mux. + envoy::config::core::v3::ApiConfigSource empty_ads_config; + // Expect a disconnect from the original async_client and stream. + EXPECT_CALL(async_stream_, resetStream()); + // Expect establishing connection to the new client and stream. + EXPECT_CALL(*replaced_async_client_, startRaw(_, _, _, _)) + .WillOnce(Return(&replaced_async_stream_)); + // Expect the initial message to be sent to the new stream. + // It will include "x" in its initial_resource_versions. + expectSendMessage(type_url, {"x", "y"}, {}, "", Grpc::Status::WellKnownGrpcStatus::Ok, "", + {{"x", "x1"}}, &replaced_async_stream_); + EXPECT_OK(grpc_mux_->updateMuxSource( + /*primary_async_client=*/std::unique_ptr(replaced_async_client_), + /*failover_async_client=*/nullptr, + /*custom_config_validators=*/std::make_unique>(), + /*scope=*/*stats_.rootScope(), + /*backoff_strategy=*/ + std::make_unique( + SubscriptionFactory::RetryInitialDelayMs, SubscriptionFactory::RetryMaxDelayMs, random_), + empty_ads_config)); + + // Send a response to resource "y" on the replaced mux. + { + auto response = std::make_unique(); + response->set_type_url(type_url); + response->set_system_version_info("2"); + response->set_nonce("n2"); + envoy::config::endpoint::v3::ClusterLoadAssignment load_assignment; + load_assignment.set_cluster_name("y"); + auto* resource = response->add_resources(); + resource->set_name("y"); + resource->mutable_resource()->PackFrom(load_assignment); + resource->set_version("y1"); + EXPECT_CALL(callbacks_, onConfigUpdate(_, _, "2")) + .WillOnce(Invoke([&load_assignment](const std::vector& added_resources, + const Protobuf::RepeatedPtrField&, + const std::string&) { + EXPECT_EQ(1, added_resources.size()); + EXPECT_TRUE( + TestUtility::protoEqual(added_resources[0].get().resource(), load_assignment)); + return absl::OkStatus(); + })); + expectSendMessage(type_url, {}, {}, "n2", Grpc::Status::WellKnownGrpcStatus::Ok, "", {}, + &replaced_async_stream_); + onDiscoveryResponse(std::move(response)); + } + + // Ending test, removing subscriptions for the subscription. + expectSendMessage(type_url, {}, {"x", "y"}, "", Grpc::Status::WellKnownGrpcStatus::Ok, "", {}, + &replaced_async_stream_); +} + +// Updating the mux object with wrong rate limit settings is rejected. +TEST_P(NewGrpcMuxImplTest, RejectMuxDynamicReplacementRateLimitSettingsError) { + replaced_async_client_ = new Grpc::MockAsyncClient(); + setup(); + InSequence s; + + const std::string& type_url = Config::TypeUrl::get().ClusterLoadAssignment; + auto foo_sub = grpc_mux_->addWatch(type_url, {"x", "y"}, callbacks_, resource_decoder_, {}); + EXPECT_CALL(*async_client_, startRaw(_, _, _, _)).WillOnce(Return(&async_stream_)); + expectSendMessage(type_url, {"x", "y"}, {}); + grpc_mux_->start(); + EXPECT_EQ(1, control_plane_connected_state_.value()); + + // Switch the mux. + envoy::config::core::v3::ApiConfigSource ads_config_wrong_settings; + envoy::config::core::v3::RateLimitSettings* rate_limits = + ads_config_wrong_settings.mutable_rate_limit_settings(); + rate_limits->mutable_max_tokens()->set_value(500); + rate_limits->mutable_fill_rate()->set_value(std::numeric_limits::quiet_NaN()); + // No disconnect and replacement of the original async_client. + EXPECT_CALL(async_stream_, resetStream()).Times(0); + EXPECT_CALL(*replaced_async_client_, startRaw(_, _, _, _)).Times(0); + EXPECT_FALSE(grpc_mux_ + ->updateMuxSource( + /*primary_async_client=*/std::unique_ptr( + replaced_async_client_), + /*failover_async_client=*/nullptr, + /*custom_config_validators=*/nullptr, + /*scope=*/*stats_.rootScope(), + /*backoff_strategy=*/ + std::make_unique( + SubscriptionFactory::RetryInitialDelayMs, + SubscriptionFactory::RetryMaxDelayMs, random_), + ads_config_wrong_settings) + .ok()); + // Ending test, removing subscriptions for type_url_foo. + expectSendMessage(type_url, {}, {"x", "y"}, "", Grpc::Status::WellKnownGrpcStatus::Ok, "", {}, + &async_stream_); +} + TEST(NewGrpcMuxFactoryTest, InvalidRateLimit) { auto* factory = Config::Utility::getFactoryByName( "envoy.config_mux.new_grpc_mux_factory"); diff --git a/test/extensions/config_subscription/grpc/watch_map_test.cc b/test/extensions/config_subscription/grpc/watch_map_test.cc index 4186b3d8b8d7..014a1221b0c4 100644 --- a/test/extensions/config_subscription/grpc/watch_map_test.cc +++ b/test/extensions/config_subscription/grpc/watch_map_test.cc @@ -133,7 +133,7 @@ TEST(WatchMapTest, Basic) { TestUtility::TestOpaqueResourceDecoderImpl resource_decoder("cluster_name"); NiceMock config_validators; - WatchMap watch_map(false, "ClusterLoadAssignmentType", config_validators, {}); + WatchMap watch_map(false, "ClusterLoadAssignmentType", &config_validators, {}); Watch* watch = watch_map.addWatch(callbacks, resource_decoder); { @@ -207,7 +207,7 @@ TEST(WatchMapTest, Overlap) { TestUtility::TestOpaqueResourceDecoderImpl resource_decoder("cluster_name"); NiceMock config_validators; - WatchMap watch_map(false, "ClusterLoadAssignmentType", config_validators, {}); + WatchMap watch_map(false, "ClusterLoadAssignmentType", &config_validators, {}); Watch* watch1 = watch_map.addWatch(callbacks1, resource_decoder); Watch* watch2 = watch_map.addWatch(callbacks2, resource_decoder); @@ -276,7 +276,7 @@ TEST(WatchMapTest, CacheResourceAddResource) { NiceMock eds_resources_cache; const std::string eds_type_url = Config::getTypeUrl(); - WatchMap watch_map(false, eds_type_url, config_validators, + WatchMap watch_map(false, eds_type_url, &config_validators, makeOptRef(eds_resources_cache)); // The test uses 2 watchers to ensure that interest is kept regardless of // which watcher was the first to add a watch for the assignment. @@ -357,7 +357,7 @@ TEST(WatchMapTest, CacheResourceAddResource) { // WatchMap defers deletes and doesn't crash. class SameWatchRemoval : public testing::Test { public: - SameWatchRemoval() : watch_map_(false, "ClusterLoadAssignmentType", config_validators, {}) {} + SameWatchRemoval() : watch_map_(false, "ClusterLoadAssignmentType", &config_validators, {}) {} void SetUp() override { envoy::config::endpoint::v3::ClusterLoadAssignment alice; @@ -437,7 +437,7 @@ TEST(WatchMapTest, AddRemoveAdd) { TestUtility::TestOpaqueResourceDecoderImpl resource_decoder("cluster_name"); NiceMock config_validators; - WatchMap watch_map(false, "ClusterLoadAssignmentType", config_validators, {}); + WatchMap watch_map(false, "ClusterLoadAssignmentType", &config_validators, {}); Watch* watch1 = watch_map.addWatch(callbacks1, resource_decoder); Watch* watch2 = watch_map.addWatch(callbacks2, resource_decoder); @@ -494,7 +494,7 @@ TEST(WatchMapTest, UninterestingUpdate) { TestUtility::TestOpaqueResourceDecoderImpl resource_decoder("cluster_name"); NiceMock config_validators; - WatchMap watch_map(false, "ClusterLoadAssignmentType", config_validators, {}); + WatchMap watch_map(false, "ClusterLoadAssignmentType", &config_validators, {}); Watch* watch = watch_map.addWatch(callbacks, resource_decoder); watch_map.updateWatchInterest(watch, {"alice"}); @@ -539,7 +539,7 @@ TEST(WatchMapTest, WatchingEverything) { TestUtility::TestOpaqueResourceDecoderImpl resource_decoder("cluster_name"); NiceMock config_validators; - WatchMap watch_map(false, "ClusterLoadAssignmentType", config_validators, {}); + WatchMap watch_map(false, "ClusterLoadAssignmentType", &config_validators, {}); /*Watch* watch1 = */ watch_map.addWatch(callbacks1, resource_decoder); Watch* watch2 = watch_map.addWatch(callbacks2, resource_decoder); // watch1 never specifies any names, and so is treated as interested in everything. @@ -576,7 +576,7 @@ TEST(WatchMapTest, DeltaOnConfigUpdate) { TestUtility::TestOpaqueResourceDecoderImpl resource_decoder("cluster_name"); NiceMock config_validators; - WatchMap watch_map(false, "ClusterLoadAssignmentType", config_validators, {}); + WatchMap watch_map(false, "ClusterLoadAssignmentType", &config_validators, {}); Watch* watch1 = watch_map.addWatch(callbacks1, resource_decoder); Watch* watch2 = watch_map.addWatch(callbacks2, resource_decoder); Watch* watch3 = watch_map.addWatch(callbacks3, resource_decoder); @@ -610,7 +610,7 @@ TEST(WatchMapTest, DeltaOnConfigUpdate) { TEST(WatchMapTest, OnConfigUpdateFailed) { NiceMock config_validators; - WatchMap watch_map(false, "ClusterLoadAssignmentType", config_validators, {}); + WatchMap watch_map(false, "ClusterLoadAssignmentType", &config_validators, {}); // calling on empty map doesn't break watch_map.onConfigUpdateFailed(ConfigUpdateFailureReason::UpdateRejected, nullptr); @@ -632,7 +632,7 @@ TEST(WatchMapTest, OnConfigUpdateXdsTpGlobCollections) { TestUtility::TestOpaqueResourceDecoderImpl resource_decoder("cluster_name"); NiceMock config_validators; - WatchMap watch_map(false, "ClusterLoadAssignmentType", config_validators, {}); + WatchMap watch_map(false, "ClusterLoadAssignmentType", &config_validators, {}); Watch* watch = watch_map.addWatch(callbacks, resource_decoder); watch_map.updateWatchInterest(watch, {"xdstp://foo/bar/baz/*?some=thing&thing=some"}); @@ -677,7 +677,7 @@ TEST(WatchMapTest, OnConfigUpdateXdsTpSingletons) { TestUtility::TestOpaqueResourceDecoderImpl resource_decoder("cluster_name"); NiceMock config_validators; - WatchMap watch_map(false, "ClusterLoadAssignmentType", config_validators, {}); + WatchMap watch_map(false, "ClusterLoadAssignmentType", &config_validators, {}); Watch* watch = watch_map.addWatch(callbacks, resource_decoder); watch_map.updateWatchInterest(watch, {"xdstp://foo/bar/baz?some=thing&thing=some"}); @@ -718,7 +718,7 @@ TEST(WatchMapTest, OnConfigUpdateUsingNamespaces) { TestUtility::TestOpaqueResourceDecoderImpl resource_decoder("cluster_name"); NiceMock config_validators; - WatchMap watch_map(true, "ClusterLoadAssignmentType", config_validators, {}); + WatchMap watch_map(true, "ClusterLoadAssignmentType", &config_validators, {}); Watch* watch1 = watch_map.addWatch(callbacks1, resource_decoder); Watch* watch2 = watch_map.addWatch(callbacks2, resource_decoder); Watch* watch3 = watch_map.addWatch(callbacks3, resource_decoder); diff --git a/test/extensions/config_subscription/grpc/xds_grpc_mux_impl_test.cc b/test/extensions/config_subscription/grpc/xds_grpc_mux_impl_test.cc index 123ffd52a56d..f81b1da3ff94 100644 --- a/test/extensions/config_subscription/grpc/xds_grpc_mux_impl_test.cc +++ b/test/extensions/config_subscription/grpc/xds_grpc_mux_impl_test.cc @@ -26,6 +26,7 @@ #include "test/test_common/logging.h" #include "test/test_common/resources.h" #include "test/test_common/simulated_time_system.h" +#include "test/test_common/status_utility.h" #include "test/test_common/test_runtime.h" #include "test/test_common/test_time.h" #include "test/test_common/utility.h" @@ -94,7 +95,8 @@ class GrpcMuxImplTestBase : public testing::TestWithParam { const std::vector& resource_names, const std::string& version, bool first = false, const std::string& nonce = "", const Protobuf::int32 error_code = Grpc::Status::WellKnownGrpcStatus::Ok, - const std::string& error_message = "") { + const std::string& error_message = "", + Grpc::MockAsyncStream* async_stream = nullptr) { envoy::service::discovery::v3::DiscoveryRequest expected_request; if (first) { expected_request.mutable_node()->CopyFrom(local_info_.node()); @@ -113,7 +115,7 @@ class GrpcMuxImplTestBase : public testing::TestWithParam { error_detail->set_message(error_message); } EXPECT_CALL( - async_stream_, + async_stream ? *async_stream : async_stream_, sendMessageRaw_(Grpc::ProtoBufferEqIgnoreRepeatedFieldOrdering(expected_request), false)); } @@ -134,6 +136,9 @@ class GrpcMuxImplTestBase : public testing::TestWithParam { NiceMock random_; Grpc::MockAsyncClient* async_client_; Grpc::MockAsyncStream async_stream_; + // Used for tests invoking updateMuxSource(). + Grpc::MockAsyncClient* replaced_async_client_; + Grpc::MockAsyncStream replaced_async_stream_; NiceMock local_info_; CustomConfigValidatorsPtr config_validators_; std::unique_ptr grpc_mux_; @@ -1279,6 +1284,168 @@ TEST_P(GrpcMuxImplTest, AddRemoveSubscriptions) { } } +// Updating the mux object while being connected sends the correct requests. +TEST_P(GrpcMuxImplTest, MuxDynamicReplacementWhenConnected) { + replaced_async_client_ = new Grpc::MockAsyncClient(); + setup(); + InSequence s; + + auto foo_sub = makeWatch("type_url_foo", {"x", "y"}); + auto bar_sub = makeWatch("type_url_bar", {}); + EXPECT_CALL(*async_client_, startRaw(_, _, _, _)).WillOnce(Return(&async_stream_)); + expectSendMessage("type_url_foo", {"x", "y"}, "", true); + expectSendMessage("type_url_bar", {}, ""); + grpc_mux_->start(); + EXPECT_EQ(1, control_plane_connected_state_.value()); + + // Switch the mux. + envoy::config::core::v3::ApiConfigSource empty_ads_config; + // Expect a disconnect from the original async_client and stream. + EXPECT_CALL(async_stream_, resetStream()); + // Expect establishing connection to the new client and stream. + EXPECT_CALL(*replaced_async_client_, startRaw(_, _, _, _)) + .WillOnce(Return(&replaced_async_stream_)); + // Expect the initial messages to be sent to the new stream. + expectSendMessage("type_url_foo", {"x", "y"}, "", true, "", Grpc::Status::WellKnownGrpcStatus::Ok, + "", &replaced_async_stream_); + expectSendMessage("type_url_bar", {}, "", false, "", Grpc::Status::WellKnownGrpcStatus::Ok, "", + &replaced_async_stream_); + EXPECT_OK(grpc_mux_->updateMuxSource( + /*primary_async_client=*/std::unique_ptr(replaced_async_client_), + /*failover_async_client=*/nullptr, + /*custom_config_validators=*/nullptr, + /*scope=*/*stats_.rootScope(), + /*backoff_strategy=*/ + std::make_unique( + SubscriptionFactory::RetryInitialDelayMs, SubscriptionFactory::RetryMaxDelayMs, random_), + empty_ads_config)); + // Ending test, removing subscriptions for type_url_foo. + expectSendMessage("type_url_foo", {}, "", false, "", Grpc::Status::WellKnownGrpcStatus::Ok, "", + &replaced_async_stream_); +} + +// Updating the mux object after receiving a response, sends the correct requests. +TEST_P(GrpcMuxImplTest, MuxDynamicReplacementFetchingResources) { + replaced_async_client_ = new Grpc::MockAsyncClient(); + setup(); + InSequence s; + + const std::string& type_url = Config::TypeUrl::get().ClusterLoadAssignment; + auto foo_sub = makeWatch(type_url, {"x", "y"}); + EXPECT_CALL(*async_client_, startRaw(_, _, _, _)).WillOnce(Return(&async_stream_)); + expectSendMessage(type_url, {"x", "y"}, "", true); + grpc_mux_->start(); + + // Send back a response for one of the resources. + { + auto response = std::make_unique(); + response->set_type_url(type_url); + response->set_version_info("1"); + envoy::config::endpoint::v3::ClusterLoadAssignment load_assignment; + load_assignment.set_cluster_name("x"); + response->add_resources()->PackFrom(load_assignment); + EXPECT_CALL(callbacks_, onConfigUpdate(_, "1")) + .WillOnce(Invoke([&load_assignment](const std::vector& resources, + const std::string&) { + EXPECT_EQ(1, resources.size()); + const auto& expected_assignment = + dynamic_cast( + resources[0].get().resource()); + EXPECT_TRUE(TestUtility::protoEqual(expected_assignment, load_assignment)); + return absl::OkStatus(); + })); + expectSendMessage(type_url, {"x", "y"}, "1"); + grpc_mux_->onDiscoveryResponse(std::move(response), control_plane_stats_); + } + + // Switch the mux. + envoy::config::core::v3::ApiConfigSource empty_ads_config; + // Expect a disconnect from the original async_client and stream. + EXPECT_CALL(async_stream_, resetStream()); + // Expect establishing connection to the new client and stream. + EXPECT_CALL(*replaced_async_client_, startRaw(_, _, _, _)) + .WillOnce(Return(&replaced_async_stream_)); + // Expect the initial message to be sent to the new stream. + expectSendMessage(type_url, {"x", "y"}, "1", true, "", Grpc::Status::WellKnownGrpcStatus::Ok, "", + &replaced_async_stream_); + EXPECT_OK(grpc_mux_->updateMuxSource( + /*primary_async_client=*/std::unique_ptr(replaced_async_client_), + /*failover_async_client=*/nullptr, + /*custom_config_validators=*/std::make_unique>(), + /*scope=*/*stats_.rootScope(), + /*backoff_strategy=*/ + std::make_unique( + SubscriptionFactory::RetryInitialDelayMs, SubscriptionFactory::RetryMaxDelayMs, random_), + empty_ads_config)); + + // Send a response to resource "y" on the replaced mux. + { + auto response = std::make_unique(); + response->set_type_url(type_url); + response->set_version_info("2"); + envoy::config::endpoint::v3::ClusterLoadAssignment load_assignment; + load_assignment.set_cluster_name("y"); + response->add_resources()->PackFrom(load_assignment); + EXPECT_CALL(callbacks_, onConfigUpdate(_, "2")) + .WillOnce(Invoke([&load_assignment](const std::vector& resources, + const std::string&) { + EXPECT_EQ(1, resources.size()); + const auto& expected_assignment = + dynamic_cast( + resources[0].get().resource()); + EXPECT_TRUE(TestUtility::protoEqual(expected_assignment, load_assignment)); + return absl::OkStatus(); + })); + expectSendMessage(type_url, {"x", "y"}, "2", false, "", Grpc::Status::WellKnownGrpcStatus::Ok, + "", &replaced_async_stream_); + grpc_mux_->onDiscoveryResponse(std::move(response), control_plane_stats_); + } + + // Ending test, removing subscriptions for the subscription. + expectSendMessage(type_url, {}, "2", false, "", Grpc::Status::WellKnownGrpcStatus::Ok, "", + &replaced_async_stream_); +} + +// Updating the mux object with wrong rate limit settings is rejected. +TEST_P(GrpcMuxImplTest, RejectMuxDynamicReplacementRateLimitSettingsError) { + replaced_async_client_ = new Grpc::MockAsyncClient(); + setup(); + InSequence s; + + const std::string& type_url = Config::TypeUrl::get().ClusterLoadAssignment; + auto foo_sub = makeWatch(type_url, {"x", "y"}); + EXPECT_CALL(*async_client_, startRaw(_, _, _, _)).WillOnce(Return(&async_stream_)); + expectSendMessage(type_url, {"x", "y"}, "", true); + grpc_mux_->start(); + EXPECT_EQ(1, control_plane_connected_state_.value()); + + // Switch the mux. + envoy::config::core::v3::ApiConfigSource ads_config_wrong_settings; + envoy::config::core::v3::RateLimitSettings* rate_limits = + ads_config_wrong_settings.mutable_rate_limit_settings(); + rate_limits->mutable_max_tokens()->set_value(500); + rate_limits->mutable_fill_rate()->set_value(std::numeric_limits::quiet_NaN()); + // No disconnect and replacement of the original async_client. + EXPECT_CALL(async_stream_, resetStream()).Times(0); + EXPECT_CALL(*replaced_async_client_, startRaw(_, _, _, _)).Times(0); + EXPECT_FALSE(grpc_mux_ + ->updateMuxSource( + /*primary_async_client=*/std::unique_ptr( + replaced_async_client_), + /*failover_async_client=*/nullptr, + /*custom_config_validators=*/nullptr, + /*scope=*/*stats_.rootScope(), + /*backoff_strategy=*/ + std::make_unique( + SubscriptionFactory::RetryInitialDelayMs, + SubscriptionFactory::RetryMaxDelayMs, random_), + ads_config_wrong_settings) + .ok()); + // Ending test, removing subscriptions for type_url_foo. + expectSendMessage(type_url, {}, "", false, "", Grpc::Status::WellKnownGrpcStatus::Ok, "", + &async_stream_); +} + class NullGrpcMuxImplTest : public testing::Test { public: NullGrpcMuxImplTest() : null_mux_(std::make_unique()) {} diff --git a/test/mocks/config/mocks.h b/test/mocks/config/mocks.h index f153a22aa229..2d23f3179429 100644 --- a/test/mocks/config/mocks.h +++ b/test/mocks/config/mocks.h @@ -131,6 +131,13 @@ class MockGrpcMux : public GrpcMux { MOCK_METHOD(bool, paused, (const std::string& type_url), (const)); MOCK_METHOD(EdsResourcesCacheOptRef, edsResourcesCache, ()); + + MOCK_METHOD(absl::Status, updateMuxSource, + (Grpc::RawAsyncClientPtr && primary_async_client, + Grpc::RawAsyncClientPtr&& failover_async_client, + CustomConfigValidatorsPtr&& custom_config_validators, Stats::Scope& scope, + BackOffStrategyPtr&& backoff_strategy, + const envoy::config::core::v3::ApiConfigSource& ads_config_source)); }; class MockGrpcStreamCallbacks From 6b501730e2f9aad57f1fa01eeabbfceadbb412f8 Mon Sep 17 00:00:00 2001 From: phlax Date: Tue, 1 Oct 2024 21:34:53 +0100 Subject: [PATCH 4/6] github/ci: Upload coverage/docs to GCP (#36410) Fix #36356 Signed-off-by: Ryan Northey --- .github/workflows/_check_coverage.yml | 6 ++++++ .github/workflows/_precheck_publish.yml | 12 ++++++++++++ .github/workflows/_run.yml | 7 ++++++- .github/workflows/envoy-checks.yml | 2 +- .github/workflows/envoy-prechecks.yml | 2 ++ 5 files changed, 27 insertions(+), 2 deletions(-) diff --git a/.github/workflows/_check_coverage.yml b/.github/workflows/_check_coverage.yml index 1de9fecb0b5d..7ce81c590e7f 100644 --- a/.github/workflows/_check_coverage.yml +++ b/.github/workflows/_check_coverage.yml @@ -42,6 +42,12 @@ jobs: lower than limit rbe: true request: ${{ inputs.request }} + steps-post: | + - run: ci/run_envoy_docker.sh 'ci/do_ci.sh ${{ matrix.target }}-upload' + shell: bash + env: + GCS_ARTIFACT_BUCKET: ${{ inputs.trusted && 'envoy-postsubmit' || 'envoy-pr' }} + target: ${{ matrix.target }} timeout-minutes: 180 trusted: ${{ inputs.trusted }} diff --git a/.github/workflows/_precheck_publish.yml b/.github/workflows/_precheck_publish.yml index 2c4f0c456abc..3d708f5d8e3c 100644 --- a/.github/workflows/_precheck_publish.yml +++ b/.github/workflows/_precheck_publish.yml @@ -5,6 +5,9 @@ permissions: on: workflow_call: + secrets: + gcp-key: + required: true inputs: request: type: string @@ -20,6 +23,8 @@ concurrency: jobs: publish: + secrets: + gcp-key: ${{ secrets.gcp-key }} permissions: contents: read packages: read @@ -30,6 +35,7 @@ jobs: cache-build-image: ${{ fromJSON(inputs.request).request.build-image.default }} cache-build-image-key-suffix: ${{ matrix.arch == 'arm64' && '-arm64' || '' }} concurrency-suffix: -${{ matrix.target }}${{ matrix.arch && format('-{0}', matrix.arch) || '' }} + gcs-only: "true" rbe: ${{ matrix.rbe }} request: ${{ inputs.request }} runs-on: ${{ matrix.runs-on || 'ubuntu-24.04' }} @@ -38,6 +44,7 @@ jobs: ERROR error: Error: + steps-post: ${{ matrix.steps-post }} target: ${{ matrix.target }} target-suffix: ${{ matrix.target-suffix }} trusted: ${{ inputs.trusted }} @@ -67,3 +74,8 @@ jobs: --config=remote-envoy-engflow --config=docs-ci rbe: true + steps-post: | + - run: ci/run_envoy_docker.sh 'ci/do_ci.sh docs-upload' + shell: bash + env: + GCS_ARTIFACT_BUCKET: ${{ inputs.trusted && 'envoy-postsubmit' || 'envoy-pr' }} diff --git a/.github/workflows/_run.yml b/.github/workflows/_run.yml index 1c9766d19dc2..612b8f241001 100644 --- a/.github/workflows/_run.yml +++ b/.github/workflows/_run.yml @@ -69,6 +69,8 @@ on: Error: fail-match: type: string + gcs-only: + type: string import-gpg: type: boolean default: false @@ -277,9 +279,12 @@ jobs: GCP_SERVICE_ACCOUNT_KEY_PATH=$(mktemp -p "${{ runner.temp }}" -t gcp_service_account.XXXXXX.json) echo "${{ secrets.gcp-key }}" | base64 --decode > "${GCP_SERVICE_ACCOUNT_KEY_PATH}" GCP_SERVICE_ACCOUNT_KEY_FILE="$(basename "${GCP_SERVICE_ACCOUNT_KEY_PATH}")" + echo "GCP_SERVICE_ACCOUNT_KEY_PATH=/build/${GCP_SERVICE_ACCOUNT_KEY_FILE}" >> "$GITHUB_ENV" + if [[ "${{ inputs.gcs-only }}" != "" ]]; then + exit 0 + fi BAZEL_BUILD_EXTRA_OPTIONS="--google_credentials=/build/${GCP_SERVICE_ACCOUNT_KEY_FILE} --config=remote-ci --config=rbe-google" echo "BAZEL_BUILD_EXTRA_OPTIONS=${BAZEL_BUILD_EXTRA_OPTIONS}" >> "$GITHUB_ENV" - echo "GCP_SERVICE_ACCOUNT_KEY_PATH=${GCP_SERVICE_ACCOUNT_KEY_PATH}" >> "$GITHUB_ENV" - uses: envoyproxy/toolshed/gh-actions/github/run@actions-v0.2.36 name: Run CI ${{ inputs.command }} ${{ inputs.target }} diff --git a/.github/workflows/envoy-checks.yml b/.github/workflows/envoy-checks.yml index ee6f9d94d5b3..08422f5ad5c1 100644 --- a/.github/workflows/envoy-checks.yml +++ b/.github/workflows/envoy-checks.yml @@ -59,7 +59,7 @@ jobs: coverage: secrets: - gcp-key: ${{ secrets.GCP_SERVICE_ACCOUNT_KEY }} + gcp-key: ${{ fromJSON(needs.load.outputs.trusted) && secrets.GCP_SERVICE_ACCOUNT_KEY_TRUSTED || secrets.GCP_SERVICE_ACCOUNT_KEY }} permissions: actions: read contents: read diff --git a/.github/workflows/envoy-prechecks.yml b/.github/workflows/envoy-prechecks.yml index 860856081368..35e9eed6fe2b 100644 --- a/.github/workflows/envoy-prechecks.yml +++ b/.github/workflows/envoy-prechecks.yml @@ -67,6 +67,8 @@ jobs: trusted: ${{ fromJSON(needs.load.outputs.trusted) }} publish: + secrets: + gcp-key: ${{ fromJSON(needs.load.outputs.trusted) && secrets.GCP_SERVICE_ACCOUNT_KEY_TRUSTED || secrets.GCP_SERVICE_ACCOUNT_KEY }} permissions: actions: read contents: read From 7fb03187145cc6411a4190d3a045052b1b5ce752 Mon Sep 17 00:00:00 2001 From: RenjieTang Date: Tue, 1 Oct 2024 13:53:45 -0700 Subject: [PATCH 5/6] [mobile]add QUIC idle timeout setting to EngineBuilder (#36388) Commit Message: add QUIC idle timeout setting to EngineBuilder Risk Level: Low Testing: n/a Docs Changes: n/a Release Notes: n/a Platform Specific Features: mobile only Signed-off-by: Renjie Tang --- mobile/library/cc/engine_builder.cc | 8 +++++++- mobile/library/cc/engine_builder.h | 5 +++++ 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/mobile/library/cc/engine_builder.cc b/mobile/library/cc/engine_builder.cc index 53518b8cd984..a73ef0f089dc 100644 --- a/mobile/library/cc/engine_builder.cc +++ b/mobile/library/cc/engine_builder.cc @@ -267,6 +267,12 @@ EngineBuilder& EngineBuilder::setUpstreamTlsSni(std::string sni) { return *this; } +EngineBuilder& +EngineBuilder::setQuicConnectionIdleTimeoutSeconds(int quic_connection_idle_timeout_seconds) { + quic_connection_idle_timeout_seconds_ = quic_connection_idle_timeout_seconds; + return *this; +} + EngineBuilder& EngineBuilder::enablePlatformCertificatesValidation(bool platform_certificates_validation_on) { platform_certificates_validation_on_ = platform_certificates_validation_on; @@ -739,7 +745,7 @@ std::unique_ptr EngineBuilder::generate ->mutable_http3_protocol_options() ->mutable_quic_protocol_options() ->mutable_idle_network_timeout() - ->set_seconds(30); + ->set_seconds(quic_connection_idle_timeout_seconds_); base_cluster->mutable_transport_socket()->mutable_typed_config()->PackFrom(h3_proxy_socket); (*base_cluster->mutable_typed_extension_protocol_options()) diff --git a/mobile/library/cc/engine_builder.h b/mobile/library/cc/engine_builder.h index 7636eab146a5..c5c82e3383ef 100644 --- a/mobile/library/cc/engine_builder.h +++ b/mobile/library/cc/engine_builder.h @@ -104,6 +104,9 @@ class EngineBuilder { // outside of this range will be ignored. EngineBuilder& setNetworkThreadPriority(int thread_priority); + // Sets the QUIC connection idle timeout in seconds. + EngineBuilder& setQuicConnectionIdleTimeoutSeconds(int quic_connection_idle_timeout_seconds); + #if defined(__APPLE__) // Right now, this API is only used by Apple (iOS) to register the Apple proxy resolver API for // use in reading and using the system proxy settings. @@ -201,6 +204,8 @@ class EngineBuilder { // https://source.chromium.org/chromium/chromium/src/+/main:net/quic/quic_session_pool.cc;l=790-793;drc=7f04a8e033c23dede6beae129cd212e6d4473d72 // https://source.chromium.org/chromium/chromium/src/+/main:net/third_party/quiche/src/quiche/quic/core/quic_constants.h;l=43-47;drc=34ad7f3844f882baf3d31a6bc6e300acaa0e3fc8 int32_t udp_socket_send_buffer_size_ = 1452 * 20; + + int quic_connection_idle_timeout_seconds_ = 30; }; using EngineBuilderSharedPtr = std::shared_ptr; From 907c37059a093e65a7ee9e012355c4b0a267d0a1 Mon Sep 17 00:00:00 2001 From: Fredy Wijaya Date: Tue, 1 Oct 2024 16:33:02 -0500 Subject: [PATCH 6/6] mobile: Deflake BidirectionalStreamTest (#36412) I still don't fully understand how removing the `shutdown` fixes the issue. Although it doesn't completely fix the flakiness but it reduces the flakiness by a lot. Risk Level: low (tests only) Testing: `bazel test --runs_per_test=100 //test/java/org/chromium/net:bidirectional_stream_test` Docs Changes: n/a Release Notes: n/a Platform Specific Features: n/a Signed-off-by: Fredy Wijaya --- mobile/test/java/org/chromium/net/BUILD | 2 +- mobile/test/java/org/chromium/net/BidirectionalStreamTest.java | 3 --- 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/mobile/test/java/org/chromium/net/BUILD b/mobile/test/java/org/chromium/net/BUILD index 3b4b49a444f0..43b1efe7d76a 100644 --- a/mobile/test/java/org/chromium/net/BUILD +++ b/mobile/test/java/org/chromium/net/BUILD @@ -305,7 +305,7 @@ envoy_mobile_android_test( srcs = [ "BidirectionalStreamTest.java", ], - flaky = True, # TODO(fredyw): Debug the reason for it being flaky. + flaky = True, native_deps = [ "//test/jni:libenvoy_jni_with_test_extensions.so", ] + select({ diff --git a/mobile/test/java/org/chromium/net/BidirectionalStreamTest.java b/mobile/test/java/org/chromium/net/BidirectionalStreamTest.java index 7f56f1c55bb3..53d53a23f92c 100644 --- a/mobile/test/java/org/chromium/net/BidirectionalStreamTest.java +++ b/mobile/test/java/org/chromium/net/BidirectionalStreamTest.java @@ -78,9 +78,6 @@ public void setUp() throws Exception { @After public void tearDown() throws Exception { assertTrue(Http2TestServer.shutdownHttp2TestServer()); - if (mCronetEngine != null) { - mCronetEngine.shutdown(); - } } private static void checkResponseInfo(UrlResponseInfo responseInfo, String expectedUrl,