Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
fubuloubu committed May 30, 2024
1 parent 0eaa266 commit 20ee2b1
Show file tree
Hide file tree
Showing 7 changed files with 276 additions and 98 deletions.
34 changes: 24 additions & 10 deletions example.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
from typing import Annotated

from ape import chain
Expand All @@ -14,16 +15,21 @@
# We can add parameters, which are values in state that can be updated by external triggers
app.add_parameter("bad_number", default=3)

# Cannot call `app.state` outside of an app function handler
# app.state.something # NOTE: raises AttributeError

# NOTE: Don't do any networking until after initializing app
USDC = tokens["USDC"]
YFI = tokens["YFI"]


@app.on_startup()
def app_startup(startup_state: StateSnapshot):
# NOTE: This is called just as the app is put into "run" state,
# and handled by the first available worker
# raise Exception # NOTE: Any exception raised on startup aborts immediately
# This is called just as the app is put into "run" state,
# and handled by the first available worker

# Any exception raised on startup aborts immediately:
# raise Exception # NOTE: raises StartupFailure

# This is a great place to set `app.state` values that aren't parameters
# NOTE: Non-parameter state is `None` by default
Expand All @@ -36,7 +42,7 @@ def app_startup(startup_state: StateSnapshot):
# Can handle some resource initialization for each worker, like LLMs or database connections
class MyDB:
def execute(self, query: str):
pass
pass # Handle query somehow...


@app.on_worker_startup()
Expand All @@ -45,9 +51,11 @@ def worker_startup(worker_state: TaskiqState): # NOTE: You need the type hint t
# NOTE: Can put anything here, any python object works
worker_state.db = MyDB()
worker_state.block_count = 0
# raise Exception # NOTE: Any exception raised on worker startup aborts immediately

# Cannot call `app.state` because it is not set up yet
# Any exception raised on worker startup aborts immediately:
# raise Exception # NOTE: raises StartupFailure

# Cannot call `app.state` because it is not set up yet on worker startup functions
# app.state.something # NOTE: raises AttributeError


Expand All @@ -64,7 +72,7 @@ def exec_block(block: BlockAPI, context: Annotated[Context, TaskiqDepends()]):
# Set new_block_timeout to adjust the expected block time.
@app.on_(USDC.Transfer, start_block=19784367, new_block_timeout=25)
# NOTE: Typing isn't required, it will still be an Ape `ContractLog` type
def exec_event1(log):
async def exec_event1(log):
if log.log_index % 7 == app.state.bad_number:
# If you raise any exception, Silverback will track the failure and keep running
# NOTE: By default, if you have 3 tasks fail in a row, the app will shutdown itself
Expand All @@ -73,12 +81,14 @@ def exec_event1(log):
# You can update state whenever you want
app.state.logs_processed += 1

# Do any other long running tasks...
await asyncio.sleep(5)
return {"amount": log.amount}


@app.on_(YFI.Approval)
# Any handler function can be async too
async def exec_event2(log: ContractLog):
def exec_event2(log: ContractLog):
if log.log_index % 7 == 6:
# If you ever want the app to immediately shutdown under some scenario, raise this exception
raise CircuitBreaker("Oopsie!")
Expand All @@ -91,12 +101,16 @@ async def exec_event2(log: ContractLog):
# A final job to execute on Silverback shutdown
@app.on_shutdown()
def app_shutdown():
# raise Exception # NOTE: Any exception raised on shutdown is ignored
# NOTE: Any exception raised on worker shutdown is ignored:
# raise Exception
return {"some_metric": 123}


# Just in case you need to release some resources or something inside each worker
@app.on_worker_shutdown()
def worker_shutdown(state: TaskiqState): # NOTE: You need the type hint here
# This is a good time to release resources
state.db = None
# raise Exception # NOTE: Any exception raised on worker shutdown is ignored

# NOTE: Any exception raised on worker shutdown is ignored:
# raise Exception
71 changes: 71 additions & 0 deletions silverback/_cli.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import asyncio
import os
from concurrent.futures import ThreadPoolExecutor
from decimal import Decimal
from uuid import uuid4

import click
from ape.cli import (
Expand All @@ -12,11 +14,15 @@
)
from ape.exceptions import Abort
from taskiq import AsyncBroker
from taskiq.brokers.inmemory_broker import InMemoryBroker
from taskiq.cli.worker.run import shutdown_broker
from taskiq.kicker import AsyncKicker
from taskiq.receiver import Receiver

from silverback._importer import import_from_string
from silverback.runner import PollingRunner
from silverback.settings import Settings
from silverback.types import ScalarType, TaskType, is_scalar_type


@click.group()
Expand Down Expand Up @@ -130,3 +136,68 @@ def run(cli_ctx, account, runner, recorder, max_exceptions, path):
def worker(cli_ctx, account, workers, max_exceptions, shutdown_timeout, path):
app = import_from_string(path)
asyncio.run(run_worker(app.broker, worker_count=workers, shutdown_timeout=shutdown_timeout))


class ScalarParam(click.ParamType):
name = "scalar"

def convert(self, val, param, ctx) -> ScalarType:
if not isinstance(val, str) or is_scalar_type(val):
return val

elif val.lower() in ("f", "false"):
return False

elif val.lower() in ("t", "true"):
return True

try:
return int(val)
except Exception:
pass

try:
return float(val)
except Exception:
pass

# NOTE: Decimal allows the most values, so leave last
return Decimal(val)


@cli.command(cls=ConnectedProviderCommand, help="Set parameters against a running silverback app")
@network_option(
default=os.environ.get("SILVERBACK_NETWORK_CHOICE", "auto"),
callback=_network_callback,
)
@click.option(
"-p",
"--param",
"param_updates",
type=(str, ScalarParam()),
multiple=True,
)
def set_param(param_updates):

if len(param_updates) > 1:
task_name = str(TaskType._SET_PARAM_BATCH)
arg = dict(param_updates)
else:
param_name, arg = param_updates[0]
task_name = f"{TaskType._SET_PARAM}:{param_name}"

async def set_parameters():
broker = Settings().get_broker()
if isinstance(broker, InMemoryBroker):
raise RuntimeError("Cannot use with default in-memory broker")

kicker = AsyncKicker(task_name, broker, labels={})
task = await kicker.kiq(arg)
result = await task.wait_result()

if result.is_err:
click.echo(result.error)
else:
click.echo(result.return_value)

asyncio.run(set_parameters())
118 changes: 93 additions & 25 deletions silverback/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@ class TaskData:
handler: AsyncTaskiqDecoratedTask


@dataclass
class ParameterInfo:
default: ScalarType | None
update_handler: AsyncTaskiqDecoratedTask | None


class SharedState(defaultdict):
def __init__(self):
# Any unknown key returns None
Expand Down Expand Up @@ -109,10 +115,31 @@ def __init__(self, settings: Settings | None = None):

# NOTE: The runner needs to know the set of things that the app is tracking as a parameter
# NOTE: We also need to know the defaults in case the parameters are not in the backup
self.parameter_defaults: dict[str, ScalarType | None] = dict()

self.__parameters: dict[str, ParameterInfo] = {
# System state parameters
"system:last_block_seen": ParameterInfo(
default=-1,
# NOTE: Don't allow external updates
update_handler=None,
),
"system:last_block_processed": ParameterInfo(
default=-1,
# NOTE: Don't allow external updates
update_handler=None,
),
}

# Register system tasks
self._create_system_startup_task()
# TODO: Make backup optional and settings-driven
self.backup = AppDatastore(app_id=self.identifier)
self._create_system_backup_task()
self._create_batch_parameter_task()

@property
def parameters(self) -> dict[str, ParameterInfo]:
# NOTE: makes this variable read-only
return self.__parameters

def _create_system_startup_task(self):
# Add a task to load all parameters from state at startup
Expand All @@ -124,22 +151,24 @@ async def startup_handler() -> StateSnapshot:
# NOTE: attribute does not exist before this task is executed,
# ensuring no one uses it during worker startup

self.backup = AppDatastore()
if not (startup_state := await self.backup.init(app_id=self.identifier)):
return StateSnapshot(last_block_seen=-1, last_block_processed=-1)
if not (startup_state := await self.backup.load()):
logger.warning("No state snapshot detected, using empty snapshot")
startup_state = StateSnapshot() # Use empty snapshot

logger.info("Finding cached parameters: [" + ", ".join(self.parameter_defaults) + "]")
for param_name, param_info in self.parameters.items():

for param_name, default in self.parameter_defaults.items():
if (cached_value := startup_state.parameter_values.get(param_name)) is not None:
logger.debug(f"Found cached value for '{param_name}': {cached_value}")
if (cached_value := startup_state.parameters.get(param_name)) is not None:
logger.info(f"Found cached value for app.state['{param_name}']: {cached_value}")
self.state[param_name] = cached_value

else:
logger.debug(
f"Cached value not found for '{param_name}', using default: {default}"
elif param_info.default is not None:
logger.info(
f"Cached value not found for app.state['{param_name}']"
f", using default: {param_info.default}"
)
self.state[param_name] = default
self.state[param_name] = param_info.default

# NOTE: `None` default doesn't need to be set because that's how SharedState works

return startup_state

Expand All @@ -150,18 +179,27 @@ async def startup_handler() -> StateSnapshot:
task_type=str(TaskType._RESTORE),
)

def _create_snapshot(self) -> StateSnapshot:
return StateSnapshot(
parameters={param_name: self.state[param_name] for param_name in self.parameters},
)

def _create_system_backup_task(self):
# TODO: Make backups optional
# TODO: Allow configuring backup class

# Add a task to backup state before/after every non-system runtime task and at shutdown
async def backup_handler(snapshot: StateSnapshot):
for param_name in self.parameter_defaults:
# Save our current parameter values, if set
if (current_value := self.state[param_name]) is not None:
snapshot.parameter_values[param_name] = current_value
async def backup_handler(
last_block_seen: int | None = None,
last_block_processed: int | None = None,
):
if last_block_seen is not None:
self.state["system:last_block_seen"] = last_block_seen

if last_block_processed is not None:
self.state["system:last_block_processed"] = last_block_processed

return await self.backup.save(snapshot)
return await self.backup.save(self._create_snapshot())

self.backup_task = self.broker.register_task(
backup_handler,
Expand All @@ -170,17 +208,47 @@ async def backup_handler(snapshot: StateSnapshot):
task_type=str(TaskType._BACKUP),
)

def _create_batch_parameter_task(self):
async def batch_parameters_handler(parameter_updates: dict):
# NOTE: This is one blocking atomic task, it must be handled atomically
datapoints = {}
for param_name, new_value in parameter_updates.items():
if "system:" in param_name:
logger.error(f"Cannot update system parameter '{param_name}'")

elif param_name not in self.parameters:
logger.error(f"Unrecognized parameter '{param_name}'")

else:
datapoints[param_name] = ParamChangeDatapoint(
old=self.state[param_name], new=new_value
)
logger.success(f"Update: app.state['{param_name}'] = {new_value}")
self.state[param_name] = new_value

await self.backup.save(self._create_snapshot())
return datapoints

self.batch_parameters_task = self.broker.register_task(
batch_parameters_handler,
# NOTE: Name makes it impossible to conflict with user's handler fn names
task_name=str(TaskType._SET_PARAM_BATCH),
task_type=str(TaskType._SET_PARAM_BATCH),
)

def add_parameter(self, param_name: str, default: ScalarType | None = None):
if param_name in self.parameter_defaults:
raise ValueError(f"{param_name} already added!")
if "system:" in param_name:
raise ValueError("Cannot override system parameters")

# Update this to track parameter existance/default value
self.parameter_defaults[param_name] = default
if param_name in self.parameters:
raise ValueError(f"{param_name} already added!")

# This handler will handle parameter changes during runtime
async def update_handler(new_value):
datapoint = ParamChangeDatapoint(old=self.state[param_name], new=new_value)
logger.success(f"Update: app.state['{param_name}'] = {new_value}")
self.state[param_name] = new_value
await self.backup.save(self._create_snapshot())
return datapoint

broker_task = self.broker.register_task(
Expand All @@ -190,8 +258,8 @@ async def update_handler(new_value):
task_type=str(TaskType._SET_PARAM),
)

self.tasks[TaskType._SET_PARAM].append(TaskData(container=None, handler=broker_task))
# TODO: Allow accepting parameter updates to .kiq this task somehow
# Update this to track parameter existance/default value/update handler
self.__parameters[param_name] = ParameterInfo(default=default, update_handler=broker_task)

def broker_task_decorator(
self,
Expand Down
2 changes: 1 addition & 1 deletion silverback/middlewares.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ def pre_execute(self, message: TaskiqMessage) -> TaskiqMessage:
message.labels["transaction_hash"] = log.transaction_hash
message.labels["log_index"] = str(log.log_index)

elif task_type in (TaskType.STARTUP, TaskType._BACKUP):
elif task_type is TaskType.STARTUP:
message.args[0] = StateSnapshot.model_validate(message.args[0])

# Record task start (appears on worker in distributed mode)
Expand Down
Loading

0 comments on commit 20ee2b1

Please sign in to comment.