Skip to content

Commit

Permalink
Add IPv8-based listening point adapter
Browse files Browse the repository at this point in the history
  • Loading branch information
ichorid committed May 8, 2018
1 parent 2a018b6 commit c2c1783
Show file tree
Hide file tree
Showing 2 changed files with 409 additions and 0 deletions.
121 changes: 121 additions & 0 deletions endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

from util import is_valid_address_or_log
from .candidate import Candidate
from ipv8_endpoint import EndpointListener


if sys.platform == 'win32':
Expand Down Expand Up @@ -394,3 +395,123 @@ def data_came_in(self, packets, cache=True):
if self.mim.data_came_in(packets):
return
super(MIMEndpoint, self).data_came_in(packets, cache)


class IPv8toDispersyAdapter(EndpointListener, Endpoint):
"""
This is a short-term solution for transition to IPv8 communities.
Thus, a lot of copy-paste code is OK.
If you are still seeing this code by the end of 2018, something went
horribly wrong...
"""

def __init__(self, ipv8_endpoint):
#super(IPv8toDispersyAdapter, self).__init__(ipv8_endpoint)
EndpointListener.__init__(self, ipv8_endpoint)
Endpoint.__init__(self)
self._ipv8_endpoint = ipv8_endpoint
self._address = self._ipv8_endpoint.get_address()
self._ipv8_endpoint.add_listener(self)
self.packet_handlers = {}

def get_address(self):
return self._address

def send(self, candidates, packets, prefix=None):
assert isinstance(candidates, (tuple, list, set)), type(candidates)
assert all(isinstance(candidate, Candidate) for candidate in candidates), [type(candidate) for candidate in candidates]
assert isinstance(packets, (tuple, list, set)), type(packets)
assert all(isinstance(packet, str) for packet in packets), [type(packet) for packet in packets]
assert all(len(packet) > 0 for packet in packets), [len(packet) for packet in packets]
prefix = prefix or ''
packets = [prefix + packet for packet in packets]

if any(len(packet) > 2 ** 16 - 60 for packet in packets):
raise RuntimeError("UDP does not support %d byte packets" % max(len(packet) for packet in packets))

send_packet = False
for candidate, packet in product(candidates, packets):
if self.send_packet(candidate, packet):
send_packet = True

return send_packet

def send_packet(self, candidate, packet, prefix=None):
assert self._dispersy, "Should not be called before open(...)"
assert isinstance(candidate, Candidate), type(candidate)
assert isinstance(packet, str), type(packet)
assert len(packet) > 0

packet = (prefix or '') + packet

if len(packet) > 2 ** 16 - 60:
raise RuntimeError("UDP does not support %d byte packets" % len(packet))

self._dispersy.statistics.total_up += len(packet)
self._dispersy.statistics.total_send += 1

data = TUNNEL_PREFIX + packet if candidate.tunnel else packet

self._ipv8_endpoint.send(candidate.sock_addr, data)

if self._logger.isEnabledFor(logging.DEBUG):
self.log_packet(candidate.sock_addr, packet)
return True

def listen_to(self, prefix, handler):
self.packet_handlers[prefix] = handler

def stop_listen_to(self, prefix):
del self.packet_handlers[prefix]

def on_packet(self, packet):
l = list()
l.append(packet)
self.data_came_in(l)

def data_came_in(self, packets, cache=True):
assert self._dispersy, "Should not be called before open(...)"
assert isinstance(packets, (list, tuple)), type(packets)

normal_packets = []
for packet in packets:
prefix = next((p for p in self.packet_handlers if
packet[1].startswith(p)), None)
if prefix:
sock_addr, data = packet
self.packet_handlers[prefix](sock_addr, data[len(prefix):])
else:
normal_packets.append(packet)

if normal_packets:
self._dispersy.statistics.total_down += sum(len(data) for _, data in normal_packets)
if self._logger.isEnabledFor(logging.DEBUG):
for sock_addr, data in normal_packets:
self.log_packet(sock_addr, data, outbound=False)

# The endpoint runs on it's own thread, so we can't do a callLater here
reactor.callFromThread(self.dispersythread_data_came_in, normal_packets, time(), cache)

def dispersythread_data_came_in(self, packets, timestamp, cache=True):
assert self._dispersy, "Should not be called before open(...)"

def strip_if_tunnel(packets):
for sock_addr, data in packets:
if data.startswith(TUNNEL_PREFIX):
yield True, sock_addr, data[TUNNEL_PREFIX_LENGHT:]
else:
yield False, sock_addr, data

try:
self._dispersy.on_incoming_packets([(Candidate(sock_addr, tunnel), data)
for tunnel, sock_addr, data
in strip_if_tunnel(packets)
if is_valid_address_or_log(sock_addr, data)],
cache,
timestamp,
u"standalone_ep")
except AssertionError:
# TODO(Martijn): this effectively disables all assertions, making it harder to crash Tribler clients with
# malformed packets. We should replace this with a more robust design once we redesign Dispersy.
self._logger.exception("Ignored assertion error in Dispersy")

Loading

0 comments on commit c2c1783

Please sign in to comment.