Skip to content

Commit

Permalink
Untangle update method (WIP) ethereum#50
Browse files Browse the repository at this point in the history
The `update()` method of `KademliaProtocol` is far too complex for sensible
refactoring, this is a first step to untangle all involved state transitions.
  • Loading branch information
konradkonrad committed Dec 22, 2016
1 parent ae18501 commit a9c9a8b
Showing 1 changed file with 95 additions and 44 deletions.
139 changes: 95 additions & 44 deletions devp2p/kademlia.py
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,8 @@ def bond(self, node):
self.ping(node)
elif not node.ping_recv:
self.waiting_for_ping.append(node)
assert node.bonded
else:
assert node.bonded

def update(self, node, pingid=None):
"""
Expand Down Expand Up @@ -434,48 +435,35 @@ def update(self, node, pingid=None):
if not node.bonded:
self.bond(node)

def _expected_pongs():
return set(v[1] for v in self._expected_pongs.values())

if pingid and (pingid not in self._expected_pongs):
assert pingid not in self._expected_pongs
log.debug('surprising pong', remoteid=node,
expected=_expected_pongs(), pingid=pingid.encode('hex')[:8])
if pingid in self._deleted_pingids:
log.debug('surprising pong was deleted')
else:
for key in self._expected_pongs:
if key.endswith(node.pubkey):
log.debug('waiting for ping from node, but echo mismatch', node=node,
expected_echo=key[:len(node.pubkey)][:8].encode('hex'),
received_echo=pingid[:len(node.pubkey)][:8].encode('hex'))
return
return self._process_surprising_pong(node, pingid)

# check for timed out pings and eventually evict them
for _pingid, (timeout, _node, replacement) in self._expected_pongs.items():
if time.time() > timeout:
log.debug('deleting timedout node', remoteid=_node,
pingid=_pingid.encode('hex')[:8])
self._deleted_pingids.add(_pingid) # FIXME this is for testing
del self._expected_pongs[_pingid]
self.routing.remove_node(_node)
if replacement:
log.debug('adding replacement', remoteid=replacement)
self.update(replacement)
return
if _node == node: # prevent node from being added later
return
if self._check_timed_out_pings(node):
# prevent node from being added later
return

# if we had registered this node for eviction test
if pingid in self._expected_pongs:
timeout, _node, replacement = self._expected_pongs[pingid]
log.debug('received expected pong', remoteid=node)
if replacement:
log.debug('adding replacement to cache', remoteid=replacement)
self.routing.bucket_by_node(replacement).replacement_cache.append(replacement)
del self._expected_pongs[pingid]
if pingid and (pingid in self._expected_pongs):
self._handle_eviction_test_ping(node, pingid)

# add node
self._add_or_queue_eviction_test(node)

# check for not full buckets and ping replacements
self._ping_potential_replacements()

# check idle buckets
self._check_idle_buckets()

# check and remove timedout find requests
self._check_and_remove_timed_out_find_requests()

log.debug('updated', num_nodes=len(self.routing), num_buckets=len(self.routing.buckets))

def _add_or_queue_eviction_test(self, node):
"""Try to add the node. If routing proposes an eviction candidate instead,
add queue the eviction test.
"""
eviction_candidate = self.routing.add_node(node)
if eviction_candidate:
log.debug('could not add', remoteid=node, pinging=eviction_candidate)
Expand All @@ -484,12 +472,31 @@ def _expected_pongs():
else:
log.debug('added', remoteid=node)

# check for not full buckets and ping replacements
def _handle_eviction_test_ping(self, node, pingid):
"""Consume expected pong (by pingid) and handle potential eviction replacement.
"""
timeout, _node, replacement = self._expected_pongs[pingid]
log.debug('received expected pong', remoteid=node)
if replacement:
log.debug('adding replacement to cache', remoteid=replacement)
self.routing.bucket_by_node(replacement).replacement_cache.append(replacement)
del self._expected_pongs[pingid]

def _ping_potential_replacements(self):
"""Find unfilled buckets and ping potential replacements.
"""
for bucket in self.routing.not_full_buckets:
for node in bucket.replacement_cache:
self.ping(node)

# check idle buckets
def _check_and_remove_timed_out_find_requests(self):
"""Cleanup find_requests if timed out
"""
for nodeid, timeout in self._find_requests.items():
if time.time() > timeout:
del self._find_requests[nodeid]

def _check_idle_buckets(self):
"""
idle bucket refresh:
for each bucket which hasn't been touched in 3600 seconds
Expand All @@ -499,12 +506,56 @@ def _expected_pongs():
rid = random.randint(bucket.start, bucket.end)
self.find_node(rid)

# check and removed timedout find requests
for nodeid, timeout in self._find_requests.items():
if time.time() > timeout:
del self._find_requests[nodeid]
def _expected_pongs(self):
return set(v[1] for v in self._expected_pongs.values())

log.debug('updated', num_nodes=len(self.routing), num_buckets=len(self.routing.buckets))
def _process_surprising_pong(self, node, pingid):
"""Receive an unknown pingid (from pong)
Args:
node: the node in scope
pingid: the pingid
"""
assert pingid not in self._expected_pongs
log.debug('surprising pong', remoteid=node,
expected=self._expected_pongs(), pingid=pingid.encode('hex')[:8])
if pingid in self._deleted_pingids:
log.debug('surprising pong was deleted')
else:
for key in self._expected_pongs:
if key.endswith(node.pubkey):
log.debug('waiting for ping from node, but echo mismatch', node=node,
expected_echo=key[:len(node.pubkey)][:8].encode('hex'),
received_echo=pingid[:len(node.pubkey)][:8].encode('hex'))

def _check_timed_out_pings(self, node):
"""Check for timed out pings and eventually evict them.
If there are replacements registered for timed out pings, recurse into
`self.update` with the replacements.
Args:
node: the node in scope
Return:
timed_out (boolean): if a ping to the node in scope timed out
"""
timed_out = False
replacements = []
# check for timed out pings and eventually evict them
for _pingid, (timeout, _node, replacement) in self._expected_pongs.items():
if time.time() > timeout:
log.debug('deleting timedout node', remoteid=_node,
pingid=_pingid.encode('hex')[:8])
self._deleted_pingids.add(_pingid) # FIXME this is for testing
del self._expected_pongs[_pingid]
self.routing.remove_node(_node)
if replacement:
log.debug('adding replacement', remoteid=replacement)
assert replacement != node
replacements.append(replacement)
if _node == node: # prevent node from being added later
timed_out = True
if replacements:
for replacement in replacements:
self.update(replacement)
return timed_out

def _mkpingid(self, echoed, node):
assert node.pubkey
Expand Down

0 comments on commit a9c9a8b

Please sign in to comment.