From 9e32878c4eebc1e3ebe7ff10185e1674eaf0de19 Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Mon, 2 Sep 2024 14:31:31 -0500 Subject: [PATCH] GH-529 Make net_plugin call producer_plugin directly for on_incoming_block instead of through chain_plugin. Remove unused block_sync in chain_plugin. Remove unneeded code from net_plugin and producer_plugin now that blocks are not processed one at a time. --- plugins/chain_plugin/chain_plugin.cpp | 6 -- .../eosio/chain_plugin/chain_plugin.hpp | 1 - plugins/net_plugin/net_plugin.cpp | 62 ++++++------------- .../eosio/producer_plugin/producer_plugin.hpp | 2 + plugins/producer_plugin/producer_plugin.cpp | 44 +++++-------- 5 files changed, 35 insertions(+), 80 deletions(-) diff --git a/plugins/chain_plugin/chain_plugin.cpp b/plugins/chain_plugin/chain_plugin.cpp index 83e67f7ac7..ad10a5d76f 100644 --- a/plugins/chain_plugin/chain_plugin.cpp +++ b/plugins/chain_plugin/chain_plugin.cpp @@ -151,7 +151,6 @@ class chain_plugin_impl { ,accepted_block_channel(app().get_channel()) ,irreversible_block_channel(app().get_channel()) ,applied_transaction_channel(app().get_channel()) - ,incoming_block_sync_method(app().get_method()) ,incoming_transaction_async_method(app().get_method()) {} @@ -180,7 +179,6 @@ class chain_plugin_impl { channels::applied_transaction::channel_type& applied_transaction_channel; // retained references to methods for easy calling - incoming::methods::block_sync::method_type& incoming_block_sync_method; incoming::methods::transaction_async::method_type& incoming_transaction_async_method; // method provider handles @@ -1208,10 +1206,6 @@ chain_apis::read_only chain_plugin::get_read_only_api(const fc::microseconds& ht } -bool chain_plugin::accept_block(const signed_block_ptr& block, const block_id_type& id, const block_handle& bh ) { - return my->incoming_block_sync_method(block, id, bh); -} - void chain_plugin::accept_transaction(const chain::packed_transaction_ptr& trx, next_function next) { my->incoming_transaction_async_method(trx, false, transaction_metadata::trx_type::input, false, std::move(next)); } diff --git a/plugins/chain_plugin/include/eosio/chain_plugin/chain_plugin.hpp b/plugins/chain_plugin/include/eosio/chain_plugin/chain_plugin.hpp index 6c97ded9a4..a241ca174f 100644 --- a/plugins/chain_plugin/include/eosio/chain_plugin/chain_plugin.hpp +++ b/plugins/chain_plugin/include/eosio/chain_plugin/chain_plugin.hpp @@ -998,7 +998,6 @@ class chain_plugin : public plugin { chain_apis::read_write get_read_write_api(const fc::microseconds& http_max_response_time); chain_apis::read_only get_read_only_api(const fc::microseconds& http_max_response_time) const; - bool accept_block( const chain::signed_block_ptr& block, const chain::block_id_type& id, const chain::block_handle& bh ); void accept_transaction(const chain::packed_transaction_ptr& trx, chain::plugin_interface::next_function next); // Only call this after plugin_initialize()! diff --git a/plugins/net_plugin/net_plugin.cpp b/plugins/net_plugin/net_plugin.cpp index 746e20a019..0fab39445e 100644 --- a/plugins/net_plugin/net_plugin.cpp +++ b/plugins/net_plugin/net_plugin.cpp @@ -3691,6 +3691,8 @@ namespace eosio { bool best_head = false; sync_manager::closing_mode close_mode = sync_manager::closing_mode::handshake; try { + EOS_ASSERT(ptr->timestamp < (fc::time_point::now() + fc::seconds(7)), block_from_the_future, + "received a block from the future, ignoring it: ${id}", ("id", id)); // this will return empty optional if block is not linkable std::tie(best_head, obh) = cc.accept_block( id, ptr ); } catch( const invalid_qc_claim& ex) { @@ -3708,7 +3710,7 @@ namespace eosio { ("cid", cid)("n", ptr->block_num())("id", id.str().substr(8,16))); } if( exception || !obh) { - if (!obh) { + if (!exception && !obh) { if (prev_is_proper_svnn_block || !ptr->is_proper_svnn_block()) { fc_dlog(logger, "unlinkable_block ${bn} : ${id}, previous ${pn} : ${pid}", ("bn", ptr->block_num())("id", id)("pn", block_header::num_from_id(ptr->previous))("pid", ptr->previous)); @@ -3730,6 +3732,13 @@ namespace eosio { my_impl->dispatcher.add_peer_block( obh->id(), cid ); // no need to send back to sender my_impl->dispatcher.bcast_block( obh->block(), obh->id() ); + ++c->unique_blocks_rcvd_count; + c->strand.post([sync_master = my_impl->sync_master.get(), c, id, block_num, timestamp=obh->timestamp()]() { + const fc::microseconds age(fc::time_point::now() - timestamp); + bool blk_applied = true; // not really applied, but accepted by controller into forkdb + sync_master->sync_recv_block(c, id, block_num, blk_applied, age); + }); + if (best_head) { fc_dlog(logger, "posting block ${n} to app thread", ("n", ptr->block_num())); app().executor().post(priority::medium, exec_queue::read_write, @@ -3739,12 +3748,6 @@ namespace eosio { // ready to process immediately, so signal producer to interrupt start_block my_impl->producer_plug->received_block(block_num); - } else { - c->strand.post([sync_master = my_impl->sync_master.get(), c, id, block_num, timestamp=obh->timestamp()]() { - const fc::microseconds age(fc::time_point::now() - timestamp); - bool blk_applied = true; // not really applied, but accepted by controller into forkdb - sync_master->sync_recv_block(c, id, block_num, blk_applied, age); - }); } }); } @@ -3769,51 +3772,20 @@ namespace eosio { } } catch( const assert_exception& ex ) { // possible corrupted block log - fc_elog( logger, "caught assert on fetch_block_by_id, ${ex}, id ${id}, conn ${c}", ("ex", ex.to_string())("id", blk_id)("c", connection_id) ); + fc_elog(logger, "caught assert on validated_block_exists, ${ex}, id ${id}, conn ${c}", + ("ex", ex.to_string())("id", blk_id)("c", connection_id)); } catch( ... ) { - fc_elog( logger, "caught an unknown exception trying to fetch block ${id}, conn ${c}", ("id", blk_id)("c", connection_id) ); + fc_elog(logger, "caught an unknown exception trying to fetch block ${id}, conn ${c}", + ("id", blk_id)("c", connection_id)); } fc_dlog( logger, "received signed_block: #${n} block age in secs = ${age}, connection - ${cid}, header validated, lib #${lib}", ("n", blk_num)("age", age.to_seconds())("cid", c->connection_id)("lib", lib) ); - bool accepted = false; try { - accepted = my_impl->chain_plug->accept_block(block, blk_id, bh); - my_impl->update_chain_info(); - } catch( const unlinkable_block_exception &ex) { - fc_ilog(logger, "unlinkable_block_exception connection - ${cid}: #${n} ${id}...: ${m}", - ("cid", c->connection_id)("n", blk_num)("id", blk_id.str().substr(8,16))("m",ex.to_string())); - } catch( const block_validate_exception &ex ) { - fc_ilog(logger, "block_validate_exception connection - ${cid}: #${n} ${id}...: ${m}", - ("cid", c->connection_id)("n", blk_num)("id", blk_id.str().substr(8,16))("m",ex.to_string())); - } catch( const assert_exception &ex ) { - fc_wlog(logger, "block assert_exception connection - ${cid}: #${n} ${id}...: ${m}", - ("cid", c->connection_id)("n", blk_num)("id", blk_id.str().substr(8,16))("m",ex.to_string())); - } catch( const fc::exception &ex ) { - fc_ilog(logger, "bad block exception connection - ${cid}: #${n} ${id}...: ${m}", - ("cid", c->connection_id)("n", blk_num)("id", blk_id.str().substr(8,16))("m",ex.to_string())); + my_impl->producer_plug->on_incoming_block(); } catch( ... ) { - fc_wlog(logger, "bad block connection - ${cid}: #${n} ${id}...: unknown exception", - ("cid", c->connection_id)("n", blk_num)("id", blk_id.str().substr(8,16))); - } - - if( accepted ) { - ++unique_blocks_rcvd_count; - boost::asio::post( my_impl->thread_pool.get_executor(), [&dispatcher = my_impl->dispatcher, c, blk_id, blk_num]() { - fc_dlog( logger, "accepted signed_block : #${n} ${id}...", ("n", blk_num)("id", blk_id.str().substr(8,16)) ); - dispatcher.add_peer_block( blk_id, c->connection_id ); - }); - c->strand.post( [sync_master = my_impl->sync_master.get(), - c, blk_id, blk_num, latency = age]() { - sync_master->sync_recv_block( c, blk_id, blk_num, true, latency ); - }); - } else { - c->strand.post( [sync_master = my_impl->sync_master.get(), &dispatcher = my_impl->dispatcher, c, - block{std::move(block)}, blk_id, blk_num]() mutable { - sync_master->rejected_block( c, blk_num, sync_manager::closing_mode::handshake ); - dispatcher.rejected_block( blk_id ); - }); + // errors on applied blocks logged in controller } } @@ -3882,6 +3854,8 @@ namespace eosio { } void net_plugin_impl::on_accepted_block( const signed_block_ptr& block, const block_id_type&) { + update_chain_info(); + sync_master->send_handshakes_if_synced(fc::time_point::now() - block->timestamp); if (const auto* pending_producers = chain_plug->chain().pending_producers()) { on_pending_schedule(*pending_producers); diff --git a/plugins/producer_plugin/include/eosio/producer_plugin/producer_plugin.hpp b/plugins/producer_plugin/include/eosio/producer_plugin/producer_plugin.hpp index 495d1b91a6..ce3067de9b 100644 --- a/plugins/producer_plugin/include/eosio/producer_plugin/producer_plugin.hpp +++ b/plugins/producer_plugin/include/eosio/producer_plugin/producer_plugin.hpp @@ -81,6 +81,8 @@ class producer_plugin : public appbase::plugin { virtual void plugin_shutdown(); void handle_sighup() override; + bool on_incoming_block(); + void pause(); void resume(); bool paused() const; diff --git a/plugins/producer_plugin/producer_plugin.cpp b/plugins/producer_plugin/producer_plugin.cpp index 5a876ae4a3..bb283c3473 100644 --- a/plugins/producer_plugin/producer_plugin.cpp +++ b/plugins/producer_plugin/producer_plugin.cpp @@ -803,47 +803,29 @@ class producer_plugin_impl : public std::enable_shared_from_thisblock_num(); - - if (now - block->timestamp < fc::minutes(5) || (blk_num % 1000 == 0)) // only log every 1000 during sync - fc_dlog(_log, "received incoming block ${n} ${id}", ("n", blk_num)("id", id)); - auto& chain = chain_plug->chain(); - // de-dupe here... no point in aborting block if we already know the block - if (chain.validated_block_exists(id)) { - fc_dlog(_log, "already have validated block ${n} ${id}", ("n", blk_num)("id", id)); - _time_tracker.add_other_time(); - return true; // return true because the block was already accepted - } - - EOS_ASSERT(block->timestamp < (now + fc::seconds(7)), block_from_the_future, "received a block from the future, ignoring it: ${id}", ("id", id)); - // start processing of block if (in_producing_mode()) { - fc_ilog(_log, "producing, incoming block #${num} id: ${id}", ("num", blk_num)("id", id)); + if (_log.is_enabled(fc::log_level::info)) { + auto fhead = chain.fork_db_head(); + fc_ilog(_log, "producing, fork database head at: #${num} id: ${id}", + ("num", fhead.block_num())("id", fhead.id())); + } _time_tracker.add_other_time(); return true; // return true because block was accepted } - // start a new speculative block + // start a new speculative block, adds to time tracker which includes this method's time auto ensure = fc::make_scoped_exit([this]() { schedule_production_loop(); }); // abort the pending block abort_block(); - // push the new block - auto handle_error = [&](const auto& e) { - fc_ilog(_log, "Exception on block ${bn}: ${e}", ("bn", blk_num)("e", e.to_detail_string())); - app().get_channel().publish(priority::medium, block); - throw; - }; - try { chain.apply_blocks( [this](const transaction_metadata_ptr& trx) { _unapplied_transactions.add_forked(trx); }, @@ -860,12 +842,12 @@ class producer_plugin_impl : public std::enable_shared_from_this= now) { _production_enabled = true; } @@ -1413,7 +1395,7 @@ void producer_plugin_impl::plugin_initialize(const boost::program_options::varia _incoming_block_sync_provider = app().get_method().register_provider( [this](const signed_block_ptr& block, const block_id_type& block_id, const block_handle& bh) { - return on_incoming_block(block, block_id, bh); + return on_incoming_block(); }); _incoming_transaction_async_provider = @@ -1587,6 +1569,10 @@ void producer_plugin::handle_sighup() { fc::logger::update(transient_trx_failed_trace_logger_name, _transient_trx_failed_trace_log); } +bool producer_plugin::on_incoming_block() { + return my->on_incoming_block(); +} + void producer_plugin::pause() { fc_ilog(_log, "Producer paused."); my->_pause_production = true;