Skip to content

Commit

Permalink
Merge branch 'main' of github.com:AntelopeIO/leap into gh-672
Browse files Browse the repository at this point in the history
  • Loading branch information
greg7mdp committed Jun 5, 2023
2 parents a4f54a2 + c697250 commit 2bd8818
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 43 deletions.
4 changes: 2 additions & 2 deletions .github/actions/parallel-ctest-containers/dist/index.mjs

Large diffs are not rendered by default.

8 changes: 5 additions & 3 deletions .github/actions/parallel-ctest-containers/main.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@ const log_tarball_prefix = core.getInput('log-tarball-prefix', {required: true})
const tests_label = core.getInput('tests-label', {required: true});
const test_timeout = core.getInput('test-timeout', {required: true});

const repo_name = process.env.GITHUB_REPOSITORY.split('/')[1];

try {
if(child_process.spawnSync("docker", ["run", "--name", "base", "-v", `${process.cwd()}/build.tar.zst:/build.tar.zst`, "--workdir", "/__w/leap/leap", container, "sh", "-c", "zstdcat /build.tar.zst | tar x"], {stdio:"inherit"}).status)
if(child_process.spawnSync("docker", ["run", "--name", "base", "-v", `${process.cwd()}/build.tar.zst:/build.tar.zst`, "--workdir", `/__w/${repo_name}/${repo_name}`, container, "sh", "-c", "zstdcat /build.tar.zst | tar x"], {stdio:"inherit"}).status)
throw new Error("Failed to create base container");
if(child_process.spawnSync("docker", ["commit", "base", "baseimage"], {stdio:"inherit"}).status)
throw new Error("Failed to create base image");
Expand Down Expand Up @@ -45,13 +47,13 @@ try {
let packer = tar.pack();

extractor.on('entry', (header, stream, next) => {
if(!header.name.startsWith(`__w/leap/leap/build`)) {
if(!header.name.startsWith(`__w/${repo_name}/${repo_name}/build`)) {
stream.on('end', () => next());
stream.resume();
return;
}

header.name = header.name.substring(`__w/leap/leap/`.length);
header.name = header.name.substring(`__w/${repo_name}/${repo_name}/`.length);
if(header.name !== "build/" && error_log_paths.filter(p => header.name.startsWith(p)).length === 0) {
stream.on('end', () => next());
stream.resume();
Expand Down
96 changes: 58 additions & 38 deletions plugins/state_history_plugin/state_history_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ auto catch_and_log(F f) {
}

struct state_history_plugin_impl : std::enable_shared_from_this<state_history_plugin_impl> {
constexpr static uint64_t default_frame_size = 1024 * 1024;

private:
chain_plugin* chain_plug = nullptr;
std::optional<state_history_log> trace_log;
std::optional<state_history_log> chain_state_log;
Expand All @@ -65,12 +68,16 @@ struct state_history_plugin_impl : std::enable_shared_from_this<state_history_pl
block_id_type lib_id;
time_point head_timestamp;

constexpr static uint64_t default_frame_size = 1024 * 1024;

named_thread_pool<struct ship> thread_pool;

bool plugin_started = false;

public:
void plugin_initialize(const variables_map& options);
void plugin_startup();
void plugin_shutdown();
session_manager& get_session_manager() { return session_mgr; }

static fc::logger& get_logger() { return _log; }

std::optional<state_history_log>& get_trace_log() { return trace_log; }
Expand Down Expand Up @@ -174,7 +181,7 @@ struct state_history_plugin_impl : std::enable_shared_from_this<state_history_pl
// this method is called from the main thread and "plugin_started" is set on the main thread as well when plugin is started
if (plugin_started) {
boost::asio::post(get_ship_executor(), [self = this->shared_from_this(), block_state]() {
self->session_mgr.send_update(block_state);
self->get_session_manager().send_update(block_state);
});
}

Expand Down Expand Up @@ -242,8 +249,8 @@ struct ship_listener : fc::listener<ship_listener<Protocol>, Protocol> {
// Create a session object and run it
catch_and_log([&] {
auto s = std::make_shared<session<state_history_plugin_impl, socket_type>>(
state_, std::move(socket), state_.session_mgr);
state_.session_mgr.insert(s);
state_, std::move(socket), state_.get_session_manager());
state_.get_session_manager().insert(s);
s->start();
});
}
Expand Down Expand Up @@ -300,24 +307,22 @@ void state_history_plugin::set_program_options(options_description& cli, options
options("state-history-log-retain-blocks", bpo::value<uint32_t>(), "if set, periodically prune the state history files to store only configured number of most recent blocks");
}

void state_history_plugin::plugin_initialize(const variables_map& options) {
void state_history_plugin_impl::plugin_initialize(const variables_map& options) {
try {
handle_sighup(); // setup logging

EOS_ASSERT(options.at("disable-replay-opts").as<bool>(), plugin_exception,
"state_history_plugin requires --disable-replay-opts");

my->chain_plug = app().find_plugin<chain_plugin>();
EOS_ASSERT(my->chain_plug, chain::missing_chain_plugin_exception, "");
auto& chain = my->chain_plug->chain();
my->applied_transaction_connection.emplace(chain.applied_transaction.connect(
chain_plug = app().find_plugin<chain_plugin>();
EOS_ASSERT(chain_plug, chain::missing_chain_plugin_exception, "");
auto& chain = chain_plug->chain();
applied_transaction_connection.emplace(chain.applied_transaction.connect(
[&](std::tuple<const transaction_trace_ptr&, const packed_transaction_ptr&> t) {
my->on_applied_transaction(std::get<0>(t), std::get<1>(t));
on_applied_transaction(std::get<0>(t), std::get<1>(t));
}));
my->accepted_block_connection.emplace(
chain.accepted_block.connect([&](const block_state_ptr& p) { my->on_accepted_block(p); }));
my->block_start_connection.emplace(
chain.block_start.connect([&](uint32_t block_num) { my->on_block_start(block_num); }));
accepted_block_connection.emplace(
chain.accepted_block.connect([&](const block_state_ptr& p) { on_accepted_block(p); }));
block_start_connection.emplace(
chain.block_start.connect([&](uint32_t block_num) { on_block_start(block_num); }));

auto dir_option = options.at("state-history-dir").as<std::filesystem::path>();
std::filesystem::path state_history_dir;
Expand All @@ -328,13 +333,13 @@ void state_history_plugin::plugin_initialize(const variables_map& options) {
if (auto resmon_plugin = app().find_plugin<resource_monitor_plugin>())
resmon_plugin->monitor_directory(state_history_dir);

my->endpoint_address = options.at("state-history-endpoint").as<string>();
endpoint_address = options.at("state-history-endpoint").as<string>();

if (options.count("state-history-unix-socket-path")) {
std::filesystem::path sock_path = options.at("state-history-unix-socket-path").as<string>();
if (sock_path.is_relative())
sock_path = app().data_dir() / sock_path;
my->unix_path = sock_path.generic_string();
unix_path = sock_path.generic_string();
}

if (options.at("delete-state-history").as<bool>()) {
Expand All @@ -344,7 +349,7 @@ void state_history_plugin::plugin_initialize(const variables_map& options) {
std::filesystem::create_directories(state_history_dir);

if (options.at("trace-history-debug-mode").as<bool>()) {
my->trace_debug_mode = true;
trace_debug_mode = true;
}

bool has_state_history_partition_options =
Expand Down Expand Up @@ -373,46 +378,61 @@ void state_history_plugin::plugin_initialize(const variables_map& options) {
}

if (options.at("trace-history").as<bool>())
my->trace_log.emplace("trace_history", state_history_dir , ship_log_conf);
trace_log.emplace("trace_history", state_history_dir , ship_log_conf);
if (options.at("chain-state-history").as<bool>())
my->chain_state_log.emplace("chain_state_history", state_history_dir, ship_log_conf);
chain_state_log.emplace("chain_state_history", state_history_dir, ship_log_conf);
}
FC_LOG_AND_RETHROW()
} // state_history_plugin::plugin_initialize

void state_history_plugin::plugin_startup() {
auto bsp = my->chain_plug->chain().head_block_state();
if( bsp && my->chain_state_log && my->chain_state_log->empty() ) {
fc_ilog( _log, "Storing initial state on startup, this can take a considerable amount of time" );
my->store_chain_state( bsp );
fc_ilog( _log, "Done storing initial state on startup" );
void state_history_plugin::plugin_initialize(const variables_map& options) {
handle_sighup(); // setup logging
my->plugin_initialize(options);
}

void state_history_plugin_impl::plugin_startup() {
auto bsp = chain_plug->chain().head_block_state();
if (bsp && chain_state_log && chain_state_log->empty()) {
fc_ilog(_log, "Storing initial state on startup, this can take a considerable amount of time");
store_chain_state(bsp);
fc_ilog(_log, "Done storing initial state on startup");
}
my->listen();
listen();
// use of executor assumes only one thread
my->thread_pool.start( 1, [](const fc::exception& e) {
fc_elog( _log, "Exception in SHiP thread pool, exiting: ${e}", ("e", e.to_detail_string()) );
thread_pool.start(1, [](const fc::exception& e) {
fc_elog(_log, "Exception in SHiP thread pool, exiting: ${e}", ("e", e.to_detail_string()));
app().quit();
});
my->plugin_started = true;
plugin_started = true;
}

void state_history_plugin::plugin_startup() {
my->plugin_startup();
}

void state_history_plugin_impl::plugin_shutdown() {
applied_transaction_connection.reset();
accepted_block_connection.reset();
block_start_connection.reset();
thread_pool.stop();
}

void state_history_plugin::plugin_shutdown() {
my->applied_transaction_connection.reset();
my->accepted_block_connection.reset();
my->block_start_connection.reset();
my->thread_pool.stop();
my->plugin_shutdown();
}

void state_history_plugin::handle_sighup() {
fc::logger::update(logger_name, _log);
}

const state_history_log* state_history_plugin::trace_log() const {
return my->trace_log ? std::addressof(*my->trace_log) : nullptr;
const auto& log = my->get_trace_log();
return log ? std::addressof(*log) : nullptr;
}

const state_history_log* state_history_plugin::chain_state_log() const {
return my->chain_state_log ? std::addressof(*my->chain_state_log) : nullptr;
const auto& log = my->get_chain_state_log();
return log ? std::addressof(*log) : nullptr;
}

} // namespace eosio

0 comments on commit 2bd8818

Please sign in to comment.