Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable rust endpoint for anonymous downloads #7908

Merged
merged 5 commits into from
Feb 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion requirements-core.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@ sentry-sdk==1.31.0
yappi==1.4.0
yarl==1.9.2 # keep this dependency higher than 1.6.3. See: https://github.com/aio-libs/yarl/issues/517
bitarray==2.7.6
pyipv8==2.12.0
pyipv8==2.13.0
libtorrent==1.2.19
file-read-backwards==3.0.0
Brotli==1.0.9 # to prevent AttributeError on macOs: module 'brotli' has no attribute 'error' (in urllib3.response)
human-readable==1.3.2
colorlog==6.7.0
filelock==3.13.0
ipv8-rust-tunnels==0.1.15
6 changes: 3 additions & 3 deletions scripts/experiments/tunnel_community/speed_test_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from binascii import unhexlify
from pathlib import Path

from ipv8.messaging.anonymization.tunnel import EXIT_NODE, ORIGINATOR
from ipv8.messaging.anonymization.tunnel import BACKWARD, FORWARD
from ipv8.taskmanager import task

from scripts.experiments.tunnel_community.speed_test_exit import EXPERIMENT_NUM_CIRCUITS, EXPERIMENT_NUM_HOPS, \
Expand Down Expand Up @@ -34,8 +34,8 @@ async def on_circuit_ready(self, address):
self.index += 1
self.logger.info(f"on_circuit_ready: {self.index}/{EXPERIMENT_NUM_CIRCUITS}")
circuit = self.tunnel_community.circuits[self.tunnel_community.ip_to_circuit_id(address[0])]
self.results += await self.run_speed_test(ORIGINATOR, circuit, index, EXPERIMENT_NUM_MB)
self.results += await self.run_speed_test(EXIT_NODE, circuit, index, EXPERIMENT_NUM_MB)
self.results += await self.run_speed_test(BACKWARD, circuit, index, EXPERIMENT_NUM_MB)
self.results += await self.run_speed_test(FORWARD, circuit, index, EXPERIMENT_NUM_MB)
self.tunnel_community.remove_circuit(circuit.circuit_id)
if self.index >= EXPERIMENT_NUM_CIRCUITS:
self._graceful_shutdown()
Expand Down
12 changes: 6 additions & 6 deletions scripts/experiments/tunnel_community/speed_test_exit.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import os
from pathlib import Path

from ipv8.messaging.anonymization.tunnel import EXIT_NODE, ORIGINATOR
from ipv8.messaging.anonymization.tunnel import BACKWARD, FORWARD
from ipv8.messaging.anonymization.utils import run_speed_test
from ipv8.taskmanager import TaskManager

Expand Down Expand Up @@ -60,25 +60,25 @@ async def on_tribler_started(self):
circuit = community.create_circuit(EXPERIMENT_NUM_HOPS)
if circuit and (await circuit.ready):
index += 1
self.results += await self.run_speed_test(ORIGINATOR, circuit, index, EXPERIMENT_NUM_MB)
self.results += await self.run_speed_test(EXIT_NODE, circuit, index, EXPERIMENT_NUM_MB)
self.results += await self.run_speed_test(BACKWARD, circuit, index, EXPERIMENT_NUM_MB)
self.results += await self.run_speed_test(FORWARD, circuit, index, EXPERIMENT_NUM_MB)
self.logger.info(f"Remove circuit: {index}/{EXPERIMENT_NUM_CIRCUITS}")
community.remove_circuit(circuit.circuit_id)
else:
await asyncio.sleep(1)
self._graceful_shutdown()

async def run_speed_test(self, direction, circuit, index, size):
request_size = 0 if direction == ORIGINATOR else 1024
response_size = 1024 if direction == ORIGINATOR else 0
request_size = 0 if direction == BACKWARD else 1024
response_size = 1024 if direction == BACKWARD else 0
num_requests = size * 1024
component = self.session.get_instance(TunnelsComponent)
task = asyncio.create_task(run_speed_test(component.community, circuit, request_size,
response_size, num_requests, window=50))
results = []
prev_transferred = ts = 0
while not task.done():
cur_transferred = circuit.bytes_down if direction == ORIGINATOR else circuit.bytes_up
cur_transferred = circuit.bytes_down if direction == BACKWARD else circuit.bytes_up
results.append((ts, index, direction, (cur_transferred - prev_transferred) / 1024))
prev_transferred = cur_transferred
ts += 1
Expand Down
1 change: 1 addition & 0 deletions src/tribler/core/components/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ def tribler_config(tmp_path) -> TriblerConfig:
config.torrent_checking.enabled = False
config.ipv8.enabled = False
config.ipv8.walk_scaling_enabled = False
config.ipv8.rust_endpoint = False
config.discovery_community.enabled = False
config.libtorrent.enabled = False
config.libtorrent.dht_readiness_timeout = 0
Expand Down
5 changes: 0 additions & 5 deletions src/tribler/core/components/ipv8/eva/payload.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,4 @@
from dataclasses import dataclass

from ipv8.messaging.lazy_payload import VariablePayload, vp_compile
from ipv8.messaging.payload_dataclass import overwrite_dataclass

dataclass = overwrite_dataclass(dataclass)


@vp_compile
Expand Down
18 changes: 12 additions & 6 deletions src/tribler/core/components/ipv8/ipv8_component.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
from typing import Optional

try:
from ipv8_rust_tunnels.endpoint import RustEndpoint as UDPEndpoint
except ImportError:
from ipv8.messaging.interfaces.udp.endpoint import UDPEndpoint

Check warning on line 6 in src/tribler/core/components/ipv8/ipv8_component.py

View check run for this annotation

Codecov / codecov/patch

src/tribler/core/components/ipv8/ipv8_component.py#L5-L6

Added lines #L5 - L6 were not covered by tests

from ipv8.bootstrapping.dispersy.bootstrapper import DispersyBootstrapper
from ipv8.configuration import ConfigBuilder, DISPERSY_BOOTSTRAPPER
from ipv8.dht.churn import PingChurn
from ipv8.dht.discovery import DHTDiscoveryCommunity
from ipv8.dht.routing import RoutingTable
from ipv8.messaging.interfaces.dispatcher.endpoint import DispatcherEndpoint
from ipv8.messaging.interfaces.dispatcher.endpoint import DispatcherEndpoint, INTERFACES
from ipv8.messaging.interfaces.udp.endpoint import UDPv4Address
from ipv8.peer import Peer
from ipv8.peerdiscovery.churn import RandomChurn
Expand Down Expand Up @@ -51,7 +56,7 @@
self.rendevous_hook = RendezvousHook(self.rendezvous_db)

port = config.ipv8.port
address = config.ipv8.address
address = '127.0.0.1' if config.gui_test_mode else config.ipv8.address
self.logger.info('Starting ipv8')
self.logger.info(f'Port: {port}. Address: {address}')
ipv8_config_builder = (ConfigBuilder()
Expand All @@ -62,10 +67,11 @@
.set_working_directory(str(config.state_dir))
.set_walker_interval(config.ipv8.walk_interval))

if config.gui_test_mode:
endpoint = DispatcherEndpoint([])
else:
endpoint = DispatcherEndpoint(["UDPIPv4"], UDPIPv4={'port': port, 'ip': address})
if config.ipv8.rust_endpoint and not config.gui_test_mode:
INTERFACES["UDPIPv4"] = UDPEndpoint

Check warning on line 71 in src/tribler/core/components/ipv8/ipv8_component.py

View check run for this annotation

Codecov / codecov/patch

src/tribler/core/components/ipv8/ipv8_component.py#L71

Added line #L71 was not covered by tests

endpoint = DispatcherEndpoint(["UDPIPv4"], UDPIPv4={'port': port, 'ip': address})

ipv8 = IPv8(ipv8_config_builder.finalize(),
enable_statistics=config.ipv8.statistics and not config.gui_test_mode,
endpoint_override=endpoint)
Expand Down
1 change: 1 addition & 0 deletions src/tribler/core/components/ipv8/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ class Ipv8Settings(TriblerConfigSection):
enabled: bool = True
port: int = 7759
address: str = '0.0.0.0'
rust_endpoint: bool = True
bootstrap_override: Optional[str] = None
statistics: bool = False
rendezvous_stats: bool = False
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,4 @@
from dataclasses import dataclass

from ipv8.messaging.payload_dataclass import overwrite_dataclass, type_from_format

dataclass = overwrite_dataclass(dataclass)
from ipv8.messaging.payload_dataclass import dataclass, type_from_format


@dataclass
Expand Down Expand Up @@ -30,21 +26,24 @@ class StatementOperationSignature:
signature: type_from_format('64s')


@dataclass(msg_id=STATEMENT_OPERATION_MESSAGE_ID)
@dataclass
class RawStatementOperationMessage:
""" RAW payload class is used for reducing ipv8 unpacking operations
For more information take a look at: https://github.com/Tribler/tribler/pull/6396#discussion_r728334323
"""
msg_id = STATEMENT_OPERATION_MESSAGE_ID
operation: RAW_DATA
signature: RAW_DATA


@dataclass(msg_id=STATEMENT_OPERATION_MESSAGE_ID)
@dataclass
class StatementOperationMessage:
msg_id = STATEMENT_OPERATION_MESSAGE_ID
operation: StatementOperation
signature: StatementOperationSignature


@dataclass(msg_id=1)
@dataclass
class RequestStatementOperationMessage:
msg_id = 1
count: int
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
SOCKS_VERSION,
socks5_serializer,
)
from tribler.core.components.socks_servers.socks5.udp_connection import SocksUDPConnection
from tribler.core.components.socks_servers.socks5.udp_connection import SocksUDPConnection, RustUDPConnection


class ConnectionState:
Expand Down Expand Up @@ -174,7 +174,11 @@
# The DST.ADDR and DST.PORT fields contain the address and port that the client expects
# to use to send UDP datagrams on for the association. The server MAY use this information
# to limit access to the association.
self.udp_connection = SocksUDPConnection(self, request.destination)
if self.socksserver and self.socksserver.rust_endpoint:
self.udp_connection = RustUDPConnection(self.socksserver.rust_endpoint, self.socksserver.hops)

Check warning on line 178 in src/tribler/core/components/socks_servers/socks5/connection.py

View check run for this annotation

Codecov / codecov/patch

src/tribler/core/components/socks_servers/socks5/connection.py#L178

Added line #L178 was not covered by tests
else:
self.udp_connection = SocksUDPConnection(self, request.destination)

await self.udp_connection.open()
ip, _ = self.transport.get_extra_info('sockname')
port = self.udp_connection.get_listen_port()
Expand Down
11 changes: 8 additions & 3 deletions src/tribler/core/components/socks_servers/socks5/server.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import logging
from asyncio import get_event_loop
from typing import Optional
from typing import Optional, List

from tribler.core.components.socks_servers.socks5.connection import Socks5Connection
from tribler.core.components.tunnel.community.dispatcher import TunnelDispatcher
Expand All @@ -11,12 +11,17 @@ class Socks5Server:
This object represents a Socks5 server.
"""

def __init__(self, port=None, output_stream: Optional[TunnelDispatcher] = None):
def __init__(self, hops:int ,
port=None,
output_stream: Optional[TunnelDispatcher] = None,
rust_endpoint=None):
self._logger = logging.getLogger(self.__class__.__name__)
self.hops = hops
self.port = port
self.output_stream = output_stream
self.server = None
self.sessions = []
self.sessions: List[Socks5Connection] = []
self.rust_endpoint = rust_endpoint

async def start(self):
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

@pytest.fixture(name='socks5_server')
async def fixture_socks5_server(free_port):
socks5_server = Socks5Server(free_port, Mock())
socks5_server = Socks5Server(1, free_port, Mock())
yield socks5_server
await socks5_server.stop()

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from unittest.mock import Mock

import pytest

from tribler.core.components.socks_servers.socks5.udp_connection import SocksUDPConnection
from tribler.core.components.socks_servers.socks5.udp_connection import SocksUDPConnection, RustUDPConnection


@pytest.fixture
Expand Down Expand Up @@ -37,3 +39,30 @@ def test_send_diagram(connection):
assert connection.send_datagram(b'a')
connection.remote_udp_address = None
assert not connection.send_datagram(b'a')


async def test_rust_udp_connection():
"""
Test the rust SOCKS5 UDP connection
"""
rust_endpoint = Mock()
rust_endpoint.create_udp_associate = Mock(return_value=5000)
connection = RustUDPConnection(rust_endpoint, 1)

await connection.open()
rust_endpoint.create_udp_associate.assert_called_with(0, 1)
assert connection.get_listen_port() == 5000

await connection.open()
rust_endpoint.create_udp_associate.assert_called_once()

connection.remote_udp_address = ('1.2.3.4', 5)
rust_endpoint.set_udp_associate_default_remote.assert_called_with(('1.2.3.4', 5))
assert connection.remote_udp_address is None

connection.close()
rust_endpoint.close_udp_associate.assert_called_once()
assert connection.get_listen_port() is None

connection.close()
rust_endpoint.close_udp_associate.assert_called_once()
33 changes: 33 additions & 0 deletions src/tribler/core/components/socks_servers/socks5/udp_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,36 @@ def close(self):
if self.transport:
self.transport.close()
self.transport = None


class RustUDPConnection:

def __init__(self, rust_endpoint, hops):
self.rust_endpoint = rust_endpoint
self.hops = hops
self.port = None
self.logger = logging.getLogger(self.__class__.__name__)

@property
def remote_udp_address(self) -> None:
# Ensure this connection doesn't get picked up by the dispatcher
return None

@remote_udp_address.setter
def remote_udp_address(self, address: tuple) -> None:
self.rust_endpoint.set_udp_associate_default_remote(address)

async def open(self):
if self.port is not None:
self.logger.error("UDP connection is already open on port %s", self.port)
return

self.port = self.rust_endpoint.create_udp_associate(0, self.hops)

def get_listen_port(self):
return self.port

def close(self):
if self.port is not None:
self.rust_endpoint.close_udp_associate(self.port)
self.port = None
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
from typing import List

from ipv8.messaging.interfaces.dispatcher.endpoint import DispatcherEndpoint

from ipv8_rust_tunnels.endpoint import RustEndpoint

from tribler.core.components.component import Component
from tribler.core.components.exceptions import NoneComponent
from tribler.core.components.ipv8.ipv8_component import Ipv8Component
from tribler.core.components.reporter.reporter_component import ReporterComponent
from tribler.core.components.socks_servers.socks5.server import Socks5Server
from tribler.core.utilities.network_utils import default_network_utils

NUM_SOCKS_PROXIES = 5

NUM_SOCKS_PROXIES = 3
SOCKS5_SERVER_PORTS = 'socks5_server_ports'


Expand All @@ -17,9 +24,19 @@ async def run(self):
await self.get_component(ReporterComponent)
self.socks_servers = []
self.socks_ports = []

# If IPv8 has been started using the RustEndpoint, find it
ipv8_component = await self.maybe_component(Ipv8Component)
rust_endpoint = None
if not isinstance(ipv8_component, NoneComponent):
ipv4_endpoint = ipv8_component.ipv8.endpoint
if isinstance(ipv4_endpoint, DispatcherEndpoint):
ipv4_endpoint = ipv4_endpoint.interfaces.get("UDPIPv4", None)
rust_endpoint = ipv4_endpoint if isinstance(ipv4_endpoint, RustEndpoint) else None

# Start the SOCKS5 servers
for _ in range(NUM_SOCKS_PROXIES):
socks_server = Socks5Server()
for hops in range(NUM_SOCKS_PROXIES):
socks_server = Socks5Server(hops + 1, rust_endpoint=rust_endpoint)
self.socks_servers.append(socks_server)
await socks_server.start()
socks_port = socks_server.port
Expand Down
3 changes: 2 additions & 1 deletion src/tribler/core/components/tunnel/community/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ def on_socks5_udp_data(self, udp_connection, request):
return False

self._logger.debug("Sending data over circuit %d destined for %r:%r", circuit.circuit_id, *request.destination)
self.tunnels.send_data(circuit.peer, circuit.circuit_id, request.destination, ('0.0.0.0', 0), request.data)
self.tunnels.send_data(circuit.hop.address, circuit.circuit_id,
request.destination, ('0.0.0.0', 0), request.data) # nosec B104
return True

@task
Expand Down
7 changes: 4 additions & 3 deletions src/tribler/core/components/tunnel/community/payload.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
from __future__ import annotations

from ipv8.messaging.lazy_payload import VariablePayload, vp_compile
from ipv8.messaging.anonymization.payload import CellablePayload
from ipv8.messaging.lazy_payload import vp_compile


@vp_compile
class HTTPRequestPayload(VariablePayload):
class HTTPRequestPayload(CellablePayload):
msg_id = 28
format_list = ['I', 'I', 'address', 'varlenH']
names = ['circuit_id', 'identifier', 'target', 'request']


@vp_compile
class HTTPResponsePayload(VariablePayload):
class HTTPResponsePayload(CellablePayload):
msg_id = 29
format_list = ['I', 'I', 'H', 'H', 'varlenH']
names = ['circuit_id', 'identifier', 'part', 'total', 'response']
Loading
Loading