-
Notifications
You must be signed in to change notification settings - Fork 629
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
test(staking): Unstake and restake node. Wait for it to sync #12247
Open
VanBarbascu
wants to merge
2
commits into
near:master
Choose a base branch
from
VanBarbascu:statesync/staking
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
2 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,217 @@ | ||
#!/usr/bin/env python3 | ||
# Spins up 5 validating nodes. Let validators track a single shard. | ||
# Add a dumper node for the state sync headers. | ||
# Add an RPC node to issue tx and change the state. | ||
# Send random transactions between accounts in different shards. | ||
# Shuffle the shard assignment of validators and check if they can sync up. | ||
|
||
from collections import defaultdict | ||
import unittest | ||
import sys | ||
import pathlib | ||
|
||
sys.path.append(str(pathlib.Path(__file__).resolve().parents[2] / 'lib')) | ||
|
||
from transaction import (sign_staking_tx) | ||
|
||
from configured_logger import logger | ||
from cluster import start_cluster | ||
import state_sync_lib | ||
from utils import wait_for_blocks | ||
import simple_test | ||
|
||
EPOCH_LENGTH = 10 | ||
|
||
NUM_VALIDATORS = 5 | ||
|
||
# Shard layout with 5 roughly equal size shards for convenience. | ||
SHARD_LAYOUT = { | ||
"V1": { | ||
"boundary_accounts": [ | ||
"fff", | ||
"lll", | ||
"rrr", | ||
], | ||
"version": 2, | ||
"shards_split_map": [], | ||
"to_parent_shard_map": [], | ||
} | ||
} | ||
|
||
NUM_SHARDS = len(SHARD_LAYOUT["V1"]["boundary_accounts"]) + 1 | ||
|
||
ALL_ACCOUNTS_PREFIXES = [ | ||
"aaa", | ||
"ggg", | ||
"lll", | ||
"sss", | ||
] | ||
|
||
|
||
class StateSyncValidatorShardSwap(unittest.TestCase): | ||
|
||
def _prepare_cluster(self, | ||
with_rpc=False, | ||
shuffle_shard_assignment=False, | ||
centralized_state_sync=False, | ||
decentralized_state_sync=False): | ||
(node_config_dump, | ||
node_config_sync) = state_sync_lib.get_state_sync_configs_pair( | ||
tracked_shards=None) | ||
|
||
# State snapshot is enabled for dumper if we test centralized state sync. | ||
# We want to dump the headers either way. | ||
node_config_dump[ | ||
"store.state_snapshot_enabled"] = True if centralized_state_sync else False | ||
node_config_dump[ | ||
"store.state_snapshot_config.state_snapshot_type"] = "EveryEpoch" if centralized_state_sync else "ForReshardingOnly" | ||
|
||
# State snapshot is enabled for validators if we test decentralized state sync. | ||
# They will share parts of the state. | ||
node_config_sync[ | ||
"store.state_snapshot_enabled"] = True if decentralized_state_sync else False | ||
node_config_sync["tracked_shards"] = [] | ||
|
||
# Validators | ||
configs = {x: node_config_sync.copy() for x in range(NUM_VALIDATORS)} | ||
|
||
# Dumper | ||
configs[NUM_VALIDATORS] = node_config_dump | ||
|
||
if with_rpc: | ||
# RPC | ||
configs[NUM_VALIDATORS + 1] = node_config_sync.copy() | ||
# RPC tracks all shards. | ||
configs[NUM_VALIDATORS + 1]["tracked_shards"] = [0] | ||
# RPC node does not participate in state parts distribution. | ||
configs[NUM_VALIDATORS + 1]["store.state_snapshot_enabled"] = False | ||
configs[NUM_VALIDATORS + 1][ | ||
"store.state_snapshot_config.state_snapshot_type"] = "ForReshardingOnly" | ||
|
||
nodes = start_cluster( | ||
num_nodes=NUM_VALIDATORS, | ||
num_observers=1 + (1 if with_rpc else 0), | ||
num_shards=NUM_SHARDS, | ||
config=None, | ||
genesis_config_changes=[ | ||
["epoch_length", EPOCH_LENGTH], ["shard_layout", SHARD_LAYOUT], | ||
[ | ||
"shuffle_shard_assignment_for_chunk_producers", | ||
shuffle_shard_assignment | ||
], ["block_producer_kickout_threshold", 0], | ||
["chunk_producer_kickout_threshold", 0] | ||
], | ||
client_config_changes=configs) | ||
|
||
for node in nodes: | ||
node.stop_checking_store() | ||
|
||
self.dumper_node = nodes[NUM_VALIDATORS] | ||
self.rpc_node = nodes[NUM_VALIDATORS + | ||
1] if with_rpc else self.dumper_node | ||
self.nodes = nodes | ||
self.validators = nodes[:NUM_VALIDATORS] | ||
|
||
def _prepare_simple_transfers(self): | ||
self.testcase = simple_test.SimpleTransferBetweenAccounts( | ||
nodes=self.nodes, | ||
rpc_node=self.rpc_node, | ||
account_prefixes=ALL_ACCOUNTS_PREFIXES, | ||
epoch_length=EPOCH_LENGTH) | ||
|
||
self.testcase.wait_for_blocks(3) | ||
|
||
self.testcase.create_accounts() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This part seems to be flaky, sometimes the test times out waiting for the result of the transactions which create the accounts. |
||
|
||
self.testcase.deploy_contracts() | ||
|
||
def _clear_cluster(self): | ||
self.testcase = None | ||
for node in self.nodes: | ||
node.cleanup() | ||
|
||
# Extra validator is the one that is assigned to a shard with more than one validator. | ||
def _get_extra_validator(self): | ||
res = self.rpc_node.get_validators() | ||
shards_to_validators = defaultdict(list) | ||
for validator in res["result"]["current_validators"]: | ||
for shard in validator["shards"]: | ||
shards_to_validators[shard].append(validator["account_id"]) | ||
for s, validators in shards_to_validators.items(): | ||
if len(validators) > 1: | ||
return validators[-1] | ||
|
||
def send_stake_tx(self, validator, stake): | ||
logger.info(f'Staking {stake} {validator.signer_key.account_id}...') | ||
nonce = self.rpc_node.get_nonce_for_pk(validator.signer_key.account_id, | ||
validator.signer_key.pk) + 10 | ||
|
||
hash_ = self.rpc_node.get_latest_block().hash_bytes | ||
tx = sign_staking_tx(validator.signer_key, validator.validator_key, | ||
stake, nonce, hash_) | ||
res = self.rpc_node.send_tx_and_wait(tx, timeout=15) | ||
|
||
# Test that validators can sync up after restaking. | ||
def _run_test_state_sync_with_restaking(self): | ||
validator = self.validators[int(self._get_extra_validator()[-1])] | ||
|
||
self.send_stake_tx(validator, 0) | ||
target_height = 6 * EPOCH_LENGTH | ||
self.testcase.random_workload_until(target_height) | ||
self.send_stake_tx(validator, 50000000000000000000000000000000) | ||
target_height = 9 * EPOCH_LENGTH | ||
# Wait for all nodes to reach epoch 9. | ||
for n in self.validators: | ||
logger.info( | ||
f"Waiting for node {n.ordinal} to reach target height {target_height}" | ||
) | ||
try: | ||
wait_for_blocks(n, target=target_height) | ||
except Exception as e: | ||
logger.error(e) | ||
assert False, f"Node {n.ordinal} did not reach target height {target_height} in time" | ||
logger.info("Test ended") | ||
|
||
def test_state_sync_with_restaking_decentralized_only(self): | ||
try: | ||
self._prepare_cluster( | ||
with_rpc=True, | ||
shuffle_shard_assignment=False, | ||
decentralized_state_sync=True, | ||
) | ||
self._prepare_simple_transfers() | ||
self._run_test_state_sync_with_restaking() | ||
except Exception as e: | ||
assert False, f"Test failed: {e}" | ||
|
||
def test_state_sync_with_restaking_centralized_only(self): | ||
try: | ||
self._prepare_cluster( | ||
with_rpc=True, | ||
shuffle_shard_assignment=False, | ||
centralized_state_sync=True, | ||
) | ||
self._prepare_simple_transfers() | ||
self._run_test_state_sync_with_restaking() | ||
except Exception as e: | ||
assert False, f"Test failed: {e}" | ||
|
||
def test_state_sync_with_restaking_both_dss_and_css(self): | ||
try: | ||
self._prepare_cluster( | ||
with_rpc=True, | ||
shuffle_shard_assignment=False, | ||
decentralized_state_sync=True, | ||
centralized_state_sync=True, | ||
) | ||
self._prepare_simple_transfers() | ||
self._run_test_state_sync_with_restaking() | ||
except Exception as e: | ||
assert False, f"Test failed: {e}" | ||
|
||
def tearDown(self): | ||
self._clear_cluster() | ||
|
||
|
||
if __name__ == '__main__': | ||
unittest.main() |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All of these tests are using the same config, one in which it is specified that an external source is available in state_part_dir. In 2.3.0-rc.4 we made it so that if an external source is configured, it will be used directly and no requests will be made to peers. Then in
test_state_sync_with_restaking_decentralized_only
you have the same config but you don't dump state parts in the state_part_dir. Hence the node just gets stuck looking for state parts in the external directory.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for clarifying this! I assumed that we fallback from external to peers, but instead we do just external.
In this case, how are we going to test the DSS? DSS still relies on the external storage for state parts, but if you set the external configs, will only do cloud sync.