Skip to content

Commit

Permalink
access_log: support UPSTREAM_CONNECTION_ID in UDP and TCP tunneling f…
Browse files Browse the repository at this point in the history
…lows (#35950)

Additional Description: The following PR:
#31920, added support for
``%UPSTREAM_CONNECTION_ID`` substitution string for access logs.
However, it's only supported for HTTP flows. Adding support for UDP and
TCP tunneling
Risk Level: low
Testing: unit tests, integration tests
Docs Changes: none
Release Notes: none
Platform Specific Features: none

---------

Signed-off-by: Ohad Vano <[email protected]>
  • Loading branch information
ohadvano authored Sep 8, 2024
1 parent 88cef4f commit bf9d8b7
Show file tree
Hide file tree
Showing 7 changed files with 99 additions and 15 deletions.
3 changes: 3 additions & 0 deletions changelogs/current.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,9 @@ new_features:
- area: geoip
change: |
Added ``envoy.reloadable_features.mmdb_files_reload_enabled`` runtime flag that enables reload of mmdb files by default.
- area: access_logs
change: |
Added support for %UPSTREAM_CONNECTION_ID% access log substitution string in TCP and UDP tunneling flows.
- area: redis_proxy
change: |
Added :ref:`external_auth_provider <envoy_v3_api_msg_extensions.filters.network.redis_proxy.v3.RedisProxy>` to support
Expand Down
3 changes: 3 additions & 0 deletions source/common/tcp_proxy/upstream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ void TcpConnPool::onPoolFailure(ConnectionPool::PoolFailureReason reason,
void TcpConnPool::onPoolReady(Tcp::ConnectionPool::ConnectionDataPtr&& conn_data,
Upstream::HostDescriptionConstSharedPtr host) {
if (downstream_info_.downstreamAddressProvider().connectionID()) {
downstream_info_.upstreamInfo()->setUpstreamConnectionId(conn_data->connection().id());
ENVOY_LOG(debug, "Attached upstream connection [C{}] to downstream connection [C{}]",
conn_data->connection().id(),
downstream_info_.downstreamAddressProvider().connectionID().value());
Expand Down Expand Up @@ -368,6 +369,8 @@ void HttpConnPool::onPoolReady(Http::RequestEncoder& request_encoder,
downstream_info_.downstreamAddressProvider().connectionID()) {
// info.downstreamAddressProvider() is being called to get the upstream connection ID,
// because the StreamInfo object here is of the upstream connection.
downstream_info_.upstreamInfo()->setUpstreamConnectionId(
info.downstreamAddressProvider().connectionID().value());
ENVOY_LOG(debug, "Attached upstream connection [C{}] to downstream connection [C{}]",
info.downstreamAddressProvider().connectionID().value(),
downstream_info_.downstreamAddressProvider().connectionID().value());
Expand Down
4 changes: 3 additions & 1 deletion source/extensions/filters/udp/udp_proxy/udp_proxy_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -851,8 +851,9 @@ void TunnelingConnectionPoolImpl::onPoolReady(Http::RequestEncoder& request_enco
Upstream::HostDescriptionConstSharedPtr upstream_host,
StreamInfo::StreamInfo& upstream_info,
absl::optional<Http::Protocol>) {
auto upstream_connection_id = upstream_info.downstreamAddressProvider().connectionID().value();
ENVOY_LOG(debug, "Upstream connection [C{}] ready, creating tunnel stream",
upstream_info.downstreamAddressProvider().connectionID().value());
upstream_connection_id);

upstream_handle_ = nullptr;
upstream_host_ = upstream_host;
Expand All @@ -863,6 +864,7 @@ void TunnelingConnectionPoolImpl::onPoolReady(Http::RequestEncoder& request_enco
upstream_->setRequestEncoder(request_encoder, is_ssl);
upstream_->setTunnelCreationCallbacks(*this);
downstream_info_.upstreamInfo()->setUpstreamHost(upstream_host);
downstream_info_.upstreamInfo()->setUpstreamConnectionId(upstream_connection_id);
callbacks_->resetIdleTimer();
}

Expand Down
14 changes: 14 additions & 0 deletions test/common/tcp_proxy/tcp_proxy_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1238,6 +1238,20 @@ TEST_P(TcpProxyTest, AccessLogUpstreamSSLConnection) {
filter_->getStreamInfo().upstreamInfo()->upstreamSslConnection()->sessionId());
}

TEST_P(TcpProxyTest, AccessLogUpstreamConnectionId) {
int connection_id = 20;
setup(1, accessLogConfig("%UPSTREAM_CONNECTION_ID%"));

EXPECT_CALL(*upstream_connections_.at(0), id()).WillRepeatedly(Return(connection_id));
raiseEventUpstreamConnected(0);

EXPECT_EQ(connection_id, filter_->getStreamInfo().upstreamInfo()->upstreamConnectionId());
filter_callbacks_.connection_.raiseEvent(Network::ConnectionEvent::RemoteClose);
filter_.reset();

EXPECT_EQ(access_log_data_.value(), std::to_string(connection_id));
}

// Tests that upstream flush works properly with no idle timeout configured.
TEST_P(TcpProxyTest, UpstreamFlushNoTimeout) {
setup(1);
Expand Down
23 changes: 13 additions & 10 deletions test/integration/tcp_proxy_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -529,7 +529,7 @@ TEST_P(TcpProxyIntegrationTest, AccessLogOnUpstreamConnect) {
envoy::extensions::access_loggers::file::v3::FileAccessLog access_log_config;
access_log_config.set_path(access_log_path);
access_log_config.mutable_log_format()->mutable_text_format_source()->set_inline_string(
"ACCESS_LOG_TYPE=%ACCESS_LOG_TYPE%");
"%ACCESS_LOG_TYPE%-%UPSTREAM_CONNECTION_ID%\n");
access_log->mutable_typed_config()->PackFrom(access_log_config);
config_blob->PackFrom(tcp_proxy_config);
});
Expand All @@ -541,9 +541,12 @@ TEST_P(TcpProxyIntegrationTest, AccessLogOnUpstreamConnect) {

ASSERT_TRUE(fake_upstreams_[0]->waitForRawConnection(fake_upstream_connection));
auto log_result = waitForAccessLog(access_log_path);
EXPECT_EQ(absl::StrCat("ACCESS_LOG_TYPE=",
AccessLogType_Name(AccessLog::AccessLogType::TcpUpstreamConnected)),
log_result);
std::vector<std::string> access_log_parts = absl::StrSplit(log_result, '-');
EXPECT_EQ(AccessLogType_Name(AccessLog::AccessLogType::TcpUpstreamConnected),
access_log_parts[0]);
uint32_t upstream_connection_id;
ASSERT_TRUE(absl::SimpleAtoi(access_log_parts[1], &upstream_connection_id));
EXPECT_GT(upstream_connection_id, 0);

ASSERT_TRUE(fake_upstream_connection->waitForData(5));
ASSERT_TRUE(tcp_client->write("", true));
Expand All @@ -552,12 +555,12 @@ TEST_P(TcpProxyIntegrationTest, AccessLogOnUpstreamConnect) {
ASSERT_TRUE(fake_upstream_connection->waitForDisconnect());
tcp_client->waitForDisconnect();
test_server_.reset();
log_result = waitForAccessLog(access_log_path);
EXPECT_EQ(
absl::StrCat(
"ACCESS_LOG_TYPE=", AccessLogType_Name(AccessLog::AccessLogType::TcpUpstreamConnected),
"ACCESS_LOG_TYPE=", AccessLogType_Name(AccessLog::AccessLogType::TcpConnectionEnd)),
log_result);

log_result = waitForAccessLog(access_log_path, 1);
access_log_parts = absl::StrSplit(log_result, '-');
EXPECT_EQ(AccessLogType_Name(AccessLog::AccessLogType::TcpConnectionEnd), access_log_parts[0]);
ASSERT_TRUE(absl::SimpleAtoi(access_log_parts[1], &upstream_connection_id));
EXPECT_GT(upstream_connection_id, 0);
}

TEST_P(TcpProxyIntegrationTest, PeriodicAccessLog) {
Expand Down
53 changes: 53 additions & 0 deletions test/integration/tcp_tunneling_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1099,6 +1099,59 @@ TEST_P(TcpTunnelingIntegrationTest, BasicUsePost) {
closeConnection(fake_upstream_connection_);
}

TEST_P(TcpTunnelingIntegrationTest, TcpTunnelingAccessLog) {
if (upstreamProtocol() == Http::CodecType::HTTP3) {
return;
}

const std::string access_log_filename =
TestEnvironment::temporaryPath(TestUtility::uniqueFilename());

config_helper_.addConfigModifier([&](envoy::config::bootstrap::v3::Bootstrap& bootstrap) -> void {
envoy::extensions::filters::network::tcp_proxy::v3::TcpProxy proxy_config;
proxy_config.set_stat_prefix("tcp_stats");
proxy_config.set_cluster("cluster_0");
proxy_config.mutable_tunneling_config()->set_hostname("host.com:80");

envoy::extensions::access_loggers::file::v3::FileAccessLog access_log_config;
access_log_config.mutable_log_format()->mutable_text_format_source()->set_inline_string(
"%ACCESS_LOG_TYPE%-%UPSTREAM_CONNECTION_ID%\n");
access_log_config.set_path(access_log_filename);
proxy_config.add_access_log()->mutable_typed_config()->PackFrom(access_log_config);

auto* listeners = bootstrap.mutable_static_resources()->mutable_listeners();
for (auto& listener : *listeners) {
if (listener.name() != "tcp_proxy") {
continue;
}
auto* filter_chain = listener.mutable_filter_chains(0);
auto* filter = filter_chain->mutable_filters(0);
filter->mutable_typed_config()->PackFrom(proxy_config);
break;
}
});

initialize();

// Start a connection, and verify the upgrade headers are received upstream.
tcp_client_ = makeTcpConnection(lookupPort("tcp_proxy"));
ASSERT_TRUE(fake_upstreams_[0]->waitForHttpConnection(*dispatcher_, fake_upstream_connection_));
ASSERT_TRUE(fake_upstream_connection_->waitForNewStream(*dispatcher_, upstream_request_));
ASSERT_TRUE(upstream_request_->waitForHeadersComplete());
EXPECT_EQ(upstream_request_->headers().getMethodValue(), "CONNECT");

upstream_request_->encodeHeaders(default_response_headers_, false);
sendBidiData(fake_upstream_connection_);
closeConnection(fake_upstream_connection_);

auto log_result = waitForAccessLog(access_log_filename);
std::vector<std::string> access_log_parts = absl::StrSplit(log_result, '-');
EXPECT_EQ(AccessLogType_Name(AccessLog::AccessLogType::TcpConnectionEnd), access_log_parts[0]);
uint32_t upstream_connection_id;
ASSERT_TRUE(absl::SimpleAtoi(access_log_parts[1], &upstream_connection_id));
EXPECT_GT(upstream_connection_id, 0);
}

TEST_P(TcpTunnelingIntegrationTest, BasicHeaderEvaluationTunnelingConfig) {
const std::string access_log_filename =
TestEnvironment::temporaryPath(TestUtility::uniqueFilename());
Expand Down
14 changes: 10 additions & 4 deletions test/integration/udp_tunneling_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1110,7 +1110,7 @@ TEST_P(UdpTunnelingIntegrationTest, FlushAccessLogOnTunnelConnected) {
path: {}
log_format:
text_format_source:
inline_string: "%ACCESS_LOG_TYPE%\n"
inline_string: "%ACCESS_LOG_TYPE%-%UPSTREAM_CONNECTION_ID%\n"
)EOF",
access_log_filename);

Expand All @@ -1133,9 +1133,15 @@ TEST_P(UdpTunnelingIntegrationTest, FlushAccessLogOnTunnelConnected) {

const std::string datagram = "hello";
establishConnection(datagram);
EXPECT_THAT(
waitForAccessLog(access_log_filename),
testing::HasSubstr(AccessLogType_Name(AccessLog::AccessLogType::UdpTunnelUpstreamConnected)));

std::string access_log = waitForAccessLog(access_log_filename);
std::vector<std::string> access_log_parts = absl::StrSplit(access_log, '-');

EXPECT_EQ(AccessLogType_Name(AccessLog::AccessLogType::UdpTunnelUpstreamConnected),
access_log_parts[0]);
uint32_t upstream_connection_id;
ASSERT_TRUE(absl::SimpleAtoi(access_log_parts[1], &upstream_connection_id));
EXPECT_GT(upstream_connection_id, 0);

// Wait for buffered datagram.
ASSERT_TRUE(upstream_request_->waitForData(*dispatcher_, expectedCapsules({datagram})));
Expand Down

0 comments on commit bf9d8b7

Please sign in to comment.