Skip to content

Commit

Permalink
Merge branch 'main' of github.com:AntelopeIO/leap into boost_submodule
Browse files Browse the repository at this point in the history
  • Loading branch information
greg7mdp committed Jul 7, 2023
2 parents 0ee195e + 189a6ac commit eea5f30
Show file tree
Hide file tree
Showing 9 changed files with 167 additions and 40 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ witness_node_data_dir
*.pyc
*.pyo

*.gdb_history

Testing/*
build.tar.gz
[Bb]uild*/*
Expand Down
42 changes: 26 additions & 16 deletions plugins/net_plugin/net_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1289,10 +1289,8 @@ namespace eosio {
void connection::_close( bool reconnect, bool shutdown ) {
socket_open = false;
boost::system::error_code ec;
if( socket->is_open() ) {
socket->shutdown( tcp::socket::shutdown_both, ec );
socket->close( ec );
}
socket->shutdown( tcp::socket::shutdown_both, ec );
socket->close( ec );
socket.reset( new tcp::socket( my_impl->thread_pool.get_executor() ) );
flush_queues();
peer_syncing_from_us = false;
Expand Down Expand Up @@ -1518,11 +1516,18 @@ namespace eosio {
try {
c->buffer_queue.clear_out_queue();
// May have closed connection and cleared buffer_queue
if( !c->socket_is_open() || socket != c->socket ) {
peer_ilog( c, "async write socket ${r} before callback", ("r", c->socket_is_open() ? "changed" : "closed") );
if (!c->socket->is_open() && c->socket_is_open()) { // if socket_open then close not called
peer_ilog(c, "async write socket closed before callback");
c->close();
return;
}
if (socket != c->socket ) { // different socket, c must have created a new socket, make sure previous is closed
peer_ilog( c, "async write socket changed before callback");
boost::system::error_code ec;
socket->shutdown( tcp::socket::shutdown_both, ec );
socket->close( ec );
return;
}

if( ec ) {
if( ec.value() != boost::asio::error::eof ) {
Expand Down Expand Up @@ -2735,7 +2740,18 @@ namespace eosio {
boost::asio::bind_executor( strand,
[conn = shared_from_this(), socket=socket]( boost::system::error_code ec, std::size_t bytes_transferred ) {
// may have closed connection and cleared pending_message_buffer
if( !conn->socket_is_open() || socket != conn->socket ) return;
if (!conn->socket->is_open() && conn->socket_is_open()) { // if socket_open then close not called
peer_dlog( conn, "async_read socket not open, closing");
conn->close();
return;
}
if (socket != conn->socket ) { // different socket, conn must have created a new socket, make sure previous is closed
peer_dlog( conn, "async_read diff socket closing");
boost::system::error_code ec;
socket->shutdown( tcp::socket::shutdown_both, ec );
socket->close( ec );
return;
}

bool close_connection = false;
try {
Expand Down Expand Up @@ -3078,6 +3094,7 @@ namespace eosio {
peer_head_block_num = msg.head_num;
fc::unique_lock g_conn( conn_mtx );
last_handshake_recv = msg;
auto c_time = last_handshake_sent.time;
g_conn.unlock();

set_state(connection_state::connected);
Expand All @@ -3103,16 +3120,11 @@ namespace eosio {
return;
}

if( peer_address().empty() ) {
if( incoming() ) {
auto [host, port, type] = split_host_port_type(msg.p2p_address);
if (host.size())
set_connection_type( msg.p2p_address );
}

g_conn.lock();
if( peer_address().empty() || last_handshake_recv.node_id == fc::sha256()) {
auto c_time = last_handshake_sent.time;
g_conn.unlock();
peer_dlog( this, "checking for duplicate" );
auto is_duplicate = [&](const auto& check) {
if(check.get() == this)
Expand Down Expand Up @@ -3159,9 +3171,7 @@ namespace eosio {
return;
}
} else {
peer_dlog( this, "skipping duplicate check, addr == ${pa}, id = ${ni}",
("pa", peer_address())( "ni", last_handshake_recv.node_id ) );
g_conn.unlock();
peer_dlog(this, "skipping duplicate check, addr == ${pa}, id = ${ni}", ("pa", peer_address())("ni", msg.node_id));
}

if( msg.chain_id != my_impl->chain_id ) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ struct session : session_base, std::enable_shared_from_this<session<Plugin, Sock

void update_current_request(state_history::get_blocks_request_v0& req) {
fc_dlog(plugin.get_logger(), "replying get_blocks_request_v0 = ${req}", ("req", req));
to_send_block_num = req.start_block_num;
to_send_block_num = std::max(req.start_block_num, plugin.get_first_available_block_num());
for (auto& cp : req.have_positions) {
if (req.start_block_num <= cp.block_num)
continue;
Expand Down
54 changes: 42 additions & 12 deletions plugins/state_history_plugin/state_history_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ struct state_history_plugin_impl : std::enable_shared_from_this<state_history_pl
chain_plugin* chain_plug = nullptr;
std::optional<state_history_log> trace_log;
std::optional<state_history_log> chain_state_log;
uint32_t first_available_block = 0;
bool trace_debug_mode = false;
std::optional<scoped_connection> applied_transaction_connection;
std::optional<scoped_connection> block_start_connection;
Expand Down Expand Up @@ -113,10 +114,17 @@ struct state_history_plugin_impl : std::enable_shared_from_this<state_history_pl

// thread-safe
std::optional<chain::block_id_type> get_block_id(uint32_t block_num) {
if (trace_log)
return trace_log->get_block_id(block_num);
if (chain_state_log)
return chain_state_log->get_block_id(block_num);
std::optional<chain::block_id_type> id;
if( trace_log ) {
id = trace_log->get_block_id( block_num );
if( id )
return id;
}
if( chain_state_log ) {
id = chain_state_log->get_block_id( block_num );
if( id )
return id;
}
try {
return chain_plug->chain().get_block_id_for_num(block_num);
} catch (...) {
Expand All @@ -142,6 +150,11 @@ struct state_history_plugin_impl : std::enable_shared_from_this<state_history_pl
return head_timestamp;
}

// thread-safe
uint32_t get_first_available_block_num() const {
return first_available_block;
}

template <typename Protocol>
void create_listener(const std::string& address) {
const boost::posix_time::milliseconds accept_timeout(200);
Expand Down Expand Up @@ -177,15 +190,18 @@ struct state_history_plugin_impl : std::enable_shared_from_this<state_history_pl
trace_converter.add_transaction(p, t);
}

// called from main thread
void update_current() {
const auto& chain = chain_plug->chain();
std::lock_guard g(mtx);
head_id = chain.head_block_id();
lib_id = chain.last_irreversible_block_id();
head_timestamp = chain.head_block_time();
}

// called from main thread
void on_accepted_block(const block_state_ptr& block_state) {
{
const auto& chain = chain_plug->chain();
std::lock_guard g(mtx);
head_id = chain.head_block_id();
lib_id = chain.last_irreversible_block_id();
head_timestamp = chain.head_block_time();
}
update_current();

try {
store_traces(block_state);
Expand Down Expand Up @@ -381,12 +397,26 @@ void state_history_plugin::plugin_initialize(const variables_map& options) {

void state_history_plugin_impl::plugin_startup() {
try {
auto bsp = chain_plug->chain().head_block_state();
const auto& chain = chain_plug->chain();
update_current();
auto bsp = chain.head_block_state();
if( bsp && chain_state_log && chain_state_log->empty() ) {
fc_ilog( _log, "Storing initial state on startup, this can take a considerable amount of time" );
store_chain_state( bsp );
fc_ilog( _log, "Done storing initial state on startup" );
}
first_available_block = chain.earliest_available_block_num();
if (trace_log) {
auto first_trace_block = trace_log->block_range().first;
if( first_trace_block > 0 )
first_available_block = std::min( first_available_block, first_trace_block );
}
if (chain_state_log) {
auto first_state_block = chain_state_log->block_range().first;
if( first_state_block > 0 )
first_available_block = std::min( first_available_block, first_state_block );
}
fc_ilog(_log, "First available block for SHiP ${b}", ("b", first_available_block));
listen();
// use of executor assumes only one thread
thread_pool.start( 1, [](const fc::exception& e) {
Expand Down
2 changes: 2 additions & 0 deletions plugins/state_history_plugin/tests/session_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ struct mock_state_history_plugin {
eosio::state_history::block_position get_block_head() { return block_head; }
eosio::state_history::block_position get_last_irreversible() { return block_head; }

uint32_t get_first_available_block_num() const { return 0; }

void add_session(std::shared_ptr<eosio::session_base> s) {
session_mgr.insert(std::move(s));
}
Expand Down
2 changes: 1 addition & 1 deletion tests/TestHarness/Cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -1593,6 +1593,6 @@ def waitForTrxGeneratorsSpinup(self, nodeId: int, numGenerators: int, timeout: i
firstTrxs.append(line.rstrip('\n'))
Utils.Print(f"first transactions: {firstTrxs}")
status = node.waitForTransactionsInBlock(firstTrxs)
if status is None:
if not status:
Utils.Print('ERROR: Failed to spin up transaction generators: never received first transactions')
return status
4 changes: 3 additions & 1 deletion tests/TestHarness/testUtils.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,11 +161,13 @@ def getNodeDataDir(ext, relativeDir=None, trailingSlash=False):
return path

@staticmethod
def rmNodeDataDir(ext, rmState=True, rmBlocks=True):
def rmNodeDataDir(ext, rmState=True, rmBlocks=True, rmStateHist=True):
if rmState:
shutil.rmtree(Utils.getNodeDataDir(ext, "state"))
if rmBlocks:
shutil.rmtree(Utils.getNodeDataDir(ext, "blocks"))
if rmStateHist:
shutil.rmtree(Utils.getNodeDataDir(ext, "state-history"), ignore_errors=True)

@staticmethod
def getNodeConfigDir(ext, relativeDir=None, trailingSlash=False):
Expand Down
16 changes: 10 additions & 6 deletions tests/ship_streamer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,16 +154,20 @@ int main(int argc, char* argv[]) {
// validate after streaming, so that invalid entry is included in the output
uint32_t this_block_num = 0;
if( result_document[1].HasMember("this_block") && result_document[1]["this_block"].IsObject() ) {
if( result_document[1]["this_block"].HasMember("block_num") && result_document[1]["this_block"]["block_num"].IsUint() ) {
this_block_num = result_document[1]["this_block"]["block_num"].GetUint();
const auto& this_block = result_document[1]["this_block"];
if( this_block.HasMember("block_num") && this_block["block_num"].IsUint() ) {
this_block_num = this_block["block_num"].GetUint();
}
std::string this_block_id;
if( result_document[1]["this_block"].HasMember("block_id") && result_document[1]["this_block"]["block_id"].IsString() ) {
this_block_id = result_document[1]["this_block"]["block_id"].GetString();
if( this_block.HasMember("block_id") && this_block["block_id"].IsString() ) {
this_block_id = this_block["block_id"].GetString();
}
std::string prev_block_id;
if( result_document[1]["prev_block"].HasMember("block_id") && result_document[1]["prev_block"]["block_id"].IsString() ) {
prev_block_id = result_document[1]["prev_block"]["block_id"].GetString();
if( result_document[1].HasMember("prev_block") && result_document[1]["prev_block"].IsObject() ) {
const auto& prev_block = result_document[1]["prev_block"];
if ( prev_block.HasMember("block_id") && prev_block["block_id"].IsString() ) {
prev_block_id = prev_block["block_id"].GetString();
}
}
if( !irreversible_only && !this_block_id.empty() && !prev_block_id.empty() ) {
// verify forks were sent
Expand Down
83 changes: 80 additions & 3 deletions tests/ship_streamer_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,13 @@
WalletdName=Utils.EosWalletName
shipTempDir=None

def getLatestSnapshot(nodeId):
snapshotDir = os.path.join(Utils.getNodeDataDir(nodeId), "snapshots")
snapshotDirContents = os.listdir(snapshotDir)
assert len(snapshotDirContents) > 0
snapshotDirContents.sort()
return os.path.join(snapshotDir, snapshotDirContents[-1])

try:
TestHelper.printSystemInfo("BEGIN")

Expand All @@ -63,7 +70,7 @@

shipNodeNum = 1
specificExtraNodeosArgs={}
specificExtraNodeosArgs[shipNodeNum]="--plugin eosio::state_history_plugin --disable-replay-opts --trace-history --chain-state-history --plugin eosio::net_api_plugin "
specificExtraNodeosArgs[shipNodeNum]="--plugin eosio::state_history_plugin --disable-replay-opts --trace-history --chain-state-history --plugin eosio::net_api_plugin --plugin eosio::producer_api_plugin "
# producer nodes will be mapped to 0 through totalProducerNodes-1, so the number totalProducerNodes will be the non-producing node
specificExtraNodeosArgs[totalProducerNodes]="--plugin eosio::test_control_api_plugin "

Expand Down Expand Up @@ -115,13 +122,18 @@
trans=node.regproducer(cluster.defProducerAccounts[prod], "http://mysite.com", 0, waitForTransBlock=False, exitOnError=True)

# create accounts via eosio as otherwise a bid is needed
transferAmount="100000000.0000 {0}".format(CORE_SYMBOL)
for account in accounts:
Print(f"Create new account {account.name} via {cluster.eosioAccount.name} with private key: {account.activePrivateKey}")
trans=nonProdNode.createInitializeAccount(account, cluster.eosioAccount, stakedDeposit=0, waitForTransBlock=True, stakeNet=10000, stakeCPU=10000, buyRAM=10000000, exitOnError=True)
transferAmount="100000000.0000 {0}".format(CORE_SYMBOL)
trans=nonProdNode.createInitializeAccount(account, cluster.eosioAccount, stakedDeposit=0, waitForTransBlock=False, stakeNet=10000, stakeCPU=10000, buyRAM=10000000, exitOnError=True)
nonProdNode.waitForTransBlockIfNeeded(trans, True, exitOnError=True)
for account in accounts:
Print(f"Transfer funds {transferAmount} from account {cluster.eosioAccount.name} to {account.name}")
nonProdNode.transferFunds(cluster.eosioAccount, account, transferAmount, "test transfer", waitForTransBlock=False)
nonProdNode.waitForTransBlockIfNeeded(trans, True, exitOnError=True)
for account in accounts:
trans=nonProdNode.delegatebw(account, 20000000.0000, 20000000.0000, waitForTransBlock=False, exitOnError=True)
nonProdNode.waitForTransBlockIfNeeded(trans, True, exitOnError=True)

# *** vote using accounts ***

Expand All @@ -142,6 +154,19 @@
cluster.waitOnClusterSync(blockAdvancing=3)
Print("Shutdown unneeded bios node")
cluster.biosNode.kill(signal.SIGTERM)

Print("Configure and launch txn generators")
targetTpsPerGenerator = 10
testTrxGenDurationSec=60*60
numTrxGenerators=2
cluster.launchTrxGenerators(contractOwnerAcctName=cluster.eosioAccount.name, acctNamesList=[accounts[0].name, accounts[1].name],
acctPrivKeysList=[accounts[0].activePrivateKey,accounts[1].activePrivateKey], nodeId=prodNode1.nodeId,
tpsPerGenerator=targetTpsPerGenerator, numGenerators=numTrxGenerators, durationSec=testTrxGenDurationSec,
waitToComplete=False)

status = cluster.waitForTrxGeneratorsSpinup(nodeId=prodNode1.nodeId, numGenerators=numTrxGenerators)
assert status is not None and status is not False, "ERROR: Failed to spinup Transaction Generators"

prodNode0.waitForProducer("defproducerc")

block_range = 350
Expand Down Expand Up @@ -219,9 +244,61 @@
block_num += 1
assert block_num-1 == end_block_num, f"{block_num-1} != {end_block_num}"

Print("Generate snapshot")
shipNode.createSnapshot()

Print("Shutdown state_history_plugin nodeos")
shipNode.kill(signal.SIGTERM)

Print("Shutdown bridge node")
nonProdNode.kill(signal.SIGTERM)

Print("Test starting ship from snapshot")
Utils.rmNodeDataDir(shipNodeNum)
isRelaunchSuccess = shipNode.relaunch(chainArg=" --snapshot {}".format(getLatestSnapshot(shipNodeNum)))
assert isRelaunchSuccess, "relaunch from snapshot failed"

afterSnapshotBlockNum = shipNode.getBlockNum()

Print("Verify we can stream from ship after start from a snapshot with no incoming trxs")
start_block_num = afterSnapshotBlockNum
block_range = 0
end_block_num = start_block_num + block_range
cmd = f"{shipClient} --start-block-num {start_block_num} --end-block-num {end_block_num} --fetch-block --fetch-traces --fetch-deltas"
if Utils.Debug: Utils.Print(f"cmd: {cmd}")
clients = []
files = []
starts = []
for i in range(0, args.num_clients):
start = time.perf_counter()
outFile = open(f"{shipClientFilePrefix}{i}_snapshot.out", "w")
errFile = open(f"{shipClientFilePrefix}{i}_snapshot.err", "w")
Print(f"Start client {i}")
popen=Utils.delayedCheckOutput(cmd, stdout=outFile, stderr=errFile)
starts.append(time.perf_counter())
clients.append((popen, cmd))
files.append((outFile, errFile))
Print(f"Client {i} started, Ship node head is: {shipNode.getBlockNum()}")

Print(f"Stopping all {args.num_clients} clients")
for index, (popen, _), (out, err), start in zip(range(len(clients)), clients, files, starts):
popen.wait()
Print(f"Stopped client {index}. Ran for {time.perf_counter() - start:.3f} seconds.")
out.close()
err.close()
outFile = open(f"{shipClientFilePrefix}{index}_snapshot.out", "r")
data = json.load(outFile)
block_num = start_block_num
for i in data:
# fork can cause block numbers to be repeated
this_block_num = i['get_blocks_result_v0']['this_block']['block_num']
if this_block_num < block_num:
block_num = this_block_num
assert block_num == this_block_num, f"{block_num} != {this_block_num}"
assert isinstance(i['get_blocks_result_v0']['deltas'], str) # verify deltas in result
block_num += 1
assert block_num-1 == end_block_num, f"{block_num-1} != {end_block_num}"

testSuccessful = True
finally:
TestHelper.shutdown(cluster, walletMgr, testSuccessful=testSuccessful, dumpErrorDetails=dumpErrorDetails)
Expand Down

0 comments on commit eea5f30

Please sign in to comment.