Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[1.0.2] SHiP: Fix hang on shutdown #816

Merged
merged 8 commits into from
Sep 24, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ class session final : public session_base {
std::optional<log_catalog>& chain_state_log;
std::optional<log_catalog>& finality_data_log;

GetBlockID get_block_id;
GetBlockID get_block_id; // call from main app thread
GetBlock get_block;

///these items might be used on either the strand or main thread
Expand Down
34 changes: 18 additions & 16 deletions plugins/state_history_plugin/state_history_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,22 +100,24 @@ struct state_history_plugin_impl {
template <typename Protocol>
void create_listener(const std::string& address) {
const boost::posix_time::milliseconds accept_timeout(200);
// connections set must only be modified by main thread; run listener on main thread to avoid needing another post()
fc::create_listener<Protocol>(app().get_io_service(), _log, accept_timeout, address, "", [this](Protocol::socket&& socket) {
catch_and_log([this, &socket]() {
connections.emplace(new session(std::move(socket), boost::asio::make_strand(thread_pool.get_executor()), chain_plug->chain(),
trace_log, chain_state_log, finality_data_log,
[this](const chain::block_num_type block_num) {
return get_block_id(block_num);
},
[this](const chain::block_id_type& block_id) {
return chain_plug->chain().fetch_block_by_id(block_id);
},
[this](session_base* conn) {
boost::asio::post(app().get_io_service(), [conn, this]() {
connections.erase(connections.find(conn));
});
}, _log));
// connections set must only be modified by main thread; run listener on ship thread so sockets use default executor of the ship thread
fc::create_listener<Protocol>(thread_pool.get_executor(), _log, accept_timeout, address, "", [this](Protocol::socket&& socket) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There will need to be additional changes below here otherwise connections will be modified in the ship thread which is not allowed.

But that's really still not enough to get it 100% correct if the intent is to make the default executor of the stream to be what we want -- the stream should be using a per-stream strand instead of the thread's executor. The problem is that fc::create_listener doesn't give access to the variant of async_accept() that allows creating the new socket with a different default executor than the listening socket. This is a real shortcoming and without refactoring fc::create_listener I'm not sure off hand how to fix that.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case there is only one SHiP thread and its implicit strand. I agree that it would be better if fc::create_listener took a strand.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah for 1.0.x that's fine but we should really get it right on main -- the code is currently structured with the assumption that increasing ship threads 'just works'

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems passing a strand works.

boost::asio::post(app().get_io_service(), [this, socket{std::move(socket)}]() mutable {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking about this more, I think this creates a (fantastically remote) possibility of the socket being destroyed after its executor is destroyed. This occurs when this callback is post()ed after the main thread is stopped, and then the plugin is destroyed, and then appbase is destroyed which will destroy pending callbacks. Obviously something we can tolerate for 1.0.x but another good reason to improve create_listener

catch_and_log([this, &socket]() {
connections.emplace(new session(std::move(socket), boost::asio::make_strand(thread_pool.get_executor()), chain_plug->chain(),
trace_log, chain_state_log, finality_data_log,
[this](const chain::block_num_type block_num) {
return get_block_id(block_num);
},
[this](const chain::block_id_type& block_id) {
return chain_plug->chain().fetch_block_by_id(block_id);
},
[this](session_base* conn) {
boost::asio::post(app().get_io_service(), [conn, this]() {
connections.erase(connections.find(conn));
});
}, _log));
});
});
});
}
Expand Down
3 changes: 3 additions & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ configure_file(${CMAKE_CURRENT_SOURCE_DIR}/cli_test.py ${CMAKE_CURRENT_BINARY_DI
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/ship_reqs_across_svnn_test.py ${CMAKE_CURRENT_BINARY_DIR}/ship_reqs_across_svnn_test.py COPYONLY)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/ship_test.py ${CMAKE_CURRENT_BINARY_DIR}/ship_test.py COPYONLY)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/ship_streamer_test.py ${CMAKE_CURRENT_BINARY_DIR}/ship_streamer_test.py COPYONLY)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/ship_kill_client_test.py ${CMAKE_CURRENT_BINARY_DIR}/ship_kill_client_test.py COPYONLY)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/bridge_for_fork_test_shape.json ${CMAKE_CURRENT_BINARY_DIR}/bridge_for_fork_test_shape.json COPYONLY)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/lib_advance_test.py ${CMAKE_CURRENT_BINARY_DIR}/lib_advance_test.py COPYONLY)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/http_plugin_test.py ${CMAKE_CURRENT_BINARY_DIR}/http_plugin_test.py COPYONLY)
Expand Down Expand Up @@ -187,6 +188,8 @@ add_test(NAME ship_streamer_if_test COMMAND tests/ship_streamer_test.py -v --num
set_property(TEST ship_streamer_if_test PROPERTY LABELS long_running_tests)
add_test(NAME ship_streamer_if_fetch_finality_data_test COMMAND tests/ship_streamer_test.py -v --num-clients 10 --activate-if --finality-data-history ${UNSHARE} WORKING_DIRECTORY ${CMAKE_BINARY_DIR})
set_property(TEST ship_streamer_if_fetch_finality_data_test PROPERTY LABELS long_running_tests)
add_test(NAME ship_kill_client_test COMMAND tests/ship_kill_client_test.py -v --num-clients 20 ${UNSHARE} WORKING_DIRECTORY ${CMAKE_BINARY_DIR})
set_property(TEST ship_kill_client_test PROPERTY LABELS nonparallelizable_tests)

add_test(NAME p2p_dawn515_test COMMAND tests/p2p_tests/dawn_515/test.sh WORKING_DIRECTORY ${CMAKE_BINARY_DIR})
set_property(TEST p2p_dawn515_test PROPERTY LABELS nonparallelizable_tests)
Expand Down
131 changes: 131 additions & 0 deletions tests/ship_kill_client_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
#!/usr/bin/env python3

import time
import json
import os
import shutil
import signal
import sys

from TestHarness import Account, Cluster, TestHelper, Utils, WalletMgr
from TestHarness.TestHelper import AppArgs

###############################################################
# ship_kill_client_test
#
# Setup a nodeos with SHiP (state_history_plugin).
# Connect a number of clients and then kill the clients and shutdown nodoes.
# nodeos should exit cleanly and not hang or SEGfAULT.
#
###############################################################

Print=Utils.Print

appArgs = AppArgs()
extraArgs = appArgs.add(flag="--num-clients", type=int, help="How many ship_streamers should be started", default=1)
args = TestHelper.parse_args({"--dump-error-details","--keep-logs","-v","--leave-running","--unshared"}, applicationSpecificArgs=appArgs)

Utils.Debug=args.v
cluster=Cluster(unshared=args.unshared, keepRunning=args.leave_running, keepLogs=args.keep_logs)
dumpErrorDetails=args.dump_error_details
walletPort=TestHelper.DEFAULT_WALLET_PORT

# simpler to have two producer nodes then setup different accounts for trx generator
totalProducerNodes=2
linh2931 marked this conversation as resolved.
Show resolved Hide resolved
totalNonProducerNodes=1
totalNodes=totalProducerNodes+totalNonProducerNodes

walletMgr=WalletMgr(True, port=walletPort)
testSuccessful=False

WalletdName=Utils.EosWalletName
shipTempDir=None

try:
TestHelper.printSystemInfo("BEGIN")

cluster.setWalletMgr(walletMgr)
Print("Stand up cluster")

shipNodeNum = 2
specificExtraNodeosArgs={}
specificExtraNodeosArgs[shipNodeNum]="--plugin eosio::state_history_plugin --trace-history --chain-state-history --finality-data-history --state-history-stride 200 --plugin eosio::net_api_plugin --plugin eosio::producer_api_plugin "

if cluster.launch(pnodes=totalProducerNodes, loadSystemContract=False,
totalNodes=totalNodes, totalProducers=totalProducerNodes, activateIF=True, biosFinalizer=False,
specificExtraNodeosArgs=specificExtraNodeosArgs) is False:
Utils.cmdError("launcher")
Utils.errorExit("Failed to stand up cluster.")

# verify nodes are in sync and advancing
cluster.waitOnClusterSync(blockAdvancing=5)
Print("Cluster in Sync")

prodNode0 = cluster.getNode(0)
prodNode1 = cluster.getNode(1)
shipNode = cluster.getNode(shipNodeNum)

# cluster.waitOnClusterSync(blockAdvancing=3)
start_block_num = shipNode.getBlockNum()

#verify nodes are in sync and advancing
cluster.waitOnClusterSync(blockAdvancing=3)
Print("Shutdown unneeded bios node")
cluster.biosNode.kill(signal.SIGTERM)

Print("Configure and launch txn generators")
targetTpsPerGenerator = 10
testTrxGenDurationSec=60*60
numTrxGenerators=2
cluster.launchTrxGenerators(contractOwnerAcctName=cluster.eosioAccount.name, acctNamesList=[cluster.defproduceraAccount.name, cluster.defproducerbAccount.name],
acctPrivKeysList=[cluster.defproduceraAccount.activePrivateKey,cluster.defproducerbAccount.activePrivateKey], nodeId=prodNode1.nodeId,
tpsPerGenerator=targetTpsPerGenerator, numGenerators=numTrxGenerators, durationSec=testTrxGenDurationSec,
waitToComplete=False)

status = cluster.waitForTrxGeneratorsSpinup(nodeId=prodNode1.nodeId, numGenerators=numTrxGenerators)
assert status is not None and status is not False, "ERROR: Failed to spinup Transaction Generators"

prodNode1.waitForProducer("defproducera")

block_range = 100000 # we are going to kill the client, so just make this a huge number
end_block_num = start_block_num + block_range

shipClient = "tests/ship_streamer"
cmd = f"{shipClient} --start-block-num {start_block_num} --end-block-num {end_block_num} --fetch-block --fetch-traces --fetch-deltas --fetch-finality-data"
if Utils.Debug: Utils.Print(f"cmd: {cmd}")
clients = []
files = []
shipTempDir = os.path.join(Utils.DataDir, "ship")
os.makedirs(shipTempDir, exist_ok = True)
shipClientFilePrefix = os.path.join(shipTempDir, "client")

for i in range(0, args.num_clients):
outFile = open(f"{shipClientFilePrefix}{i}.out", "w")
errFile = open(f"{shipClientFilePrefix}{i}.err", "w")
Print(f"Start client {i}")
popen=Utils.delayedCheckOutput(cmd, stdout=outFile, stderr=errFile)
clients.append((popen, cmd))
files.append((outFile, errFile))
Print(f"Client {i} started, Ship node head is: {shipNode.getBlockNum()}")


# allow time for all clients to connect
shipNode.waitForHeadToAdvance(5)
shipNode.waitForLibToAdvance()

Print(f"Kill all {args.num_clients} clients and ship node")
for index, (popen, _) in zip(range(len(clients)), clients):
popen.kill()
if index == len(clients)/2:
shipNode.kill(signal.SIGTERM)
assert not shipNode.verifyAlive(), "ship node did not shutdown"

testSuccessful = True
finally:
TestHelper.shutdown(cluster, walletMgr, testSuccessful=testSuccessful, dumpErrorDetails=dumpErrorDetails)
if shipTempDir is not None:
if testSuccessful and not args.keep_logs:
shutil.rmtree(shipTempDir, ignore_errors=True)

errorCode = 0 if testSuccessful else 1
exit(errorCode)