Skip to content

Commit

Permalink
Bugfix: Flush UdpTunnelUpstreamConnected access log only after receiv…
Browse files Browse the repository at this point in the history
…ing valid response headers (#35226)

Commit Message: Flush UdpTunnelUpstreamConnected access log only after
receiving valid response headers from upstream
Additional Description:
Risk Level: low
Testing: integration tests
Docs Changes:
Release Notes:
Platform Specific Features:

---------

Signed-off-by: Issa Abu Kalbein <[email protected]>
  • Loading branch information
IssaAbuKalbein authored Jul 18, 2024
1 parent 1af0029 commit a73a745
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 43 deletions.
38 changes: 17 additions & 21 deletions source/extensions/filters/udp/udp_proxy/udp_proxy_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -810,12 +810,9 @@ void HttpUpstreamImpl::resetEncoder(Network::ConnectionEvent event, bool by_down
TunnelingConnectionPoolImpl::TunnelingConnectionPoolImpl(
Upstream::ThreadLocalCluster& thread_local_cluster, Upstream::LoadBalancerContext* context,
const UdpTunnelingConfig& tunnel_config, UpstreamTunnelCallbacks& upstream_callbacks,
StreamInfo::StreamInfo& downstream_info, bool flush_access_log_on_tunnel_connected,
const std::vector<AccessLog::InstanceSharedPtr>& session_access_logs)
StreamInfo::StreamInfo& downstream_info)
: upstream_callbacks_(upstream_callbacks), tunnel_config_(tunnel_config),
downstream_info_(downstream_info),
flush_access_log_on_tunnel_connected_(flush_access_log_on_tunnel_connected),
session_access_logs_(session_access_logs) {
downstream_info_(downstream_info) {
// TODO(ohadvano): support upstream HTTP/3.
absl::optional<Http::Protocol> protocol = Http::Protocol::Http2;
conn_pool_data_ =
Expand Down Expand Up @@ -864,24 +861,14 @@ void TunnelingConnectionPoolImpl::onPoolReady(Http::RequestEncoder& request_enco
upstream_->setTunnelCreationCallbacks(*this);
downstream_info_.upstreamInfo()->setUpstreamHost(upstream_host);
callbacks_->resetIdleTimer();

if (flush_access_log_on_tunnel_connected_) {
const Formatter::HttpFormatterContext log_context{
nullptr, nullptr, nullptr, {}, AccessLog::AccessLogType::UdpTunnelUpstreamConnected};
for (const auto& access_log : session_access_logs_) {
access_log->log(log_context, downstream_info_);
}
}
}

TunnelingConnectionPoolPtr TunnelingConnectionPoolFactory::createConnPool(
Upstream::ThreadLocalCluster& thread_local_cluster, Upstream::LoadBalancerContext* context,
const UdpTunnelingConfig& tunnel_config, UpstreamTunnelCallbacks& upstream_callbacks,
StreamInfo::StreamInfo& downstream_info, bool flush_access_log_on_tunnel_connected,
const std::vector<AccessLog::InstanceSharedPtr>& session_access_logs) const {
StreamInfo::StreamInfo& downstream_info) const {
auto pool = std::make_unique<TunnelingConnectionPoolImpl>(
thread_local_cluster, context, tunnel_config, upstream_callbacks, downstream_info,
flush_access_log_on_tunnel_connected, session_access_logs);
thread_local_cluster, context, tunnel_config, upstream_callbacks, downstream_info);
return (pool->valid() ? std::move(pool) : nullptr);
}

Expand Down Expand Up @@ -935,10 +922,9 @@ bool UdpProxyFilter::TunnelingActiveSession::createConnectionPool() {
cluster_.cluster_.info()->trafficStats()->upstream_rq_retry_.inc();
}

conn_pool_ = conn_pool_factory_->createConnPool(
cluster_.cluster_, load_balancer_context_.get(), *cluster_.filter_.config_->tunnelingConfig(),
*this, udp_session_info_, cluster_.filter_.config_->flushAccessLogOnTunnelConnected(),
cluster_.filter_.config_->sessionAccessLogs());
conn_pool_ = conn_pool_factory_->createConnPool(cluster_.cluster_, load_balancer_context_.get(),
*cluster_.filter_.config_->tunnelingConfig(),
*this, udp_session_info_);

if (conn_pool_) {
connecting_ = true;
Expand Down Expand Up @@ -993,6 +979,16 @@ void UdpProxyFilter::TunnelingActiveSession::onStreamReady(StreamInfo::StreamInf
connecting_ = false;
can_send_upstream_ = true;
cluster_.cluster_stats_.sess_tunnel_success_.inc();

if (cluster_.filter_.config_->flushAccessLogOnTunnelConnected()) {
fillSessionStreamInfo();
const Formatter::HttpFormatterContext log_context{
nullptr, nullptr, nullptr, {}, AccessLog::AccessLogType::UdpTunnelUpstreamConnected};
for (const auto& access_log : cluster_.filter_.config_->sessionAccessLogs()) {
access_log->log(log_context, udp_session_info_);
}
}

flushBuffer();
}

Expand Down
20 changes: 6 additions & 14 deletions source/extensions/filters/udp/udp_proxy/udp_proxy_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -402,9 +402,7 @@ class TunnelingConnectionPoolImpl : public TunnelingConnectionPool,
Upstream::LoadBalancerContext* context,
const UdpTunnelingConfig& tunnel_config,
UpstreamTunnelCallbacks& upstream_callbacks,
StreamInfo::StreamInfo& downstream_info,
bool flush_access_log_on_tunnel_connected,
const std::vector<AccessLog::InstanceSharedPtr>& session_access_logs);
StreamInfo::StreamInfo& downstream_info);
~TunnelingConnectionPoolImpl() override = default;

bool valid() const { return conn_pool_data_.has_value(); }
Expand Down Expand Up @@ -445,8 +443,6 @@ class TunnelingConnectionPoolImpl : public TunnelingConnectionPool,
Http::ConnectionPool::Cancellable* upstream_handle_{};
const UdpTunnelingConfig& tunnel_config_;
StreamInfo::StreamInfo& downstream_info_;
const bool flush_access_log_on_tunnel_connected_;
const std::vector<AccessLog::InstanceSharedPtr>& session_access_logs_;
Upstream::HostDescriptionConstSharedPtr upstream_host_;
Ssl::ConnectionInfoConstSharedPtr ssl_info_;
StreamInfo::StreamInfo* upstream_info_;
Expand All @@ -462,17 +458,13 @@ class TunnelingConnectionPoolFactory {
* @param tunnel_config the tunneling config.
* @param upstream_callbacks the callbacks to provide to the connection if successfully created.
* @param stream_info is the downstream session stream info.
* @param flush_access_log_on_tunnel_connected indicates whether to flush access log on tunnel
* connected.
* @param session_access_logs is the list of access logs for the session.
* @return may be null if pool creation failed.
*/
TunnelingConnectionPoolPtr
createConnPool(Upstream::ThreadLocalCluster& thread_local_cluster,
Upstream::LoadBalancerContext* context, const UdpTunnelingConfig& tunnel_config,
UpstreamTunnelCallbacks& upstream_callbacks, StreamInfo::StreamInfo& stream_info,
bool flush_access_log_on_tunnel_connected,
const std::vector<AccessLog::InstanceSharedPtr>& session_access_logs) const;
TunnelingConnectionPoolPtr createConnPool(Upstream::ThreadLocalCluster& thread_local_cluster,
Upstream::LoadBalancerContext* context,
const UdpTunnelingConfig& tunnel_config,
UpstreamTunnelCallbacks& upstream_callbacks,
StreamInfo::StreamInfo& stream_info) const;
};

using TunnelingConnectionPoolFactoryPtr = std::unique_ptr<TunnelingConnectionPoolFactory>;
Expand Down
13 changes: 5 additions & 8 deletions test/extensions/filters/udp/udp_proxy/udp_proxy_filter_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1949,9 +1949,8 @@ class TunnelingConnectionPoolImplTest : public testing::Test {
header_evaluator_ = Envoy::Router::HeaderParser::configure(headers_to_add).value();
config_ = std::make_unique<NiceMock<MockUdpTunnelingConfig>>(*header_evaluator_);
stream_info_.downstream_connection_info_provider_->setConnectionID(0);
session_access_logs_ = std::make_unique<std::vector<AccessLog::InstanceSharedPtr>>();
pool_ = std::make_unique<TunnelingConnectionPoolImpl>(
cluster_, &context_, *config_, callbacks_, stream_info_, false, *session_access_logs_);
pool_ = std::make_unique<TunnelingConnectionPoolImpl>(cluster_, &context_, *config_, callbacks_,
stream_info_);
}

void createNewStream() { pool_->newStream(stream_callbacks_); }
Expand All @@ -1962,7 +1961,6 @@ class TunnelingConnectionPoolImplTest : public testing::Test {
std::unique_ptr<NiceMock<MockUdpTunnelingConfig>> config_;
NiceMock<MockUpstreamTunnelCallbacks> callbacks_;
NiceMock<StreamInfo::MockStreamInfo> stream_info_;
std::unique_ptr<std::vector<AccessLog::InstanceSharedPtr>> session_access_logs_;
NiceMock<MockHttpStreamCallbacks> stream_callbacks_;
NiceMock<Http::MockRequestEncoder> request_encoder_;
std::shared_ptr<NiceMock<Upstream::MockHostDescription>> upstream_host_{
Expand Down Expand Up @@ -2042,13 +2040,12 @@ TEST_F(TunnelingConnectionPoolImplTest, FactoryTest) {
setup();

TunnelingConnectionPoolFactory factory;
auto valid_pool = factory.createConnPool(cluster_, &context_, *config_, callbacks_, stream_info_,
false, *session_access_logs_);
auto valid_pool = factory.createConnPool(cluster_, &context_, *config_, callbacks_, stream_info_);
EXPECT_FALSE(valid_pool == nullptr);

EXPECT_CALL(cluster_, httpConnPool(_, _, _)).WillOnce(Return(absl::nullopt));
auto invalid_pool = factory.createConnPool(cluster_, &context_, *config_, callbacks_,
stream_info_, false, *session_access_logs_);
auto invalid_pool =
factory.createConnPool(cluster_, &context_, *config_, callbacks_, stream_info_);
EXPECT_TRUE(invalid_pool == nullptr);
}

Expand Down
55 changes: 55 additions & 0 deletions test/integration/udp_tunneling_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1144,6 +1144,61 @@ TEST_P(UdpTunnelingIntegrationTest, FlushAccessLogOnTunnelConnected) {
test_server_->waitForGaugeEq("udp.foo.downstream_sess_active", 0);
}

TEST_P(UdpTunnelingIntegrationTest, DontFlushTunnelConnectedAccessLogWithInvalidResponseHeaders) {
const std::string access_log_filename =
TestEnvironment::temporaryPath(TestUtility::uniqueFilename());

const std::string session_access_log_config = fmt::format(R"EOF(
access_log:
- name: envoy.access_loggers.file
typed_config:
'@type': type.googleapis.com/envoy.extensions.access_loggers.file.v3.FileAccessLog
path: {}
log_format:
text_format_source:
inline_string: "%ACCESS_LOG_TYPE%\n"
)EOF",
access_log_filename);

const std::string access_log_options = R"EOF(
access_log_options:
flush_access_log_on_tunnel_connected: true
)EOF";

const TestConfig config{"host.com",
"target.com",
1,
30,
false,
"",
BufferOptions{1, 30},
absl::nullopt,
session_access_log_config,
access_log_options};
setup(config);

client_->write("hello", *listener_address_);
ASSERT_TRUE(fake_upstreams_[0]->waitForHttpConnection(*dispatcher_, fake_upstream_connection_));
ASSERT_TRUE(fake_upstream_connection_->waitForNewStream(*dispatcher_, upstream_request_));
ASSERT_TRUE(upstream_request_->waitForHeadersComplete());
expectRequestHeaders(upstream_request_->headers());

Http::TestResponseHeaderMapImpl response_headers{{":status", "404"}};
upstream_request_->encodeHeaders(response_headers, true);

test_server_->waitForCounterEq("cluster.cluster_0.upstream_cx_connect_attempts_exceeded", 1);
test_server_->waitForCounterEq("cluster.cluster_0.udp.sess_tunnel_failure", 1);
test_server_->waitForCounterEq("cluster.cluster_0.udp.sess_tunnel_success", 0);
test_server_->waitForGaugeEq("udp.foo.downstream_sess_active", 0);

// Verify that UdpTunnelUpstreamConnected access log wasn't flushed.
const std::string access_log = waitForAccessLog(access_log_filename);
EXPECT_THAT(access_log,
testing::HasSubstr(AccessLogType_Name(AccessLog::AccessLogType::UdpSessionEnd)));
EXPECT_THAT(access_log, testing::Not(testing::HasSubstr(AccessLogType_Name(
AccessLog::AccessLogType::UdpTunnelUpstreamConnected))));
}

TEST_P(UdpTunnelingIntegrationTest, FlushAccessLogPeriodically) {
const std::string access_log_filename =
TestEnvironment::temporaryPath(TestUtility::uniqueFilename());
Expand Down

0 comments on commit a73a745

Please sign in to comment.