Skip to content

Commit

Permalink
refactor: have runner load and save the snapshot in datastore
Browse files Browse the repository at this point in the history
  • Loading branch information
fubuloubu committed Aug 26, 2024
1 parent f254868 commit 07dd9f9
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 41 deletions.
39 changes: 12 additions & 27 deletions silverback/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

from .exceptions import ContainerTypeMismatchError, InvalidContainerTypeError
from .settings import Settings
from .state import AppDatastore, StateSnapshot
from .state import StateSnapshot
from .types import SilverbackID, TaskType


Expand Down Expand Up @@ -161,15 +161,11 @@ def __init__(self, settings: Settings | None = None):
self._get_user_all_taskdata = self.__register_system_task(
TaskType.SYSTEM_USER_ALL_TASKDATA, self.__get_user_all_taskdata_handler
)

# TODO: Make backup optional and settings-driven
# TODO: Allow configuring backup class
self.datastore = AppDatastore()
self._load_snapshot = self.__register_system_task(
TaskType.SYSTEM_LOAD_SNAPSHOT, self.__load_snapshot_handler
)
self._save_snapshot = self.__register_system_task(
TaskType.SYSTEM_SAVE_SNAPSHOT, self.__save_snapshot_handler
self._create_snapshot = self.__register_system_task(
TaskType.SYSTEM_CREATE_SNAPSHOT, self.__create_snapshot_handler
)

def __register_system_task(
Expand Down Expand Up @@ -201,45 +197,34 @@ def __get_user_taskdata_handler(self, task_type: TaskType) -> list[TaskData]:
def __get_user_all_taskdata_handler(self) -> list[TaskData]:
return [v for k, l in self.tasks.items() if str(k).startswith("user:") for v in l]

async def __load_snapshot_handler(self) -> StateSnapshot:
async def __load_snapshot_handler(self, startup_state: StateSnapshot):
# NOTE: *DO NOT USE* in Runner, as it will not be updated by the app
self.state = SharedState()
# NOTE: attribute does not exist before this task is executed,
# ensuring no one uses it during worker startup

if not (startup_state := await self.datastore.init(app_id=self.identifier)):
logger.warning("No state snapshot detected, using empty snapshot")
# TODO: Refactor to `None` by removing
self.state["system:last_block_seen"] = -1
self.state["system:last_block_processed"] = -1
startup_state = StateSnapshot(
# TODO: Migrate these to parameters (remove explicitly from state)
last_block_seen=-1,
last_block_processed=-1,
) # Use empty snapshot

return startup_state
self.state["system:last_block_seen"] = startup_state.last_block_seen
self.state["system:last_block_processed"] = startup_state.last_block_processed
# TODO: Load user custom state (should not start with `system:`)

async def __save_snapshot_handler(
async def __create_snapshot_handler(
self,
last_block_seen: int | None = None,
last_block_processed: int | None = None,
):
# Task that backups state before/after every non-system runtime task and at shutdown
# Task that updates state checkpoints before/after every non-system runtime task/at shutdown
if last_block_seen is not None:
self.state["system:last_block_seen"] = last_block_seen

if last_block_processed is not None:
self.state["system:last_block_processed"] = last_block_processed

snapshot = StateSnapshot(
return StateSnapshot(
# TODO: Migrate these to parameters (remove explicitly from state)
last_block_processed=self.state["system:last_block_seen"] or -1,
last_block_seen=self.state["system:last_block_processed"] or -1,
last_block_seen=self.state.get("system:last_block_seen", -1),
last_block_processed=self.state.get("system:last_block_processed", -1),
)

return await self.datastore.save(snapshot)

def broker_task_decorator(
self,
task_type: TaskType,
Expand Down
43 changes: 30 additions & 13 deletions silverback/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from .application import SilverbackApp, SystemConfig, TaskData
from .exceptions import Halt, NoTasksAvailableError, NoWebsocketAvailableError, StartupFailure
from .recorder import BaseRecorder, TaskResult
from .state import StateSnapshot
from .state import AppDatastore, StateSnapshot
from .subscriptions import SubscriptionType, Web3SubscriptionsManager
from .types import TaskType
from .utils import (
Expand All @@ -36,6 +36,10 @@ def __init__(
**kwargs,
):
self.app = app

# TODO: Make datastore optional and settings-driven
# TODO: Allow configuring datastore class
self.datastore = AppDatastore()
self.recorder = recorder

self.max_exceptions = max_exceptions
Expand Down Expand Up @@ -74,12 +78,14 @@ async def _checkpoint(
last_block_processed: int | None = None,
):
"""Set latest checkpoint block number"""
if not self.save_snapshot_supported:
if not self._snapshotting_supported:
return # Can't support this feature

task = await self.app._save_snapshot.kiq(last_block_seen, last_block_processed)
task = await self.app._create_snapshot.kiq(last_block_seen, last_block_processed)
if (result := await task.wait_result()).is_err:
logger.error(f"Error saving snapshot: {result.error}")
else:
await self.datastore.save(result.return_value)

@abstractmethod
async def _block_task(self, task_data: TaskData):
Expand Down Expand Up @@ -133,32 +139,39 @@ async def run(self):
)

# NOTE: Bypass snapshotting if unsupported
self.save_snapshot_supported = TaskType.SYSTEM_SAVE_SNAPSHOT in system_tasks
self._snapshotting_supported = TaskType.SYSTEM_CREATE_SNAPSHOT in system_tasks

# Load the snapshot (if available)
# NOTE: Add some additional handling to see if this feature is available in bot
if TaskType.SYSTEM_LOAD_SNAPSHOT not in system_tasks:
logger.warning(
"Silverback no longer supports runner-based snapshotting, "
"please upgrade your bot SDK version to latest."
"please upgrade your bot SDK version to latest to use snapshots."
)
startup_state = StateSnapshot(
last_block_seen=-1,
last_block_processed=-1,
) # Use empty snapshot

elif (
elif not (startup_state := await self.datastore.init(app_id=self.app.identifier)):
logger.warning("No state snapshot detected, using empty snapshot")
startup_state = StateSnapshot(
# TODO: Migrate these to parameters (remove explicitly from state)
last_block_seen=-1,
last_block_processed=-1,
) # Use empty snapshot

logger.debug(f"Startup state: {startup_state}")
# NOTE: State snapshot is immediately out of date after init

# Send startup state to app
if (
result := await run_taskiq_task_wait_result(
self._create_system_task_kicker(TaskType.SYSTEM_LOAD_SNAPSHOT)
self._create_system_task_kicker(TaskType.SYSTEM_LOAD_SNAPSHOT), startup_state
)
).is_err:
raise StartupFailure(result.error)

else:
startup_state = result.return_value
logger.debug(f"Startup state: {startup_state}")
# NOTE: State snapshot is immediately out of date after init

# NOTE: Do this for other system tasks because they may not be in older SDK versions
# `if TaskType.<SYSTEM_TASK_NAME> not in system_tasks: raise StartupFailure(...)`
# or handle accordingly by having default logic if it is not available
Expand Down Expand Up @@ -274,7 +287,11 @@ async def run(self):

# NOTE: No need to handle results otherwise

await self.app.broker.shutdown()
if self._snapshotting_supported:
# Do one last checkpoint to save a snapshot of final state
await self._checkpoint()

await self.app.broker.shutdown() # Release broker


class WebsocketRunner(BaseRunner, ManagerAccessMixin):
Expand Down
2 changes: 1 addition & 1 deletion silverback/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class TaskType(str, Enum):
SYSTEM_USER_TASKDATA = "system:user-taskdata"
SYSTEM_USER_ALL_TASKDATA = "system:user-all-taskdata"
SYSTEM_LOAD_SNAPSHOT = "system:load-snapshot"
SYSTEM_SAVE_SNAPSHOT = "system:save-snapshot"
SYSTEM_CREATE_SNAPSHOT = "system:create-snapshot"

# User-accessible Tasks
STARTUP = "user:startup"
Expand Down

0 comments on commit 07dd9f9

Please sign in to comment.