diff --git a/plugins/net_plugin/net_plugin.cpp b/plugins/net_plugin/net_plugin.cpp index 6efd4c1f18..1a90e73f54 100644 --- a/plugins/net_plugin/net_plugin.cpp +++ b/plugins/net_plugin/net_plugin.cpp @@ -262,6 +262,8 @@ namespace eosio { struct connection_detail { std::string host; connection_ptr c; + tcp::endpoint active_ip; + tcp::resolver::results_type ips; }; using connection_details_index = multi_index_container< @@ -313,8 +315,6 @@ namespace eosio { boost::asio::steady_timer::duration conn_period, uint32_t maximum_client_count); - std::chrono::milliseconds get_heartbeat_timeout() const { return heartbeat_timeout; } - uint32_t get_max_client_count() const { return max_client_count; } fc::microseconds get_connector_period() const; @@ -332,6 +332,8 @@ namespace eosio { void add(connection_ptr c); string connect(const string& host, const string& p2p_address); string resolve_and_connect(const string& host, const string& p2p_address); + void update_connection_endpoint(connection_ptr c, const tcp::endpoint& endpoint); + void connect(const connection_ptr& c); string disconnect(const string& host); void close_all(); @@ -924,7 +926,7 @@ namespace eosio { bool populate_handshake( handshake_message& hello ) const; - bool resolve_and_connect(); + bool reconnect(); void connect( const tcp::resolver::results_type& endpoints ); void start_read_message(); @@ -1098,21 +1100,16 @@ namespace eosio { }; - std::tuple split_host_port_type(const std::string& peer_add, bool incoming) { + std::tuple split_host_port_type(const std::string& peer_add) { // host:port:[|] if (peer_add.empty()) return {}; string::size_type p = peer_add[0] == '[' ? peer_add.find(']') : 0; - string::size_type colon = p != string::npos ? peer_add.find(':', p) : string::npos; - if (colon == std::string::npos || colon == 0) { - // if incoming then not an error this peer can do anything about - if (incoming) { - fc_dlog( logger, "Invalid peer address. must be \"host:port[:|]\": ${p}", ("p", peer_add) ); - } else { - fc_elog( logger, "Invalid peer address. must be \"host:port[:|]\": ${p}", ("p", peer_add) ); - } + if (p == string::npos) { + fc_wlog( logger, "Invalid peer address: ${peer}", ("peer", peer_add) ); return {}; } + string::size_type colon = peer_add.find(':', p); string::size_type colon2 = peer_add.find(':', colon + 1); string::size_type end = colon2 == string::npos ? string::npos : peer_add.find_first_of( " :+=.,<>!$%^&(*)|-#@\t", colon2 + 1 ); // future proof by including most symbols without using regex @@ -1189,8 +1186,8 @@ namespace eosio { last_handshake_sent(), p2p_address( endpoint ) { - set_connection_type( peer_address() ); my_impl->mark_bp_connection(this); + update_endpoints(); fc_ilog( logger, "created connection - ${c} to ${n}", ("c", connection_id)("n", endpoint) ); } @@ -1205,6 +1202,7 @@ namespace eosio { last_handshake_recv(), last_handshake_sent() { + update_endpoints(); fc_dlog( logger, "new connection - ${c} object created for peer ${address}:${port} from listener ${addr}", ("c", connection_id)("address", log_remote_endpoint_ip)("port", log_remote_endpoint_port)("addr", listen_address) ); } @@ -1237,7 +1235,7 @@ namespace eosio { // called from connection strand void connection::set_connection_type( const std::string& peer_add ) { - auto [host, port, type] = split_host_port_type(peer_add, false); + auto [host, port, type] = split_host_port_type(peer_add); if( type.empty() ) { fc_dlog( logger, "Setting connection - ${c} type for: ${peer} to both transactions and blocks", ("c", connection_id)("peer", peer_add) ); connection_type = both; @@ -1298,7 +1296,6 @@ namespace eosio { bool connection::start_session() { verify_strand_in_this_thread( strand, __func__, __LINE__ ); - update_endpoints(); boost::asio::ip::tcp::no_delay nodelay( true ); boost::system::error_code ec; socket->set_option( nodelay, ec ); @@ -2742,6 +2739,31 @@ namespace eosio { //------------------------------------------------------------------------ + bool connection::reconnect() { + switch ( no_retry ) { + case no_reason: + case wrong_version: + case benign_other: + case duplicate: // attempt reconnect in case connection has been dropped, should quickly disconnect if duplicate + break; + default: + fc_dlog( logger, "Skipping connect due to go_away reason ${r}",("r", reason_str( no_retry ))); + return false; + } + if( consecutive_immediate_connection_close > def_max_consecutive_immediate_connection_close || no_retry == benign_other ) { + fc::microseconds connector_period = my_impl->connections.get_connector_period(); + fc::lock_guard g( conn_mtx ); + if( last_close == fc::time_point() || last_close > fc::time_point::now() - connector_period ) { + return true; // true so doesn't remove from valid connections + } + } + connection_ptr c = shared_from_this(); + strand.post([c]() { + my_impl->connections.connect(c); + }); + return true; + } + // called from connection strand void connection::connect( const tcp::resolver::results_type& endpoints ) { set_state(connection_state::connecting); @@ -2751,6 +2773,7 @@ namespace eosio { boost::asio::bind_executor( strand, [c = shared_from_this(), socket=socket]( const boost::system::error_code& err, const tcp::endpoint& endpoint ) { if( !err && socket->is_open() && socket == c->socket ) { + my_impl->connections.update_connection_endpoint(c, endpoint); c->update_endpoints(endpoint); if( c->start_session() ) { c->send_handshake(); @@ -2800,7 +2823,7 @@ namespace eosio { fc_ilog(logger, "Accepted new connection: " + paddr_str); connections.any_of_supplied_peers([&listen_address, &paddr_str, &paddr_desc, &limit](const string& peer_addr) { - auto [host, port, type] = split_host_port_type(peer_addr, false); + auto [host, port, type] = split_host_port_type(peer_addr); if (host == paddr_str) { if (limit > 0) { fc_dlog(logger, "Connection inbound to ${la} from ${a} is a configured p2p-peer-address and will not be throttled", ("la", listen_address)("a", paddr_desc)); @@ -3296,9 +3319,9 @@ namespace eosio { } if( incoming() ) { - auto [host, port, type] = split_host_port_type(msg.p2p_address, true); + auto [host, port, type] = split_host_port_type(msg.p2p_address); if (host.size()) - set_connection_type( msg.p2p_address); + set_connection_type( msg.p2p_address ); peer_dlog( this, "checking for duplicate" ); auto is_duplicate = [&](const connection_ptr& check) { @@ -4470,7 +4493,7 @@ namespace eosio { //---------------------------------------------------------------------------- size_t connections_manager::number_connections() const { - std::shared_lock g(connections_mtx); + std::lock_guard g(connections_mtx); return connections.size(); } @@ -4499,9 +4522,8 @@ namespace eosio { update_p2p_connection_metrics = std::move(fun); } - // can be called from any thread void connections_manager::connect_supplied_peers(const string& p2p_address) { - std::shared_lock g(connections_mtx); + std::unique_lock g(connections_mtx); chain::flat_set peers = supplied_peers; g.unlock(); for (const auto& peer : peers) { @@ -4511,9 +4533,12 @@ namespace eosio { void connections_manager::add( connection_ptr c ) { std::lock_guard g( connections_mtx ); + boost::system::error_code ec; + auto endpoint = c->socket->remote_endpoint(ec); connections.insert( connection_detail{ .host = c->peer_address(), - .c = std::move(c)} ); + .c = std::move(c), + .active_ip = endpoint} ); } // called by API @@ -4525,72 +4550,62 @@ namespace eosio { } string connections_manager::resolve_and_connect( const string& peer_address, const string& listen_address ) { - auto [host, port, type] = split_host_port_type(peer_address, false); - if (host.empty()) { + string::size_type colon = peer_address.find(':'); + if (colon == std::string::npos || colon == 0) { + fc_elog( logger, "Invalid peer address. must be \"host:port[:|]\": ${p}", ("p", peer_address) ); return "invalid peer address"; } - { - std::shared_lock g( connections_mtx ); - if( find_connection_i( peer_address ) ) - return "already connected"; - } - - connection_ptr c = std::make_shared( peer_address, listen_address ); - if (c->resolve_and_connect()) { - add(std::move(c)); - - return "added connection"; - } + std::lock_guard g( connections_mtx ); + if( find_connection_i( peer_address ) ) + return "already connected"; + + auto [host, port, type] = split_host_port_type(peer_address); + + auto resolver = std::make_shared( my_impl->thread_pool.get_executor() ); + + resolver->async_resolve(host, port, + [resolver, host = host, port = port, peer_address = peer_address, listen_address = listen_address, this]( const boost::system::error_code& err, const tcp::resolver::results_type& results ) { + connection_ptr c = std::make_shared( peer_address, listen_address ); + c->set_heartbeat_timeout( heartbeat_timeout ); + std::lock_guard g( connections_mtx ); + auto [it, inserted] = connections.emplace( connection_detail{ + .host = peer_address, + .c = std::move(c), + .ips = results + }); + if( !err ) { + it->c->connect( results ); + } else { + fc_wlog( logger, "Unable to resolve ${host}:${port} ${error}", + ("host", host)("port", port)( "error", err.message() ) ); + it->c->set_state(connection::connection_state::closed); + ++(it->c->consecutive_immediate_connection_close); + } + } ); - return "connection failed"; + return "added connection"; } - // called from any thread - bool connection::resolve_and_connect() { - switch ( no_retry ) { - case no_reason: - case wrong_version: - case benign_other: - case duplicate: // attempt reconnect in case connection has been dropped, should quickly disconnect if duplicate - break; - default: - fc_dlog( logger, "Skipping connect due to go_away reason ${r}",("r", reason_str( no_retry ))); - return false; + void connections_manager::update_connection_endpoint(connection_ptr c, + const tcp::endpoint& endpoint) { + std::unique_lock g( connections_mtx ); + auto& index = connections.get(); + const auto& it = index.find(c); + if( it != index.end() ) { + index.modify(it, [endpoint](connection_detail& cd) { + cd.active_ip = endpoint; + }); } + } - auto [host, port, type] = split_host_port_type(peer_address(), false); - if (host.empty()) - return false; - - connection_ptr c = shared_from_this(); - - if( consecutive_immediate_connection_close > def_max_consecutive_immediate_connection_close || no_retry == benign_other ) { - fc::microseconds connector_period = my_impl->connections.get_connector_period(); - fc::lock_guard g( conn_mtx ); - if( last_close == fc::time_point() || last_close > fc::time_point::now() - connector_period ) { - return true; // true so doesn't remove from valid connections - } + void connections_manager::connect(const connection_ptr& c) { + std::lock_guard g( connections_mtx ); + const auto& index = connections.get(); + const auto& it = index.find(c); + if( it != index.end() ) { + it->c->connect( it->ips ); } - - strand.post([c, host, port]() { - auto resolver = std::make_shared( my_impl->thread_pool.get_executor() ); - resolver->async_resolve(host, port, - [resolver, c, host, port] - ( const boost::system::error_code& err, const tcp::resolver::results_type& results ) { - c->set_heartbeat_timeout( my_impl->connections.get_heartbeat_timeout() ); - if( !err ) { - c->connect( results ); - } else { - fc_wlog( logger, "Unable to resolve ${host}:${port} ${error}", - ("host", host)("port", port)( "error", err.message() ) ); - c->set_state(connection::connection_state::closed); - ++c->consecutive_immediate_connection_close; - } - } ); - } ); - - return true; } // called by API @@ -4619,11 +4634,8 @@ namespace eosio { } std::optional connections_manager::status( const string& host )const { - connection_ptr con; - { - std::shared_lock g( connections_mtx ); - con = find_connection_i( host ); - } + std::shared_lock g( connections_mtx ); + auto con = find_connection_i( host ); if( con ) { return con->get_status(); } @@ -4631,19 +4643,12 @@ namespace eosio { } vector connections_manager::connection_statuses()const { - vector conns; vector result; - { - std::shared_lock g( connections_mtx ); - auto& index = connections.get(); - result.reserve( index.size() ); - conns.reserve( index.size() ); - for( const connection_detail& cd : index ) { - conns.emplace_back( cd.c ); - } - } - for (const auto& c : conns) { - result.push_back( c->get_status() ); + std::shared_lock g( connections_mtx ); + auto& index = connections.get(); + result.reserve( index.size() ); + for( const connection_detail& cd : index ) { + result.emplace_back( cd.c->get_status() ); } return result; } @@ -4705,7 +4710,7 @@ namespace eosio { auto cleanup = [&num_peers, &num_rm, this](vector&& reconnecting, vector&& removing) { for( auto& c : reconnecting ) { - if (!c->resolve_and_connect()) { + if (!c->reconnect()) { --num_peers; ++num_rm; removing.push_back(c); @@ -4771,7 +4776,7 @@ namespace eosio { assert(update_p2p_connection_metrics); auto from = from_connection.lock(); std::shared_lock g(connections_mtx); - const auto& index = connections.get(); + auto& index = connections.get(); size_t num_clients = 0, num_peers = 0, num_bp_peers = 0; net_plugin::p2p_per_connection_metrics per_connection(index.size()); for (auto it = index.begin(); it != index.end(); ++it) {