Skip to content

Commit

Permalink
fix net_plugin_impl dtor order
Browse files Browse the repository at this point in the history
  • Loading branch information
spoonincode committed Mar 11, 2024
1 parent 62e95b3 commit b47a64c
Showing 1 changed file with 41 additions and 64 deletions.
105 changes: 41 additions & 64 deletions plugins/net_plugin/net_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -444,10 +444,13 @@ namespace eosio {
class net_plugin_impl : public std::enable_shared_from_this<net_plugin_impl>,
public auto_bp_peering::bp_connection_manager<net_plugin_impl, connection> {
public:
uint16_t thread_pool_size = 4;
eosio::chain::named_thread_pool<struct net> thread_pool;

std::atomic<uint32_t> current_connection_id{0};

unique_ptr< sync_manager > sync_master;
unique_ptr< dispatch_manager > dispatcher;
dispatch_manager dispatcher {thread_pool.get_executor()};
connections_manager connections;

/**
Expand Down Expand Up @@ -488,21 +491,18 @@ namespace eosio {

alignas(hardware_destructive_interference_size)
fc::mutex expire_timer_mtx;
unique_ptr<boost::asio::steady_timer> expire_timer GUARDED_BY(expire_timer_mtx);
boost::asio::steady_timer expire_timer GUARDED_BY(expire_timer_mtx) {thread_pool.get_executor()};

alignas(hardware_destructive_interference_size)
fc::mutex keepalive_timer_mtx;
unique_ptr<boost::asio::steady_timer> keepalive_timer GUARDED_BY(keepalive_timer_mtx);
boost::asio::steady_timer keepalive_timer GUARDED_BY(keepalive_timer_mtx) {thread_pool.get_executor()};

alignas(hardware_destructive_interference_size)
std::atomic<bool> in_shutdown{false};

alignas(hardware_destructive_interference_size)
compat::channels::transaction_ack::channel_type::handle incoming_transaction_ack_subscription;

uint16_t thread_pool_size = 4;
eosio::chain::named_thread_pool<struct net> thread_pool;

boost::asio::deadline_timer accept_error_timer{thread_pool.get_executor()};


Expand Down Expand Up @@ -1452,7 +1452,7 @@ namespace eosio {
conn_node_id = fc::sha256();
}
if( has_last_req && !shutdown ) {
my_impl->dispatcher->retry_fetch( shared_from_this() );
my_impl->dispatcher.retry_fetch( shared_from_this() );
}
peer_lib_num = 0;
peer_requested.reset();
Expand Down Expand Up @@ -1966,7 +1966,7 @@ namespace eosio {
// called from connection strand
void connection::fetch_timeout( boost::system::error_code ec ) {
if( !ec ) {
my_impl->dispatcher->retry_fetch( shared_from_this() );
my_impl->dispatcher.retry_fetch( shared_from_this() );
} else if( ec != boost::asio::error::operation_aborted ) { // don't log on operation_aborted, called on destroy
peer_elog( this, "setting timer for fetch request got error ${ec}", ("ec", ec.message() ) );
}
Expand Down Expand Up @@ -2405,7 +2405,7 @@ namespace eosio {
const block_id_type& id = msg.known_blocks.ids.back();
peer_ilog( c, "notice_message, pending ${p}, blk_num ${n}, id ${id}...",
("p", msg.known_blocks.pending)("n", block_header::num_from_id(id))("id",id.str().substr(8,16)) );
if( !my_impl->dispatcher->have_block( id ) ) {
if( !my_impl->dispatcher.have_block( id ) ) {
verify_catchup( c, msg.known_blocks.pending, id );
} else {
// we already have the block, so update peer with our view of the world
Expand Down Expand Up @@ -3062,7 +3062,7 @@ namespace eosio {
const block_id_type blk_id = bh.calculate_id();
const uint32_t blk_num = last_received_block_num = block_header::num_from_id(blk_id);
// don't add_peer_block because we have not validated this block header yet
if( my_impl->dispatcher->have_block( blk_id ) ) {
if( my_impl->dispatcher.have_block( blk_id ) ) {
peer_dlog( this, "canceling wait, already received block ${num}, id ${id}...",
("num", blk_num)("id", blk_id.str().substr(8,16)) );
my_impl->sync_master->sync_recv_block( shared_from_this(), blk_id, blk_num, false );
Expand Down Expand Up @@ -3163,8 +3163,8 @@ namespace eosio {
}
return true;
}
bool have_trx = my_impl->dispatcher->have_txn( ptr->id() );
my_impl->dispatcher->add_peer_txn( ptr->id(), ptr->expiration(), connection_id );
bool have_trx = my_impl->dispatcher.have_txn( ptr->id() );
my_impl->dispatcher.add_peer_txn( ptr->id(), ptr->expiration(), connection_id );

if( have_trx ) {
peer_dlog( this, "got a duplicate transaction - dropping" );
Expand All @@ -3178,18 +3178,6 @@ namespace eosio {
void net_plugin_impl::plugin_shutdown() {
in_shutdown = true;

connections.stop_conn_timers();
{
fc::lock_guard g( expire_timer_mtx );
if( expire_timer )
expire_timer->cancel();
}
{
fc::lock_guard g( keepalive_timer_mtx );
if( keepalive_timer )
keepalive_timer->cancel();
}

connections.close_all();
thread_pool.stop();
}
Expand Down Expand Up @@ -3569,7 +3557,7 @@ namespace eosio {
case catch_up:
break;
case normal: {
my_impl->dispatcher->recv_notice( shared_from_this(), msg, false );
my_impl->dispatcher.recv_notice( shared_from_this(), msg, false );
}
}

Expand All @@ -3593,7 +3581,7 @@ namespace eosio {
break;
}
case normal : {
my_impl->dispatcher->recv_notice( shared_from_this(), msg, false );
my_impl->dispatcher.recv_notice( shared_from_this(), msg, false );
break;
}
default: {
Expand Down Expand Up @@ -3699,12 +3687,12 @@ namespace eosio {
// called from connection strand
void connection::handle_message( const block_id_type& id, signed_block_ptr ptr ) {
// post to dispatcher strand so that we don't have multiple threads validating the block header
my_impl->dispatcher->strand.post([id, c{shared_from_this()}, ptr{std::move(ptr)}, cid=connection_id]() mutable {
my_impl->dispatcher.strand.post([id, c{shared_from_this()}, ptr{std::move(ptr)}, cid=connection_id]() mutable {
controller& cc = my_impl->chain_plug->chain();

// may have come in on a different connection and posted into dispatcher strand before this one
if( my_impl->dispatcher->have_block( id ) || cc.fetch_block_state_by_id( id ) ) { // thread-safe
my_impl->dispatcher->add_peer_block( id, c->connection_id );
if( my_impl->dispatcher.have_block( id ) || cc.fetch_block_state_by_id( id ) ) { // thread-safe
my_impl->dispatcher.add_peer_block( id, c->connection_id );
c->strand.post( [c, id]() {
my_impl->sync_master->sync_recv_block( c, id, block_header::num_from_id(id), false );
});
Expand All @@ -3728,7 +3716,7 @@ namespace eosio {
if( exception ) {
c->strand.post( [c, id, blk_num=ptr->block_num()]() {
my_impl->sync_master->rejected_block( c, blk_num );
my_impl->dispatcher->rejected_block( id );
my_impl->dispatcher.rejected_block( id );
});
return;
}
Expand All @@ -3739,8 +3727,8 @@ namespace eosio {
if( block_num != 0 ) {
fc_dlog( logger, "validated block header, broadcasting immediately, connection ${cid}, blk num = ${num}, id = ${id}",
("cid", cid)("num", block_num)("id", bsp->id) );
my_impl->dispatcher->add_peer_block( bsp->id, cid ); // no need to send back to sender
my_impl->dispatcher->bcast_block( bsp->block, bsp->id );
my_impl->dispatcher.add_peer_block( bsp->id, cid ); // no need to send back to sender
my_impl->dispatcher.bcast_block( bsp->block, bsp->id );
}

app().executor().post(priority::medium, exec_queue::read_write, [ptr{std::move(ptr)}, bsp{std::move(bsp)}, id, c{std::move(c)}]() mutable {
Expand All @@ -3765,8 +3753,8 @@ namespace eosio {
try {
if( blk_num <= lib || cc.fetch_block_by_id(blk_id) ) {
c->strand.post( [sync_master = my_impl->sync_master.get(),
dispatcher = my_impl->dispatcher.get(), c, blk_id, blk_num]() {
dispatcher->add_peer_block( blk_id, c->connection_id );
&dispatcher = my_impl->dispatcher, c, blk_id, blk_num]() {
dispatcher.add_peer_block( blk_id, c->connection_id );
sync_master->sync_recv_block( c, blk_id, blk_num, true );
});
return;
Expand Down Expand Up @@ -3811,12 +3799,12 @@ namespace eosio {

if( accepted ) {
++unique_blocks_rcvd_count;
boost::asio::post( my_impl->thread_pool.get_executor(), [dispatcher = my_impl->dispatcher.get(), c, blk_id, blk_num]() {
boost::asio::post( my_impl->thread_pool.get_executor(), [&dispatcher = my_impl->dispatcher, c, blk_id, blk_num]() {
fc_dlog( logger, "accepted signed_block : #${n} ${id}...", ("n", blk_num)("id", blk_id.str().substr(8,16)) );
dispatcher->add_peer_block( blk_id, c->connection_id );
dispatcher.add_peer_block( blk_id, c->connection_id );

while (true) { // attempt previously unlinkable blocks where prev_unlinkable->block->previous == blk_id
unlinkable_block_state prev_unlinkable = dispatcher->pop_possible_linkable_block(blk_id);
unlinkable_block_state prev_unlinkable = dispatcher.pop_possible_linkable_block(blk_id);
if (!prev_unlinkable.block)
break;
fc_dlog( logger, "retrying previous unlinkable block #${n} ${id}...",
Expand All @@ -3827,21 +3815,21 @@ namespace eosio {
});
}
});
c->strand.post( [sync_master = my_impl->sync_master.get(), dispatcher = my_impl->dispatcher.get(), c, blk_id, blk_num]() {
dispatcher->recv_block( c, blk_id, blk_num );
c->strand.post( [sync_master = my_impl->sync_master.get(), &dispatcher = my_impl->dispatcher, c, blk_id, blk_num]() {
dispatcher.recv_block( c, blk_id, blk_num );
sync_master->sync_recv_block( c, blk_id, blk_num, true );
});
} else {
c->strand.post( [sync_master = my_impl->sync_master.get(), dispatcher = my_impl->dispatcher.get(), c,
c->strand.post( [sync_master = my_impl->sync_master.get(), &dispatcher = my_impl->dispatcher, c,
block{std::move(block)}, blk_id, blk_num, reason]() mutable {
if( reason == unlinkable || reason == no_reason ) {
dispatcher->add_unlinkable_block( std::move(block), blk_id );
dispatcher.add_unlinkable_block( std::move(block), blk_id );
}
// reason==no_reason means accept_block() return false because we are producing, don't call rejected_block which sends handshake
if( reason != no_reason ) {
sync_master->rejected_block( c, blk_num );
}
dispatcher->rejected_block( blk_id );
dispatcher.rejected_block( blk_id );
});
}
}
Expand All @@ -3850,8 +3838,8 @@ namespace eosio {
void net_plugin_impl::start_expire_timer() {
if( in_shutdown ) return;
fc::lock_guard g( expire_timer_mtx );
expire_timer->expires_from_now( txn_exp_period);
expire_timer->async_wait( [my = shared_from_this()]( boost::system::error_code ec ) {
expire_timer.expires_from_now( txn_exp_period);
expire_timer.async_wait( [my = shared_from_this()]( boost::system::error_code ec ) {
if( !ec ) {
my->expire();
} else {
Expand All @@ -3866,8 +3854,8 @@ namespace eosio {
void net_plugin_impl::ticker() {
if( in_shutdown ) return;
fc::lock_guard g( keepalive_timer_mtx );
keepalive_timer->expires_from_now(keepalive_interval);
keepalive_timer->async_wait( [my = shared_from_this()]( boost::system::error_code ec ) {
keepalive_timer.expires_from_now(keepalive_interval);
keepalive_timer.async_wait( [my = shared_from_this()]( boost::system::error_code ec ) {
my->ticker();
if( ec ) {
if( my->in_shutdown ) return;
Expand All @@ -3886,19 +3874,15 @@ namespace eosio {
}

void net_plugin_impl::start_monitors() {
{
fc::lock_guard g( expire_timer_mtx );
expire_timer = std::make_unique<boost::asio::steady_timer>( my_impl->thread_pool.get_executor() );
}
connections.start_conn_timers();
start_expire_timer();
}

void net_plugin_impl::expire() {
auto now = time_point::now();
uint32_t lib_num = get_chain_lib_num();
dispatcher->expire_blocks( lib_num );
dispatcher->expire_txns();
dispatcher.expire_blocks( lib_num );
dispatcher.expire_txns();
fc_dlog( logger, "expire_txns ${n}us", ("n", time_point::now() - now) );

start_expire_timer();
Expand All @@ -3908,9 +3892,9 @@ namespace eosio {
void net_plugin_impl::on_accepted_block_header(const signed_block_ptr& block, const block_id_type& id) {
update_chain_info();

dispatcher->strand.post([block, id]() {
dispatcher.strand.post([block, id]() {
fc_dlog(logger, "signaled accepted_block_header, blk num = ${num}, id = ${id}", ("num", block->block_num())("id", id));
my_impl->dispatcher->bcast_block(block, id);
my_impl->dispatcher.bcast_block(block, id);
});
}

Expand All @@ -3927,14 +3911,14 @@ namespace eosio {

// called from application thread
void net_plugin_impl::transaction_ack(const std::pair<fc::exception_ptr, packed_transaction_ptr>& results) {
boost::asio::post( my_impl->thread_pool.get_executor(), [dispatcher = my_impl->dispatcher.get(), results]() {
boost::asio::post( my_impl->thread_pool.get_executor(), [&dispatcher = my_impl->dispatcher, results]() {
const auto& id = results.second->id();
if (results.first) {
fc_dlog( logger, "signaled NACK, trx-id = ${id} : ${why}", ("id", id)( "why", results.first->to_detail_string() ) );
dispatcher->rejected_transaction(results.second);
dispatcher.rejected_transaction(results.second);
} else {
fc_dlog( logger, "signaled ACK, trx-id = ${id}", ("id", id) );
dispatcher->bcast_transaction(results.second);
dispatcher.bcast_transaction(results.second);
}
});
}
Expand Down Expand Up @@ -4272,8 +4256,6 @@ namespace eosio {
app().quit();
} );

dispatcher = std::make_unique<dispatch_manager>( my_impl->thread_pool.get_executor() );

if( !p2p_accept_transactions && p2p_addresses.size() ) {
fc_ilog( logger, "\n"
"***********************************\n"
Expand Down Expand Up @@ -4321,11 +4303,6 @@ namespace eosio {
} );
}

{
fc::lock_guard g( keepalive_timer_mtx );
keepalive_timer = std::make_unique<boost::asio::steady_timer>( thread_pool.get_executor() );
}

incoming_transaction_ack_subscription = app().get_channel<compat::channels::transaction_ack>().subscribe(
[this](auto&& t) { transaction_ack(std::forward<decltype(t)>(t)); });

Expand Down

0 comments on commit b47a64c

Please sign in to comment.