Skip to content

Commit

Permalink
Merge branch 'main' of github.com:AntelopeIO/leap into gh-672
Browse files Browse the repository at this point in the history
  • Loading branch information
greg7mdp committed Jun 8, 2023
2 parents 48be938 + 1c607bd commit 839f5c9
Show file tree
Hide file tree
Showing 22 changed files with 593 additions and 366 deletions.
14 changes: 13 additions & 1 deletion libraries/chain/include/eosio/chain/snapshot_scheduler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
#include <boost/property_tree/json_parser.hpp>
#include <boost/property_tree/ptree.hpp>

#include <limits>

namespace eosio::chain {

namespace bmi = boost::multi_index;
Expand All @@ -38,10 +40,19 @@ class snapshot_scheduler {
struct snapshot_request_information {
uint32_t block_spacing = 0;
uint32_t start_block_num = 0;
uint32_t end_block_num = 0;
uint32_t end_block_num = std::numeric_limits<uint32_t>::max();
std::string snapshot_description = "";
};

// this struct used to hold request params in api call
// it is differentiate between 0 and empty values
struct snapshot_request_params {
std::optional<uint32_t> block_spacing;
std::optional<uint32_t> start_block_num;
std::optional<uint32_t> end_block_num;
std::optional<std::string> snapshot_description;
};

struct snapshot_request_id_information {
uint32_t snapshot_request_id = 0;
};
Expand Down Expand Up @@ -205,6 +216,7 @@ class snapshot_scheduler {

FC_REFLECT(eosio::chain::snapshot_scheduler::snapshot_information, (head_block_id) (head_block_num) (head_block_time) (version) (snapshot_name))
FC_REFLECT(eosio::chain::snapshot_scheduler::snapshot_request_information, (block_spacing) (start_block_num) (end_block_num) (snapshot_description))
FC_REFLECT(eosio::chain::snapshot_scheduler::snapshot_request_params, (block_spacing) (start_block_num) (end_block_num) (snapshot_description))
FC_REFLECT(eosio::chain::snapshot_scheduler::snapshot_request_id_information, (snapshot_request_id))
FC_REFLECT(eosio::chain::snapshot_scheduler::get_snapshot_requests_result, (snapshot_requests))
FC_REFLECT_DERIVED(eosio::chain::snapshot_scheduler::snapshot_schedule_information, (eosio::chain::snapshot_scheduler::snapshot_request_id_information)(eosio::chain::snapshot_scheduler::snapshot_request_information), (pending_snapshots))
Expand Down
70 changes: 58 additions & 12 deletions libraries/chain/include/eosio/chain/thread_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,19 +37,39 @@ namespace eosio { namespace chain {

/// Spawn threads, can be re-started after stop().
/// Assumes start()/stop() called from the same thread or externally protected.
/// Blocks until all threads are created and completed their init function, or an exception is thrown
/// during thread startup or an init function. Exceptions thrown during these stages are rethrown from start()
/// but some threads might still have been started. Calling stop() after such a failure is safe.
/// @param num_threads is number of threads spawned
/// @param on_except is the function to call if io_context throws an exception, is called from thread pool thread.
/// if an empty function then logs and rethrows exception on thread which will terminate.
/// if an empty function then logs and rethrows exception on thread which will terminate. Not called
/// for exceptions during the init function (such exceptions are rethrown from start())
/// @param init is an optional function to call at startup to initialize any data.
/// @throw assert_exception if already started and not stopped.
void start( size_t num_threads, on_except_t on_except, init_t init = {} ) {
FC_ASSERT( !_ioc_work, "Thread pool already started" );
_ioc_work.emplace( boost::asio::make_work_guard( _ioc ) );
_ioc.restart();
_thread_pool.reserve( num_threads );
for( size_t i = 0; i < num_threads; ++i ) {
_thread_pool.emplace_back( std::thread( &named_thread_pool::run_thread, this, i, on_except, init ) );

std::promise<void> start_complete;
std::atomic<uint32_t> threads_remaining = num_threads;
std::exception_ptr pending_exception;
std::mutex pending_exception_mutex;

try {
for( size_t i = 0; i < num_threads; ++i ) {
_thread_pool.emplace_back( std::thread( &named_thread_pool::run_thread, this, i, on_except, init, std::ref(start_complete),
std::ref(threads_remaining), std::ref(pending_exception), std::ref(pending_exception_mutex) ) );
}
}
catch( ... ) {
/// only an exception from std::thread's ctor should end up here. shut down all threads to ensure no
/// potential access to the promise, atomic, etc above performed after throwing out of start
stop();
throw;
}
start_complete.get_future().get();
}

/// destroy work guard, stop io_context, join thread_pool
Expand All @@ -63,16 +83,42 @@ namespace eosio { namespace chain {
}

private:
void run_thread( size_t i, const on_except_t& on_except, const init_t& init ) {
std::string tn = boost::core::demangle(typeid(this).name());
auto offset = tn.rfind("::");
if (offset != std::string::npos)
tn.erase(0, offset+2);
tn = tn.substr(0, tn.find('>')) + "-" + std::to_string( i );
void run_thread( size_t i, const on_except_t& on_except, const init_t& init, std::promise<void>& start_complete,
std::atomic<uint32_t>& threads_remaining, std::exception_ptr& pending_exception, std::mutex& pending_exception_mutex ) {

std::string tn;

auto decrement_remaining = [&]() {
if( !--threads_remaining ) {
if( pending_exception )
start_complete.set_exception( pending_exception );
else
start_complete.set_value();
}
};

try {
try {
tn = boost::core::demangle(typeid(this).name());
auto offset = tn.rfind("::");
if (offset != std::string::npos)
tn.erase(0, offset+2);
tn = tn.substr(0, tn.find('>')) + "-" + std::to_string( i );
fc::set_os_thread_name( tn );
if ( init )
init();
} FC_LOG_AND_RETHROW()
}
catch( ... ) {
std::lock_guard<std::mutex> l( pending_exception_mutex );
pending_exception = std::current_exception();
decrement_remaining();
return;
}

decrement_remaining();

try {
fc::set_os_thread_name( tn );
if ( init )
init();
_ioc.run();
} catch( const fc::exception& e ) {
if( on_except ) {
Expand Down
41 changes: 10 additions & 31 deletions libraries/chain/snapshot_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ namespace eosio::chain {

// snapshot_scheduler_listener
void snapshot_scheduler::on_start_block(uint32_t height, chain::controller& chain) {
bool serialize_needed = false;
bool snapshot_executed = false;

auto execute_snapshot_with_log = [this, height, &snapshot_executed, &chain](const auto& req) {
Expand All @@ -25,38 +24,25 @@ void snapshot_scheduler::on_start_block(uint32_t height, chain::controller& chai
std::vector<uint32_t> unschedule_snapshot_request_ids;
for(const auto& req: _snapshot_requests.get<0>()) {
// -1 since its called from start block
bool recurring_snapshot = req.block_spacing && (height >= req.start_block_num + 1) && (!((height - req.start_block_num - 1) % req.block_spacing));
bool onetime_snapshot = (!req.block_spacing) && (height == req.start_block_num + 1);

// assume "asap" for snapshot with missed/zero start, it can have spacing
if(!req.start_block_num) {
// update start_block_num with current height only if this is recurring
// if non recurring, will be executed and unscheduled
if(req.block_spacing && height) {
auto& snapshot_by_id = _snapshot_requests.get<by_snapshot_id>();
auto it = snapshot_by_id.find(req.snapshot_request_id);
_snapshot_requests.modify(it, [&height](auto& p) { p.start_block_num = height - 1; });
serialize_needed = true;
}
execute_snapshot_with_log(req);
} else if(recurring_snapshot || onetime_snapshot) {
bool recurring_snapshot = req.block_spacing && (height >= req.start_block_num + 1) && (!((height - req.start_block_num - 1) % req.block_spacing));
bool onetime_snapshot = (!req.block_spacing) && (height == req.start_block_num + 1);

bool marked_for_deletion = ((!req.block_spacing) && (height >= req.start_block_num + 1)) || // if one time snapshot executed or scheduled for the past, it should be gone
(height > 0 && ((height-1) >= req.end_block_num)); // any snapshot can expire by end block num (end_block_num can be max value)

if(recurring_snapshot || onetime_snapshot) {
execute_snapshot_with_log(req);
}

// cleanup - remove expired (or invalid) request
if((!req.start_block_num && !req.block_spacing) ||
(!req.block_spacing && height >= (req.start_block_num + 1)) ||
(req.end_block_num > 0 && height >= (req.end_block_num + 1))) {
if(marked_for_deletion) {
unschedule_snapshot_request_ids.push_back(req.snapshot_request_id);
}
}

for(const auto& i: unschedule_snapshot_request_ids) {
unschedule_snapshot(i);
}

// store db to filesystem
if(serialize_needed) x_serialize();
}

void snapshot_scheduler::on_irreversible_block(const signed_block_ptr& lib, const chain::controller& chain) {
Expand All @@ -80,15 +66,8 @@ snapshot_scheduler::snapshot_schedule_result snapshot_scheduler::schedule_snapsh
auto& snapshot_by_value = _snapshot_requests.get<by_snapshot_value>();
auto existing = snapshot_by_value.find(std::make_tuple(sri.block_spacing, sri.start_block_num, sri.end_block_num));
EOS_ASSERT(existing == snapshot_by_value.end(), chain::duplicate_snapshot_request, "Duplicate snapshot request");

if(sri.end_block_num > 0) {
// if "end" is specified, it should be greater then start
EOS_ASSERT(sri.start_block_num <= sri.end_block_num, chain::invalid_snapshot_request, "End block number should be greater or equal to start block number");
// if also block_spacing specified, check it
if(sri.block_spacing > 0) {
EOS_ASSERT(sri.start_block_num + sri.block_spacing <= sri.end_block_num, chain::invalid_snapshot_request, "Block spacing exceeds defined by start and end range");
}
}
EOS_ASSERT(sri.start_block_num <= sri.end_block_num, chain::invalid_snapshot_request, "End block number should be greater or equal to start block number");
EOS_ASSERT(sri.start_block_num + sri.block_spacing <= sri.end_block_num, chain::invalid_snapshot_request, "Block spacing exceeds defined by start and end range");

_snapshot_requests.emplace(snapshot_schedule_information{{_snapshot_id++}, {sri.block_spacing, sri.start_block_num, sri.end_block_num, sri.snapshot_description}, {}});
x_serialize();
Expand Down
2 changes: 1 addition & 1 deletion plugins/producer_api_plugin/producer_api_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ void producer_api_plugin::plugin_startup() {
CALL_ASYNC(producer, snapshot, producer, create_snapshot, chain::snapshot_scheduler::snapshot_information,
INVOKE_R_V_ASYNC(producer, create_snapshot), 201),
CALL_WITH_400(producer, snapshot, producer, schedule_snapshot,
INVOKE_R_R_II(producer, schedule_snapshot, chain::snapshot_scheduler::snapshot_request_information), 201),
INVOKE_R_R_II(producer, schedule_snapshot, chain::snapshot_scheduler::snapshot_request_params), 201),
CALL_WITH_400(producer, snapshot, producer, unschedule_snapshot,
INVOKE_R_R(producer, unschedule_snapshot, chain::snapshot_scheduler::snapshot_request_id_information), 201),
CALL_WITH_400(producer, producer_rw, producer, get_integrity_hash,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ class producer_plugin : public appbase::plugin<producer_plugin> {
integrity_hash_information get_integrity_hash() const;

void create_snapshot(next_function<chain::snapshot_scheduler::snapshot_information> next);
chain::snapshot_scheduler::snapshot_schedule_result schedule_snapshot(const chain::snapshot_scheduler::snapshot_request_information& schedule);
chain::snapshot_scheduler::snapshot_schedule_result schedule_snapshot(const chain::snapshot_scheduler::snapshot_request_params& srp);
chain::snapshot_scheduler::snapshot_schedule_result unschedule_snapshot(const chain::snapshot_scheduler::snapshot_request_id_information& schedule);
chain::snapshot_scheduler::get_snapshot_requests_result get_snapshot_requests() const;

Expand Down
14 changes: 0 additions & 14 deletions plugins/producer_plugin/producer_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1327,7 +1327,6 @@ void producer_plugin_impl::plugin_startup() {
}

if (_ro_thread_pool_size > 0) {
std::atomic<uint32_t> num_threads_started = 0;
_ro_thread_pool.start(
_ro_thread_pool_size,
[](const fc::exception& e) {
Expand All @@ -1336,21 +1335,8 @@ void producer_plugin_impl::plugin_startup() {
},
[&]() {
chain.init_thread_local_data();
++num_threads_started;
});

// This will be changed with std::latch or std::atomic<>::wait
// when C++20 is used.
auto time_slept_ms = 0;
constexpr auto max_time_slept_ms = 1000;
while (num_threads_started.load() < _ro_thread_pool_size && time_slept_ms < max_time_slept_ms) {
std::this_thread::sleep_for(1ms);
++time_slept_ms;
}
EOS_ASSERT(num_threads_started.load() == _ro_thread_pool_size, producer_exception,
"read-only threads failed to start. num_threads_started: ${n}, time_slept_ms: ${t}ms",
("n", num_threads_started.load())("t", time_slept_ms));

start_write_window();
}

Expand Down
2 changes: 1 addition & 1 deletion plugins/state_history_plugin/state_history_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ void state_history_plugin_impl::plugin_initialize(const variables_map& options)

state_history_log_config ship_log_conf;
if (options.count("state-history-log-retain-blocks")) {
auto ship_log_prune_conf = ship_log_conf.emplace<state_history::prune_config>();
auto& ship_log_prune_conf = ship_log_conf.emplace<state_history::prune_config>();
ship_log_prune_conf.prune_blocks = options.at("state-history-log-retain-blocks").as<uint32_t>();
//the arbitrary limit of 1000 here is mainly so that there is enough buffer for newly applied forks to be delivered to clients
// before getting pruned out. ideally pruning would have been smart enough to know not to prune reversible blocks
Expand Down
17 changes: 17 additions & 0 deletions plugins/state_history_plugin/tests/plugin_config_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,21 @@ BOOST_AUTO_TEST_CASE(state_history_plugin_default_tests) {
auto* config = std::get_if<eosio::state_history::partition_config>(&plugin.trace_log()->config());
BOOST_REQUIRE(config);
BOOST_CHECK_EQUAL(config->max_retained_files, UINT32_MAX);
}

BOOST_AUTO_TEST_CASE(state_history_plugin_retain_blocks_tests) {
fc::temp_directory tmp;
appbase::scoped_app app;

auto tmp_path = tmp.path().string();
std::array args = {"test_state_history", "--trace-history", "--state-history-log-retain-blocks", "4242",
"--disable-replay-opts", "--data-dir", tmp_path.c_str()};

BOOST_CHECK(app->initialize<eosio::state_history_plugin>(args.size(), const_cast<char**>(args.data())));
auto& plugin = app->get_plugin<eosio::state_history_plugin>();

BOOST_REQUIRE(plugin.trace_log());
auto* config = std::get_if<eosio::state_history::prune_config>(&plugin.trace_log()->config());
BOOST_REQUIRE(config);
BOOST_CHECK_EQUAL(config->prune_blocks, 4242);
}
5 changes: 3 additions & 2 deletions tests/TestHarness/Cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -1562,11 +1562,12 @@ def launchTrxGenerators(self, contractOwnerAcctName: str, acctNamesList: list, a

self.preExistingFirstTrxFiles = glob.glob(f"{Utils.DataDir}/first_trx_*.txt")
connectionPairList = [f"{self.host}:{self.getNodeP2pPort(nodeId)}"]
tpsTrxGensConfig = TpsTrxGensConfig(targetTps=targetTps, tpsLimitPerGenerator=tpsLimitPerGenerator, connectionPairList=connectionPairList, endpointApi="p2p")
tpsTrxGensConfig = TpsTrxGensConfig(targetTps=targetTps, tpsLimitPerGenerator=tpsLimitPerGenerator, connectionPairList=connectionPairList)
self.trxGenLauncher = TransactionGeneratorsLauncher(chainId=chainId, lastIrreversibleBlockId=lib_id,
contractOwnerAccount=contractOwnerAcctName, accts=','.join(map(str, acctNamesList)),
privateKeys=','.join(map(str, acctPrivKeysList)), trxGenDurationSec=durationSec, logDir=Utils.DataDir,
abiFile=abiFile, actionsData=actionsData, actionsAuths=actionsAuths, tpsTrxGensConfig=tpsTrxGensConfig)
abiFile=abiFile, actionsData=actionsData, actionsAuths=actionsAuths, tpsTrxGensConfig=tpsTrxGensConfig,
endpointMode="p2p")

Utils.Print("Launch txn generators and start generating/sending transactions")
self.trxGenLauncher.launch(waitToComplete=waitToComplete)
Expand Down
5 changes: 4 additions & 1 deletion tests/TestHarness/Node.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ def kill(self, killSignal):
if self.popenProc is not None:
self.popenProc.send_signal(killSignal)
self.popenProc.wait()
else:
elif self.pid is not None:
os.kill(self.pid, killSignal)

# wait for kill validation
Expand All @@ -286,6 +286,8 @@ def myFunc():
if not Utils.waitForBool(myFunc):
Utils.Print("ERROR: Failed to validate node shutdown.")
return False
else:
if Utils.Debug: Utils.Print(f"Called kill on node {self.nodeId} but it has already exited.")
except OSError as ex:
Utils.Print("ERROR: Failed to kill node (%s)." % (self.cmd), ex)
return False
Expand Down Expand Up @@ -377,6 +379,7 @@ def relaunch(self, chainArg=None, newChain=False, skipGenesis=True, timeout=Util
if chainArg:
cmdArr.extend(shlex.split(chainArg))
self.popenProc=self.launchCmd(cmdArr, self.data_dir, launch_time=datetime.now().strftime('%Y_%m_%d_%H_%M_%S'))
self.pid=self.popenProc.pid

def isNodeAlive():
"""wait for node to be responsive."""
Expand Down
Loading

0 comments on commit 839f5c9

Please sign in to comment.