Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[1.0.1 -> main] P2P: syncing fix #705

Merged
merged 7 commits into from
Sep 6, 2024
36 changes: 17 additions & 19 deletions plugins/net_plugin/net_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1000,9 +1000,9 @@ namespace eosio {
// std::chrono::nanoseconds (not used) dst{0}; //!< destination timestamp, Time at the client when the reply arrived from the server.
/** @} */
// timestamp for the lastest message
std::chrono::system_clock::time_point latest_msg_time{std::chrono::system_clock::time_point::min()};
std::chrono::steady_clock::time_point latest_msg_time{std::chrono::steady_clock::time_point::min()};
std::chrono::milliseconds hb_timeout{std::chrono::milliseconds{def_keepalive_interval}};
std::chrono::system_clock::time_point latest_blk_time{std::chrono::system_clock::time_point::min()};
std::chrono::steady_clock::time_point latest_blk_time{std::chrono::steady_clock::time_point::min()};

bool connected() const;
bool closed() const; // socket is not open or is closed or closing, thread safe
Expand Down Expand Up @@ -1044,7 +1044,7 @@ namespace eosio {
*/
/** \brief Check heartbeat time and send Time_message
*/
void check_heartbeat( std::chrono::system_clock::time_point current_time );
void check_heartbeat( std::chrono::steady_clock::time_point current_time );
/** \brief Populate and queue time_message
*/
void send_time();
Expand Down Expand Up @@ -1486,8 +1486,8 @@ namespace eosio {
cancel_sync_wait();
sync_last_requested_block = 0;
org = std::chrono::nanoseconds{0};
latest_msg_time = std::chrono::system_clock::time_point::min();
latest_blk_time = std::chrono::system_clock::time_point::min();
latest_msg_time = std::chrono::steady_clock::time_point::min();
latest_blk_time = std::chrono::steady_clock::time_point::min();
set_state(connection_state::closed);
block_sync_send_start = 0ns;
block_sync_frame_bytes_sent = 0;
Expand Down Expand Up @@ -1607,8 +1607,8 @@ namespace eosio {
}

// called from connection strand
void connection::check_heartbeat( std::chrono::system_clock::time_point current_time ) {
if( latest_msg_time > std::chrono::system_clock::time_point::min() ) {
void connection::check_heartbeat( std::chrono::steady_clock::time_point current_time ) {
if( latest_msg_time > std::chrono::steady_clock::time_point::min() ) {
if( current_time > latest_msg_time + hb_timeout ) {
no_retry = benign_other;
if( !peer_address().empty() ) {
Expand All @@ -1622,7 +1622,7 @@ namespace eosio {
}
if (!my_impl->sync_master->syncing_from_peer()) {
const std::chrono::milliseconds timeout = std::max(hb_timeout/2, 2*std::chrono::milliseconds(config::block_interval_ms));
if (current_time > latest_blk_time + timeout) {
if (std::chrono::steady_clock::now() > latest_blk_time + timeout) {
peer_wlog(this, "half heartbeat timed out, sending handshake");
send_handshake();
return;
Expand Down Expand Up @@ -1910,7 +1910,7 @@ namespace eosio {

block_buffer_factory buff_factory;
const auto& sb = buff_factory.get_send_buffer( b );
latest_blk_time = std::chrono::system_clock::now();
latest_blk_time = std::chrono::steady_clock::now();
enqueue_buffer( sb, no_reason, to_sync_queue);
return sb->size();
}
Expand Down Expand Up @@ -2211,14 +2211,13 @@ namespace eosio {
set_state( lib_catchup );
sync_last_requested_num = 0;
sync_next_expected_num = chain_info.lib_num + 1;
} else if (is_sync_request_ahead_allowed(sync_next_expected_num)) {
// break
request_next_chunk( c );
} else if (sync_last_requested_num > 0 && is_sync_request_ahead_allowed(sync_next_expected_num)) {
request_next_chunk();
} else {
peer_dlog(c, "already syncing, start sync ignored");
return;
}

request_next_chunk( c );
}

// thread safe
Expand All @@ -2229,7 +2228,6 @@ namespace eosio {
// called from connection strand
void sync_manager::sync_wait(const connection_ptr& c) {
++sync_timers_active;
sync_active_time = std::chrono::steady_clock::now(); // reset when we receive a block
peer_dlog(c, "sync wait, active_timers ${t}", ("t", sync_timers_active.load()));
}

Expand Down Expand Up @@ -2484,7 +2482,7 @@ namespace eosio {
c->close( false, true );
return;
}
c->latest_blk_time = std::chrono::system_clock::now();
c->latest_blk_time = sync_active_time = std::chrono::steady_clock::now(); // reset when we receive a block
if (blk_applied)
c->block_status_monitor_.accepted();
if (blk_latency.count() < config::block_interval_us && c->peer_syncing_from_us) {
Expand Down Expand Up @@ -2704,7 +2702,7 @@ namespace eosio {
send_buffer_type sb = buff_factory.get_send_buffer( b );

cp->strand.post( [cp, bnum, sb{std::move(sb)}]() {
cp->latest_blk_time = std::chrono::system_clock::now();
cp->latest_blk_time = std::chrono::steady_clock::now();
bool has_block = cp->peer_lib_num >= bnum;
if( !has_block ) {
peer_dlog( cp, "bcast block ${b}", ("b", bnum) );
Expand Down Expand Up @@ -3045,15 +3043,15 @@ namespace eosio {
bytes_received += message_length;
last_bytes_received = get_time();
try {
latest_msg_time = std::chrono::system_clock::now();
auto now = latest_msg_time = std::chrono::steady_clock::now();

// if next message is a block we already have, exit early
auto peek_ds = pending_message_buffer.create_peek_datastream();
unsigned_int which{};
fc::raw::unpack( peek_ds, which );

if( which == signed_block_which ) {
latest_blk_time = std::chrono::system_clock::now();
latest_blk_time = now;
return process_next_block_message( message_length );
} else if( which == packed_transaction_which ) {
return process_next_trx_message( message_length );
Expand Down Expand Up @@ -3947,7 +3945,7 @@ namespace eosio {
fc_wlog( logger, "Peer keepalive ticked sooner than expected: ${m}", ("m", ec.message()) );
}

auto current_time = std::chrono::system_clock::now();
auto current_time = std::chrono::steady_clock::now();
my->connections.for_each_connection( [current_time]( const connection_ptr& c ) {
if( c->socket_is_open() ) {
c->strand.post([c, current_time]() {
Expand Down