Skip to content

Commit

Permalink
Feature: Allow PAYG using base (#258)
Browse files Browse the repository at this point in the history
* Feature: handle superfuild flow

* feat: control of message / flow before starting the instance

* fix: mypy issue

* fix: add setuptools for ci

* Add unit test in pair programming

* fixup! Add unit test in pair programming

* Refactor: handle_flow_reduction and handle_flow into update_flow

* Fix: add type annotations

* fixup! Merge branch 'master' into 1yam-payg

* fixup! Add unit test in pair programming

* fixup! Add unit test in pair programming

* Fix raised error when no flow to reduce + dumby fix echo

* Add qemu_support, confidential_support and stream_reward_address checks

* Fix flow outputs

* Fixes after tests

* Feature: Allow PAYG on bae

* Fix instance confidential adding payment_chain arg

* fix: mypy issue

* Merge from master

* Fix PAYG + balance checks

* Remove default rootfs to allow prompting

* Fix: Update to latest Aleph-SDK version

* Fix name / crn_name

* Fix: Remove BASE chain for now and ceil flow to up number to ensure to have enough tokens.

* Fix: Solve code quality issues.

* Fix: Solve mypy issues.

---------

Co-authored-by: Hugo Herter <[email protected]>
Co-authored-by: philogicae <[email protected]>
Co-authored-by: Andres D. Molins <[email protected]>
  • Loading branch information
4 people authored Aug 26, 2024
1 parent 740b156 commit 7a806cf
Show file tree
Hide file tree
Showing 7 changed files with 250 additions and 19 deletions.
6 changes: 4 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ classifiers = [
]

dependencies = [
"aleph-sdk-python>=1.0.0rc2",
"aleph-message>=0.4.8",
"aleph-sdk-python>=1.0.0",
"setuptools>=65.5.0",
"aleph-message>=0.4.9",
"aiohttp==3.9.5",
"typer==0.12.3",
"python-magic==0.4.27",
Expand Down Expand Up @@ -69,6 +70,7 @@ source = "vcs"
[tool.hatch.envs.default]
platforms = ["linux", "macos"]
dependencies = [
"setuptools>=65.5.0",
"pytest==8.2.2",
"pytest-asyncio==0.23.7",
"pytest-cov==5.0.0",
Expand Down
1 change: 1 addition & 0 deletions src/aleph_client/commands/help_strings.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,4 @@
CRN_PENDING = "Pending..."
ALLOCATION_AUTO = "Auto - Scheduler"
ALLOCATION_MANUAL = "Manual - Selection"
PAYMENT_CHAIN = "Chain you want to use to pay for your instance"
104 changes: 88 additions & 16 deletions src/aleph_client/commands/instance/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,18 @@
import json
import logging
import shutil
from decimal import Decimal
from ipaddress import IPv6Interface
from math import ceil
from pathlib import Path
from typing import Dict, List, Optional, Tuple, Union, cast

import aiohttp
import typer
from aiohttp import ClientConnectorError, ClientResponseError, ClientSession
from aleph.sdk import AlephHttpClient, AuthenticatedAlephHttpClient
from aleph.sdk.account import _load_account
from aleph.sdk.chains.ethereum import ETHAccount
from aleph.sdk.client.vm_client import VmClient
from aleph.sdk.client.vm_confidential_client import VmConfidentialClient
from aleph.sdk.conf import settings as sdk_settings
Expand All @@ -21,6 +25,7 @@
MessageNotFoundError,
)
from aleph.sdk.query.filters import MessageFilter
from aleph.sdk.query.responses import PriceResponse
from aleph.sdk.types import AccountFromPrivateKey, StorageEnum
from aleph.sdk.utils import calculate_firmware_hash
from aleph_message.models import InstanceMessage, StoreMessage
Expand Down Expand Up @@ -49,12 +54,15 @@
setup_logging,
validated_int_prompt,
validated_prompt,
wait_for_confirmed_flow,
wait_for_processed_instance,
)
from aleph_client.conf import settings
from aleph_client.models import CRNInfo
from aleph_client.utils import AsyncTyper, fetch_json

from ..utils import has_nested_attr
from .superfluid import FlowUpdate, update_flow

logger = logging.getLogger(__name__)
app = AsyncTyper(no_args_is_help=True)
Expand All @@ -63,9 +71,10 @@
@app.command()
async def create(
payment_type: PaymentType = typer.Option(None, help=help_strings.PAYMENT_TYPE),
payment_chain: Chain = typer.Option(None, help=help_strings.PAYMENT_CHAIN),
hypervisor: HypervisorType = typer.Option(None, help=help_strings.HYPERVISOR),
name: Optional[str] = typer.Option(None, help=help_strings.INSTANCE_NAME),
rootfs: str = typer.Option("ubuntu22", help=help_strings.ROOTFS),
rootfs: str = typer.Option(None, help=help_strings.ROOTFS),
rootfs_size: int = typer.Option(None, help=help_strings.ROOTFS_SIZE),
vcpus: int = typer.Option(None, help=help_strings.VCPUS),
memory: int = typer.Option(None, help=help_strings.MEMORY),
Expand Down Expand Up @@ -133,6 +142,28 @@ def validate_ssh_pubkey_file(file: Union[str, Path]) -> Path:
)
is_stream = payment_type != PaymentType.hold

# super_token_chains = get_chains_with_super_token()
super_token_chains = [Chain.AVAX.value]
if is_stream:
if payment_chain is None or payment_chain not in super_token_chains:
payment_chain = Chain(
Prompt.ask(
"Which chain do you want to use for Pay-As-You-Go?",
choices=super_token_chains,
default=Chain.AVAX.value,
)
)
if isinstance(account, ETHAccount):
account.switch_chain(payment_chain)
if account.superfluid_connector: # Quick check with theoretical min price
try:
account.superfluid_connector.can_start_flow(Decimal(0.000031)) # 0.11/h
except Exception as e:
echo(e)
raise typer.Exit(code=1)
else:
payment_chain = Chain.ETH # Hold chain for all balances

if confidential:
if hypervisor and hypervisor != HypervisorType.qemu:
echo("Only QEMU is supported as an hypervisor for confidential")
Expand Down Expand Up @@ -171,7 +202,7 @@ def validate_ssh_pubkey_file(file: Union[str, Path]) -> Path:
if confidential:
# Confidential only support custom rootfs
rootfs = "custom"
elif rootfs not in os_choices:
elif not rootfs or rootfs not in os_choices:
rootfs = Prompt.ask(
"Use a custom rootfs or one of the following prebuilt ones:",
default=rootfs,
Expand Down Expand Up @@ -206,6 +237,7 @@ def validate_ssh_pubkey_file(file: Union[str, Path]) -> Path:
if not firmware_message:
echo("Confidential Firmware hash does not exist on aleph.im")
raise typer.Exit(code=1)

name = name or validated_prompt("Instance name", lambda x: len(x) < 65)
rootfs_size = rootfs_size or validated_int_prompt(
"Disk size in MiB", default=settings.DEFAULT_ROOTFS_SIZE, min_value=10_240, max_value=102_400
Expand All @@ -228,27 +260,24 @@ def validate_ssh_pubkey_file(file: Union[str, Path]) -> Path:
immutable_volume=immutable_volume,
)

# For PAYG or confidential, the user select directly the node on which to run on
# For PAYG User have to make the payment stream separately
# For now, we allow hold for confidential, but the user still has to choose on which CRN to run.
stream_reward_address = None
crn = None
if crn_url and crn_hash:
crn_url = sanitize_url(crn_url)
try:
name, score, reward_addr = "?", 0, ""
crn_name, score, reward_addr = "?", 0, ""
nodes: NodeInfo = await _fetch_nodes()
for node in nodes.nodes:
if node["address"].rstrip("/") == crn_url:
name = node["name"]
crn_name = node["name"]
score = node["score"]
reward_addr = node["stream_reward"]
break
crn_info = await fetch_crn_info(crn_url)
if crn_info:
crn = CRNInfo(
hash=ItemHash(crn_hash),
name=name or "?",
name=crn_name or "?",
url=crn_url,
version=crn_info.get("version", ""),
score=score,
Expand Down Expand Up @@ -293,13 +322,11 @@ def validate_ssh_pubkey_file(file: Union[str, Path]) -> Path:
raise typer.Exit(1)

async with AuthenticatedAlephHttpClient(account=account, api_server=sdk_settings.API_HOST) as client:
payment: Optional[Payment] = None
if stream_reward_address:
payment = Payment(
chain=Chain.AVAX,
receiver=stream_reward_address,
type=payment_type,
)
payment = Payment(
chain=payment_chain,
receiver=stream_reward_address if stream_reward_address else None,
type=payment_type,
)
try:
message, status = await client.create_instance(
sync=True,
Expand Down Expand Up @@ -341,7 +368,36 @@ def validate_ssh_pubkey_file(file: Union[str, Path]) -> Path:
# Not the ideal solution
logger.debug(f"Cannot allocate {item_hash}: no CRN url")
return item_hash, crn_url
account = _load_account(private_key, private_key_file)

# Wait for the instance message to be processed
async with aiohttp.ClientSession() as session:
await wait_for_processed_instance(session, item_hash)

# Pay-As-You-Go
if payment_type == PaymentType.superfluid:
price: PriceResponse = await client.get_program_price(item_hash)
ceil_factor = 10**18
required_tokens = ceil(Decimal(price.required_tokens) * ceil_factor) / ceil_factor
if isinstance(account, ETHAccount) and account.superfluid_connector:
try: # Double check with effective price
account.superfluid_connector.can_start_flow(Decimal(0.000031)) # Min for 0.11/h
except Exception as e:
echo(e)
raise typer.Exit(code=1)
flow_hash = await update_flow(
account=account,
receiver=crn.stream_reward_address,
flow=Decimal(required_tokens),
update_type=FlowUpdate.INCREASE,
)
# Wait for the flow transaction to be confirmed
await wait_for_confirmed_flow(account, message.content.payment.receiver)
if flow_hash:
echo(
f"Flow {flow_hash} has been created:\n\t- price/sec: {price.required_tokens:.7f} ALEPH\n\t- receiver: {crn.stream_reward_address}"
)

# Notify CRN
async with VmClient(account, crn.url) as crn_client:
status, result = await crn_client.start_instance(vm_id=item_hash)
logger.debug(status, result)
Expand Down Expand Up @@ -437,6 +493,20 @@ async def delete(
echo("You are not the owner of this instance")
raise typer.Exit(code=1)

# Check for streaming payment and eventually stop it
payment: Optional[Payment] = existing_message.content.payment
if payment is not None and payment.type == PaymentType.superfluid:
price: PriceResponse = await client.get_program_price(item_hash)
if payment.receiver is not None:
if isinstance(account, ETHAccount):
account.switch_chain(payment.chain)
if account.superfluid_connector:
flow_hash = await update_flow(
account, payment.receiver, Decimal(price.required_tokens), FlowUpdate.REDUCE
)
if flow_hash:
echo(f"Flow {flow_hash} has been deleted.")

# Check status of the instance and eventually erase associated VM
node_list: NodeInfo = await _fetch_nodes()
_, details = await _get_instance_details(existing_message, node_list)
Expand Down Expand Up @@ -962,6 +1032,7 @@ async def confidential(
keep_session: bool = typer.Option(None, help=help_strings.KEEP_SESSION),
vm_secret: str = typer.Option(None, help=help_strings.VM_SECRET),
payment_type: PaymentType = typer.Option(None, help=help_strings.PAYMENT_TYPE),
payment_chain: Optional[Chain] = typer.Option(None, help=help_strings.PAYMENT_CHAIN),
name: Optional[str] = typer.Option(None, help=help_strings.INSTANCE_NAME),
rootfs: str = typer.Option("ubuntu22", help=help_strings.ROOTFS),
rootfs_size: int = typer.Option(None, help=help_strings.ROOTFS_SIZE),
Expand Down Expand Up @@ -1002,6 +1073,7 @@ async def confidential(
if not vm_id or len(vm_id) != 64:
vm_id, crn_url = await create(
payment_type,
payment_chain,
None,
name,
rootfs,
Expand Down
67 changes: 67 additions & 0 deletions src/aleph_client/commands/instance/superfluid.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import logging
from decimal import Decimal
from enum import Enum

from aleph.sdk.chains.ethereum import ETHAccount
from aleph.sdk.conf import settings
from aleph_message.models import Chain
from click import echo
from eth_utils.currency import to_wei
from superfluid import Web3FlowInfo

logger = logging.getLogger(__name__)


def from_wei(wei_value: Decimal) -> Decimal:
"""Converts the given wei value to ether."""
return wei_value / Decimal(10**settings.TOKEN_DECIMALS)


class FlowUpdate(str, Enum):
REDUCE = "reduce"
INCREASE = "increase"


async def update_flow(account: ETHAccount, receiver: str, flow: Decimal, update_type: FlowUpdate):
"""
Update the flow of a Superfluid stream between a sender and receiver.
This function either increases or decreases the flow rate between the sender and receiver,
based on the update_type. If no flow exists and the update type is augmentation, it creates a new flow
with the specified rate. If the update type is reduction and the reduction amount brings the flow to zero
or below, the flow is deleted.
:param account: The SuperFluid account instance used to interact with the blockchain.
:param chain: The blockchain chain to interact with.
:param receiver: Address of the receiver in hexadecimal format.
:param flow: The flow rate to be added or removed (in ether).
:param update_type: The type of update to perform (augmentation or reduction).
:return: The transaction hash of the executed operation (create, update, or delete flow).
"""

# Retrieve current flow info
flow_info: Web3FlowInfo = await account.get_flow(receiver)

current_flow_rate_wei: Decimal = Decimal(flow_info["flowRate"] or "0")
flow_rate_wei: int = to_wei(flow, "ether")

if update_type == FlowUpdate.INCREASE:
if current_flow_rate_wei > 0:
# Update existing flow by augmenting the rate
new_flow_rate_wei = current_flow_rate_wei + flow_rate_wei
new_flow_rate_ether = from_wei(new_flow_rate_wei)
return await account.update_flow(receiver, new_flow_rate_ether)
else:
# Create a new flow if none exists
return await account.create_flow(receiver, flow)
elif update_type == FlowUpdate.REDUCE:
if current_flow_rate_wei > 0:
# Reduce the existing flow
new_flow_rate_wei = current_flow_rate_wei - flow_rate_wei
if new_flow_rate_wei > 0:
new_flow_rate_ether = from_wei(new_flow_rate_wei)
return await account.update_flow(receiver, new_flow_rate_ether)
else:
# Delete the flow if the new flow rate is zero or negative
return await account.delete_flow(receiver)
else:
echo("No existing flow to stop. Skipping...")
2 changes: 1 addition & 1 deletion src/aleph_client/commands/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ async def post(
file_size = os.path.getsize(path)
storage_engine = StorageEnum.ipfs if file_size > 4 * 1024 * 1024 else StorageEnum.storage

with open(path, "r") as fd:
with open(path) as fd:
content = json.load(fd)

else:
Expand Down
31 changes: 31 additions & 0 deletions src/aleph_client/commands/utils.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,26 @@
from __future__ import annotations

import asyncio
import logging
import os
import sys
from datetime import datetime
from typing import Any, Callable, Dict, List, Optional, TypeVar, Union

import typer
from aiohttp import ClientSession
from aleph.sdk.chains.ethereum import ETHAccount
from aleph.sdk.conf import settings as sdk_settings
from aleph.sdk.types import GenericMessage
from aleph_message.models import ItemHash
from pygments import highlight
from pygments.formatters.terminal256 import Terminal256Formatter
from pygments.lexers import JsonLexer
from rich.prompt import IntPrompt, Prompt, PromptError
from typer import echo

from aleph_client.utils import fetch_json

logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -208,3 +215,27 @@ def has_nested_attr(obj, *attr_chain) -> bool:
return False
obj = getattr(obj, attr)
return True


async def wait_for_processed_instance(session: ClientSession, item_hash: ItemHash):
"""Wait for a message to be processed by CCN"""
while True:
url = f"{sdk_settings.API_HOST.rstrip('/')}/api/v0/messages/{item_hash}"
message = await fetch_json(session, url)
if message["status"] == "processed":
return
elif message["status"] == "pending":
typer.echo(f"Message {item_hash} is still pending, waiting 10sec...")
await asyncio.sleep(10)
elif message["status"] == "rejected":
raise Exception(f"Message {item_hash} has been rejected")


async def wait_for_confirmed_flow(account: ETHAccount, receiver: str):
"""Wait for a flow to be confirmed on-chain"""
while True:
flow = await account.get_flow(receiver)
if flow:
return
typer.echo("Flow transaction is still pending, waiting 10sec...")
await asyncio.sleep(10)
Loading

0 comments on commit 7a806cf

Please sign in to comment.