Skip to content

Commit

Permalink
GH-525 Refactor resolve_and_connect so that connection object is reused.
Browse files Browse the repository at this point in the history
  • Loading branch information
heifner committed Oct 1, 2024
1 parent f336eaf commit 6e8767f
Showing 1 changed file with 73 additions and 75 deletions.
148 changes: 73 additions & 75 deletions plugins/net_plugin/net_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,6 @@ namespace eosio {
struct connection_detail {
std::string host;
connection_ptr c;
tcp::resolver::results_type ips;
};

using connection_details_index = multi_index_container<
Expand Down Expand Up @@ -403,6 +402,8 @@ namespace eosio {
boost::asio::steady_timer::duration conn_period,
uint32_t maximum_client_count);

std::chrono::milliseconds get_heartbeat_timeout() const { return heartbeat_timeout; }

uint32_t get_max_client_count() const { return max_client_count; }

fc::microseconds get_connector_period() const;
Expand All @@ -419,9 +420,7 @@ namespace eosio {

void add(connection_ptr c);
string connect(const string& host, const string& p2p_address);
string resolve_and_connect(const string& host, const string& p2p_address, uint16_t consecutive_immediate_connection_close = 0);
void connect(const connection_ptr& c);
void remove(const connection_ptr& c);
string resolve_and_connect(const string& host, const string& p2p_address);
string disconnect(const string& host);
void close_all();

Expand Down Expand Up @@ -836,7 +835,7 @@ namespace eosio {
public:
enum class connection_state { connecting, connected, closing, closed };

explicit connection( const string& endpoint, const string& listen_address, uint16_t consecutive_immediate_connection_close );
explicit connection( const string& endpoint, const string& listen_address );
/// @brief ctor
/// @param socket created by boost::asio in fc::listener
/// @param address identifier of listen socket which accepted this new connection
Expand Down Expand Up @@ -1014,7 +1013,7 @@ namespace eosio {

bool populate_handshake( handshake_message& hello ) const;

bool reconnect();
bool resolve_and_connect();
void connect( const tcp::resolver::results_type& endpoints );
void start_read_message();

Expand Down Expand Up @@ -1265,21 +1264,20 @@ namespace eosio {

//---------------------------------------------------------------------------

connection::connection( const string& endpoint, const string& listen_address, uint16_t consecutive_immediate_connection_close )
connection::connection( const string& endpoint, const string& listen_address )
: peer_addr( endpoint ),
strand( my_impl->thread_pool.get_executor() ),
socket( new tcp::socket( my_impl->thread_pool.get_executor() ) ),
listen_address( listen_address ),
log_p2p_address( endpoint ),
connection_id( ++my_impl->current_connection_id ),
consecutive_immediate_connection_close( consecutive_immediate_connection_close ),
sync_response_expected_timer( my_impl->thread_pool.get_executor() ),
last_handshake_recv(),
last_handshake_sent(),
p2p_address( endpoint )
{
set_connection_type( peer_address() );
my_impl->mark_bp_connection(this);
update_endpoints();
fc_ilog( logger, "created connection - ${c} to ${n}", ("c", connection_id)("n", endpoint) );
}

Expand All @@ -1294,7 +1292,6 @@ namespace eosio {
last_handshake_recv(),
last_handshake_sent()
{
update_endpoints();
fc_dlog( logger, "new connection - ${c} object created for peer ${address}:${port} from listener ${addr}",
("c", connection_id)("address", log_remote_endpoint_ip)("port", log_remote_endpoint_port)("addr", listen_address) );
}
Expand Down Expand Up @@ -1388,6 +1385,7 @@ namespace eosio {
bool connection::start_session() {
verify_strand_in_this_thread( strand, __func__, __LINE__ );

update_endpoints();
boost::asio::ip::tcp::no_delay nodelay( true );
boost::system::error_code ec;
socket->set_option( nodelay, ec );
Expand Down Expand Up @@ -2810,31 +2808,6 @@ namespace eosio {

//------------------------------------------------------------------------

bool connection::reconnect() {
switch ( no_retry ) {
case no_reason:
case wrong_version:
case benign_other:
case duplicate: // attempt reconnect in case connection has been dropped, should quickly disconnect if duplicate
break;
default:
fc_dlog( logger, "Skipping connect due to go_away reason ${r}",("r", reason_str( no_retry )));
return false;
}
if( consecutive_immediate_connection_close > def_max_consecutive_immediate_connection_close || no_retry == benign_other ) {
fc::microseconds connector_period = my_impl->connections.get_connector_period();
fc::lock_guard g( conn_mtx );
if( last_close == fc::time_point() || last_close > fc::time_point::now() - connector_period ) {
return true; // true so doesn't remove from valid connections
}
}

connection_ptr c = shared_from_this();
my_impl->connections.remove(c);
my_impl->connections.resolve_and_connect(c->peer_address(), c->listen_address, c->consecutive_immediate_connection_close.load());
return true;
}

// called from connection strand
void connection::connect( const tcp::resolver::results_type& endpoints ) {
set_state(connection_state::connecting);
Expand Down Expand Up @@ -4635,6 +4608,7 @@ namespace eosio {
update_p2p_connection_metrics = std::move(fun);
}

// can be called from any thread
void connections_manager::connect_supplied_peers(const string& p2p_address) {
std::unique_lock g(connections_mtx);
chain::flat_set<string> peers = supplied_peers;
Expand All @@ -4659,61 +4633,85 @@ namespace eosio {
return resolve_and_connect( host, p2p_address );
}

string connections_manager::resolve_and_connect( const string& peer_address, const string& listen_address, uint16_t consecutive_immediate_connection_close ) {
string connections_manager::resolve_and_connect( const string& peer_address, const string& listen_address ) {
string::size_type colon = peer_address.find(':');
if (colon == std::string::npos || colon == 0) {
fc_elog( logger, "Invalid peer address. must be \"host:port[:<blk>|<trx>]\": ${p}", ("p", peer_address) );
return "invalid peer address";
}

std::lock_guard g( connections_mtx );
if( find_connection_i( peer_address ) )
return "already connected";
{
std::lock_guard g( connections_mtx );
if( find_connection_i( peer_address ) )
return "already connected";
}

auto [host, port, type] = split_host_port_type(peer_address);

auto resolver = std::make_shared<tcp::resolver>( my_impl->thread_pool.get_executor() );

resolver->async_resolve(host, port,
[resolver, host, port, peer_address, listen_address, consecutive_immediate_connection_close, this]
( const boost::system::error_code& err, const tcp::resolver::results_type& results ) {
connection_ptr c = std::make_shared<connection>( peer_address, listen_address, consecutive_immediate_connection_close );
c->set_heartbeat_timeout( heartbeat_timeout );
std::lock_guard g( connections_mtx );
auto [it, inserted] = connections.emplace( connection_detail{
.host = peer_address,
.c = std::move(c),
.ips = results
});
if( !err ) {
it->c->connect( results );
} else {
fc_wlog( logger, "Unable to resolve ${host}:${port} ${error}",
("host", host)("port", port)( "error", err.message() ) );
it->c->set_state(connection::connection_state::closed);
++it->c->consecutive_immediate_connection_close;
}
} );
connection_ptr c = std::make_shared<connection>( peer_address, listen_address );
if (c->resolve_and_connect()) {
add(c);

return "added connection";
}

return "added connection";
return "connection failed";
}

void connections_manager::connect(const connection_ptr& c) {
std::lock_guard g( connections_mtx );
const auto& index = connections.get<by_connection>();
const auto& it = index.find(c);
if( it != index.end() ) {
it->c->connect( it->ips );
// called from any thread
bool connection::resolve_and_connect() {
switch ( no_retry ) {
case no_reason:
case wrong_version:
case benign_other:
case duplicate: // attempt reconnect in case connection has been dropped, should quickly disconnect if duplicate
break;
default:
fc_dlog( logger, "Skipping connect due to go_away reason ${r}",("r", reason_str( no_retry )));
return false;
}
}

void connections_manager::remove(const connection_ptr& c) {
std::lock_guard g( connections_mtx );
auto& index = connections.get<by_connection>();
index.erase(c);
string::size_type colon = peer_address().find(':');
if (colon == std::string::npos || colon == 0) {
fc_elog( logger, "Invalid peer address. must be \"host:port[:<blk>|<trx>]\": ${p}", ("p", peer_address()) );
return false;
}

connection_ptr c = shared_from_this();

if( consecutive_immediate_connection_close > def_max_consecutive_immediate_connection_close || no_retry == benign_other ) {
fc::microseconds connector_period = my_impl->connections.get_connector_period();
fc::lock_guard g( conn_mtx );
if( last_close == fc::time_point() || last_close > fc::time_point::now() - connector_period ) {
return true; // true so doesn't remove from valid connections
}
}

auto [host, port, type] = split_host_port_type(c->peer_address());
if (host.empty())
return false;

strand.post([c, host, port]() {
auto resolver = std::make_shared<tcp::resolver>( my_impl->thread_pool.get_executor() );
resolver->async_resolve(host, port,
[resolver, c, host, port]
( const boost::system::error_code& err, const tcp::resolver::results_type& results ) {
c->set_heartbeat_timeout( my_impl->connections.get_heartbeat_timeout() );
if( !err ) {
c->connect( results );
} else {
fc_wlog( logger, "Unable to resolve ${host}:${port} ${error}",
("host", host)("port", port)( "error", err.message() ) );
c->set_state(connection::connection_state::closed);
++c->consecutive_immediate_connection_close;
}
} );
} );

return true;
}

// called by API
// called by API
string connections_manager::disconnect( const string& host ) {
std::lock_guard g( connections_mtx );
auto& index = connections.get<by_host>();
Expand Down Expand Up @@ -4815,7 +4813,7 @@ namespace eosio {
auto cleanup = [&num_peers, &num_rm, this](vector<connection_ptr>&& reconnecting,
vector<connection_ptr>&& removing) {
for( auto& c : reconnecting ) {
if (!c->reconnect()) {
if (!c->resolve_and_connect()) {
--num_peers;
++num_rm;
removing.push_back(c);
Expand Down

0 comments on commit 6e8767f

Please sign in to comment.