From 7a806cfc6c6766d060b45a7050886b6908007986 Mon Sep 17 00:00:00 2001 From: 1yam <40899431+1yam@users.noreply.github.com> Date: Mon, 26 Aug 2024 20:06:42 +0200 Subject: [PATCH] Feature: Allow PAYG using base (#258) * Feature: handle superfuild flow * feat: control of message / flow before starting the instance * fix: mypy issue * fix: add setuptools for ci * Add unit test in pair programming * fixup! Add unit test in pair programming * Refactor: handle_flow_reduction and handle_flow into update_flow * Fix: add type annotations * fixup! Merge branch 'master' into 1yam-payg * fixup! Add unit test in pair programming * fixup! Add unit test in pair programming * Fix raised error when no flow to reduce + dumby fix echo * Add qemu_support, confidential_support and stream_reward_address checks * Fix flow outputs * Fixes after tests * Feature: Allow PAYG on bae * Fix instance confidential adding payment_chain arg * fix: mypy issue * Merge from master * Fix PAYG + balance checks * Remove default rootfs to allow prompting * Fix: Update to latest Aleph-SDK version * Fix name / crn_name * Fix: Remove BASE chain for now and ceil flow to up number to ensure to have enough tokens. * Fix: Solve code quality issues. * Fix: Solve mypy issues. --------- Co-authored-by: Hugo Herter Co-authored-by: philogicae Co-authored-by: Andres D. Molins --- pyproject.toml | 6 +- src/aleph_client/commands/help_strings.py | 1 + .../commands/instance/__init__.py | 104 +++++++++++++++--- .../commands/instance/superfluid.py | 67 +++++++++++ src/aleph_client/commands/message.py | 2 +- src/aleph_client/commands/utils.py | 31 ++++++ tests/unit/test_instance.py | 58 ++++++++++ 7 files changed, 250 insertions(+), 19 deletions(-) create mode 100644 src/aleph_client/commands/instance/superfluid.py diff --git a/pyproject.toml b/pyproject.toml index 8658a3bf..1a02e227 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -24,8 +24,9 @@ classifiers = [ ] dependencies = [ - "aleph-sdk-python>=1.0.0rc2", - "aleph-message>=0.4.8", + "aleph-sdk-python>=1.0.0", + "setuptools>=65.5.0", + "aleph-message>=0.4.9", "aiohttp==3.9.5", "typer==0.12.3", "python-magic==0.4.27", @@ -69,6 +70,7 @@ source = "vcs" [tool.hatch.envs.default] platforms = ["linux", "macos"] dependencies = [ + "setuptools>=65.5.0", "pytest==8.2.2", "pytest-asyncio==0.23.7", "pytest-cov==5.0.0", diff --git a/src/aleph_client/commands/help_strings.py b/src/aleph_client/commands/help_strings.py index 5b4e98f4..9ba18416 100644 --- a/src/aleph_client/commands/help_strings.py +++ b/src/aleph_client/commands/help_strings.py @@ -48,3 +48,4 @@ CRN_PENDING = "Pending..." ALLOCATION_AUTO = "Auto - Scheduler" ALLOCATION_MANUAL = "Manual - Selection" +PAYMENT_CHAIN = "Chain you want to use to pay for your instance" diff --git a/src/aleph_client/commands/instance/__init__.py b/src/aleph_client/commands/instance/__init__.py index ceab0408..ea169b09 100644 --- a/src/aleph_client/commands/instance/__init__.py +++ b/src/aleph_client/commands/instance/__init__.py @@ -4,14 +4,18 @@ import json import logging import shutil +from decimal import Decimal from ipaddress import IPv6Interface +from math import ceil from pathlib import Path from typing import Dict, List, Optional, Tuple, Union, cast +import aiohttp import typer from aiohttp import ClientConnectorError, ClientResponseError, ClientSession from aleph.sdk import AlephHttpClient, AuthenticatedAlephHttpClient from aleph.sdk.account import _load_account +from aleph.sdk.chains.ethereum import ETHAccount from aleph.sdk.client.vm_client import VmClient from aleph.sdk.client.vm_confidential_client import VmConfidentialClient from aleph.sdk.conf import settings as sdk_settings @@ -21,6 +25,7 @@ MessageNotFoundError, ) from aleph.sdk.query.filters import MessageFilter +from aleph.sdk.query.responses import PriceResponse from aleph.sdk.types import AccountFromPrivateKey, StorageEnum from aleph.sdk.utils import calculate_firmware_hash from aleph_message.models import InstanceMessage, StoreMessage @@ -49,12 +54,15 @@ setup_logging, validated_int_prompt, validated_prompt, + wait_for_confirmed_flow, + wait_for_processed_instance, ) from aleph_client.conf import settings from aleph_client.models import CRNInfo from aleph_client.utils import AsyncTyper, fetch_json from ..utils import has_nested_attr +from .superfluid import FlowUpdate, update_flow logger = logging.getLogger(__name__) app = AsyncTyper(no_args_is_help=True) @@ -63,9 +71,10 @@ @app.command() async def create( payment_type: PaymentType = typer.Option(None, help=help_strings.PAYMENT_TYPE), + payment_chain: Chain = typer.Option(None, help=help_strings.PAYMENT_CHAIN), hypervisor: HypervisorType = typer.Option(None, help=help_strings.HYPERVISOR), name: Optional[str] = typer.Option(None, help=help_strings.INSTANCE_NAME), - rootfs: str = typer.Option("ubuntu22", help=help_strings.ROOTFS), + rootfs: str = typer.Option(None, help=help_strings.ROOTFS), rootfs_size: int = typer.Option(None, help=help_strings.ROOTFS_SIZE), vcpus: int = typer.Option(None, help=help_strings.VCPUS), memory: int = typer.Option(None, help=help_strings.MEMORY), @@ -133,6 +142,28 @@ def validate_ssh_pubkey_file(file: Union[str, Path]) -> Path: ) is_stream = payment_type != PaymentType.hold + # super_token_chains = get_chains_with_super_token() + super_token_chains = [Chain.AVAX.value] + if is_stream: + if payment_chain is None or payment_chain not in super_token_chains: + payment_chain = Chain( + Prompt.ask( + "Which chain do you want to use for Pay-As-You-Go?", + choices=super_token_chains, + default=Chain.AVAX.value, + ) + ) + if isinstance(account, ETHAccount): + account.switch_chain(payment_chain) + if account.superfluid_connector: # Quick check with theoretical min price + try: + account.superfluid_connector.can_start_flow(Decimal(0.000031)) # 0.11/h + except Exception as e: + echo(e) + raise typer.Exit(code=1) + else: + payment_chain = Chain.ETH # Hold chain for all balances + if confidential: if hypervisor and hypervisor != HypervisorType.qemu: echo("Only QEMU is supported as an hypervisor for confidential") @@ -171,7 +202,7 @@ def validate_ssh_pubkey_file(file: Union[str, Path]) -> Path: if confidential: # Confidential only support custom rootfs rootfs = "custom" - elif rootfs not in os_choices: + elif not rootfs or rootfs not in os_choices: rootfs = Prompt.ask( "Use a custom rootfs or one of the following prebuilt ones:", default=rootfs, @@ -206,6 +237,7 @@ def validate_ssh_pubkey_file(file: Union[str, Path]) -> Path: if not firmware_message: echo("Confidential Firmware hash does not exist on aleph.im") raise typer.Exit(code=1) + name = name or validated_prompt("Instance name", lambda x: len(x) < 65) rootfs_size = rootfs_size or validated_int_prompt( "Disk size in MiB", default=settings.DEFAULT_ROOTFS_SIZE, min_value=10_240, max_value=102_400 @@ -228,19 +260,16 @@ def validate_ssh_pubkey_file(file: Union[str, Path]) -> Path: immutable_volume=immutable_volume, ) - # For PAYG or confidential, the user select directly the node on which to run on - # For PAYG User have to make the payment stream separately - # For now, we allow hold for confidential, but the user still has to choose on which CRN to run. stream_reward_address = None crn = None if crn_url and crn_hash: crn_url = sanitize_url(crn_url) try: - name, score, reward_addr = "?", 0, "" + crn_name, score, reward_addr = "?", 0, "" nodes: NodeInfo = await _fetch_nodes() for node in nodes.nodes: if node["address"].rstrip("/") == crn_url: - name = node["name"] + crn_name = node["name"] score = node["score"] reward_addr = node["stream_reward"] break @@ -248,7 +277,7 @@ def validate_ssh_pubkey_file(file: Union[str, Path]) -> Path: if crn_info: crn = CRNInfo( hash=ItemHash(crn_hash), - name=name or "?", + name=crn_name or "?", url=crn_url, version=crn_info.get("version", ""), score=score, @@ -293,13 +322,11 @@ def validate_ssh_pubkey_file(file: Union[str, Path]) -> Path: raise typer.Exit(1) async with AuthenticatedAlephHttpClient(account=account, api_server=sdk_settings.API_HOST) as client: - payment: Optional[Payment] = None - if stream_reward_address: - payment = Payment( - chain=Chain.AVAX, - receiver=stream_reward_address, - type=payment_type, - ) + payment = Payment( + chain=payment_chain, + receiver=stream_reward_address if stream_reward_address else None, + type=payment_type, + ) try: message, status = await client.create_instance( sync=True, @@ -341,7 +368,36 @@ def validate_ssh_pubkey_file(file: Union[str, Path]) -> Path: # Not the ideal solution logger.debug(f"Cannot allocate {item_hash}: no CRN url") return item_hash, crn_url - account = _load_account(private_key, private_key_file) + + # Wait for the instance message to be processed + async with aiohttp.ClientSession() as session: + await wait_for_processed_instance(session, item_hash) + + # Pay-As-You-Go + if payment_type == PaymentType.superfluid: + price: PriceResponse = await client.get_program_price(item_hash) + ceil_factor = 10**18 + required_tokens = ceil(Decimal(price.required_tokens) * ceil_factor) / ceil_factor + if isinstance(account, ETHAccount) and account.superfluid_connector: + try: # Double check with effective price + account.superfluid_connector.can_start_flow(Decimal(0.000031)) # Min for 0.11/h + except Exception as e: + echo(e) + raise typer.Exit(code=1) + flow_hash = await update_flow( + account=account, + receiver=crn.stream_reward_address, + flow=Decimal(required_tokens), + update_type=FlowUpdate.INCREASE, + ) + # Wait for the flow transaction to be confirmed + await wait_for_confirmed_flow(account, message.content.payment.receiver) + if flow_hash: + echo( + f"Flow {flow_hash} has been created:\n\t- price/sec: {price.required_tokens:.7f} ALEPH\n\t- receiver: {crn.stream_reward_address}" + ) + + # Notify CRN async with VmClient(account, crn.url) as crn_client: status, result = await crn_client.start_instance(vm_id=item_hash) logger.debug(status, result) @@ -437,6 +493,20 @@ async def delete( echo("You are not the owner of this instance") raise typer.Exit(code=1) + # Check for streaming payment and eventually stop it + payment: Optional[Payment] = existing_message.content.payment + if payment is not None and payment.type == PaymentType.superfluid: + price: PriceResponse = await client.get_program_price(item_hash) + if payment.receiver is not None: + if isinstance(account, ETHAccount): + account.switch_chain(payment.chain) + if account.superfluid_connector: + flow_hash = await update_flow( + account, payment.receiver, Decimal(price.required_tokens), FlowUpdate.REDUCE + ) + if flow_hash: + echo(f"Flow {flow_hash} has been deleted.") + # Check status of the instance and eventually erase associated VM node_list: NodeInfo = await _fetch_nodes() _, details = await _get_instance_details(existing_message, node_list) @@ -962,6 +1032,7 @@ async def confidential( keep_session: bool = typer.Option(None, help=help_strings.KEEP_SESSION), vm_secret: str = typer.Option(None, help=help_strings.VM_SECRET), payment_type: PaymentType = typer.Option(None, help=help_strings.PAYMENT_TYPE), + payment_chain: Optional[Chain] = typer.Option(None, help=help_strings.PAYMENT_CHAIN), name: Optional[str] = typer.Option(None, help=help_strings.INSTANCE_NAME), rootfs: str = typer.Option("ubuntu22", help=help_strings.ROOTFS), rootfs_size: int = typer.Option(None, help=help_strings.ROOTFS_SIZE), @@ -1002,6 +1073,7 @@ async def confidential( if not vm_id or len(vm_id) != 64: vm_id, crn_url = await create( payment_type, + payment_chain, None, name, rootfs, diff --git a/src/aleph_client/commands/instance/superfluid.py b/src/aleph_client/commands/instance/superfluid.py new file mode 100644 index 00000000..d7105993 --- /dev/null +++ b/src/aleph_client/commands/instance/superfluid.py @@ -0,0 +1,67 @@ +import logging +from decimal import Decimal +from enum import Enum + +from aleph.sdk.chains.ethereum import ETHAccount +from aleph.sdk.conf import settings +from aleph_message.models import Chain +from click import echo +from eth_utils.currency import to_wei +from superfluid import Web3FlowInfo + +logger = logging.getLogger(__name__) + + +def from_wei(wei_value: Decimal) -> Decimal: + """Converts the given wei value to ether.""" + return wei_value / Decimal(10**settings.TOKEN_DECIMALS) + + +class FlowUpdate(str, Enum): + REDUCE = "reduce" + INCREASE = "increase" + + +async def update_flow(account: ETHAccount, receiver: str, flow: Decimal, update_type: FlowUpdate): + """ + Update the flow of a Superfluid stream between a sender and receiver. + This function either increases or decreases the flow rate between the sender and receiver, + based on the update_type. If no flow exists and the update type is augmentation, it creates a new flow + with the specified rate. If the update type is reduction and the reduction amount brings the flow to zero + or below, the flow is deleted. + + :param account: The SuperFluid account instance used to interact with the blockchain. + :param chain: The blockchain chain to interact with. + :param receiver: Address of the receiver in hexadecimal format. + :param flow: The flow rate to be added or removed (in ether). + :param update_type: The type of update to perform (augmentation or reduction). + :return: The transaction hash of the executed operation (create, update, or delete flow). + """ + + # Retrieve current flow info + flow_info: Web3FlowInfo = await account.get_flow(receiver) + + current_flow_rate_wei: Decimal = Decimal(flow_info["flowRate"] or "0") + flow_rate_wei: int = to_wei(flow, "ether") + + if update_type == FlowUpdate.INCREASE: + if current_flow_rate_wei > 0: + # Update existing flow by augmenting the rate + new_flow_rate_wei = current_flow_rate_wei + flow_rate_wei + new_flow_rate_ether = from_wei(new_flow_rate_wei) + return await account.update_flow(receiver, new_flow_rate_ether) + else: + # Create a new flow if none exists + return await account.create_flow(receiver, flow) + elif update_type == FlowUpdate.REDUCE: + if current_flow_rate_wei > 0: + # Reduce the existing flow + new_flow_rate_wei = current_flow_rate_wei - flow_rate_wei + if new_flow_rate_wei > 0: + new_flow_rate_ether = from_wei(new_flow_rate_wei) + return await account.update_flow(receiver, new_flow_rate_ether) + else: + # Delete the flow if the new flow rate is zero or negative + return await account.delete_flow(receiver) + else: + echo("No existing flow to stop. Skipping...") diff --git a/src/aleph_client/commands/message.py b/src/aleph_client/commands/message.py index 5591090d..9a69c40c 100644 --- a/src/aleph_client/commands/message.py +++ b/src/aleph_client/commands/message.py @@ -128,7 +128,7 @@ async def post( file_size = os.path.getsize(path) storage_engine = StorageEnum.ipfs if file_size > 4 * 1024 * 1024 else StorageEnum.storage - with open(path, "r") as fd: + with open(path) as fd: content = json.load(fd) else: diff --git a/src/aleph_client/commands/utils.py b/src/aleph_client/commands/utils.py index d384face..6e716fc9 100644 --- a/src/aleph_client/commands/utils.py +++ b/src/aleph_client/commands/utils.py @@ -1,5 +1,6 @@ from __future__ import annotations +import asyncio import logging import os import sys @@ -7,13 +8,19 @@ from typing import Any, Callable, Dict, List, Optional, TypeVar, Union import typer +from aiohttp import ClientSession +from aleph.sdk.chains.ethereum import ETHAccount +from aleph.sdk.conf import settings as sdk_settings from aleph.sdk.types import GenericMessage +from aleph_message.models import ItemHash from pygments import highlight from pygments.formatters.terminal256 import Terminal256Formatter from pygments.lexers import JsonLexer from rich.prompt import IntPrompt, Prompt, PromptError from typer import echo +from aleph_client.utils import fetch_json + logger = logging.getLogger(__name__) @@ -208,3 +215,27 @@ def has_nested_attr(obj, *attr_chain) -> bool: return False obj = getattr(obj, attr) return True + + +async def wait_for_processed_instance(session: ClientSession, item_hash: ItemHash): + """Wait for a message to be processed by CCN""" + while True: + url = f"{sdk_settings.API_HOST.rstrip('/')}/api/v0/messages/{item_hash}" + message = await fetch_json(session, url) + if message["status"] == "processed": + return + elif message["status"] == "pending": + typer.echo(f"Message {item_hash} is still pending, waiting 10sec...") + await asyncio.sleep(10) + elif message["status"] == "rejected": + raise Exception(f"Message {item_hash} has been rejected") + + +async def wait_for_confirmed_flow(account: ETHAccount, receiver: str): + """Wait for a flow to be confirmed on-chain""" + while True: + flow = await account.get_flow(receiver) + if flow: + return + typer.echo("Flow transaction is still pending, waiting 10sec...") + await asyncio.sleep(10) diff --git a/tests/unit/test_instance.py b/tests/unit/test_instance.py index fba14329..cefbe81f 100644 --- a/tests/unit/test_instance.py +++ b/tests/unit/test_instance.py @@ -1,12 +1,20 @@ from __future__ import annotations from datetime import datetime, timezone +from decimal import Decimal +from typing import cast +from unittest.mock import AsyncMock, MagicMock, patch import pytest from aiohttp import InvalidURL +from aleph.sdk.chains.ethereum import ETHAccount +from aleph_message.models import Chain +from aleph_message.models.execution.base import Payment, PaymentType from aleph_message.models.execution.environment import CpuProperties +from eth_utils.currency import to_wei from multidict import CIMultiDict, CIMultiDictProxy +from aleph_client.commands.instance import delete from aleph_client.commands.instance.network import ( FORBIDDEN_HOSTS, fetch_crn_info, @@ -107,3 +115,53 @@ def test_sanitize_url_with_valid_url(): def test_sanitize_url_with_https_scheme(): url = "https://example.org" assert sanitize_url(url) == url + + +class MockETHAccount(ETHAccount): + pass + + +def create_test_account() -> MockETHAccount: + return MockETHAccount(private_key=b"deca" * 8) + + +@pytest.mark.asyncio +async def test_delete_instance(): + item_hash = "cafe" * 16 + test_account = create_test_account() + + # Mocking get_flow and delete_flow methods using patch.object + with patch.object(test_account, "get_flow", AsyncMock(return_value={"flowRate": to_wei(123, unit="ether")})): + delete_flow_mock = AsyncMock() + with patch.object(test_account, "delete_flow", delete_flow_mock): + mock_response_message = MagicMock( + sender=test_account.get_address(), + content=MagicMock( + payment=Payment( + chain=Chain.AVAX, + type=PaymentType.superfluid, + receiver=ETHAccount(private_key=b"cafe" * 8).get_address(), + ) + ), + ) + + mock_client = AsyncMock( + get_message=AsyncMock(return_value=mock_response_message), + get_program_price=AsyncMock(return_value=MagicMock(required_tokens=123)), + forget=AsyncMock(return_value=(MagicMock(), MagicMock())), + ) + + mock_client_class = MagicMock() + mock_client_class.return_value.__aenter__ = AsyncMock(return_value=mock_client) + + mock_load_account = MagicMock(return_value=test_account) + + with patch("aleph_client.commands.instance.AuthenticatedAlephHttpClient", mock_client_class): + with patch("aleph_client.commands.instance._load_account", mock_load_account): + await delete(item_hash) + + # The flow has been deleted since payment uses Superfluid and there is only one flow mocked + delete_flow_mock.assert_awaited_once() + + # The message has been forgotten + mock_client.forget.assert_called_once()