Skip to content

Commit

Permalink
listener: more exception cleanup (#36198)
Browse files Browse the repository at this point in the history
Risk Level: low
Testing: updated tests
Docs Changes: n/a
Release Notes: n/a
envoyproxy/envoy-mobile#176

Signed-off-by: Alyssa Wilk <[email protected]>
  • Loading branch information
alyssawilk authored Sep 24, 2024
1 parent eeea435 commit a141498
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 64 deletions.
96 changes: 54 additions & 42 deletions source/common/listener_manager/listener_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ ListenSocketFactoryImpl::ListenSocketFactoryImpl(
}
}

sockets_.push_back(createListenSocketAndApplyOptions(factory, socket_type, 0));
sockets_.push_back(THROW_OR_RETURN_VALUE(
createListenSocketAndApplyOptions(factory, socket_type, 0), Network::SocketSharedPtr));

if (sockets_[0] != nullptr && local_address_->ip() && local_address_->ip()->port() == 0) {
local_address_ = sockets_[0]->connectionInfoProvider().localAddress();
Expand All @@ -113,7 +114,8 @@ ListenSocketFactoryImpl::ListenSocketFactoryImpl(
if (bind_type_ != ListenerComponentFactory::BindType::ReusePort && sockets_[0] != nullptr) {
sockets_.push_back(sockets_[0]->duplicate());
} else {
sockets_.push_back(createListenSocketAndApplyOptions(factory, socket_type, i));
sockets_.push_back(THROW_OR_RETURN_VALUE(
createListenSocketAndApplyOptions(factory, socket_type, i), Network::SocketSharedPtr));
}
}
ASSERT(sockets_.size() == num_sockets);
Expand Down Expand Up @@ -143,15 +145,15 @@ ListenSocketFactoryImpl::ListenSocketFactoryImpl(const ListenSocketFactoryImpl&
}
}

Network::SocketSharedPtr ListenSocketFactoryImpl::createListenSocketAndApplyOptions(
absl::StatusOr<Network::SocketSharedPtr> ListenSocketFactoryImpl::createListenSocketAndApplyOptions(
ListenerComponentFactory& factory, Network::Socket::Type socket_type, uint32_t worker_index) {
// Socket might be nullptr when doing server validation.
// TODO(mattklein123): See the comment in the validation code. Make that code not return nullptr
// so this code can be simpler.
Network::SocketSharedPtr socket = THROW_OR_RETURN_VALUE(
factory.createListenSocket(local_address_, socket_type, options_, bind_type_,
socket_creation_options_, worker_index),
Network::SocketSharedPtr);
absl::StatusOr<Network::SocketSharedPtr> socket_or_error = factory.createListenSocket(
local_address_, socket_type, options_, bind_type_, socket_creation_options_, worker_index);
RETURN_IF_NOT_OK_REF(socket_or_error.status());
Network::SocketSharedPtr socket = std::move(*socket_or_error);

// Binding is done by now.
ENVOY_LOG(debug, "Create listen socket for listener {} on address {}", listener_name_,
Expand Down Expand Up @@ -371,9 +373,9 @@ ListenerImpl::ListenerImpl(const envoy::config::listener::v3::Listener& config,
// listener factory can provide additional options.
THROW_IF_NOT_OK(buildUdpListenerFactory(config, parent_.server_.options().concurrency()));
buildListenSocketOptions(config, address_opts_list);
createListenerFilterFactories(config);
validateFilterChains(config);
buildFilterChains(config);
THROW_IF_NOT_OK(createListenerFilterFactories(config));
THROW_IF_NOT_OK(validateFilterChains(config));
THROW_IF_NOT_OK(buildFilterChains(config));
if (socket_type_ != Network::Socket::Type::Datagram) {
buildSocketOptions(config);
buildOriginalDstListenerFilter(config);
Expand Down Expand Up @@ -442,9 +444,9 @@ ListenerImpl::ListenerImpl(ListenerImpl& origin,
POOL_COUNTER(listener_factory_context_->listenerScope()))}) {
buildAccessLog(config);
THROW_IF_NOT_OK(validateConfig());
createListenerFilterFactories(config);
validateFilterChains(config);
buildFilterChains(config);
THROW_IF_NOT_OK(createListenerFilterFactories(config));
THROW_IF_NOT_OK(validateFilterChains(config));
THROW_IF_NOT_OK(buildFilterChains(config));
THROW_IF_NOT_OK(buildInternalListener(config));
if (socket_type_ == Network::Socket::Type::Stream) {
// Apply the options below only for TCP.
Expand Down Expand Up @@ -685,42 +687,47 @@ void ListenerImpl::buildListenSocketOptions(
}
}

void ListenerImpl::createListenerFilterFactories(
const envoy::config::listener::v3::Listener& config) {
absl::Status
ListenerImpl::createListenerFilterFactories(const envoy::config::listener::v3::Listener& config) {
if (!config.listener_filters().empty()) {
switch (socket_type_) {
case Network::Socket::Type::Datagram: {
if (udp_listener_config_->listener_factory_->isTransportConnectionless()) {
udp_listener_filter_factories_ =
THROW_OR_RETURN_VALUE(parent_.factory_->createUdpListenerFilterFactoryList(
config.listener_filters(), *listener_factory_context_),
std::vector<Network::UdpListenerFilterFactoryCb>);
auto udp_listener_filter_factory_or_error =
parent_.factory_->createUdpListenerFilterFactoryList(config.listener_filters(),
*listener_factory_context_);
RETURN_IF_NOT_OK_REF(udp_listener_filter_factory_or_error.status());
udp_listener_filter_factories_ = std::move(*udp_listener_filter_factory_or_error);
} else {
absl::StatusOr<Filter::QuicListenerFilterFactoriesList>
quic_listener_filter_factory_or_error =
parent_.factory_->createQuicListenerFilterFactoryList(config.listener_filters(),
*listener_factory_context_);
RETURN_IF_NOT_OK_REF(quic_listener_filter_factory_or_error.status());
// This is a QUIC listener.
quic_listener_filter_factories_ =
THROW_OR_RETURN_VALUE(parent_.factory_->createQuicListenerFilterFactoryList(
config.listener_filters(), *listener_factory_context_),
Filter::QuicListenerFilterFactoriesList);
quic_listener_filter_factories_ = std::move(*quic_listener_filter_factory_or_error);
}
break;
}
case Network::Socket::Type::Stream:
listener_filter_factories_ =
THROW_OR_RETURN_VALUE(parent_.factory_->createListenerFilterFactoryList(
config.listener_filters(), *listener_factory_context_),
Filter::ListenerFilterFactoriesList);
auto listener_filter_factory_or_error = parent_.factory_->createListenerFilterFactoryList(
config.listener_filters(), *listener_factory_context_);
RETURN_IF_NOT_OK_REF(listener_filter_factory_or_error.status());
listener_filter_factories_ = std::move(*listener_filter_factory_or_error);
break;
}
}
return absl::OkStatus();
}

void ListenerImpl::validateFilterChains(const envoy::config::listener::v3::Listener& config) {
absl::Status
ListenerImpl::validateFilterChains(const envoy::config::listener::v3::Listener& config) {
if (config.filter_chains().empty() && !config.has_default_filter_chain() &&
(socket_type_ == Network::Socket::Type::Stream ||
!udp_listener_config_->listener_factory_->isTransportConnectionless())) {
// If we got here, this is a tcp listener or connection-oriented udp listener, so ensure there
// is a filter chain specified
throwEnvoyExceptionOrPanic(
return absl::InvalidArgumentError(
fmt::format("error adding listener '{}': no filter chains specified",
absl::StrJoin(addresses_, ",", Network::AddressStrFormatter())));
} else if (udp_listener_config_ != nullptr &&
Expand All @@ -729,7 +736,7 @@ void ListenerImpl::validateFilterChains(const envoy::config::listener::v3::Liste
if (anyFilterChain(config, [](const auto& filter_chain) {
return !filter_chain.has_transport_socket();
})) {
throwEnvoyExceptionOrPanic(
return absl::InvalidArgumentError(
fmt::format("error adding listener '{}': no transport socket "
"specified for connection oriented UDP listener",
absl::StrJoin(addresses_, ",", Network::AddressStrFormatter())));
Expand All @@ -738,26 +745,28 @@ void ListenerImpl::validateFilterChains(const envoy::config::listener::v3::Liste
udp_listener_config_ != nullptr &&
udp_listener_config_->listener_factory_->isTransportConnectionless()) {

throwEnvoyExceptionOrPanic(
return absl::InvalidArgumentError(
fmt::format("error adding listener '{}': {} filter chain(s) specified for "
"connection-less UDP listener.",
absl::StrJoin(addresses_, ",", Network::AddressStrFormatter()),
config.filter_chains_size()));
}
return absl::OkStatus();
}

void ListenerImpl::buildFilterChains(const envoy::config::listener::v3::Listener& config) {
absl::Status ListenerImpl::buildFilterChains(const envoy::config::listener::v3::Listener& config) {
transport_factory_context_->setInitManager(*dynamic_init_manager_);
ListenerFilterChainFactoryBuilder builder(*this, *transport_factory_context_);
THROW_IF_NOT_OK(filter_chain_manager_->addFilterChains(
return filter_chain_manager_->addFilterChains(
config.has_filter_chain_matcher() ? &config.filter_chain_matcher() : nullptr,
config.filter_chains(),
config.has_default_filter_chain() ? &config.default_filter_chain() : nullptr, builder,
*filter_chain_manager_));
*filter_chain_manager_);
}

void ListenerImpl::buildConnectionBalancer(const envoy::config::listener::v3::Listener& config,
const Network::Address::Instance& address) {
absl::Status
ListenerImpl::buildConnectionBalancer(const envoy::config::listener::v3::Listener& config,
const Network::Address::Instance& address) {
auto iter = connection_balancers_.find(address.asString());
if (iter == connection_balancers_.end() && socket_type_ == Network::Socket::Type::Stream) {
#ifdef WIN32
Expand Down Expand Up @@ -788,7 +797,7 @@ void ListenerImpl::buildConnectionBalancer(const envoy::config::listener::v3::Li
Envoy::Registry::FactoryRegistry<Network::ConnectionBalanceFactory>::getFactoryByType(
connection_balance_library_type);
if (factory == nullptr) {
throwEnvoyExceptionOrPanic(
return absl::InvalidArgumentError(
fmt::format("Didn't find a registered implementation for type: '{}'",
connection_balance_library_type));
}
Expand All @@ -799,7 +808,7 @@ void ListenerImpl::buildConnectionBalancer(const envoy::config::listener::v3::Li
break;
}
case envoy::config::listener::v3::Listener_ConnectionBalanceConfig::BALANCE_TYPE_NOT_SET: {
throwEnvoyExceptionOrPanic("No valid balance type for connection balance");
return absl::InvalidArgumentError("No valid balance type for connection balance");
}
}
} else {
Expand All @@ -808,6 +817,7 @@ void ListenerImpl::buildConnectionBalancer(const envoy::config::listener::v3::Li
}
#endif
}
return absl::OkStatus();
}

void ListenerImpl::buildSocketOptions(const envoy::config::listener::v3::Listener& config) {
Expand Down Expand Up @@ -961,14 +971,15 @@ ListenerImpl::~ListenerImpl() {

Init::Manager& ListenerImpl::initManager() { return *dynamic_init_manager_; }

void ListenerImpl::addSocketFactory(Network::ListenSocketFactoryPtr&& socket_factory) {
buildConnectionBalancer(config(), *socket_factory->localAddress());
absl::Status ListenerImpl::addSocketFactory(Network::ListenSocketFactoryPtr&& socket_factory) {
RETURN_IF_NOT_OK(buildConnectionBalancer(config(), *socket_factory->localAddress()));
if (buildUdpListenerWorkerRouter(*socket_factory->localAddress(),
parent_.server_.options().concurrency())) {
parent_.server_.hotRestart().registerUdpForwardingListener(socket_factory->localAddress(),
udp_listener_config_);
}
socket_factories_.emplace_back(std::move(socket_factory));
return absl::OkStatus();
}

bool ListenerImpl::supportUpdateFilterChain(const envoy::config::listener::v3::Listener& new_config,
Expand Down Expand Up @@ -1132,10 +1143,11 @@ bool ListenerImpl::hasDuplicatedAddress(const ListenerImpl& other) const {
return false;
}

void ListenerImpl::cloneSocketFactoryFrom(const ListenerImpl& other) {
absl::Status ListenerImpl::cloneSocketFactoryFrom(const ListenerImpl& other) {
for (auto& socket_factory : other.getSocketFactories()) {
addSocketFactory(socket_factory->clone());
RETURN_IF_NOT_OK(addSocketFactory(socket_factory->clone()));
}
return absl::OkStatus();
}

void ListenerImpl::closeAllSockets() {
Expand Down
20 changes: 10 additions & 10 deletions source/common/listener_manager/listener_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,9 @@ class ListenSocketFactoryImpl : public Network::ListenSocketFactory,
private:
ListenSocketFactoryImpl(const ListenSocketFactoryImpl& factory_to_clone);

Network::SocketSharedPtr createListenSocketAndApplyOptions(ListenerComponentFactory& factory,
Network::Socket::Type socket_type,
uint32_t worker_index);
absl::StatusOr<Network::SocketSharedPtr>
createListenSocketAndApplyOptions(ListenerComponentFactory& factory,
Network::Socket::Type socket_type, uint32_t worker_index);

ListenerComponentFactory& factory_;
// Initially, its port number might be 0. Once a socket is created, its port
Expand Down Expand Up @@ -250,7 +250,7 @@ class ListenerImpl final : public Network::ListenerConfig,
DrainManager& localDrainManager() const {
return listener_factory_context_->listener_factory_context_base_->drainManager();
}
void addSocketFactory(Network::ListenSocketFactoryPtr&& socket_factory);
absl::Status addSocketFactory(Network::ListenSocketFactoryPtr&& socket_factory);
void setSocketAndOptions(const Network::SocketSharedPtr& socket);
const Network::Socket::OptionsSharedPtr& listenSocketOptions(uint32_t address_index) {
ASSERT(listen_socket_options_list_.size() > address_index);
Expand Down Expand Up @@ -328,7 +328,7 @@ class ListenerImpl final : public Network::ListenerConfig,
}
}

void cloneSocketFactoryFrom(const ListenerImpl& other);
absl::Status cloneSocketFactoryFrom(const ListenerImpl& other);
void closeAllSockets();

Network::Socket::Type socketType() const { return socket_type_; }
Expand Down Expand Up @@ -396,11 +396,11 @@ class ListenerImpl final : public Network::ListenerConfig,
void buildListenSocketOptions(const envoy::config::listener::v3::Listener& config,
std::vector<std::reference_wrapper<const Protobuf::RepeatedPtrField<
envoy::config::core::v3::SocketOption>>>& address_opts_list);
void createListenerFilterFactories(const envoy::config::listener::v3::Listener& config);
void validateFilterChains(const envoy::config::listener::v3::Listener& config);
void buildFilterChains(const envoy::config::listener::v3::Listener& config);
void buildConnectionBalancer(const envoy::config::listener::v3::Listener& config,
const Network::Address::Instance& address);
absl::Status createListenerFilterFactories(const envoy::config::listener::v3::Listener& config);
absl::Status validateFilterChains(const envoy::config::listener::v3::Listener& config);
absl::Status buildFilterChains(const envoy::config::listener::v3::Listener& config);
absl::Status buildConnectionBalancer(const envoy::config::listener::v3::Listener& config,
const Network::Address::Instance& address);
void buildSocketOptions(const envoy::config::listener::v3::Listener& config);
void buildOriginalDstListenerFilter(const envoy::config::listener::v3::Listener& config);
void buildProxyProtocolListenerFilter(const envoy::config::listener::v3::Listener& config);
Expand Down
21 changes: 14 additions & 7 deletions source/common/listener_manager/listener_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -529,7 +529,7 @@ ListenerManagerImpl::setupSocketFactoryForListener(ListenerImpl& new_listener,
if (!(existing_listener.hasCompatibleAddress(new_listener) && same_socket_options)) {
RETURN_IF_NOT_OK(setNewOrDrainingSocketFactory(new_listener.name(), new_listener));
} else {
new_listener.cloneSocketFactoryFrom(existing_listener);
RETURN_IF_NOT_OK(new_listener.cloneSocketFactoryFrom(existing_listener));
}
return absl::OkStatus();
}
Expand Down Expand Up @@ -1177,7 +1177,7 @@ absl::Status ListenerManagerImpl::setNewOrDrainingSocketFactory(const std::strin
}

if (draining_listener_ptr != nullptr) {
listener.cloneSocketFactoryFrom(*draining_listener_ptr);
RETURN_IF_NOT_OK(listener.cloneSocketFactoryFrom(*draining_listener_ptr));
} else {
return createListenSocketFactory(listener);
}
Expand All @@ -1191,25 +1191,32 @@ absl::Status ListenerManagerImpl::createListenSocketFactory(ListenerImpl& listen
bind_type = listener.reusePort() ? ListenerComponentFactory::BindType::ReusePort
: ListenerComponentFactory::BindType::NoReusePort;
}
absl::Status socket_status = absl::OkStatus();
TRY_ASSERT_MAIN_THREAD {
Network::SocketCreationOptions creation_options;
creation_options.mptcp_enabled_ = listener.mptcpEnabled();
for (std::vector<Network::Address::InstanceConstSharedPtr>::size_type i = 0;
i < listener.addresses().size(); i++) {
listener.addSocketFactory(std::make_unique<ListenSocketFactoryImpl>(
socket_status = listener.addSocketFactory(std::make_unique<ListenSocketFactoryImpl>(
*factory_, listener.addresses()[i], socket_type, listener.listenSocketOptions(i),
listener.name(), listener.tcpBacklogSize(), bind_type, creation_options,
server_.options().concurrency()));
if (!socket_status.ok()) {
break;
}
}
}
END_TRY
CATCH(const EnvoyException& e, {
socket_status = absl::InvalidArgumentError(e.what());
;
});
if (!socket_status.ok()) {
ENVOY_LOG(error, "listener '{}' failed to bind or apply socket options: {}", listener.name(),
e.what());
socket_status.message());
incListenerCreateFailureStat();
return absl::InvalidArgumentError(e.what());
});
return absl::OkStatus();
}
return socket_status;
}

void ListenerManagerImpl::maybeCloseSocketsForListener(ListenerImpl& listener) {
Expand Down
9 changes: 4 additions & 5 deletions test/common/listener_manager/listener_manager_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8040,9 +8040,8 @@ TEST_P(ListenerManagerImplWithRealFiltersTest, InvalidExtendConnectionBalanceCon
Network::Address::InstanceConstSharedPtr address(
new Network::Address::Ipv4Instance("192.168.0.1", 80, nullptr));
EXPECT_CALL(*socket_factory, localAddress()).WillOnce(ReturnRef(address));
EXPECT_THROW_WITH_MESSAGE(
listener_impl.addSocketFactory(std::move(socket_factory)), EnvoyException,
"Didn't find a registered implementation for type: 'google.protobuf.test'");
EXPECT_EQ(listener_impl.addSocketFactory(std::move(socket_factory)).message(),
"Didn't find a registered implementation for type: 'google.protobuf.test'");
#endif
}

Expand All @@ -8058,8 +8057,8 @@ TEST_P(ListenerManagerImplWithRealFiltersTest, EmptyConnectionBalanceConfig) {
Network::Address::InstanceConstSharedPtr address(
new Network::Address::Ipv4Instance("192.168.0.1", 80, nullptr));
EXPECT_CALL(*socket_factory, localAddress()).WillOnce(ReturnRef(address));
EXPECT_THROW_WITH_MESSAGE(listener_impl.addSocketFactory(std::move(socket_factory)),
EnvoyException, "No valid balance type for connection balance");
EXPECT_EQ(listener_impl.addSocketFactory(std::move(socket_factory)).message(),
"No valid balance type for connection balance");
#endif
}

Expand Down

0 comments on commit a141498

Please sign in to comment.