Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Test only, don't merge] test ship streamer #813

Closed
wants to merge 2 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
167 changes: 4 additions & 163 deletions tests/ship_streamer_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,14 +132,14 @@

prodNode0.waitForProducer("defproducerc")

block_range = 250
block_range = 1000000
end_block_num = start_block_num + block_range

shipClient = "tests/ship_streamer"
cmd = f"{shipClient} --start-block-num {start_block_num} --end-block-num {end_block_num} --fetch-block --fetch-traces --fetch-deltas"
if args.finality_data_history:
cmd += " --fetch-finality-data"
if Utils.Debug: Utils.Print(f"cmd: {cmd}")
Utils.Print(f"ship cmd: {cmd}")
clients = []
files = []
shipTempDir = os.path.join(Utils.DataDir, "ship")
Expand All @@ -158,170 +158,11 @@
files.append((outFile, errFile))
Print(f"Client {i} started, Ship node head is: {shipNode.getBlockNum()}")

# Generate a fork
nonProdNode.waitForProducer("defproducera")
prodNode3Prod= "defproducerd"
preKillBlockNum=nonProdNode.getBlockNum()
preKillBlockProducer=nonProdNode.getBlockProducerByNum(preKillBlockNum)
forkAtProducer="defproducerb"
nonProdNode.killNodeOnProducer(producer=forkAtProducer, whereInSequence=1)
Print(f"Current block producer {preKillBlockProducer} fork will be at producer {forkAtProducer}")
prodNode0.waitForProducer("defproducerc")
prodNode3.waitForProducer(prodNode3Prod)
if nonProdNode.verifyAlive():
Utils.errorExit("Bridge did not shutdown")
Print("Fork started")

prodNode0.waitForProducer("defproducerc") # wait for fork to progress a bit
restore0BlockNum = prodNode0.getBlockNum()
restore1BlockNum = prodNode3.getBlockNum()
restoreBlockNum = max(int(restore0BlockNum), int(restore1BlockNum))
restore0LIB = prodNode0.getIrreversibleBlockNum()
restore1LIB = prodNode3.getIrreversibleBlockNum()
restoreLIB = max(int(restore0LIB), int(restore1LIB))

if int(restoreBlockNum) > int(end_block_num):
Utils.errorExit(f"Did not stream long enough {end_block_num} to cover the fork {restoreBlockNum}, increase block_range {block_range}")

Print("Restore fork")
Print("Relaunching the non-producing bridge node to connect the producing nodes again")
if nonProdNode.verifyAlive():
Utils.errorExit("Bridge is already running")
if not nonProdNode.relaunch():
Utils.errorExit(f"Failure - (non-production) node {nonProdNode.nodeNum} should have restarted")

nonProdNode.waitForProducer(forkAtProducer)
nonProdNode.waitForProducer(prodNode3Prod)
nonProdNode.waitForIrreversibleBlock(restoreLIB+1)
afterForkBlockNum = nonProdNode.getBlockNum()

assert shipNode.findInLog(f"successfully switched fork to new head"), f"No fork found in log {shipNode}"

Print(f"Stopping all {args.num_clients} clients")
for index, (popen, _), (out, err), start in zip(range(len(clients)), clients, files, starts):
popen.wait()
Print(f"Stopped client {index}. Ran for {time.perf_counter() - start:.3f} seconds.")
out.close()
err.close()
outFile = open(f"{shipClientFilePrefix}{index}.out", "r")
data = json.load(outFile)
block_num = start_block_num
for i in data:
# fork can cause block numbers to be repeated
this_block_num = i['get_blocks_result_v1']['this_block']['block_num']
if this_block_num < block_num:
block_num = this_block_num
assert block_num == this_block_num, f"{block_num} != {this_block_num}"
assert isinstance(i['get_blocks_result_v1']['block'], str) # verify block in result
block_num += 1
assert block_num-1 == end_block_num, f"{block_num-1} != {end_block_num}"

Print("Generate snapshot")
shipNode.createSnapshot()

Print("Shutdown state_history_plugin nodeos")
shipNode.kill(signal.SIGTERM)

Print("Shutdown bridge node")
nonProdNode.kill(signal.SIGTERM)

Print("Test starting ship from replay")
isRelaunchSuccess = shipNode.relaunch(chainArg=" --replay-blockchain")
assert isRelaunchSuccess, "relaunch from replay failed"

afterReplayBlockNum = shipNode.getBlockNum()

Print("Verify we can stream from ship after start from a replay with no incoming trxs")
start_block_num = afterReplayBlockNum
block_range = 0
end_block_num = start_block_num + block_range
cmd = f"{shipClient} --start-block-num {start_block_num} --end-block-num {end_block_num} --fetch-block --fetch-traces --fetch-deltas"
if args.finality_data_history:
cmd += " --fetch-finality-data"
if Utils.Debug: Utils.Print(f"cmd: {cmd}")
clients = []
files = []
starts = []
for i in range(0, args.num_clients):
start = time.perf_counter()
outFile = open(f"{shipClientFilePrefix}{i}_replay.out", "w")
errFile = open(f"{shipClientFilePrefix}{i}_replay.err", "w")
Print(f"Start client {i}")
popen=Utils.delayedCheckOutput(cmd, stdout=outFile, stderr=errFile)
starts.append(time.perf_counter())
clients.append((popen, cmd))
files.append((outFile, errFile))
Print(f"Client {i} started, Ship node head is: {shipNode.getBlockNum()}")

Print(f"Stopping all {args.num_clients} clients")
for index, (popen, _), (out, err), start in zip(range(len(clients)), clients, files, starts):
popen.wait()
Print(f"Stopped client {index}. Ran for {time.perf_counter() - start:.3f} seconds.")
out.close()
err.close()
outFile = open(f"{shipClientFilePrefix}{index}_replay.out", "r")
data = json.load(outFile)
block_num = start_block_num
for i in data:
# fork can cause block numbers to be repeated
this_block_num = i['get_blocks_result_v1']['this_block']['block_num']
if this_block_num < block_num:
block_num = this_block_num
assert block_num == this_block_num, f"{block_num} != {this_block_num}"
assert isinstance(i['get_blocks_result_v1']['deltas'], str) # verify deltas in result
block_num += 1
assert block_num-1 == end_block_num, f"{block_num-1} != {end_block_num}"

Print("Shutdown state_history_plugin nodeos after replay")
shipNode.kill(signal.SIGTERM)

Print("Test starting ship from snapshot")
shipNode.removeDataDir()
isRelaunchSuccess = shipNode.relaunch(rmArgs="--replay-blockchain", chainArg=" --snapshot {}".format(shipNode.getLatestSnapshot()))
assert isRelaunchSuccess, "relaunch from snapshot failed"

afterSnapshotBlockNum = shipNode.getBlockNum()

Print("Verify we can stream from ship after start from a snapshot with no incoming trxs")
start_block_num = afterSnapshotBlockNum
block_range = 0
end_block_num = start_block_num + block_range
cmd = f"{shipClient} --start-block-num {start_block_num} --end-block-num {end_block_num} --fetch-block --fetch-traces --fetch-deltas"
if args.finality_data_history:
cmd += " --fetch-finality-data"
if Utils.Debug: Utils.Print(f"cmd: {cmd}")
clients = []
files = []
starts = []
for i in range(0, args.num_clients):
start = time.perf_counter()
outFile = open(f"{shipClientFilePrefix}{i}_snapshot.out", "w")
errFile = open(f"{shipClientFilePrefix}{i}_snapshot.err", "w")
Print(f"Start client {i}")
popen=Utils.delayedCheckOutput(cmd, stdout=outFile, stderr=errFile)
starts.append(time.perf_counter())
clients.append((popen, cmd))
files.append((outFile, errFile))
Print(f"Client {i} started, Ship node head is: {shipNode.getBlockNum()}")
time.sleep(5.0)

Print(f"Stopping all {args.num_clients} clients")
for index, (popen, _), (out, err), start in zip(range(len(clients)), clients, files, starts):
popen.wait()
Print(f"Stopped client {index}. Ran for {time.perf_counter() - start:.3f} seconds.")
out.close()
err.close()
outFile = open(f"{shipClientFilePrefix}{index}_snapshot.out", "r")
data = json.load(outFile)
block_num = start_block_num
for i in data:
# fork can cause block numbers to be repeated
this_block_num = i['get_blocks_result_v1']['this_block']['block_num']
if this_block_num < block_num:
block_num = this_block_num
assert block_num == this_block_num, f"{block_num} != {this_block_num}"
assert isinstance(i['get_blocks_result_v1']['deltas'], str) # verify deltas in result
block_num += 1
assert block_num-1 == end_block_num, f"{block_num-1} != {end_block_num}"
popen.kill()

testSuccessful = True
finally:
Expand Down
Loading