Skip to content

Commit

Permalink
Merge pull request #826 from AntelopeIO/resolve-on-reconnect-main
Browse files Browse the repository at this point in the history
[1.0.2 -> main] P2P: Resolve on reconnect
  • Loading branch information
heifner authored Sep 26, 2024
2 parents d589ea5 + 03ad7e5 commit 881f997
Showing 1 changed file with 44 additions and 31 deletions.
75 changes: 44 additions & 31 deletions plugins/net_plugin/net_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,6 @@ namespace eosio {
std::string host;
connection_ptr c;
tcp::endpoint active_ip;
tcp::resolver::results_type ips;
};

using connection_details_index = multi_index_container<
Expand Down Expand Up @@ -331,9 +330,9 @@ namespace eosio {

void add(connection_ptr c);
string connect(const string& host, const string& p2p_address);
string resolve_and_connect(const string& host, const string& p2p_address);
string resolve_and_connect(const string& host, const string& p2p_address, const connection_ptr& c = {});
void update_connection_endpoint(connection_ptr c, const tcp::endpoint& endpoint);
void connect(const connection_ptr& c);
void reconnect(const connection_ptr& c);
string disconnect(const string& host);
void close_all();

Expand Down Expand Up @@ -839,7 +838,7 @@ namespace eosio {

fc::sha256 conn_node_id;
string short_conn_node_id;
string listen_address; // address sent to peer in handshake
const string listen_address; // address sent to peer in handshake
string log_p2p_address;
string log_remote_endpoint_ip;
string log_remote_endpoint_port;
Expand Down Expand Up @@ -2754,16 +2753,21 @@ namespace eosio {
fc_dlog( logger, "Skipping connect due to go_away reason ${r}",("r", reason_str( no_retry )));
return false;
}

if (incoming())
return false;

if( consecutive_immediate_connection_close > def_max_consecutive_immediate_connection_close || no_retry == benign_other ) {
fc::microseconds connector_period = my_impl->connections.get_connector_period();
fc::lock_guard g( conn_mtx );
if( last_close == fc::time_point() || last_close > fc::time_point::now() - connector_period ) {
return true; // true so doesn't remove from valid connections
}
}

connection_ptr c = shared_from_this();
strand.post([c]() {
my_impl->connections.connect(c);
my_impl->connections.reconnect(c);
});
return true;
}
Expand Down Expand Up @@ -4553,39 +4557,48 @@ namespace eosio {
return resolve_and_connect( host, p2p_address );
}

string connections_manager::resolve_and_connect( const string& peer_address, const string& listen_address ) {
string connections_manager::resolve_and_connect( const string& peer_address, const string& listen_address,
const connection_ptr& c )
{
assert(!c || (c->peer_address() == peer_address && c->listen_address == listen_address));

string::size_type colon = peer_address.find(':');
if (colon == std::string::npos || colon == 0) {
fc_elog( logger, "Invalid peer address. must be \"host:port[:<blk>|<trx>]\": ${p}", ("p", peer_address) );
return "invalid peer address";
}

std::lock_guard g( connections_mtx );
if( find_connection_i( peer_address ) )
return "already connected";
{
std::lock_guard g(connections_mtx);
if (find_connection_i(peer_address))
return "already connected";
}

auto [host, port, type] = split_host_port_type(peer_address);

auto resolver = std::make_shared<tcp::resolver>( my_impl->thread_pool.get_executor() );

resolver->async_resolve(host, port,
[resolver, host = host, port = port, peer_address = peer_address, listen_address = listen_address, this]( const boost::system::error_code& err, const tcp::resolver::results_type& results ) {
connection_ptr c = std::make_shared<connection>( peer_address, listen_address );
c->set_heartbeat_timeout( heartbeat_timeout );
std::lock_guard g( connections_mtx );
auto [it, inserted] = connections.emplace( connection_detail{
.host = peer_address,
.c = std::move(c),
.ips = results
[this, resolver, c_org{c}, host, port, peer_address, listen_address]( const boost::system::error_code& err, const tcp::resolver::results_type& results ) {
connection_ptr c = c_org ? c_org : std::make_shared<connection>( peer_address, listen_address );
c->strand.post([this, resolver, c, err, results, host, port, peer_address]() {
c->set_heartbeat_timeout( heartbeat_timeout );
{
std::lock_guard g( connections_mtx );
connections.emplace( connection_detail{
.host = peer_address,
.c = c,
});
}
if( !err ) {
c->connect( results );
} else {
fc_wlog( logger, "Unable to resolve ${host}:${port} ${error}",
("host", host)("port", port)( "error", err.message() ) );
c->set_state(connection::connection_state::closed);
++(c->consecutive_immediate_connection_close);
}
});
if( !err ) {
it->c->connect( results );
} else {
fc_wlog( logger, "Unable to resolve ${host}:${port} ${error}",
("host", host)("port", port)( "error", err.message() ) );
it->c->set_state(connection::connection_state::closed);
++(it->c->consecutive_immediate_connection_close);
}
} );

return "added connection";
Expand All @@ -4603,13 +4616,13 @@ namespace eosio {
}
}

void connections_manager::connect(const connection_ptr& c) {
std::lock_guard g( connections_mtx );
const auto& index = connections.get<by_connection>();
const auto& it = index.find(c);
if( it != index.end() ) {
it->c->connect( it->ips );
void connections_manager::reconnect(const connection_ptr& c) {
{
std::lock_guard g( connections_mtx );
auto& index = connections.get<by_connection>();
index.erase(c);
}
resolve_and_connect(c->peer_address(), c->listen_address, c);
}

// called by API
Expand Down

0 comments on commit 881f997

Please sign in to comment.