Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CHIA-1638] Pace block requests #18729

Merged
merged 4 commits into from
Nov 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion chia/_tests/plot_sync/test_sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def test_set_connection_values(bt: BlockTools, seeded_random: random.Random) ->
# Test setting a valid connection works
sender.set_connection(farmer_connection) # type:ignore[arg-type]
assert sender._connection is not None
assert sender._connection == farmer_connection # type: ignore[comparison-overlap]
assert id(sender._connection) == id(farmer_connection)


@pytest.mark.anyio
Expand Down
72 changes: 68 additions & 4 deletions chia/full_node/full_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@
from chia.util.errors import ConsensusError, Err, TimestampError, ValidationError
from chia.util.ints import uint8, uint32, uint64, uint128
from chia.util.limited_semaphore import LimitedSemaphore
from chia.util.network import is_localhost
from chia.util.path import path_from_root
from chia.util.profiler import enable_profiler, mem_profile_task, profile_task
from chia.util.safe_cancel_task import cancel_task_safe
Expand Down Expand Up @@ -1124,27 +1125,84 @@ async def sync_from_fork_point(
blockchain = AugmentedBlockchain(self.blockchain)

async def fetch_blocks(output_queue: asyncio.Queue[Optional[tuple[WSChiaConnection, list[FullBlock]]]]) -> None:
# the rate limit for respond_blocks is 100 messages / 60 seconds.
# But the limit is scaled to 30% for outbound messages, so that's 30
# messages per 60 seconds.
# That's 2 seconds per request.
seconds_per_request = 2
emlowe marked this conversation as resolved.
Show resolved Hide resolved
start_height, end_height = 0, 0
new_peers_with_peak: list[WSChiaConnection] = peers_with_peak[:]

# the timestamp of when the next request_block message is allowed to
# be sent. It's initialized to the current time, and bumped by the
# seconds_per_request every time we send a request. This ensures we
# won't exceed the 100 requests / 60 seconds rate limit.
# Whichever peer has the lowest timestamp is the one we request
# from. peers that take more than 5 seconds to respond are pushed to
# the end of the queue, to be less likely to request from.

# This should be cleaned up to not be a hard coded value, and maybe
# allow higher request rates (and align the request_blocks and
# respond_blocks rate limits).
now = time.monotonic()
new_peers_with_peak: list[tuple[WSChiaConnection, float]] = [(c, now) for c in peers_with_peak[:]]
self.log.info(f"peers with peak: {len(new_peers_with_peak)}")
random.shuffle(new_peers_with_peak)
try:
# block request ranges are *inclusive*, this requires some
# gymnastics of this range (+1 to make it exclusive, like normal
# ranges) and then -1 when forming the request message
for start_height in range(fork_point_height, target_peak_sb_height + 1, batch_size):
end_height = min(target_peak_sb_height, start_height + batch_size - 1)
request = RequestBlocks(uint32(start_height), uint32(end_height), True)
new_peers_with_peak.sort(key=lambda pair: pair[1])
fetched = False
for peer in random.sample(new_peers_with_peak, len(new_peers_with_peak)):
for idx, (peer, timestamp) in enumerate(new_peers_with_peak):
if peer.closed:
continue

start = time.monotonic()
if start < timestamp:
# rate limit ourselves, since we sent a message to
# this peer too recently
await asyncio.sleep(timestamp - start)
start = time.monotonic()

# update the timestamp, now that we're sending a request
# it's OK for the timestamp to fall behind wall-clock
# time. It just means we're allowed to send more
# requests to catch up
if is_localhost(peer.peer_info.host):
# we don't apply rate limits to localhost, and our
# tests depend on it
bump = 0.1
else:
bump = seconds_per_request

new_peers_with_peak[idx] = (
new_peers_with_peak[idx][0],
new_peers_with_peak[idx][1] + bump,
)
response = await peer.call_api(FullNodeAPI.request_blocks, request, timeout=30)
end = time.monotonic()
if end - start > 5:
self.log.info(f"sync pipeline, peer took {end - start:0.2f} to respond to request_blocks")
if response is None:
self.log.info(f"peer timed out after {end - start:.1f} s")
await peer.close()
elif isinstance(response, RespondBlocks):
if end - start > 5:
self.log.info(f"peer took {end - start:.1f} s to respond to request_blocks")
# this isn't a great peer, reduce its priority
# to prefer any peers that had to wait for it.
# By setting the next allowed timestamp to now,
# means that any other peer that has waited for
# this will have its next allowed timestamp in
# the passed, and be prefered multiple times
# over this peer.
new_peers_with_peak[idx] = (
new_peers_with_peak[idx][0],
end,
arvidn marked this conversation as resolved.
Show resolved Hide resolved
)
start = time.monotonic()
await output_queue.put((peer, response.blocks))
end = time.monotonic()
Expand All @@ -1159,8 +1217,12 @@ async def fetch_blocks(output_queue: asyncio.Queue[Optional[tuple[WSChiaConnecti
self.log.error(f"failed fetching {start_height} to {end_height} from peers")
return
if self.sync_store.peers_changed.is_set():
new_peers_with_peak = self.get_peers_with_peak(peak_hash)
existing_peers = {id(c): timestamp for c, timestamp in new_peers_with_peak}
peers = self.get_peers_with_peak(peak_hash)
new_peers_with_peak = [(c, existing_peers.get(id(c), end)) for c in peers]
random.shuffle(new_peers_with_peak)
self.sync_store.peers_changed.clear()
self.log.info(f"peers with peak: {len(new_peers_with_peak)}")
except Exception as e:
self.log.error(f"Exception fetching {start_height} to {end_height} from peer {e}")
finally:
Expand Down Expand Up @@ -1267,7 +1329,9 @@ async def ingest_blocks(
block_rate_time = now
block_rate_height = end_height

self.log.info(f"Added blocks {start_height} to {end_height} ({block_rate} blocks/s)")
self.log.info(
f"Added blocks {start_height} to {end_height} ({block_rate} blocks/s) (from: {peer.peer_info.ip})"
)
peak: Optional[BlockRecord] = self.blockchain.get_peak()
if state_change_summary is not None:
assert peak is not None
Expand Down
Loading