Skip to content

Commit

Permalink
GH-529 Call sync_recv_block from on_accepted_block
Browse files Browse the repository at this point in the history
  • Loading branch information
heifner committed Sep 3, 2024
1 parent 9e32878 commit 4e3f9a0
Showing 1 changed file with 41 additions and 31 deletions.
72 changes: 41 additions & 31 deletions plugins/net_plugin/net_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ namespace eosio {
void sync_wait(const connection_ptr& c);
void sync_reassign_fetch( const connection_ptr& c );
void rejected_block( const connection_ptr& c, uint32_t blk_num, closing_mode mode );
void sync_recv_block( const connection_ptr& c, const block_id_type& blk_id, uint32_t blk_num, bool blk_applied,
void sync_recv_block( const connection_ptr& c, const block_id_type& blk_id, uint32_t blk_num,
const fc::microseconds& blk_latency );
void recv_handshake( const connection_ptr& c, const handshake_message& msg, uint32_t nblk_combined_latency );
void sync_recv_notice( const connection_ptr& c, const notice_message& msg );
Expand Down Expand Up @@ -2385,28 +2385,39 @@ namespace eosio {
}
}

// called from c's connection strand
// called from c's connection strand if c != nullptr,
// otherwise c == nullptr which implies blk_applied == false and called from dispatcher strand
void sync_manager::sync_recv_block(const connection_ptr& c, const block_id_type& blk_id, uint32_t blk_num,
bool blk_applied, const fc::microseconds& blk_latency) {
peer_dlog(c, "${d} block ${bn}:${id}.. latency ${l}ms",
("d", blk_applied ? "applied" : "got")("bn", blk_num)("id", blk_id.str().substr(8,16))
("l", blk_latency.count()/1000) );
const fc::microseconds& blk_latency) {
// no connection means called when block is applied
bool blk_applied = !c;

if (c) {
peer_dlog(c, "got block ${bn}:${id}.. latency ${l}ms",
("bn", blk_num)("id", blk_id.str().substr(8,16))("l", blk_latency.count()/1000) );
} else {
fc_dlog(logger, "applied block ${bn}:${id}.. latency ${l}ms",
("bn", blk_num)("id", blk_id.str().substr(8,16))("l", blk_latency.count()/1000) );
}
if( app().is_quiting() ) {
c->close( false, true );
if (c)
c->close( false, true );
return;
}
c->latest_blk_time = std::chrono::system_clock::now();
if (blk_applied)
c->block_status_monitor_.accepted();
if (blk_latency.count() < config::block_interval_us && c->peer_syncing_from_us) {
// a peer will not send us a recent block unless it is synced
c->peer_syncing_from_us = false;
if (c) {
c->latest_blk_time = std::chrono::system_clock::now();
if (blk_applied)
c->block_status_monitor_.accepted();
if (blk_latency.count() < config::block_interval_us && c->peer_syncing_from_us) {
// a peer will not send us a recent block unless it is synced
c->peer_syncing_from_us = false;
}
}
stages state = sync_state;
peer_dlog( c, "state ${s}", ("s", stage_str( state )) );
fc_dlog(logger, "sync_state ${s}", ("s", stage_str(state)));
if( state == head_catchup ) {
fc_dlog(logger, "sync_manager in head_catchup state");
fc::unique_lock g_sync( sync_mtx );
peer_dlog( c, "sync_manager in head_catchup state" );
sync_source.reset();
g_sync.unlock();

Expand All @@ -2419,7 +2430,7 @@ namespace eosio {
g_cp_conn.unlock();
if( fork_head_id == null_id ) {
// continue
} else if( fork_head_num < blk_num || fork_head_id == blk_id ) {
} else if( c && (fork_head_num < blk_num || fork_head_id == blk_id) ) {
fc::lock_guard g_conn( c->conn_mtx );
c->conn_fork_head = null_id;
c->conn_fork_head_num = 0;
Expand All @@ -2430,19 +2441,19 @@ namespace eosio {

if( set_state_to_head_catchup ) {
if( set_state( head_catchup ) ) {
peer_dlog( c, "Switching to head_catchup, sending handshakes" );
fc_dlog(logger, "Switching to head_catchup, sending handshakes");
send_handshakes();
}
} else {
set_state( in_sync );
peer_dlog( c, "Switching to in_sync, will send handshakes when caught up" );
fc_dlog(logger, "Switching to in_sync, will send handshakes when caught up");
send_handshakes_when_synced = true;
}
} else if( state == lib_catchup ) {
fc::unique_lock g_sync( sync_mtx );
if( blk_applied && blk_num >= sync_known_lib_num ) {
peer_dlog(c, "All caught up ${b} with last known lib ${l} resending handshake",
("b", blk_num)("l", sync_known_lib_num));
fc_dlog(logger, "All caught up ${b} with last known lib ${l} resending handshake",
("b", blk_num)("l", sync_known_lib_num));
set_state( head_catchup );
g_sync.unlock();
send_handshakes();
Expand Down Expand Up @@ -2994,7 +3005,7 @@ namespace eosio {
if( my_impl->dispatcher.have_block( blk_id ) ) {
peer_dlog( this, "already received block ${num}, id ${id}..., latency ${l}ms",
("num", blk_num)("id", blk_id.str().substr(8,16))("l", age.count()/1000) );
my_impl->sync_master->sync_recv_block( shared_from_this(), blk_id, blk_num, false, age );
my_impl->sync_master->sync_recv_block( shared_from_this(), blk_id, blk_num, age );

pending_message_buffer.advance_read_ptr( message_length );
return true;
Expand All @@ -3021,7 +3032,7 @@ namespace eosio {
} else {
block_sync_bytes_received += message_length;
uint32_t lib_num = my_impl->get_chain_lib_num();
my_impl->sync_master->sync_recv_block(shared_from_this(), blk_id, blk_num, false, age);
my_impl->sync_master->sync_recv_block(shared_from_this(), blk_id, blk_num, age);
if( blk_num <= lib_num ) {
peer_dlog( this, "received block ${n} less than lib ${lib} while syncing", ("n", blk_num)("lib", lib_num) );
pending_message_buffer.advance_read_ptr( message_length );
Expand Down Expand Up @@ -3674,7 +3685,7 @@ namespace eosio {
my_impl->dispatcher.add_peer_block( id, c->connection_id );
c->strand.post( [c, id, ptr{std::move(ptr)}]() {
const fc::microseconds age(fc::time_point::now() - ptr->timestamp);
my_impl->sync_master->sync_recv_block( c, id, block_header::num_from_id(id), false, age );
my_impl->sync_master->sync_recv_block( c, id, block_header::num_from_id(id), age );
});
return;
}
Expand Down Expand Up @@ -3732,14 +3743,8 @@ namespace eosio {
my_impl->dispatcher.add_peer_block( obh->id(), cid ); // no need to send back to sender
my_impl->dispatcher.bcast_block( obh->block(), obh->id() );

++c->unique_blocks_rcvd_count;
c->strand.post([sync_master = my_impl->sync_master.get(), c, id, block_num, timestamp=obh->timestamp()]() {
const fc::microseconds age(fc::time_point::now() - timestamp);
bool blk_applied = true; // not really applied, but accepted by controller into forkdb
sync_master->sync_recv_block(c, id, block_num, blk_applied, age);
});

if (best_head) {
++c->unique_blocks_rcvd_count;
fc_dlog(logger, "posting block ${n} to app thread", ("n", ptr->block_num()));
app().executor().post(priority::medium, exec_queue::read_write,
[ptr{std::move(ptr)}, bh{std::move(*obh)}, id, c{std::move(c)}]() mutable {
Expand All @@ -3766,7 +3771,7 @@ namespace eosio {
c->strand.post( [sync_master = my_impl->sync_master.get(),
&dispatcher = my_impl->dispatcher, c, blk_id, blk_num, latency = age]() {
dispatcher.add_peer_block( blk_id, c->connection_id );
sync_master->sync_recv_block( c, blk_id, blk_num, true, latency );
sync_master->sync_recv_block( connection_ptr{}, blk_id, blk_num, latency );
});
return;
}
Expand Down Expand Up @@ -3847,6 +3852,11 @@ namespace eosio {
void net_plugin_impl::on_accepted_block_header(const signed_block_ptr& block, const block_id_type& id) {
update_chain_info();

my_impl->dispatcher.strand.post([sync_master = my_impl->sync_master.get(), block, id]() {
const fc::microseconds age(fc::time_point::now() - block->timestamp);
sync_master->sync_recv_block(connection_ptr{}, id, block->block_num(), age);
});

dispatcher.strand.post([block, id]() {
fc_dlog(logger, "signaled accepted_block_header, blk num = ${num}, id = ${id}", ("num", block->block_num())("id", id));
my_impl->dispatcher.bcast_block(block, id);
Expand Down

0 comments on commit 4e3f9a0

Please sign in to comment.