From a73a745201befc3ae386a00248b3708e2210605f Mon Sep 17 00:00:00 2001 From: IssaAbuKalbein <86603440+IssaAbuKalbein@users.noreply.github.com> Date: Thu, 18 Jul 2024 19:40:33 +0300 Subject: [PATCH] Bugfix: Flush UdpTunnelUpstreamConnected access log only after receiving valid response headers (#35226) Commit Message: Flush UdpTunnelUpstreamConnected access log only after receiving valid response headers from upstream Additional Description: Risk Level: low Testing: integration tests Docs Changes: Release Notes: Platform Specific Features: --------- Signed-off-by: Issa Abu Kalbein --- .../filters/udp/udp_proxy/udp_proxy_filter.cc | 38 ++++++------- .../filters/udp/udp_proxy/udp_proxy_filter.h | 20 ++----- .../udp/udp_proxy/udp_proxy_filter_test.cc | 13 ++--- .../udp_tunneling_integration_test.cc | 55 +++++++++++++++++++ 4 files changed, 83 insertions(+), 43 deletions(-) diff --git a/source/extensions/filters/udp/udp_proxy/udp_proxy_filter.cc b/source/extensions/filters/udp/udp_proxy/udp_proxy_filter.cc index 539bb9818635..1f0fa6d8c236 100644 --- a/source/extensions/filters/udp/udp_proxy/udp_proxy_filter.cc +++ b/source/extensions/filters/udp/udp_proxy/udp_proxy_filter.cc @@ -810,12 +810,9 @@ void HttpUpstreamImpl::resetEncoder(Network::ConnectionEvent event, bool by_down TunnelingConnectionPoolImpl::TunnelingConnectionPoolImpl( Upstream::ThreadLocalCluster& thread_local_cluster, Upstream::LoadBalancerContext* context, const UdpTunnelingConfig& tunnel_config, UpstreamTunnelCallbacks& upstream_callbacks, - StreamInfo::StreamInfo& downstream_info, bool flush_access_log_on_tunnel_connected, - const std::vector& session_access_logs) + StreamInfo::StreamInfo& downstream_info) : upstream_callbacks_(upstream_callbacks), tunnel_config_(tunnel_config), - downstream_info_(downstream_info), - flush_access_log_on_tunnel_connected_(flush_access_log_on_tunnel_connected), - session_access_logs_(session_access_logs) { + downstream_info_(downstream_info) { // TODO(ohadvano): support upstream HTTP/3. absl::optional protocol = Http::Protocol::Http2; conn_pool_data_ = @@ -864,24 +861,14 @@ void TunnelingConnectionPoolImpl::onPoolReady(Http::RequestEncoder& request_enco upstream_->setTunnelCreationCallbacks(*this); downstream_info_.upstreamInfo()->setUpstreamHost(upstream_host); callbacks_->resetIdleTimer(); - - if (flush_access_log_on_tunnel_connected_) { - const Formatter::HttpFormatterContext log_context{ - nullptr, nullptr, nullptr, {}, AccessLog::AccessLogType::UdpTunnelUpstreamConnected}; - for (const auto& access_log : session_access_logs_) { - access_log->log(log_context, downstream_info_); - } - } } TunnelingConnectionPoolPtr TunnelingConnectionPoolFactory::createConnPool( Upstream::ThreadLocalCluster& thread_local_cluster, Upstream::LoadBalancerContext* context, const UdpTunnelingConfig& tunnel_config, UpstreamTunnelCallbacks& upstream_callbacks, - StreamInfo::StreamInfo& downstream_info, bool flush_access_log_on_tunnel_connected, - const std::vector& session_access_logs) const { + StreamInfo::StreamInfo& downstream_info) const { auto pool = std::make_unique( - thread_local_cluster, context, tunnel_config, upstream_callbacks, downstream_info, - flush_access_log_on_tunnel_connected, session_access_logs); + thread_local_cluster, context, tunnel_config, upstream_callbacks, downstream_info); return (pool->valid() ? std::move(pool) : nullptr); } @@ -935,10 +922,9 @@ bool UdpProxyFilter::TunnelingActiveSession::createConnectionPool() { cluster_.cluster_.info()->trafficStats()->upstream_rq_retry_.inc(); } - conn_pool_ = conn_pool_factory_->createConnPool( - cluster_.cluster_, load_balancer_context_.get(), *cluster_.filter_.config_->tunnelingConfig(), - *this, udp_session_info_, cluster_.filter_.config_->flushAccessLogOnTunnelConnected(), - cluster_.filter_.config_->sessionAccessLogs()); + conn_pool_ = conn_pool_factory_->createConnPool(cluster_.cluster_, load_balancer_context_.get(), + *cluster_.filter_.config_->tunnelingConfig(), + *this, udp_session_info_); if (conn_pool_) { connecting_ = true; @@ -993,6 +979,16 @@ void UdpProxyFilter::TunnelingActiveSession::onStreamReady(StreamInfo::StreamInf connecting_ = false; can_send_upstream_ = true; cluster_.cluster_stats_.sess_tunnel_success_.inc(); + + if (cluster_.filter_.config_->flushAccessLogOnTunnelConnected()) { + fillSessionStreamInfo(); + const Formatter::HttpFormatterContext log_context{ + nullptr, nullptr, nullptr, {}, AccessLog::AccessLogType::UdpTunnelUpstreamConnected}; + for (const auto& access_log : cluster_.filter_.config_->sessionAccessLogs()) { + access_log->log(log_context, udp_session_info_); + } + } + flushBuffer(); } diff --git a/source/extensions/filters/udp/udp_proxy/udp_proxy_filter.h b/source/extensions/filters/udp/udp_proxy/udp_proxy_filter.h index f3a401d43d37..9ebcd93b5a45 100644 --- a/source/extensions/filters/udp/udp_proxy/udp_proxy_filter.h +++ b/source/extensions/filters/udp/udp_proxy/udp_proxy_filter.h @@ -402,9 +402,7 @@ class TunnelingConnectionPoolImpl : public TunnelingConnectionPool, Upstream::LoadBalancerContext* context, const UdpTunnelingConfig& tunnel_config, UpstreamTunnelCallbacks& upstream_callbacks, - StreamInfo::StreamInfo& downstream_info, - bool flush_access_log_on_tunnel_connected, - const std::vector& session_access_logs); + StreamInfo::StreamInfo& downstream_info); ~TunnelingConnectionPoolImpl() override = default; bool valid() const { return conn_pool_data_.has_value(); } @@ -445,8 +443,6 @@ class TunnelingConnectionPoolImpl : public TunnelingConnectionPool, Http::ConnectionPool::Cancellable* upstream_handle_{}; const UdpTunnelingConfig& tunnel_config_; StreamInfo::StreamInfo& downstream_info_; - const bool flush_access_log_on_tunnel_connected_; - const std::vector& session_access_logs_; Upstream::HostDescriptionConstSharedPtr upstream_host_; Ssl::ConnectionInfoConstSharedPtr ssl_info_; StreamInfo::StreamInfo* upstream_info_; @@ -462,17 +458,13 @@ class TunnelingConnectionPoolFactory { * @param tunnel_config the tunneling config. * @param upstream_callbacks the callbacks to provide to the connection if successfully created. * @param stream_info is the downstream session stream info. - * @param flush_access_log_on_tunnel_connected indicates whether to flush access log on tunnel - * connected. - * @param session_access_logs is the list of access logs for the session. * @return may be null if pool creation failed. */ - TunnelingConnectionPoolPtr - createConnPool(Upstream::ThreadLocalCluster& thread_local_cluster, - Upstream::LoadBalancerContext* context, const UdpTunnelingConfig& tunnel_config, - UpstreamTunnelCallbacks& upstream_callbacks, StreamInfo::StreamInfo& stream_info, - bool flush_access_log_on_tunnel_connected, - const std::vector& session_access_logs) const; + TunnelingConnectionPoolPtr createConnPool(Upstream::ThreadLocalCluster& thread_local_cluster, + Upstream::LoadBalancerContext* context, + const UdpTunnelingConfig& tunnel_config, + UpstreamTunnelCallbacks& upstream_callbacks, + StreamInfo::StreamInfo& stream_info) const; }; using TunnelingConnectionPoolFactoryPtr = std::unique_ptr; diff --git a/test/extensions/filters/udp/udp_proxy/udp_proxy_filter_test.cc b/test/extensions/filters/udp/udp_proxy/udp_proxy_filter_test.cc index b2c9372f83a3..d219f425da4a 100644 --- a/test/extensions/filters/udp/udp_proxy/udp_proxy_filter_test.cc +++ b/test/extensions/filters/udp/udp_proxy/udp_proxy_filter_test.cc @@ -1949,9 +1949,8 @@ class TunnelingConnectionPoolImplTest : public testing::Test { header_evaluator_ = Envoy::Router::HeaderParser::configure(headers_to_add).value(); config_ = std::make_unique>(*header_evaluator_); stream_info_.downstream_connection_info_provider_->setConnectionID(0); - session_access_logs_ = std::make_unique>(); - pool_ = std::make_unique( - cluster_, &context_, *config_, callbacks_, stream_info_, false, *session_access_logs_); + pool_ = std::make_unique(cluster_, &context_, *config_, callbacks_, + stream_info_); } void createNewStream() { pool_->newStream(stream_callbacks_); } @@ -1962,7 +1961,6 @@ class TunnelingConnectionPoolImplTest : public testing::Test { std::unique_ptr> config_; NiceMock callbacks_; NiceMock stream_info_; - std::unique_ptr> session_access_logs_; NiceMock stream_callbacks_; NiceMock request_encoder_; std::shared_ptr> upstream_host_{ @@ -2042,13 +2040,12 @@ TEST_F(TunnelingConnectionPoolImplTest, FactoryTest) { setup(); TunnelingConnectionPoolFactory factory; - auto valid_pool = factory.createConnPool(cluster_, &context_, *config_, callbacks_, stream_info_, - false, *session_access_logs_); + auto valid_pool = factory.createConnPool(cluster_, &context_, *config_, callbacks_, stream_info_); EXPECT_FALSE(valid_pool == nullptr); EXPECT_CALL(cluster_, httpConnPool(_, _, _)).WillOnce(Return(absl::nullopt)); - auto invalid_pool = factory.createConnPool(cluster_, &context_, *config_, callbacks_, - stream_info_, false, *session_access_logs_); + auto invalid_pool = + factory.createConnPool(cluster_, &context_, *config_, callbacks_, stream_info_); EXPECT_TRUE(invalid_pool == nullptr); } diff --git a/test/integration/udp_tunneling_integration_test.cc b/test/integration/udp_tunneling_integration_test.cc index 040e9292ad1a..63550d78770f 100644 --- a/test/integration/udp_tunneling_integration_test.cc +++ b/test/integration/udp_tunneling_integration_test.cc @@ -1144,6 +1144,61 @@ TEST_P(UdpTunnelingIntegrationTest, FlushAccessLogOnTunnelConnected) { test_server_->waitForGaugeEq("udp.foo.downstream_sess_active", 0); } +TEST_P(UdpTunnelingIntegrationTest, DontFlushTunnelConnectedAccessLogWithInvalidResponseHeaders) { + const std::string access_log_filename = + TestEnvironment::temporaryPath(TestUtility::uniqueFilename()); + + const std::string session_access_log_config = fmt::format(R"EOF( + access_log: + - name: envoy.access_loggers.file + typed_config: + '@type': type.googleapis.com/envoy.extensions.access_loggers.file.v3.FileAccessLog + path: {} + log_format: + text_format_source: + inline_string: "%ACCESS_LOG_TYPE%\n" +)EOF", + access_log_filename); + + const std::string access_log_options = R"EOF( + access_log_options: + flush_access_log_on_tunnel_connected: true +)EOF"; + + const TestConfig config{"host.com", + "target.com", + 1, + 30, + false, + "", + BufferOptions{1, 30}, + absl::nullopt, + session_access_log_config, + access_log_options}; + setup(config); + + client_->write("hello", *listener_address_); + ASSERT_TRUE(fake_upstreams_[0]->waitForHttpConnection(*dispatcher_, fake_upstream_connection_)); + ASSERT_TRUE(fake_upstream_connection_->waitForNewStream(*dispatcher_, upstream_request_)); + ASSERT_TRUE(upstream_request_->waitForHeadersComplete()); + expectRequestHeaders(upstream_request_->headers()); + + Http::TestResponseHeaderMapImpl response_headers{{":status", "404"}}; + upstream_request_->encodeHeaders(response_headers, true); + + test_server_->waitForCounterEq("cluster.cluster_0.upstream_cx_connect_attempts_exceeded", 1); + test_server_->waitForCounterEq("cluster.cluster_0.udp.sess_tunnel_failure", 1); + test_server_->waitForCounterEq("cluster.cluster_0.udp.sess_tunnel_success", 0); + test_server_->waitForGaugeEq("udp.foo.downstream_sess_active", 0); + + // Verify that UdpTunnelUpstreamConnected access log wasn't flushed. + const std::string access_log = waitForAccessLog(access_log_filename); + EXPECT_THAT(access_log, + testing::HasSubstr(AccessLogType_Name(AccessLog::AccessLogType::UdpSessionEnd))); + EXPECT_THAT(access_log, testing::Not(testing::HasSubstr(AccessLogType_Name( + AccessLog::AccessLogType::UdpTunnelUpstreamConnected)))); +} + TEST_P(UdpTunnelingIntegrationTest, FlushAccessLogPeriodically) { const std::string access_log_filename = TestEnvironment::temporaryPath(TestUtility::uniqueFilename());