From f887c6f2a4f9fe6a8914f0a72961f7e68c133d7d Mon Sep 17 00:00:00 2001 From: Olivier Desenfans Date: Fri, 14 Oct 2022 14:22:14 +0200 Subject: [PATCH] Feature: use the Aleph.im P2P service (#328) Problem: P2P communication with the JS P2P daemon is sometimes unstable. Solution: develop a new P2P service for Aleph.im nodes to replace this daemon. * Rewrote all the calls to the P2P daemon to use the new service instead * Adapted Docker Compose files * Decommissioned the P2P protocol implementation as streams are not supported by the new service for now * The key generation mechanism now writes the private key file in PKCS8 DER format, for compatibility with the P2P service * Added a migration script to migrate the private key of existing nodes to the new format --- .github/workflows/pyaleph-ci.yml | 6 - deployment/docker-build/config.yml | 1 - deployment/docker-build/docker-compose.yml | 43 ++-- .../docker-build/jsp2pd/build_jsp2pd.sh | 13 -- .../docker-build/jsp2pd/jsp2pd.dockerfile | 14 -- .../scripts/0003-convert-key-to-pkcs8-der.py | 78 +++++++ .../samples/docker-compose/docker-compose.yml | 37 +++- .../samples/docker-compose/sample-config.yml | 7 +- .../docker-monitoring/docker-compose.yml | 37 +++- .../docker-monitoring/sample-config.yml | 3 +- deployment/scripts/run_aleph_ccn.sh | 3 + docs/guides/private_net.rst | 1 - setup.cfg | 1 + src/aleph/commands.py | 23 +-- src/aleph/config.py | 21 +- src/aleph/jobs/job_utils.py | 1 - src/aleph/network.py | 18 +- src/aleph/services/ipfs/pubsub.py | 4 +- src/aleph/services/keys.py | 15 +- src/aleph/services/p2p/__init__.py | 34 +-- src/aleph/services/p2p/jobs.py | 10 +- src/aleph/services/p2p/manager.py | 18 +- src/aleph/services/p2p/peers.py | 19 +- src/aleph/services/p2p/protocol.py | 195 +----------------- src/aleph/services/p2p/pubsub.py | 34 +-- src/aleph/services/p2p/singleton.py | 19 +- src/aleph/services/peers/monitor.py | 52 ++--- src/aleph/services/peers/publish.py | 12 +- src/aleph/services/utils.py | 17 -- src/aleph/storage.py | 7 +- src/aleph/web/controllers/p2p.py | 3 +- tests/p2p/conftest.py | 24 --- tests/p2p/test_connect.py | 85 -------- tests/p2p/test_identify.py | 15 -- tests/p2p/test_pubsub.py | 79 ------- 35 files changed, 292 insertions(+), 657 deletions(-) delete mode 100644 deployment/docker-build/jsp2pd/build_jsp2pd.sh delete mode 100644 deployment/docker-build/jsp2pd/jsp2pd.dockerfile create mode 100644 deployment/migrations/scripts/0003-convert-key-to-pkcs8-der.py delete mode 100644 tests/p2p/conftest.py delete mode 100644 tests/p2p/test_connect.py delete mode 100644 tests/p2p/test_identify.py delete mode 100644 tests/p2p/test_pubsub.py diff --git a/.github/workflows/pyaleph-ci.yml b/.github/workflows/pyaleph-ci.yml index 694d64436..aa9013e60 100644 --- a/.github/workflows/pyaleph-ci.yml +++ b/.github/workflows/pyaleph-ci.yml @@ -28,10 +28,6 @@ jobs: with: # Fetch the whole history for all tags and branches (required for aleph.__version__) fetch-depth: 0 - # Install nodejs for jsp2pd - - uses: actions/setup-node@v2 - with: - node-version: '16' - name: Set up Python 3.8 id: setup-python uses: actions/setup-python@v2 @@ -45,8 +41,6 @@ jobs: with: path: ${{ steps.pip-cache.outputs.dir }} key: ${{ runner.os }}-python-${{ steps.setup-python.outputs.python-version }}-pip-${{ hashFiles('setup.cfg') }} - - name: Install jsp2pd - run: npm install --global libp2p-daemon@0.10.2 - name: Install Python dependencies run: | pip install .[testing] diff --git a/deployment/docker-build/config.yml b/deployment/docker-build/config.yml index 39ed308fa..3d4d26e52 100644 --- a/deployment/docker-build/config.yml +++ b/deployment/docker-build/config.yml @@ -43,7 +43,6 @@ p2p: http_port: 4024 port: 4025 control_port: 4030 - listen_port: 4031 reconnect_delay: 60 sentry: diff --git a/deployment/docker-build/docker-compose.yml b/deployment/docker-build/docker-compose.yml index 37a3f33da..36c3014ec 100644 --- a/deployment/docker-build/docker-compose.yml +++ b/deployment/docker-build/docker-compose.yml @@ -8,22 +8,39 @@ volumes: pyaleph-mongodb: services: - p2pd: + + rabbitmq: restart: always - image: alephim/jsp2pd:0.10.2-1.0.0 + image: rabbitmq:3.10.7-management networks: - pyaleph environment: - PRIVATE_KEY_FILE: "/etc/jsp2pd/keys/serialized-node-secret.key" - LISTEN_MADDR: "/ip4/0.0.0.0/tcp/4030" - HOST_MADDRS: "/ip4/0.0.0.0/tcp/4025" - PUBSUB: "true" - PUBSUB_ROUTER: "floodsub" + RABBITMQ_DEFAULT_USER: guest + RABBITMQ_DEFAULT_PASS: guest ports: - - "4025:4025" - - "4030:4030" + - "127.0.0.1:5672:5672" + - "127.0.0.1:15672:15672" + + p2p-service: + restart: always + image: alephim/p2p-service:0.1.1 + networks: + - pyaleph volumes: - - ../../keys:/etc/jsp2pd/keys + - ../../config.yml:/etc/p2p-service/config.yml + - ../../keys/node-secret.pkcs8.der:/etc/p2p-service/node-secret.pkcs8.der + depends_on: + - rabbitmq + environment: + RUST_LOG: info + ports: + - "4025:4025" + - "127.0.0.1:4030:4030" + command: + - "--config" + - "/etc/p2p-service/config.yml" + - "--private-key-file" + - "/etc/p2p-service/node-secret.pkcs8.der" ipfs: restart: always @@ -31,8 +48,8 @@ services: ports: - "4001:4001" - "4001:4001/udp" - - "5001:5001" - - "8080:8080" + - "127.0.0.1:5001:5001" + - "127.0.0.1:8080:8080" volumes: - "pyaleph-ipfs:/data/ipfs" environment: @@ -50,7 +67,7 @@ services: networks: - pyaleph ports: - - "27017:27017" + - "127.0.0.1:27017:27017" networks: pyaleph: diff --git a/deployment/docker-build/jsp2pd/build_jsp2pd.sh b/deployment/docker-build/jsp2pd/build_jsp2pd.sh deleted file mode 100644 index 34903e41d..000000000 --- a/deployment/docker-build/jsp2pd/build_jsp2pd.sh +++ /dev/null @@ -1,13 +0,0 @@ -#!/usr/bin/env bash - -set -euo pipefail - -SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )" -JSP2PD_VERSION=0.10.2 -DOCKERFILE_VERSION=1.0.0 - -docker build \ - -f "${SCRIPT_DIR}/jsp2pd.dockerfile" \ - --build-arg JSP2PD_VERSION=${JSP2PD_VERSION} \ - -t alephim/jsp2pd:${JSP2PD_VERSION}-${DOCKERFILE_VERSION} \ - "${SCRIPT_DIR}" diff --git a/deployment/docker-build/jsp2pd/jsp2pd.dockerfile b/deployment/docker-build/jsp2pd/jsp2pd.dockerfile deleted file mode 100644 index 83abf1393..000000000 --- a/deployment/docker-build/jsp2pd/jsp2pd.dockerfile +++ /dev/null @@ -1,14 +0,0 @@ -FROM node:17-alpine -ARG JSP2PD_VERSION - -RUN npm install --global libp2p-daemon@${JSP2PD_VERSION} - -USER node - -ENV PRIVATE_KEY_FILE="" -ENV LISTEN_MADDR=/ip4/0.0.0.0/tcp/4024 -ENV HOST_MADDRS=/ip4/0.0.0.0/tcp/4025 -ENV PUBSUB=false -ENV PUBSUB_ROUTER=gossipsub - -ENTRYPOINT jsp2pd --id ${PRIVATE_KEY_FILE} --listen=${LISTEN_MADDR} --hostAddrs=${HOST_MADDRS} --pubsub=${PUBSUB} --pubsubRouter=${PUBSUB_ROUTER} diff --git a/deployment/migrations/scripts/0003-convert-key-to-pkcs8-der.py b/deployment/migrations/scripts/0003-convert-key-to-pkcs8-der.py new file mode 100644 index 000000000..857ed28ce --- /dev/null +++ b/deployment/migrations/scripts/0003-convert-key-to-pkcs8-der.py @@ -0,0 +1,78 @@ +""" +This migration converts the PEM private key file to PKCS8 DER for compatibility +with the new Aleph.im P2P service. The Rust implementation of libp2p can only load +RSA keys in that format. +""" + + +import logging +import os +from pathlib import Path +from typing import Optional + +import yaml +from Crypto.PublicKey import RSA +from p2pclient.libp2p_stubs.crypto.rsa import RSAPrivateKey + +from aleph.exceptions import InvalidKeyDirException + +LOGGER = logging.getLogger(os.path.basename(__file__)) + + +PKCS8_DER_KEY_FILE = "node-secret.pkcs8.der" + + +def convert_pem_key_file_to_pkcs8_der( + pem_key_file: Path, pkcs8_der_key_file: Path +) -> None: + with pem_key_file.open() as pem: + private_key = RSAPrivateKey(RSA.import_key(pem.read())) + + with pkcs8_der_key_file.open("wb") as der: + der.write(private_key.impl.export_key(format="DER", pkcs=8)) + + +def get_key_from_config(config_file: Path) -> Optional[str]: + """ + In previous versions of the CCN, it was possible to set the key value directly + in the config file. This function tries to find it in the config or returns None. + + :param config_file: Path to the CCN configuration file. + :return: The private key used to identify the node on the P2P network, or None + if the key is not provided in the config file. + """ + with open(config_file) as f: + config = yaml.safe_load(f) + + try: + return config["p2p"]["key"] + except KeyError: + return None + + +def upgrade(**kwargs): + key_dir = Path(kwargs["key_dir"]) + pem_key_file = key_dir / "node-secret.key" + + # Nothing to do if the PKCS8 DER key file already exists + pkcs8_der_key_file = key_dir / PKCS8_DER_KEY_FILE + if pkcs8_der_key_file.is_file(): + LOGGER.info( + "Key file %s already exists, nothing to do", + pkcs8_der_key_file, + ) + return + + if not key_dir.is_dir(): + raise InvalidKeyDirException( + f"The specified key directory ('{key_dir}') is not a directory." + ) + + LOGGER.info("Converting the private key file to PKCS8 DER format...") + convert_pem_key_file_to_pkcs8_der(pem_key_file, pkcs8_der_key_file) + LOGGER.info("Successfully created %s.", pkcs8_der_key_file) + + +def downgrade(**kwargs): + # Nothing to do, the key file is still present in the key directory + pass diff --git a/deployment/samples/docker-compose/docker-compose.yml b/deployment/samples/docker-compose/docker-compose.yml index d6627fd9d..0aaabb110 100644 --- a/deployment/samples/docker-compose/docker-compose.yml +++ b/deployment/samples/docker-compose/docker-compose.yml @@ -18,28 +18,45 @@ services: depends_on: - mongodb - ipfs - - p2pd + - p2p-service networks: - pyaleph logging: options: max-size: 50m - p2pd: + p2p-service: restart: always - image: alephim/jsp2pd:0.10.2-1.0.0 + image: alephim/p2p-service:0.1.1 networks: - pyaleph + volumes: + - ./config.yml:/etc/p2p-service/config.yml + - ./keys/node-secret.pkcs8.der:/etc/p2p-service/node-secret.pkcs8.der + depends_on: + - rabbitmq environment: - PRIVATE_KEY_FILE: "/etc/jsp2pd/keys/serialized-node-secret.key" - LISTEN_MADDR: "/ip4/0.0.0.0/tcp/4030" - HOST_MADDRS: "/ip4/0.0.0.0/tcp/4025" - PUBSUB: "true" - PUBSUB_ROUTER: "floodsub" + RUST_LOG: info ports: - "4025:4025" - volumes: - - ./keys:/etc/jsp2pd/keys + - "127.0.0.1:4030:4030" + command: + - "--config" + - "/etc/p2p-service/config.yml" + - "--private-key-file" + - "/etc/p2p-service/node-secret.pkcs8.der" + + rabbitmq: + restart: always + image: rabbitmq:3.10.7-management + networks: + - pyaleph + environment: + RABBITMQ_DEFAULT_USER: guest + RABBITMQ_DEFAULT_PASS: guest + ports: + - "127.0.0.1:5672:5672" + - "127.0.0.1:15672:15672" ipfs: restart: always diff --git a/deployment/samples/docker-compose/sample-config.yml b/deployment/samples/docker-compose/sample-config.yml index 30cea443a..58872f7a6 100644 --- a/deployment/samples/docker-compose/sample-config.yml +++ b/deployment/samples/docker-compose/sample-config.yml @@ -35,12 +35,15 @@ aleph: queue_topic: ALEPH-TEST p2p: - daemon_host: p2pd + daemon_host: p2p-service http_port: 4024 port: 4025 control_port: 4030 - listen_port: 4031 reconnect_delay: 60 +rabbitmq: + host: rabbitmq + port: 5672 + sentry: dsn: "" diff --git a/deployment/samples/docker-monitoring/docker-compose.yml b/deployment/samples/docker-monitoring/docker-compose.yml index 6e156ed87..d22f96fc8 100644 --- a/deployment/samples/docker-monitoring/docker-compose.yml +++ b/deployment/samples/docker-monitoring/docker-compose.yml @@ -20,28 +20,45 @@ services: depends_on: - mongodb - ipfs - - p2pd + - p2p-service networks: - pyaleph logging: options: max-size: 50m - p2pd: + p2p-service: restart: always - image: alephim/jsp2pd:0.10.2-1.0.0 + image: alephim/p2p-service:0.1.1 networks: - pyaleph + volumes: + - ./config.yml:/etc/p2p-service/config.yml + - ./keys/node-secret.pkcs8.der:/etc/p2p-service/node-secret.pkcs8.der + depends_on: + - rabbitmq environment: - PRIVATE_KEY_FILE: "/etc/jsp2pd/keys/serialized-node-secret.key" - LISTEN_MADDR: "/ip4/0.0.0.0/tcp/4030" - HOST_MADDRS: "/ip4/0.0.0.0/tcp/4025" - PUBSUB: "true" - PUBSUB_ROUTER: "floodsub" + RUST_LOG: info ports: - "4025:4025" - volumes: - - ./keys:/etc/jsp2pd/keys + - "127.0.0.1:4030:4030" + command: + - "--config" + - "/etc/p2p-service/config.yml" + - "--private-key-file" + - "/etc/p2p-service/node-secret.pkcs8.der" + + rabbitmq: + restart: always + image: rabbitmq:3.10.7-management + networks: + - pyaleph + environment: + RABBITMQ_DEFAULT_USER: guest + RABBITMQ_DEFAULT_PASS: guest + ports: + - "127.0.0.1:5672:5672" + - "127.0.0.1:15672:15672" ipfs: restart: always diff --git a/deployment/samples/docker-monitoring/sample-config.yml b/deployment/samples/docker-monitoring/sample-config.yml index 30cea443a..e76b64cbb 100644 --- a/deployment/samples/docker-monitoring/sample-config.yml +++ b/deployment/samples/docker-monitoring/sample-config.yml @@ -35,11 +35,10 @@ aleph: queue_topic: ALEPH-TEST p2p: - daemon_host: p2pd + daemon_host: p2p-service http_port: 4024 port: 4025 control_port: 4030 - listen_port: 4031 reconnect_delay: 60 sentry: diff --git a/deployment/scripts/run_aleph_ccn.sh b/deployment/scripts/run_aleph_ccn.sh index 8ae45e9c1..dc72b52b8 100644 --- a/deployment/scripts/run_aleph_ccn.sh +++ b/deployment/scripts/run_aleph_ccn.sh @@ -41,8 +41,11 @@ function wait_for_it() DB_URI=$(get_config mongodb.uri | sed "s-mongodb://--") IPFS_HOST=$(get_config ipfs.host) IPFS_PORT=$(get_config ipfs.port) +RABBITMQ_HOST=$(get_config rabbitmq.host) +RABBITMQ_PORT=$(get_config rabbitmq.port) wait_for_it "${DB_URI}" wait_for_it -h "${IPFS_HOST}" -p "${IPFS_PORT}" +wait_for_it -h "${RABBITMQ_HOST}" -p "${RABBITMQ_PORT}" exec pyaleph "${PYALEPH_ARGS[@]}" diff --git a/docs/guides/private_net.rst b/docs/guides/private_net.rst index 959ed7497..e9a9ce4e7 100644 --- a/docs/guides/private_net.rst +++ b/docs/guides/private_net.rst @@ -55,7 +55,6 @@ Example of config file at this point (I disabled IPFS but you can leave it enabl p2p: host: 0.0.0.0 control_port: 4030 - listen_port: 4031 http_port: 4024 port: 4025 peers: [] diff --git a/setup.cfg b/setup.cfg index 891e6e543..a0d3442ed 100644 --- a/setup.cfg +++ b/setup.cfg @@ -41,6 +41,7 @@ install_requires = aioipfs@git+https://github.com/aleph-im/aioipfs.git@76d5624661e879a13b70f3ea87dc9c9604c7eda7 aleph-client==0.4.6 aleph-message==0.2.2 + aleph-p2p-client@git+https://github.com/aleph-im/p2p-service-client-python@f9fc6057be5bf1712180129c831116a740441434 aleph-pytezos@git+https://github.com/aleph-im/aleph-pytezos.git@97fe92ffa6e21ef5ec17ef4fa16c86022b30044c coincurve==15.0.1 configmanager==1.35.1 diff --git a/src/aleph/commands.py b/src/aleph/commands.py index f3170c220..4d10488f5 100644 --- a/src/aleph/commands.py +++ b/src/aleph/commands.py @@ -21,7 +21,6 @@ import sentry_sdk from aleph_message.models import MessageType from configmanager import Config -from p2pclient import Client as P2PClient from setproctitle import setproctitle import aleph.config @@ -35,7 +34,7 @@ from aleph.network import listener_tasks from aleph.services import p2p from aleph.services.keys import generate_keypair, save_keys -from aleph.services.p2p import singleton +from aleph.services.p2p import singleton, init_p2p_client from aleph.web import app __author__ = "Moshe Malawach" @@ -67,13 +66,13 @@ async def run_server( port: int, shared_stats: dict, extra_web_config: dict, - p2p_client: P2PClient, ): # These imports will run in different processes from aiohttp import web from aleph.web.controllers.listener import broadcast LOGGER.debug("Setup of runner") + p2p_client = await init_p2p_client(config, service_name=f"api-server-{port}") app["config"] = config app["extra_config"] = extra_web_config @@ -99,7 +98,6 @@ def run_server_coroutine( host: str, port: int, shared_stats: Dict, - p2p_client: P2PClient, enable_sentry: bool = True, extra_web_config: Optional[Dict] = None, ): @@ -127,7 +125,7 @@ def run_server_coroutine( # https://github.com/getsentry/raven-python/issues/1110 try: loop.run_until_complete( - run_server(config, host, port, shared_stats, extra_web_config, p2p_client) + run_server(config, host, port, shared_stats, extra_web_config) ) except Exception as e: if enable_sentry: @@ -172,10 +170,10 @@ async def main(args): LOGGER.error(msg) raise InvalidConfigException(msg) - # We only check that the serialized key exists. - serialized_key_file_path = os.path.join(args.key_dir, "serialized-node-secret.key") - if not os.path.isfile(serialized_key_file_path): - msg = f"Serialized node key ({serialized_key_file_path}) not found." + # We only check that the private key exists. + private_key_file_path = os.path.join(args.key_dir, "node-secret.pkcs8.der") + if not os.path.isfile(private_key_file_path): + msg = f"Serialized node key ({private_key_file_path}) not found." LOGGER.critical(msg) raise KeyNotFoundException(msg) @@ -218,9 +216,10 @@ async def main(args): use_processes=True, ) - # handler = app.make_handler(loop=loop) LOGGER.debug("Initializing p2p") - p2p_client, p2p_tasks = await p2p.init_p2p(config, api_servers) + p2p_client, p2p_tasks = await p2p.init_p2p( + config, service_name="network-monitor", api_servers=api_servers + ) tasks += p2p_tasks LOGGER.debug("Initialized p2p") @@ -241,7 +240,6 @@ async def main(args): config.aleph.host.value, config.p2p.http_port.value, shared_stats, - p2p_client, args.sentry_disabled is False and config.sentry.dsn.value, extra_web_config, ), @@ -253,7 +251,6 @@ async def main(args): config.aleph.host.value, config.aleph.port.value, shared_stats, - p2p_client, args.sentry_disabled is False and config.sentry.dsn.value, extra_web_config, ), diff --git a/src/aleph/config.py b/src/aleph/config.py index b6229f872..63ce40ce2 100644 --- a/src/aleph/config.py +++ b/src/aleph/config.py @@ -7,7 +7,7 @@ def get_defaults(): return { "logging": { "level": logging.WARNING, - "max_log_file_size": 1_000_000_000 # 1GB, + "max_log_file_size": 1_000_000_000, # 1GB, }, "aleph": { "queue_topic": "ALEPH-QUEUE", @@ -21,15 +21,15 @@ def get_defaults(): }, "pending_txs": { "max_concurrency": 20, - } - } + }, + }, }, "p2p": { "http_port": 4024, "port": 4025, "control_port": 4030, - "listen_port": 4031, - "daemon_host": "p2pd", + "daemon_host": "p2p-service", + "mq_host": "rabbitmq", "reconnect_delay": 60, "alive_topic": "ALIVE", "clients": ["http"], @@ -38,6 +38,7 @@ def get_defaults(): "/ip4/95.216.100.234/tcp/4025/p2p/Qmaxufiqdyt5uVWcy1Xh2nh3Rs3382ArnSP2umjCiNG2Vs", "/ip4/62.210.93.220/tcp/4025/p2p/QmXdci5feFmA2pxTg8p3FCyWmSKnWYAAmr7Uys1YCTFD8U", ], + "topics": ["ALIVE", "ALEPH-QUEUE"], }, "storage": {"folder": "./data/", "store_files": False, "engine": "mongodb"}, "nuls": { @@ -89,9 +90,17 @@ def get_defaults(): "peers": [ "/dnsaddr/api1.aleph.im/ipfs/12D3KooWNgogVS6o8fVsPdzh2FJpCdJJLVSgJT38XGE1BJoCerHx", "/ip4/51.159.57.71/tcp/4001/p2p/12D3KooWBH3JVSBwHLNzxv7EzniBP3tDmjJaoa3EJBF9wyhZtHt2", - "/ip4/62.210.93.220/tcp/4001/p2p/12D3KooWLcmvqojHzUnR7rr8YhFKGDD8z7fmsPyBfAm2rT3sFGAF" + "/ip4/62.210.93.220/tcp/4001/p2p/12D3KooWLcmvqojHzUnR7rr8YhFKGDD8z7fmsPyBfAm2rT3sFGAF", ], }, + "rabbitmq": { + "host": "127.0.0.1", + "port": 5672, + "username": "guest", + "password": "guest", + "pub_exchange": "p2p-publish", + "sub_exchange": "p2p-subscribe", + }, "sentry": { "dsn": None, "traces_sample_rate": None, diff --git a/src/aleph/jobs/job_utils.py b/src/aleph/jobs/job_utils.py index 7a326a406..0bc6e6328 100644 --- a/src/aleph/jobs/job_utils.py +++ b/src/aleph/jobs/job_utils.py @@ -38,7 +38,6 @@ def prepare_loop(config_values: Dict) -> Tuple[asyncio.AbstractEventLoop, Config init_db_globals(config) init_ipfs_globals(config) - _ = init_p2p_client(config) return loop, config diff --git a/src/aleph/network.py b/src/aleph/network.py index 05ab8c514..f4422e725 100644 --- a/src/aleph/network.py +++ b/src/aleph/network.py @@ -3,7 +3,7 @@ from typing import Coroutine, Dict, List from urllib.parse import unquote -from p2pclient import Client as P2PClient +from aleph_p2p_client import AlephP2PServiceClient from aleph.exceptions import InvalidMessageError from aleph.register_chain import VERIFIER_REGISTER @@ -26,21 +26,17 @@ ] -async def get_pubsub_message(ipfs_pubsub_message: Dict) -> BasePendingMessage: +async def decode_pubsub_message(message_data: bytes) -> BasePendingMessage: """ Extracts an Aleph message out of a pubsub message. Note: this function validates the format of the message, but does not perform extra validation (ex: signature checks). """ - message_data = ipfs_pubsub_message.get("data", b"").decode("utf-8") - try: - message_dict = json.loads(unquote(message_data)) + message_dict = json.loads(unquote(message_data.decode("utf-8"))) except json.JSONDecodeError: - raise InvalidMessageError( - "Data is not JSON: {}".format(ipfs_pubsub_message.get("data", "")) - ) + raise InvalidMessageError("Data is not JSON: {!r}".format(message_data)) LOGGER.debug("New message! %r" % message_dict) @@ -62,11 +58,13 @@ async def verify_signature(message: BasePendingMessage) -> None: raise InvalidMessageError("Signature validation error") -def listener_tasks(config, p2p_client: P2PClient) -> List[Coroutine]: +def listener_tasks(config, p2p_client: AlephP2PServiceClient) -> List[Coroutine]: from aleph.services.p2p.protocol import incoming_channel as incoming_p2p_channel # for now (1st milestone), we only listen on a single global topic... - tasks: List[Coroutine] = [incoming_p2p_channel(p2p_client, config.aleph.queue_topic.value)] + tasks: List[Coroutine] = [ + incoming_p2p_channel(p2p_client, config.aleph.queue_topic.value) + ] if config.ipfs.enabled.value: tasks.append(incoming_ipfs_channel(config.aleph.queue_topic.value)) return tasks diff --git a/src/aleph/services/ipfs/pubsub.py b/src/aleph/services/ipfs/pubsub.py index 46c8edb1f..6e8e8404a 100644 --- a/src/aleph/services/ipfs/pubsub.py +++ b/src/aleph/services/ipfs/pubsub.py @@ -41,14 +41,14 @@ async def pub(topic: str, message: Union[str, bytes]): async def incoming_channel(topic) -> None: - from aleph.network import get_pubsub_message + from aleph.network import decode_pubsub_message from aleph.chains.common import process_one_message while True: try: async for mvalue in sub(topic): try: - message = await get_pubsub_message(mvalue) + message = await decode_pubsub_message(mvalue["data"]) LOGGER.debug("New message %r" % message) asyncio.create_task(process_one_message(message)) except InvalidMessageError: diff --git a/src/aleph/services/keys.py b/src/aleph/services/keys.py index 2eeae0bb2..74a1bc9fc 100644 --- a/src/aleph/services/keys.py +++ b/src/aleph/services/keys.py @@ -20,10 +20,11 @@ def generate_keypair(print_key: bool) -> KeyPair: def save_keys(key_pair: KeyPair, key_dir: str) -> None: """ - Saves the private and public keys to the specified directory. The keys are stored in 3 formats: - - The private key is stored in PEM format for ease of use, and in a serialized format compatible with the P2P - daemon (DER + protobuf encoding). + Saves the private and public keys to the specified directory. The keys are stored in 2 formats: + - The private key is stored in PKCS8 DER (binary) format for compatibility with the Aleph.im P2P service. - The public key is stored in PEM format. + + TODO review: do we really need to store the public key? If so, in which format, PEM or DER? """ # Create the key directory if it does not exist if os.path.exists(key_dir): @@ -33,15 +34,11 @@ def save_keys(key_pair: KeyPair, key_dir: str) -> None: os.makedirs(key_dir) # Save the private and public keys in the key directory, as well as the serialized private key for p2pd. - private_key_path = os.path.join(key_dir, "node-secret.key") + private_key_path = os.path.join(key_dir, "node-secret.pkcs8.der") public_key_path = os.path.join(key_dir, "node-pub.key") - serialized_key_path = os.path.join(key_dir, "serialized-node-secret.key") with open(private_key_path, "wb") as key_file: - key_file.write(key_pair.private_key.impl.export_key()) + key_file.write(key_pair.private_key.impl.export_key(format="DER", pkcs=8)) with open(public_key_path, "wb") as key_file: key_file.write(key_pair.public_key.impl.export_key()) - - with open(serialized_key_path, "wb") as f: - f.write(key_pair.private_key.serialize()) diff --git a/src/aleph/services/p2p/__init__.py b/src/aleph/services/p2p/__init__.py index 28d640eca..96aa7745d 100644 --- a/src/aleph/services/p2p/__init__.py +++ b/src/aleph/services/p2p/__init__.py @@ -1,41 +1,41 @@ -import socket from typing import Coroutine, List, Tuple +from aleph_p2p_client import AlephP2PServiceClient, make_p2p_service_client from configmanager import Config -from multiaddr import Multiaddr -from p2pclient import Client as P2PClient from . import singleton from .manager import initialize_host -def init_p2p_client(config: Config) -> P2PClient: - host = config.p2p.daemon_host.value - host_ip_addr = socket.gethostbyname(host) - - control_port = config.p2p.control_port.value - listen_port = config.p2p.listen_port.value - control_maddr = Multiaddr(f"/ip4/{host_ip_addr}/tcp/{control_port}") - listen_maddr = Multiaddr(f"/ip4/0.0.0.0/tcp/{listen_port}") - p2p_client = P2PClient(control_maddr=control_maddr, listen_maddr=listen_maddr) +async def init_p2p_client(config: Config, service_name: str) -> AlephP2PServiceClient: + p2p_client = await make_p2p_service_client( + service_name=service_name, + mq_host=config.p2p.mq_host.value, + mq_port=config.rabbitmq.port.value, + mq_username=config.rabbitmq.username.value, + mq_password=config.rabbitmq.password.value, + mq_pub_exchange_name=config.rabbitmq.pub_exchange.value, + mq_sub_exchange_name=config.rabbitmq.sub_exchange.value, + http_host=config.p2p.daemon_host.value, + http_port=config.p2p.control_port.value, + ) return p2p_client async def init_p2p( - config: Config, api_servers: List[str], listen: bool = True -) -> Tuple[P2PClient, List[Coroutine]]: - p2p_client = init_p2p_client(config) + config: Config, service_name: str, api_servers: List[str], listen: bool = True +) -> Tuple[AlephP2PServiceClient, List[Coroutine]]: + p2p_client = await init_p2p_client(config, service_name) port = config.p2p.port.value - singleton.streamer, tasks = await initialize_host( + tasks = await initialize_host( config=config, p2p_client=p2p_client, api_servers=api_servers, host=config.p2p.daemon_host.value, port=port, listen=listen, - protocol_active=("protocol" in config.p2p.clients.value), ) return p2p_client, tasks diff --git a/src/aleph/services/p2p/jobs.py b/src/aleph/services/p2p/jobs.py index 957c49826..c2d23dad7 100644 --- a/src/aleph/services/p2p/jobs.py +++ b/src/aleph/services/p2p/jobs.py @@ -1,16 +1,14 @@ import asyncio import logging +from dataclasses import dataclass from typing import List, Optional +from aleph_p2p_client import AlephP2PServiceClient from configmanager import Config -from p2pclient import Client as P2PClient from aleph.model.p2p import get_peers from .http import api_get_request from .peers import connect_peer -from .protocol import AlephProtocol - -from dataclasses import dataclass @dataclass @@ -23,7 +21,7 @@ class PeerStatus: LOGGER = logging.getLogger("P2P.jobs") -async def reconnect_p2p_job(config: Config, p2p_client: P2PClient, streamer: Optional[AlephProtocol]) -> None: +async def reconnect_p2p_job(config: Config, p2p_client: AlephP2PServiceClient) -> None: await asyncio.sleep(2) while True: try: @@ -32,7 +30,7 @@ async def reconnect_p2p_job(config: Config, p2p_client: P2PClient, streamer: Opt ) for peer in peers: try: - await connect_peer(p2p_client=p2p_client, streamer=streamer, peer_maddr=peer) + await connect_peer(p2p_client=p2p_client, peer_maddr=peer) except Exception: LOGGER.debug("Can't reconnect to %s" % peer) diff --git a/src/aleph/services/p2p/manager.py b/src/aleph/services/p2p/manager.py index da7dc71a8..0b1715df9 100644 --- a/src/aleph/services/p2p/manager.py +++ b/src/aleph/services/p2p/manager.py @@ -1,14 +1,13 @@ import logging -from typing import Coroutine, List, Optional, Tuple +from typing import Coroutine, List +from aleph_p2p_client import AlephP2PServiceClient from configmanager import Config -from p2pclient import Client as P2PClient from aleph.services.ipfs.common import get_public_address from aleph.services.peers.monitor import monitor_hosts_ipfs, monitor_hosts_p2p from aleph.services.peers.publish import publish_host from aleph.services.utils import get_IP -from .protocol import AlephProtocol LOGGER = logging.getLogger("P2P.host") @@ -19,13 +18,12 @@ async def initialize_host( config: Config, - p2p_client: P2PClient, + p2p_client: AlephP2PServiceClient, api_servers: List[str], host: str = "0.0.0.0", port: int = 4025, listen: bool = True, - protocol_active: bool = True, -) -> Tuple[Optional[AlephProtocol], List[Coroutine]]: +) -> List[Coroutine]: from .jobs import reconnect_p2p_job, tidy_http_peers_job @@ -33,14 +31,12 @@ async def initialize_host( transport_opt = f"/ip4/{host}/tcp/{port}" - protocol = await AlephProtocol.create(p2p_client) if protocol_active else None - tasks = [ - reconnect_p2p_job(config=config, p2p_client=p2p_client, streamer=protocol), + reconnect_p2p_job(config=config, p2p_client=p2p_client), tidy_http_peers_job(config=config, api_servers=api_servers), ] if listen: - peer_id, _ = await p2p_client.identify() + peer_id = (await p2p_client.identify()).peer_id LOGGER.info("Listening on " + f"{transport_opt}/p2p/{peer_id}") ip = await get_IP() public_address = f"/ip4/{ip}/tcp/{port}/p2p/{peer_id}" @@ -90,4 +86,4 @@ async def initialize_host( except Exception: LOGGER.exception("Can't publish public IPFS address") - return protocol, tasks + return tasks diff --git a/src/aleph/services/p2p/peers.py b/src/aleph/services/p2p/peers.py index 4a40cd048..cf751804d 100644 --- a/src/aleph/services/p2p/peers.py +++ b/src/aleph/services/p2p/peers.py @@ -1,29 +1,22 @@ -from typing import Optional - +from aleph_p2p_client import AlephP2PServiceClient from multiaddr import Multiaddr -from p2pclient import Client as P2PClient from p2pclient.libp2p_stubs.peer.peerinfo import info_from_p2p_addr -from .protocol import AlephProtocol - -async def connect_peer(p2p_client: P2PClient, streamer: Optional[AlephProtocol], peer_maddr: str) -> None: +async def connect_peer(p2p_client: AlephP2PServiceClient, peer_maddr: str) -> None: """ Connects to the specified peer. :param p2p_client: P2P daemon client. - :param streamer: Protocol streamer, if configured. :param peer_maddr: Fully qualified multi-address of the peer to connect to: /ip4//tcp//p2p/ """ peer_info = info_from_p2p_addr(Multiaddr(peer_maddr)) - peer_id, _ = await p2p_client.identify() + peer_id = (await p2p_client.identify()).peer_id + # Discard attempts to connect to self. if str(peer_info.peer_id) == str(peer_id): - # LOGGER.debug("Can't connect to myself.") return - await p2p_client.connect(peer_info.peer_id, peer_info.addrs) - - if streamer is not None: - await streamer.add_peer(peer_info.peer_id) + for multiaddr in peer_info.addrs: + await p2p_client.dial(peer_id=str(peer_info.peer_id), multiaddr=str(multiaddr)) diff --git a/src/aleph/services/p2p/protocol.py b/src/aleph/services/p2p/protocol.py index de632fbbe..38a4ba5ec 100644 --- a/src/aleph/services/p2p/protocol.py +++ b/src/aleph/services/p2p/protocol.py @@ -1,198 +1,33 @@ import asyncio -import base64 -import json import logging -import random -from typing import Any, Dict, Optional, Set -from anyio.abc import SocketStream -from anyio.exceptions import IncompleteRead -from p2pclient import Client as P2PClient -from p2pclient.datastructures import StreamInfo -from p2pclient.exceptions import ControlFailure -from p2pclient.libp2p_stubs.peer.id import ID +from aleph_p2p_client import AlephP2PServiceClient -from aleph import __version__ -from aleph.exceptions import AlephStorageException, InvalidMessageError -from aleph.network import get_pubsub_message -from aleph.services.utils import pubsub_msg_to_dict -from .pubsub import receive_pubsub_messages, subscribe - -MAX_READ_LEN = 2 ** 32 - 1 +from aleph.exceptions import InvalidMessageError +from aleph.network import decode_pubsub_message LOGGER = logging.getLogger("P2P.protocol") -STREAM_COUNT = 5 - -HELLO_PACKET = {"command": "hello"} - -CONNECT_LOCK = asyncio.Lock() - - -class AlephProtocol: - p2p_client: P2PClient - PROTOCOL_ID = "/aleph/p2p/0.1.0" - - def __init__(self, p2p_client: P2PClient, streams_per_host: int = 5): - self.p2p_client = p2p_client - self.streams_per_host = streams_per_host - self.peers: Set[ID] = set() - - @classmethod - async def create( - cls, p2p_client: P2PClient, streams_per_host: int = 5 - ) -> "AlephProtocol": - """ - Creates a new protocol instance. This factory coroutine must be called instead of calling the constructor - directly in order to register the stream handlers. - """ - protocol = cls(p2p_client=p2p_client, streams_per_host=streams_per_host) - await p2p_client.stream_handler(cls.PROTOCOL_ID, cls.stream_request_handler) - return protocol - - @staticmethod - async def stream_request_handler( - stream_info: StreamInfo, stream: SocketStream - ) -> None: - """ - Handles the reception of a message from another peer under the aleph protocol. - - Receives a message, performs the corresponding action and returns a result message to the sender. - """ - - from aleph.storage import get_hash_content - - read_bytes = await stream.receive_some(MAX_READ_LEN) - if read_bytes is None: - return - - result: Dict[str, Any] - - try: - read_string = read_bytes.decode("utf-8") - message_json = json.loads(read_string) - if message_json["command"] == "hash_content": - try: - content = await get_hash_content( - message_json["hash"], use_network=False, timeout=1 - ) - except AlephStorageException: - result = {"status": "success", "content": None} - else: - result = { - "status": "success", - "hash": message_json["hash"], - "content": base64.encodebytes(content.value).decode("utf-8"), - } - elif message_json["command"] == "get_message": - result = {"status": "error", "reason": "not implemented"} - elif message_json["command"] == "publish_message": - result = {"status": "error", "reason": "not implemented"} - elif message_json["command"] == "hello": - result = { - "status": "success", - "content": {"version": __version__}, - } - else: - result = {"status": "error", "reason": "unknown command"} - LOGGER.debug(f"received {read_string}") - except Exception as e: - result = {"status": "error", "reason": repr(e)} - LOGGER.exception("Error while reading data") - - await stream.send_all(json.dumps(result).encode("utf-8")) - - async def make_request( - self, request_structure: Dict[str, Any] - ) -> Optional[Dict[str, Any]]: - peers = list(self.peers) - # Randomize the list of peers to contact to distribute the load evenly - random.shuffle(peers) - - for peer in peers: - stream_info, stream = await self.p2p_client.stream_open( - peer, (self.PROTOCOL_ID,) - ) - msg = json.dumps(request_structure).encode("UTF-8") - try: - await stream.send_all(msg) - response = await stream.receive_some(MAX_READ_LEN) - finally: - await stream.close() - - try: - value = json.loads(response.decode("UTF-8")) - except json.JSONDecodeError: - logging.warning("Could not decode response from %s", peer) - continue - - if value.get("content") is not None: - return value - - logging.info("Could not retrieve content from any peer") - return None - - async def request_hash(self, item_hash) -> Optional[bytes]: - # this should be done better, finding best peers to query from. - query = {"command": "hash_content", "hash": item_hash} - item = await self.make_request(query) - if ( - item is not None - and item["status"] == "success" - and item["content"] is not None - ): - # TODO: IMPORTANT /!\ verify the hash of received data! - return base64.decodebytes(item["content"].encode("utf-8")) - else: - LOGGER.debug(f"can't get hash {item_hash}") - return None - - async def add_peer(self, peer_id: ID) -> None: - if peer_id not in self.peers: - try: - stream_info, stream = await self.p2p_client.stream_open( - peer_id, [self.PROTOCOL_ID] - ) - except ControlFailure as error: - LOGGER.debug("failed to add new peer %s, error %s", peer_id, error) - return - - try: - await stream.send_all(json.dumps(HELLO_PACKET).encode("utf-8")) - _ = await stream.receive_some(MAX_READ_LEN) - except Exception as error: - LOGGER.debug("failed to add new peer %s, error %s", peer_id, error) - return - finally: - await stream.close() - - self.peers.add(peer_id) - - -async def incoming_channel(p2p_client: P2PClient, topic: str) -> None: +async def incoming_channel(p2p_client: AlephP2PServiceClient, topic: str) -> None: LOGGER.debug("incoming channel started...") from aleph.chains.common import delayed_incoming - stream = await subscribe(p2p_client, topic) - - # The communication with the P2P daemon sometimes fails repeatedly, spamming - # IncompleteRead exceptions. We still want to log these to Sentry without sending - # thousands of logs. - incomplete_read_threshold = 150 - incomplete_read_counter = 0 + await p2p_client.subscribe(topic) while True: try: - async for pubsub_message in receive_pubsub_messages(stream): + async for message in p2p_client.receive_messages(topic): try: - msg_dict = pubsub_msg_to_dict(pubsub_message) - LOGGER.debug("Received from P2P:", msg_dict) + protocol, topic, peer_id = message.routing_key.split(".") + LOGGER.debug("Received new %s message on topic %s from %s", protocol, topic, peer_id) + # we should check the sender here to avoid spam # and such things... try: - message = await get_pubsub_message(msg_dict) + message = await decode_pubsub_message(message.body) except InvalidMessageError: + LOGGER.warning("Received invalid message on P2P topic %s from %s", topic, peer_id) continue LOGGER.debug("New message %r" % message) @@ -200,14 +35,6 @@ async def incoming_channel(p2p_client: P2PClient, topic: str) -> None: except Exception: LOGGER.exception("Can't handle message") - except IncompleteRead: - if (incomplete_read_counter % incomplete_read_threshold) == 0: - LOGGER.exception( - "Incomplete read (%d times), reconnecting. Try to restart the application.", - incomplete_read_counter, - ) - incomplete_read_counter += 1 - except Exception: LOGGER.exception("Exception in pubsub, reconnecting.") diff --git a/src/aleph/services/p2p/pubsub.py b/src/aleph/services/p2p/pubsub.py index 1b5f87169..da650fb68 100644 --- a/src/aleph/services/p2p/pubsub.py +++ b/src/aleph/services/p2p/pubsub.py @@ -1,38 +1,12 @@ import logging -from typing import AsyncIterator, Union +from typing import Union -from p2pclient import Client as P2PClient -from p2pclient.pb.p2pd_pb2 import PSMessage -from p2pclient.utils import read_pbmsg_safe -import anyio +from aleph_p2p_client import AlephP2PServiceClient LOGGER = logging.getLogger("P2P.pubsub") -async def subscribe(p2p_client: P2PClient, topic: str) -> anyio.abc.SocketStream: - """ - Subscribes to the specified topic. - :param p2p_client: P2P daemon client. - :param topic: Topic on which to subscribe. - :return: A socket stream object. This stream can be used to read data posted by other peers on the topic. - """ - return await p2p_client.pubsub_subscribe(topic) - - -async def receive_pubsub_messages(stream: anyio.abc.SocketStream) -> AsyncIterator[PSMessage]: - """ - Receives messages from a P2P pubsub topic in a loop and yields them one by one. - :param stream: The stream (= return value of the `subscribe` function) to read data from. - """ - while True: - pubsub_msg = PSMessage() - await read_pbmsg_safe(stream, pubsub_msg) - LOGGER.debug("New message received %r" % pubsub_msg) - - yield pubsub_msg - - -async def publish(p2p_client: P2PClient, topic: str, message: Union[bytes, str]) -> None: +async def publish(p2p_client: AlephP2PServiceClient, topic: str, message: Union[bytes, str]) -> None: """ Publishes a message on the specified topic. :param p2p_client: P2P daemon client. @@ -41,4 +15,4 @@ async def publish(p2p_client: P2PClient, topic: str, message: Union[bytes, str]) """ data = message if isinstance(message, bytes) else message.encode("UTF-8") - await p2p_client.pubsub_publish(topic, data) + await p2p_client.publish(data=data, topic=topic) diff --git a/src/aleph/services/p2p/singleton.py b/src/aleph/services/p2p/singleton.py index 0aac1c3f1..64121fd8b 100644 --- a/src/aleph/services/p2p/singleton.py +++ b/src/aleph/services/p2p/singleton.py @@ -1,23 +1,6 @@ -from typing import List, Optional, TypeVar - -from p2pclient import Client as P2PClient -from .protocol import AlephProtocol - -streamer: Optional[AlephProtocol] = None +from typing import List, Optional # TODO: this global variable is currently used to distribute the list of HTTP nodes # on the network. Rewrite the retry and storage modules to pass this state # as a parameter instead. api_servers: Optional[List[str]] = None - -T = TypeVar("T") - - -def _get_singleton(singleton: Optional[T], name: str) -> T: - if singleton is None: - raise ValueError(f"{name} is null!") - return singleton - - -def get_streamer() -> AlephProtocol: - return _get_singleton(streamer, "Streamer") diff --git a/src/aleph/services/peers/monitor.py b/src/aleph/services/peers/monitor.py index 58b1753ed..15aa88916 100644 --- a/src/aleph/services/peers/monitor.py +++ b/src/aleph/services/peers/monitor.py @@ -1,29 +1,24 @@ import asyncio import json import logging -from typing import Any, Dict from urllib.parse import unquote -from anyio.exceptions import IncompleteRead -from p2pclient import Client as P2PClient -from p2pclient.pb.p2pd_pb2 import PSMessage -from p2pclient.utils import read_pbmsg_safe +from aleph_p2p_client import AlephP2PServiceClient from aleph.services.ipfs.pubsub import sub as sub_ipfs -from aleph.services.utils import pubsub_msg_to_dict from aleph.types import Protocol LOGGER = logging.getLogger("P2P.peers") -async def handle_incoming_host(pubsub_msg: Dict[str, Any], source: Protocol = Protocol.P2P): +async def handle_incoming_host( + data: bytes, sender: str, source: Protocol = Protocol.P2P +): from aleph.model.p2p import add_peer - sender = pubsub_msg["from"] - try: - LOGGER.debug("New message received %r" % pubsub_msg) - message_data = pubsub_msg.get("data", b"").decode("utf-8") + LOGGER.debug("New message received from %s", sender) + message_data = data.decode("utf-8") content = json.loads(unquote(message_data)) # TODO: replace this validation by marshaling (ex: Pydantic) @@ -51,29 +46,18 @@ async def handle_incoming_host(pubsub_msg: Dict[str, Any], source: Protocol = Pr LOGGER.exception("Exception in pubsub peers monitoring") -async def monitor_hosts_p2p(p2p_client: P2PClient, alive_topic: str) -> None: - # The communication with the P2P daemon sometimes fails repeatedly, spamming - # IncompleteRead exceptions. We still want to log these to Sentry without sending - # thousands of logs. - incomplete_read_threshold = 150 - incomplete_read_counter = 0 - +async def monitor_hosts_p2p( + p2p_client: AlephP2PServiceClient, alive_topic: str +) -> None: while True: try: - stream = await p2p_client.pubsub_subscribe(alive_topic) - while True: - pubsub_msg = PSMessage() - await read_pbmsg_safe(stream, pubsub_msg) - msg_dict = pubsub_msg_to_dict(pubsub_msg) - await handle_incoming_host(msg_dict, source=Protocol.P2P) - - except IncompleteRead: - if (incomplete_read_counter % incomplete_read_threshold) == 0: - LOGGER.exception( - "Incomplete read (%d times), reconnecting. Try to restart the application.", - incomplete_read_counter, + await p2p_client.subscribe(alive_topic) + async for alive_message in p2p_client.receive_messages(alive_topic): + protocol, topic, peer_id = alive_message.routing_key.split(".") + await handle_incoming_host( + data=alive_message.body, sender=peer_id, source=Protocol.P2P ) - incomplete_read_counter += 1 + except Exception: LOGGER.exception("Exception in pubsub peers monitoring, resubscribing") @@ -83,7 +67,9 @@ async def monitor_hosts_p2p(p2p_client: P2PClient, alive_topic: str) -> None: async def monitor_hosts_ipfs(alive_topic: str): while True: try: - async for mvalue in sub_ipfs(alive_topic): - await handle_incoming_host(mvalue, source=Protocol.IPFS) + async for message in sub_ipfs(alive_topic): + await handle_incoming_host( + data=message["data"], sender=message["from"], source=Protocol.IPFS + ) except Exception: LOGGER.exception("Exception in pubsub peers monitoring, resubscribing") diff --git a/src/aleph/services/peers/publish.py b/src/aleph/services/peers/publish.py index 39b956f6a..818c30693 100644 --- a/src/aleph/services/peers/publish.py +++ b/src/aleph/services/peers/publish.py @@ -1,18 +1,18 @@ import asyncio import json import logging +from typing import List, Optional -from aleph.services.ipfs.pubsub import pub as pub_ipfs +from aleph_p2p_client import AlephP2PServiceClient -from p2pclient import Client as P2PClient -from typing import List, Optional +from aleph.services.ipfs.pubsub import pub as pub_ipfs LOGGER = logging.getLogger("peers.publish") async def publish_host( address: str, - p2p_client: P2PClient, + p2p_client: AlephP2PServiceClient, p2p_alive_topic: str, ipfs_alive_topic: str, interests: Optional[List[str]] = None, @@ -43,7 +43,9 @@ async def publish_host( try: LOGGER.debug("Publishing alive message on p2p pubsub") - await asyncio.wait_for(p2p_client.pubsub_publish(p2p_alive_topic, msg), 10) + await asyncio.wait_for( + p2p_client.publish(data=msg, topic=p2p_alive_topic), 10 + ) except Exception: LOGGER.warning("Can't publish alive message on p2p") diff --git a/src/aleph/services/utils.py b/src/aleph/services/utils.py index 03b6948e5..4b0513552 100644 --- a/src/aleph/services/utils.py +++ b/src/aleph/services/utils.py @@ -1,10 +1,8 @@ import logging import re import socket -from typing import Any, Dict import aiohttp -from p2pclient.pb.p2pd_pb2 import PSMessage logger = logging.getLogger(__name__) @@ -48,18 +46,3 @@ async def get_IP() -> str: except Exception as error: logging.exception("Error when fetching IPv4 from service") return get_ip4_from_socket() - - -def pubsub_msg_to_dict(pubsub_msg: PSMessage) -> Dict[str, Any]: - """ - A compatibility method that translates a p2pd pubsub message to the equivalent dictionary. - The returned value is then passed to `handle_incoming_host`. - - TODO: use a better system than a dict to pass these parameters around. - """ - return { - "from": getattr(pubsub_msg, "from"), - "data": pubsub_msg.data, - "seqno": pubsub_msg.seqno, - "topicIDs": pubsub_msg.topicIDs, - } diff --git a/src/aleph/storage.py b/src/aleph/storage.py index 69ff6cfcc..9454d1d6d 100644 --- a/src/aleph/storage.py +++ b/src/aleph/storage.py @@ -20,7 +20,6 @@ from aleph.services.ipfs.storage import get_ipfs_content from aleph.services.ipfs.storage import pin_add as ipfs_pin_add from aleph.services.p2p.http import request_hash as p2p_http_request_hash -from aleph.services.p2p.singleton import get_streamer from aleph.utils import get_sha256, run_in_executor LOGGER = logging.getLogger("STORAGE") @@ -68,11 +67,7 @@ async def fetch_content_from_network( config = get_config() enabled_clients = config.p2p.clients.value - if "protocol" in enabled_clients: - streamer = get_streamer() - content = await streamer.request_hash(content_hash) - - if content is None and "http" in enabled_clients: + if "http" in enabled_clients: content = await p2p_http_request_hash(content_hash, timeout=timeout) if content is not None: diff --git a/src/aleph/web/controllers/p2p.py b/src/aleph/web/controllers/p2p.py index 47e20e68a..ca9b09a3b 100644 --- a/src/aleph/web/controllers/p2p.py +++ b/src/aleph/web/controllers/p2p.py @@ -4,6 +4,7 @@ from typing import Dict, cast from aiohttp import web +from aleph_p2p_client import AlephP2PServiceClient from configmanager import Config from aleph.exceptions import InvalidMessageError @@ -51,7 +52,7 @@ async def pub_json(request: web.Request): failed_publications.append(Protocol.IPFS) try: - p2p_client = request.app["p2p_client"] + p2p_client: AlephP2PServiceClient = request.app["p2p_client"] await asyncio.wait_for(pub_p2p(p2p_client, request_data.get("topic"), request_data.get("data")), 10) except Exception: LOGGER.exception("Can't publish on p2p") diff --git a/tests/p2p/conftest.py b/tests/p2p/conftest.py deleted file mode 100644 index 788d79d0a..000000000 --- a/tests/p2p/conftest.py +++ /dev/null @@ -1,24 +0,0 @@ -import pytest -from async_exit_stack import AsyncExitStack -from p2pclient.daemon import make_p2pd_pair_ip4 - - -@pytest.fixture -async def p2p_clients(request): - nb_p2p_daemons = request.param - assert isinstance(nb_p2p_daemons, int) - - async with AsyncExitStack() as stack: - p2pd_tuples = [ - await stack.enter_async_context( - make_p2pd_pair_ip4( - daemon_executable="jsp2pd", - enable_control=True, - enable_connmgr=False, - enable_dht=False, - enable_pubsub=True, - ) - ) - for _ in range(nb_p2p_daemons) - ] - yield tuple(p2pd_tuple.client for p2pd_tuple in p2pd_tuples) diff --git a/tests/p2p/test_connect.py b/tests/p2p/test_connect.py deleted file mode 100644 index 74508e2a6..000000000 --- a/tests/p2p/test_connect.py +++ /dev/null @@ -1,85 +0,0 @@ -"""Tests to validate the connection / reconnection features of the P2P service.""" - -from typing import Tuple - -import pytest -from p2pclient import Client as P2PClient -from aleph.services.p2p.peers import connect_peer -from aleph.services.p2p.protocol import AlephProtocol - - -@pytest.mark.skip("Will not work anymore until P2P daemon is upgraded") -@pytest.mark.asyncio -@pytest.mark.parametrize("p2p_clients", [2], indirect=True) -async def test_p2p_client_connect(p2p_clients: Tuple[P2PClient, P2PClient]): - """ - Sanity check: verify that connecting two peers makes each peer appear in the peer list of the other peer. - This test is redundant with some tests in p2pclient itself. - """ - client1, client2 = p2p_clients - client1_peer_id, client1_maddrs = await client1.identify() - client2_peer_id, client2_maddrs = await client2.identify() - await client2.connect(client1_peer_id, client1_maddrs) - - client1_peers = await client1.list_peers() - client2_peers = await client2.list_peers() - assert client1_peer_id in [peer.peer_id for peer in client2_peers] - assert client2_peer_id in [peer.peer_id for peer in client1_peers] - - -@pytest.mark.skip("Will not work anymore until P2P daemon is upgraded") -@pytest.mark.asyncio -@pytest.mark.parametrize("p2p_clients", [2], indirect=True) -async def test_connect_peer_no_streamer(p2p_clients: Tuple[P2PClient, P2PClient]): - """ - Checks that we can connect to a peer using the custom connect_peer function, without managing protocol connections. - """ - client1, client2 = p2p_clients - client1_peer_id, client1_maddrs = await client1.identify() - client2_peer_id, client2_maddrs = await client2.identify() - - peer_maddr = f"{client2_maddrs[0]}/p2p/{client2_peer_id}" - - await connect_peer(p2p_client=client1, streamer=None, peer_maddr=peer_maddr) - - assert client1_peer_id in [peer.peer_id for peer in await client2.list_peers()] - assert client2_peer_id in [peer.peer_id for peer in await client1.list_peers()] - - -@pytest.mark.skip("Will not work anymore until P2P daemon is upgraded") -@pytest.mark.asyncio -@pytest.mark.parametrize("p2p_clients", [2], indirect=True) -async def test_connect_peer_streamer(p2p_clients: Tuple[P2PClient, P2PClient]): - """ - Checks that we can connect to a peer using the custom connect_peer function while managing protocol connections. - """ - - client1, client2 = p2p_clients - streamer_client1 = await AlephProtocol.create(client1) - # The receiver must have a handler registered for the protocol to be able to open a stream. - # The handler is registered when creating the protocol streamer instance. - streamer_client2 = await AlephProtocol.create(client2) - - client1_peer_id, client1_maddrs = await client1.identify() - client2_peer_id, client2_maddrs = await client2.identify() - - peer_maddr = f"{client2_maddrs[0]}/p2p/{client2_peer_id}" - - await connect_peer(p2p_client=client1, streamer=streamer_client1, peer_maddr=peer_maddr) - - assert client1_peer_id in [peer.peer_id for peer in await client2.list_peers()] - assert client2_peer_id in [peer.peer_id for peer in await client1.list_peers()] - - # Check that the peer was added successfully - assert client2_peer_id in streamer_client1.peers - - -@pytest.mark.skip("Will not work anymore until P2P daemon is upgraded") -@pytest.mark.asyncio -@pytest.mark.parametrize("p2p_clients", [1], indirect=True) -async def test_connect_to_self(p2p_clients: Tuple[P2PClient]): - """Checks that nothing bad happens if we try to connect to ourselves.""" - client = p2p_clients[0] - peer_id, maddrs = await client.identify() - - await connect_peer(p2p_client=client, streamer=None, peer_maddr=f"/p2p/{peer_id}") diff --git a/tests/p2p/test_identify.py b/tests/p2p/test_identify.py deleted file mode 100644 index aceb49f31..000000000 --- a/tests/p2p/test_identify.py +++ /dev/null @@ -1,15 +0,0 @@ -from typing import Tuple - -import pytest -from p2pclient import Client as P2PClient - - -@pytest.mark.skip("Will not work anymore until P2P daemon is upgraded") -@pytest.mark.asyncio -@pytest.mark.parametrize("p2p_clients", [1], indirect=True) -async def test_p2p_client_identify(p2p_clients: Tuple[P2PClient]): - """Sanity check to make sure that the fixture deploys the P2P daemon and that the client can reach it.""" - - assert len(p2p_clients) == 1 - client = p2p_clients[0] - _peer_id, _maddrs = await client.identify() diff --git a/tests/p2p/test_pubsub.py b/tests/p2p/test_pubsub.py deleted file mode 100644 index 8f4823901..000000000 --- a/tests/p2p/test_pubsub.py +++ /dev/null @@ -1,79 +0,0 @@ -"""Tests to validate the sending and receiving of messages on pubsub P2P topics.""" - -import asyncio -from typing import Tuple - -import pytest -from p2pclient import Client as P2PClient - -from aleph.services.p2p.pubsub import publish, receive_pubsub_messages, subscribe - - -@pytest.mark.skip("Will not work anymore until P2P daemon is upgraded") -@pytest.mark.asyncio -@pytest.mark.parametrize("p2p_clients", [2], indirect=True) -async def test_pubsub(p2p_clients: Tuple[P2PClient, P2PClient]): - topic = "test-topic" - - client1, client2 = p2p_clients - client1_peer_id, client1_maddrs = await client1.identify() - client2_peer_id, client2_maddrs = await client2.identify() - await client2.connect(client1_peer_id, client1_maddrs) - - # Check that the peers are connected - assert client1_peer_id in [peer.peer_id for peer in await client2.list_peers()] - assert client2_peer_id in [peer.peer_id for peer in await client1.list_peers()] - - # TODO: without this sleep, the test hangs randomly. Figure out why. - await asyncio.sleep(1) - - stream = await subscribe(client2, topic) - - msg = "Hello, peer" - await publish(client1, topic, msg) - - received_msg = await asyncio.wait_for(receive_pubsub_messages(stream).__anext__(), timeout=10) - print(received_msg) - assert received_msg.data.decode("UTF-8") == msg - assert topic in received_msg.topicIDs - - -@pytest.mark.skip("Will not work anymore until P2P daemon is upgraded") -@pytest.mark.asyncio -@pytest.mark.parametrize("p2p_clients", [3], indirect=True) -async def test_pubsub_multiple_subscribers(p2p_clients): - """ - Tests that a pubsub message can go through several peers to reach its destination. - Note that the peers must all subscribe to the topic in order to publish the message to the other peers. - The connection in this test is: Peer #3 -> Peer #1 -> Peer #2. - """ - topic = "test-topic-multi" - client1, client2, client3 = p2p_clients - client1_peer_id, client1_maddrs = await client1.identify() - client2_peer_id, client2_maddrs = await client2.identify() - client3_peer_id, client3_maddrs = await client3.identify() - await client2.connect(client1_peer_id, client1_maddrs) - await client3.connect(client1_peer_id, client1_maddrs) - - # TODO: without this sleep, the test hangs randomly. Figure out why. - await asyncio.sleep(1) - - # Check that the peers are connected - assert {client2_peer_id, client3_peer_id}.issubset(peer.peer_id for peer in await client1.list_peers()) - assert client1_peer_id in [peer.peer_id for peer in await client2.list_peers()] - assert client1_peer_id in [peer.peer_id for peer in await client3.list_peers()] - - stream_client1 = await subscribe(client1, topic) - stream_client2 = await subscribe(client2, topic) - msg = "Hello, distant peer" - await publish(client3, topic, msg) - - # Check that the neighboring peer received the message - received_msg_client1 = await asyncio.wait_for(receive_pubsub_messages(stream_client1).__anext__(), timeout=10) - assert received_msg_client1.data.decode("UTF-8") == msg - assert topic in received_msg_client1.topicIDs - - # Check that the distant peer also received the message - received_msg_client2 = await asyncio.wait_for(receive_pubsub_messages(stream_client2).__anext__(), timeout=10) - assert received_msg_client2.data.decode("UTF-8") == msg - assert topic in received_msg_client2.topicIDs