Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[4.0] SHiP: Support snapshot start with full deltas #1375

Merged
merged 4 commits into from
Jul 7, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ struct session : session_base, std::enable_shared_from_this<session<Plugin, Sock

void update_current_request(state_history::get_blocks_request_v0& req) {
fc_dlog(plugin->logger(), "replying get_blocks_request_v0 = ${req}", ("req", req));
to_send_block_num = req.start_block_num;
to_send_block_num = std::max(req.start_block_num, plugin->get_first_available_block_num());
greg7mdp marked this conversation as resolved.
Show resolved Hide resolved
for (auto& cp : req.have_positions) {
if (req.start_block_num <= cp.block_num)
continue;
Expand Down
54 changes: 42 additions & 12 deletions plugins/state_history_plugin/state_history_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ struct state_history_plugin_impl : std::enable_shared_from_this<state_history_pl
chain_plugin* chain_plug = nullptr;
std::optional<state_history_log> trace_log;
std::optional<state_history_log> chain_state_log;
uint32_t first_available_block = 0;
bool trace_debug_mode = false;
std::optional<scoped_connection> applied_transaction_connection;
std::optional<scoped_connection> block_start_connection;
Expand Down Expand Up @@ -137,10 +138,17 @@ struct state_history_plugin_impl : std::enable_shared_from_this<state_history_pl

// thread-safe
std::optional<chain::block_id_type> get_block_id(uint32_t block_num) {
if (trace_log)
return trace_log->get_block_id(block_num);
if (chain_state_log)
return chain_state_log->get_block_id(block_num);
std::optional<chain::block_id_type> id;
if( trace_log ) {
id = trace_log->get_block_id( block_num );
if( id )
return id;
}
if( chain_state_log ) {
id = chain_state_log->get_block_id( block_num );
if( id )
return id;
}
try {
return chain_plug->chain().get_block_id_for_num(block_num);
} catch (...) {
Expand All @@ -166,6 +174,11 @@ struct state_history_plugin_impl : std::enable_shared_from_this<state_history_pl
return head_timestamp;
}

// thread-safe
uint32_t get_first_available_block_num() const {
return first_available_block;
}

template <typename Task>
void post_task_main_thread_medium(Task&& task) {
app().post(priority::medium, std::forward<Task>(task));
Expand Down Expand Up @@ -269,15 +282,18 @@ struct state_history_plugin_impl : std::enable_shared_from_this<state_history_pl
trace_converter.add_transaction(p, t);
}

// called from main thread
void update_current() {
const auto& chain = chain_plug->chain();
std::lock_guard g(mtx);
head_id = chain.head_block_id();
lib_id = chain.last_irreversible_block_id();
head_timestamp = chain.head_block_time();
}

// called from main thread
void on_accepted_block(const block_state_ptr& block_state) {
{
const auto& chain = chain_plug->chain();
std::lock_guard g(mtx);
head_id = chain.head_block_id();
lib_id = chain.last_irreversible_block_id();
head_timestamp = chain.head_block_time();
}
update_current();

try {
store_traces(block_state);
Expand Down Expand Up @@ -492,12 +508,26 @@ void state_history_plugin::plugin_initialize(const variables_map& options) {

void state_history_plugin::plugin_startup() {
try {
auto bsp = my->chain_plug->chain().head_block_state();
const auto& chain = my->chain_plug->chain();
my->update_current();
auto bsp = chain.head_block_state();
if( bsp && my->chain_state_log && my->chain_state_log->empty() ) {
fc_ilog( _log, "Storing initial state on startup, this can take a considerable amount of time" );
my->store_chain_state( bsp );
fc_ilog( _log, "Done storing initial state on startup" );
}
my->first_available_block = chain.earliest_available_block_num();
if (my->trace_log) {
auto first_trace_block = my->trace_log->block_range().first;
if( first_trace_block > 0 )
my->first_available_block = std::min( my->first_available_block, first_trace_block );
}
if (my->chain_state_log) {
auto first_state_block = my->chain_state_log->block_range().first;
if( first_state_block > 0 )
my->first_available_block = std::min( my->first_available_block, first_state_block );
}
fc_ilog(_log, "First available block for SHiP ${b}", ("b", my->first_available_block));
my->listen();
// use of executor assumes only one thread
my->thread_pool.start( 1, [](const fc::exception& e) {
Expand Down
2 changes: 2 additions & 0 deletions plugins/state_history_plugin/tests/session_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,8 @@ struct mock_state_history_plugin {
eosio::state_history::block_position get_block_head() { return block_head; }
eosio::state_history::block_position get_last_irreversible() { return block_head; }

uint32_t get_first_available_block_num() const { return 0; }

void add_session(std::shared_ptr<eosio::session_base> s) {
session_mgr.insert(std::move(s));
}
Expand Down
2 changes: 1 addition & 1 deletion tests/TestHarness/Cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -1731,6 +1731,6 @@ def waitForTrxGeneratorsSpinup(self, nodeId: int, numGenerators: int, timeout: i
firstTrxs.append(line.rstrip('\n'))
Utils.Print(f"first transactions: {firstTrxs}")
status = node.waitForTransactionsInBlock(firstTrxs)
if status is None:
if status is None or status is False:
greg7mdp marked this conversation as resolved.
Show resolved Hide resolved
Utils.Print('ERROR: Failed to spin up transaction generators: never received first transactions')
return status
4 changes: 3 additions & 1 deletion tests/TestHarness/testUtils.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,11 +159,13 @@ def getNodeDataDir(ext, relativeDir=None, trailingSlash=False):
return path

@staticmethod
def rmNodeDataDir(ext, rmState=True, rmBlocks=True):
def rmNodeDataDir(ext, rmState=True, rmBlocks=True, rmStateHist=True):
if rmState:
shutil.rmtree(Utils.getNodeDataDir(ext, "state"))
if rmBlocks:
shutil.rmtree(Utils.getNodeDataDir(ext, "blocks"))
if rmStateHist:
shutil.rmtree(Utils.getNodeDataDir(ext, "state-history"), ignore_errors=True)

@staticmethod
def getNodeConfigDir(ext, relativeDir=None, trailingSlash=False):
Expand Down
6 changes: 4 additions & 2 deletions tests/ship_streamer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,10 @@ int main(int argc, char* argv[]) {
this_block_id = result_document[1]["this_block"]["block_id"].GetString();
}
std::string prev_block_id;
if( result_document[1]["prev_block"].HasMember("block_id") && result_document[1]["prev_block"]["block_id"].IsString() ) {
prev_block_id = result_document[1]["prev_block"]["block_id"].GetString();
if( result_document[1].HasMember("prev_block") && result_document[1]["prev_block"].IsObject() ) {
if ( result_document[1]["prev_block"].HasMember("block_id") && result_document[1]["prev_block"]["block_id"].IsString() ) {
prev_block_id = result_document[1]["prev_block"]["block_id"].GetString();
greg7mdp marked this conversation as resolved.
Show resolved Hide resolved
}
}
if( !irreversible_only && !this_block_id.empty() && !prev_block_id.empty() ) {
// verify forks were sent
Expand Down
85 changes: 81 additions & 4 deletions tests/ship_streamer_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,13 @@
WalletdName=Utils.EosWalletName
shipTempDir=None

def getLatestSnapshot(nodeId):
snapshotDir = os.path.join(Utils.getNodeDataDir(nodeId), "snapshots")
snapshotDirContents = os.listdir(snapshotDir)
assert len(snapshotDirContents) > 0
snapshotDirContents.sort()
return os.path.join(snapshotDir, snapshotDirContents[-1])

try:
TestHelper.printSystemInfo("BEGIN")

Expand All @@ -71,7 +78,7 @@

shipNodeNum = 1
specificExtraNodeosArgs={}
specificExtraNodeosArgs[shipNodeNum]="--plugin eosio::state_history_plugin --disable-replay-opts --trace-history --chain-state-history --plugin eosio::net_api_plugin "
specificExtraNodeosArgs[shipNodeNum]="--plugin eosio::state_history_plugin --disable-replay-opts --trace-history --chain-state-history --plugin eosio::net_api_plugin --plugin eosio::producer_api_plugin "
# producer nodes will be mapped to 0 through totalProducerNodes-1, so the number totalProducerNodes will be the non-producing node
specificExtraNodeosArgs[totalProducerNodes]="--plugin eosio::test_control_api_plugin "

Expand Down Expand Up @@ -123,13 +130,18 @@
trans=node.regproducer(cluster.defProducerAccounts[prod], "http://mysite.com", 0, waitForTransBlock=False, exitOnError=True)

# create accounts via eosio as otherwise a bid is needed
transferAmount="100000000.0000 {0}".format(CORE_SYMBOL)
for account in accounts:
Print(f"Create new account {account.name} via {cluster.eosioAccount.name} with private key: {account.activePrivateKey}")
trans=nonProdNode.createInitializeAccount(account, cluster.eosioAccount, stakedDeposit=0, waitForTransBlock=True, stakeNet=10000, stakeCPU=10000, buyRAM=10000000, exitOnError=True)
transferAmount="100000000.0000 {0}".format(CORE_SYMBOL)
trans=nonProdNode.createInitializeAccount(account, cluster.eosioAccount, stakedDeposit=0, waitForTransBlock=False, stakeNet=10000, stakeCPU=10000, buyRAM=10000000, exitOnError=True)
nonProdNode.waitForTransBlockIfNeeded(trans, True, exitOnError=True)
for account in accounts:
Print(f"Transfer funds {transferAmount} from account {cluster.eosioAccount.name} to {account.name}")
nonProdNode.transferFunds(cluster.eosioAccount, account, transferAmount, "test transfer", waitForTransBlock=True)
nonProdNode.transferFunds(cluster.eosioAccount, account, transferAmount, "test transfer", waitForTransBlock=False)
nonProdNode.waitForTransBlockIfNeeded(trans, True, exitOnError=True)
for account in accounts:
trans=nonProdNode.delegatebw(account, 20000000.0000, 20000000.0000, waitForTransBlock=False, exitOnError=True)
nonProdNode.waitForTransBlockIfNeeded(trans, True, exitOnError=True)

# *** vote using accounts ***

Expand All @@ -150,6 +162,19 @@
cluster.waitOnClusterSync(blockAdvancing=3)
Print("Shutdown unneeded bios node")
cluster.biosNode.kill(signal.SIGTERM)

Print("Configure and launch txn generators")
targetTpsPerGenerator = 10
testTrxGenDurationSec=60*60
numTrxGenerators=2
cluster.launchTrxGenerators(contractOwnerAcctName=cluster.eosioAccount.name, acctNamesList=[accounts[0].name, accounts[1].name],
acctPrivKeysList=[accounts[0].activePrivateKey,accounts[1].activePrivateKey], nodeId=prodNode1.nodeId,
tpsPerGenerator=targetTpsPerGenerator, numGenerators=numTrxGenerators, durationSec=testTrxGenDurationSec,
waitToComplete=False)

status = cluster.waitForTrxGeneratorsSpinup(nodeId=prodNode1.nodeId, numGenerators=numTrxGenerators)
assert status is not None and status is not False, "ERROR: Failed to spinup Transaction Generators"

prodNode0.waitForProducer("defproducerc")

block_range = 350
Expand Down Expand Up @@ -226,9 +251,61 @@
block_num += 1
assert block_num-1 == end_block_num, f"{block_num-1} != {end_block_num}"

Print("Generate snapshot")
shipNode.createSnapshot()

Print("Shutdown state_history_plugin nodeos")
shipNode.kill(signal.SIGTERM)

Print("Shutdown bridge node")
nonProdNode.kill(signal.SIGTERM)

Print("Test starting ship from snapshot")
Utils.rmNodeDataDir(shipNodeNum)
isRelaunchSuccess = shipNode.relaunch(chainArg=" --snapshot {}".format(getLatestSnapshot(shipNodeNum)))
assert isRelaunchSuccess, "relaunch from snapshot failed"

afterSnapshotBlockNum = shipNode.getBlockNum()

Print("Verify we can stream from ship after start from a snapshot with no incoming trxs")
start_block_num = afterSnapshotBlockNum
block_range = 0
end_block_num = start_block_num + block_range
cmd = f"{shipClient} --start-block-num {start_block_num} --end-block-num {end_block_num} --fetch-block --fetch-traces --fetch-deltas"
if Utils.Debug: Utils.Print(f"cmd: {cmd}")
clients = []
files = []
starts = []
for i in range(0, args.num_clients):
start = time.perf_counter()
outFile = open(f"{shipClientFilePrefix}{i}_snapshot.out", "w")
errFile = open(f"{shipClientFilePrefix}{i}_snapshot.err", "w")
Print(f"Start client {i}")
popen=Utils.delayedCheckOutput(cmd, stdout=outFile, stderr=errFile)
starts.append(time.perf_counter())
clients.append((popen, cmd))
files.append((outFile, errFile))
Print(f"Client {i} started, Ship node head is: {shipNode.getBlockNum()}")

Print(f"Stopping all {args.num_clients} clients")
for index, (popen, _), (out, err), start in zip(range(len(clients)), clients, files, starts):
popen.wait()
Print(f"Stopped client {index}. Ran for {time.perf_counter() - start:.3f} seconds.")
out.close()
err.close()
outFile = open(f"{shipClientFilePrefix}{index}_snapshot.out", "r")
data = json.load(outFile)
block_num = start_block_num
for i in data:
# fork can cause block numbers to be repeated
this_block_num = i['get_blocks_result_v0']['this_block']['block_num']
if this_block_num < block_num:
block_num = this_block_num
assert block_num == this_block_num, f"{block_num} != {this_block_num}"
assert isinstance(i['get_blocks_result_v0']['deltas'], str) # verify deltas in result
block_num += 1
assert block_num-1 == end_block_num, f"{block_num-1} != {end_block_num}"

testSuccessful = True
finally:
TestHelper.shutdown(cluster, walletMgr, testSuccessful=testSuccessful, killEosInstances=killEosInstances, killWallet=killWallet, keepLogs=keepLogs, cleanRun=killAll, dumpErrorDetails=dumpErrorDetails)
Expand Down