Skip to content

Commit

Permalink
GH-529 Make net_plugin call producer_plugin directly for on_incoming_…
Browse files Browse the repository at this point in the history
…block instead of through chain_plugin. Remove unused block_sync in chain_plugin. Remove unneeded code from net_plugin and producer_plugin now that blocks are not processed one at a time.
  • Loading branch information
heifner committed Sep 2, 2024
1 parent a30d059 commit 9e32878
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 80 deletions.
6 changes: 0 additions & 6 deletions plugins/chain_plugin/chain_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,6 @@ class chain_plugin_impl {
,accepted_block_channel(app().get_channel<channels::accepted_block>())
,irreversible_block_channel(app().get_channel<channels::irreversible_block>())
,applied_transaction_channel(app().get_channel<channels::applied_transaction>())
,incoming_block_sync_method(app().get_method<incoming::methods::block_sync>())
,incoming_transaction_async_method(app().get_method<incoming::methods::transaction_async>())
{}

Expand Down Expand Up @@ -180,7 +179,6 @@ class chain_plugin_impl {
channels::applied_transaction::channel_type& applied_transaction_channel;

// retained references to methods for easy calling
incoming::methods::block_sync::method_type& incoming_block_sync_method;
incoming::methods::transaction_async::method_type& incoming_transaction_async_method;

// method provider handles
Expand Down Expand Up @@ -1208,10 +1206,6 @@ chain_apis::read_only chain_plugin::get_read_only_api(const fc::microseconds& ht
}


bool chain_plugin::accept_block(const signed_block_ptr& block, const block_id_type& id, const block_handle& bh ) {
return my->incoming_block_sync_method(block, id, bh);
}

void chain_plugin::accept_transaction(const chain::packed_transaction_ptr& trx, next_function<chain::transaction_trace_ptr> next) {
my->incoming_transaction_async_method(trx, false, transaction_metadata::trx_type::input, false, std::move(next));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -998,7 +998,6 @@ class chain_plugin : public plugin<chain_plugin> {
chain_apis::read_write get_read_write_api(const fc::microseconds& http_max_response_time);
chain_apis::read_only get_read_only_api(const fc::microseconds& http_max_response_time) const;

bool accept_block( const chain::signed_block_ptr& block, const chain::block_id_type& id, const chain::block_handle& bh );
void accept_transaction(const chain::packed_transaction_ptr& trx, chain::plugin_interface::next_function<chain::transaction_trace_ptr> next);

// Only call this after plugin_initialize()!
Expand Down
62 changes: 18 additions & 44 deletions plugins/net_plugin/net_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3691,6 +3691,8 @@ namespace eosio {
bool best_head = false;
sync_manager::closing_mode close_mode = sync_manager::closing_mode::handshake;
try {
EOS_ASSERT(ptr->timestamp < (fc::time_point::now() + fc::seconds(7)), block_from_the_future,
"received a block from the future, ignoring it: ${id}", ("id", id));
// this will return empty optional<block_handle> if block is not linkable
std::tie(best_head, obh) = cc.accept_block( id, ptr );
} catch( const invalid_qc_claim& ex) {
Expand All @@ -3708,7 +3710,7 @@ namespace eosio {
("cid", cid)("n", ptr->block_num())("id", id.str().substr(8,16)));
}
if( exception || !obh) {
if (!obh) {
if (!exception && !obh) {
if (prev_is_proper_svnn_block || !ptr->is_proper_svnn_block()) {
fc_dlog(logger, "unlinkable_block ${bn} : ${id}, previous ${pn} : ${pid}",
("bn", ptr->block_num())("id", id)("pn", block_header::num_from_id(ptr->previous))("pid", ptr->previous));
Expand All @@ -3730,6 +3732,13 @@ namespace eosio {
my_impl->dispatcher.add_peer_block( obh->id(), cid ); // no need to send back to sender
my_impl->dispatcher.bcast_block( obh->block(), obh->id() );

++c->unique_blocks_rcvd_count;
c->strand.post([sync_master = my_impl->sync_master.get(), c, id, block_num, timestamp=obh->timestamp()]() {
const fc::microseconds age(fc::time_point::now() - timestamp);
bool blk_applied = true; // not really applied, but accepted by controller into forkdb
sync_master->sync_recv_block(c, id, block_num, blk_applied, age);
});

if (best_head) {
fc_dlog(logger, "posting block ${n} to app thread", ("n", ptr->block_num()));
app().executor().post(priority::medium, exec_queue::read_write,
Expand All @@ -3739,12 +3748,6 @@ namespace eosio {

// ready to process immediately, so signal producer to interrupt start_block
my_impl->producer_plug->received_block(block_num);
} else {
c->strand.post([sync_master = my_impl->sync_master.get(), c, id, block_num, timestamp=obh->timestamp()]() {
const fc::microseconds age(fc::time_point::now() - timestamp);
bool blk_applied = true; // not really applied, but accepted by controller into forkdb
sync_master->sync_recv_block(c, id, block_num, blk_applied, age);
});
}
});
}
Expand All @@ -3769,51 +3772,20 @@ namespace eosio {
}
} catch( const assert_exception& ex ) {
// possible corrupted block log
fc_elog( logger, "caught assert on fetch_block_by_id, ${ex}, id ${id}, conn ${c}", ("ex", ex.to_string())("id", blk_id)("c", connection_id) );
fc_elog(logger, "caught assert on validated_block_exists, ${ex}, id ${id}, conn ${c}",
("ex", ex.to_string())("id", blk_id)("c", connection_id));
} catch( ... ) {
fc_elog( logger, "caught an unknown exception trying to fetch block ${id}, conn ${c}", ("id", blk_id)("c", connection_id) );
fc_elog(logger, "caught an unknown exception trying to fetch block ${id}, conn ${c}",
("id", blk_id)("c", connection_id));
}

fc_dlog( logger, "received signed_block: #${n} block age in secs = ${age}, connection - ${cid}, header validated, lib #${lib}",
("n", blk_num)("age", age.to_seconds())("cid", c->connection_id)("lib", lib) );

bool accepted = false;
try {
accepted = my_impl->chain_plug->accept_block(block, blk_id, bh);
my_impl->update_chain_info();
} catch( const unlinkable_block_exception &ex) {
fc_ilog(logger, "unlinkable_block_exception connection - ${cid}: #${n} ${id}...: ${m}",
("cid", c->connection_id)("n", blk_num)("id", blk_id.str().substr(8,16))("m",ex.to_string()));
} catch( const block_validate_exception &ex ) {
fc_ilog(logger, "block_validate_exception connection - ${cid}: #${n} ${id}...: ${m}",
("cid", c->connection_id)("n", blk_num)("id", blk_id.str().substr(8,16))("m",ex.to_string()));
} catch( const assert_exception &ex ) {
fc_wlog(logger, "block assert_exception connection - ${cid}: #${n} ${id}...: ${m}",
("cid", c->connection_id)("n", blk_num)("id", blk_id.str().substr(8,16))("m",ex.to_string()));
} catch( const fc::exception &ex ) {
fc_ilog(logger, "bad block exception connection - ${cid}: #${n} ${id}...: ${m}",
("cid", c->connection_id)("n", blk_num)("id", blk_id.str().substr(8,16))("m",ex.to_string()));
my_impl->producer_plug->on_incoming_block();
} catch( ... ) {
fc_wlog(logger, "bad block connection - ${cid}: #${n} ${id}...: unknown exception",
("cid", c->connection_id)("n", blk_num)("id", blk_id.str().substr(8,16)));
}

if( accepted ) {
++unique_blocks_rcvd_count;
boost::asio::post( my_impl->thread_pool.get_executor(), [&dispatcher = my_impl->dispatcher, c, blk_id, blk_num]() {
fc_dlog( logger, "accepted signed_block : #${n} ${id}...", ("n", blk_num)("id", blk_id.str().substr(8,16)) );
dispatcher.add_peer_block( blk_id, c->connection_id );
});
c->strand.post( [sync_master = my_impl->sync_master.get(),
c, blk_id, blk_num, latency = age]() {
sync_master->sync_recv_block( c, blk_id, blk_num, true, latency );
});
} else {
c->strand.post( [sync_master = my_impl->sync_master.get(), &dispatcher = my_impl->dispatcher, c,
block{std::move(block)}, blk_id, blk_num]() mutable {
sync_master->rejected_block( c, blk_num, sync_manager::closing_mode::handshake );
dispatcher.rejected_block( blk_id );
});
// errors on applied blocks logged in controller
}
}

Expand Down Expand Up @@ -3882,6 +3854,8 @@ namespace eosio {
}

void net_plugin_impl::on_accepted_block( const signed_block_ptr& block, const block_id_type&) {
update_chain_info();

sync_master->send_handshakes_if_synced(fc::time_point::now() - block->timestamp);
if (const auto* pending_producers = chain_plug->chain().pending_producers()) {
on_pending_schedule(*pending_producers);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ class producer_plugin : public appbase::plugin<producer_plugin> {
virtual void plugin_shutdown();
void handle_sighup() override;

bool on_incoming_block();

void pause();
void resume();
bool paused() const;
Expand Down
44 changes: 15 additions & 29 deletions plugins/producer_plugin/producer_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -803,47 +803,29 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin
_time_tracker.clear();
}

bool on_incoming_block(const signed_block_ptr& block, const block_id_type& id, const block_handle& bh) {
bool on_incoming_block() {
auto now = fc::time_point::now();
_time_tracker.add_idle_time(now);

assert(block);
auto blk_num = block->block_num();

if (now - block->timestamp < fc::minutes(5) || (blk_num % 1000 == 0)) // only log every 1000 during sync
fc_dlog(_log, "received incoming block ${n} ${id}", ("n", blk_num)("id", id));

auto& chain = chain_plug->chain();

// de-dupe here... no point in aborting block if we already know the block
if (chain.validated_block_exists(id)) {
fc_dlog(_log, "already have validated block ${n} ${id}", ("n", blk_num)("id", id));
_time_tracker.add_other_time();
return true; // return true because the block was already accepted
}

EOS_ASSERT(block->timestamp < (now + fc::seconds(7)), block_from_the_future, "received a block from the future, ignoring it: ${id}", ("id", id));

// start processing of block
if (in_producing_mode()) {
fc_ilog(_log, "producing, incoming block #${num} id: ${id}", ("num", blk_num)("id", id));
if (_log.is_enabled(fc::log_level::info)) {
auto fhead = chain.fork_db_head();
fc_ilog(_log, "producing, fork database head at: #${num} id: ${id}",
("num", fhead.block_num())("id", fhead.id()));
}
_time_tracker.add_other_time();
return true; // return true because block was accepted
}

// start a new speculative block
// start a new speculative block, adds to time tracker which includes this method's time
auto ensure = fc::make_scoped_exit([this]() { schedule_production_loop(); });

// abort the pending block
abort_block();

// push the new block
auto handle_error = [&](const auto& e) {
fc_ilog(_log, "Exception on block ${bn}: ${e}", ("bn", blk_num)("e", e.to_detail_string()));
app().get_channel<channels::rejected_block>().publish(priority::medium, block);
throw;
};

try {
chain.apply_blocks(
[this](const transaction_metadata_ptr& trx) { _unapplied_transactions.add_forked(trx); },
Expand All @@ -860,12 +842,12 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin
appbase::app().quit();
return false;
} catch (const fc::exception& e) {
handle_error(e);
throw;
} catch (const std::exception& e) {
handle_error(fc::std_exception_wrapper::from_current_exception(e));
throw fc::std_exception_wrapper::from_current_exception(e);
}

now = fc::time_point::now();
now = fc::time_point::now();
if (chain.head().timestamp().next().to_time_point() >= now) {
_production_enabled = true;
}
Expand Down Expand Up @@ -1413,7 +1395,7 @@ void producer_plugin_impl::plugin_initialize(const boost::program_options::varia

_incoming_block_sync_provider = app().get_method<incoming::methods::block_sync>().register_provider(
[this](const signed_block_ptr& block, const block_id_type& block_id, const block_handle& bh) {
return on_incoming_block(block, block_id, bh);
return on_incoming_block();
});

_incoming_transaction_async_provider =
Expand Down Expand Up @@ -1587,6 +1569,10 @@ void producer_plugin::handle_sighup() {
fc::logger::update(transient_trx_failed_trace_logger_name, _transient_trx_failed_trace_log);
}

bool producer_plugin::on_incoming_block() {
return my->on_incoming_block();
}

void producer_plugin::pause() {
fc_ilog(_log, "Producer paused.");
my->_pause_production = true;
Expand Down

0 comments on commit 9e32878

Please sign in to comment.