From 881136196558e73503ee382488b6afa08a1d11fd Mon Sep 17 00:00:00 2001 From: Luke Wagner Date: Fri, 4 Oct 2024 11:25:39 -0500 Subject: [PATCH] CABI refactor: improve call_and_handle_blocking interface --- design/mvp/CanonicalABI.md | 74 +++++++++++++------------ design/mvp/canonical-abi/definitions.py | 39 +++++++------ 2 files changed, 61 insertions(+), 52 deletions(-) diff --git a/design/mvp/CanonicalABI.md b/design/mvp/CanonicalABI.md index b7497d2f..20638bfa 100644 --- a/design/mvp/CanonicalABI.md +++ b/design/mvp/CanonicalABI.md @@ -304,7 +304,7 @@ class EventCode(IntEnum): EventTuple = tuple[EventCode, int] EventCallback = Callable[[], EventTuple] -OnBlockCallback = Callable[[Awaitable], any] +OnBlockCallback = Callable[[Awaitable], Any] ``` The `CallState` enum describes the linear sequence of states that an async call necessarily transitions through: [`STARTING`](Async.md#backpressure), `STARTED`, @@ -340,24 +340,27 @@ async def default_on_block(f): await current_task.acquire() return v -async def call_and_handle_blocking(callee): - blocked = asyncio.Future() +class Blocked: pass + +async def call_and_handle_blocking(callee, *args) -> Blocked|Any: + blocked_or_result = asyncio.Future[Blocked|Any]() async def on_block(f): - if not blocked.done(): - blocked.set_result(True) + if not blocked_or_result.done(): + blocked_or_result.set_result(Blocked()) else: current_task.release() + assert(not f.done()) v = await f await current_task.acquire() return v async def do_call(): - await callee(on_block) - if not blocked.done(): - blocked.set_result(False) + result = await callee(*args, on_block) + if not blocked_or_result.done(): + blocked_or_result.set_result(result) else: current_task.release() asyncio.create_task(do_call()) - return await blocked + return await blocked_or_result ``` Talking through this little Python pretzel of control flow: 1. `call_and_handle_blocking` starts by running `do_call` in a fresh Python @@ -365,20 +368,23 @@ Talking through this little Python pretzel of control flow: `do_call`. Since `current_task` isn't `release()`d or `acquire()`d as part of this process, the net effect is to directly transfer control flow from `call_and_handle_blocking` to `do_call` task without allowing other tasks to - run (as if by `cont.new` + `resume` in [stack-switching]). + run (as if by the `cont.new` + `resume` instructions of [stack-switching]). 2. `do_call` passes the local `on_block` closure to `callee`, which the - Canonical ABI ensures will be called whenever there is a need to block. -3. If `on_block` is called, the first time it resolves `blocking`. Because + Canonical ABI ensures will be called whenever there is a need to block on + I/O (represented by the future `f`). +3. If `on_block` is called, the first time it is called it will signal that + the `callee` has `Blocked` before `await`ing the unresolved future. Because the `current_task` lock is not `release()`d or `acquire()`d as part of this - process, the net effect is to directly transfer control flow from `do_call` - back to `call_and_handle_blocking` without allowing other tasks to run (as - if by `suspend` in [stack-switching]). + process, the net effect is to transfer control flow directly from + `on_block` to `call_and_handle_blocking` without allowing any other tasks + to execute (as if by the `suspend` instruction of [stack-switching]). 4. If `on_block` is called more than once, there is no longer a caller to directly switch to, so the `current_task` lock is `release()`d, just like in `default_on_block`, so that the Python async scheduler can pick another task to switch to. 5. If `do_call` finishes without `on_block` ever having been called, it - resolves `blocking` to `False` to communicate this fact to the caller. + resolves `blocking` to the (not-`Blocking`) return value of `callee` to + communicate this fact to the caller. With these tricky primitives defined, the rest of the logic below can simply use `on_block` when there is a need to block and `call_and_handle_blocking` @@ -616,7 +622,7 @@ tree. class Subtask(CallContext): ft: FuncType flat_args: CoreValueIter - flat_results: Optional[list[any]] + flat_results: Optional[list[Any]] state: CallState lenders: list[ResourceHandle] notify_supertask: bool @@ -2147,25 +2153,25 @@ async def canon_lower(opts, ft, callee, task, flat_args): async def do_call(on_block): await callee(task, subtask.on_start, subtask.on_return, on_block) [] = subtask.finish() - if await call_and_handle_blocking(do_call): - subtask.notify_supertask = True - task.need_to_drop += 1 - i = task.inst.async_subtasks.add(subtask) - flat_results = [pack_async_result(i, subtask.state)] - else: - flat_results = [0] + match await call_and_handle_blocking(do_call): + case Blocked(): + subtask.notify_supertask = True + task.need_to_drop += 1 + i = task.inst.async_subtasks.add(subtask) + flat_results = [pack_async_result(i, subtask.state)] + case None: + flat_results = [0] return flat_results ``` -In the asynchronous case, `Task.call_and_handle_blocking` returns `True` if the -call to `do_call` blocks. In this blocking case, the `Subtask` is added to -stored in an instance-wide table and given an `i32` index that is later -returned by `task.wait` to indicate that the subtask made progress. The -`need_to_drop` increment is matched by a decrement in `canon_subtask_drop` and -ensures that all subtasks of a supertask are allowed to complete before the -supertask completes. The `notify_supertask` flag is set to tell `Subtask` -methods (below) to asynchronously notify the supertask of progress. Lastly, -the current state of the subtask is eagerly returned to the caller, packed -with the `i32` subtask index: +In the asynchronous case, if `do_call` blocks before `Subtask.finish` +(signalled by `callee` calling `on_block`), the `Subtask` is added to an +instance-wide table and given an `i32` index that is later returned by +`task.wait` to signal subtask's progress. The `need_to_drop` increment is +matched by a decrement in `canon_subtask_drop` and ensures that all subtasks +of a supertask are allowed to complete before the supertask completes. The +`notify_supertask` flag is set to tell `Subtask` methods (below) to +asynchronously notify the supertask of progress. Lastly, the current progress +of the subtask is returned to the caller, packed with the `i32` subtask index: ```python def pack_async_result(i, state): assert(0 < i < 2**30) diff --git a/design/mvp/canonical-abi/definitions.py b/design/mvp/canonical-abi/definitions.py index 2dd0ea0b..cbcbd4ef 100644 --- a/design/mvp/canonical-abi/definitions.py +++ b/design/mvp/canonical-abi/definitions.py @@ -7,7 +7,7 @@ from __future__ import annotations from dataclasses import dataclass from functools import partial -from typing import Optional, Callable, Awaitable, Literal, MutableMapping, TypeVar, Generic +from typing import Any, Optional, Callable, Awaitable, Literal, MutableMapping, TypeVar, Generic from enum import IntEnum import math import struct @@ -304,7 +304,7 @@ class EventCode(IntEnum): EventTuple = tuple[EventCode, int] EventCallback = Callable[[], EventTuple] -OnBlockCallback = Callable[[Awaitable], any] +OnBlockCallback = Callable[[Awaitable], Any] current_task = asyncio.Lock() asyncio.run(current_task.acquire()) @@ -315,24 +315,26 @@ async def default_on_block(f): await current_task.acquire() return v -async def call_and_handle_blocking(callee): - blocked = asyncio.Future() +class Blocked: pass + +async def call_and_handle_blocking(callee, *args) -> Blocked|Any: + blocked_or_result = asyncio.Future[Blocked|Any]() async def on_block(f): - if not blocked.done(): - blocked.set_result(True) + if not blocked_or_result.done(): + blocked_or_result.set_result(Blocked()) else: current_task.release() v = await f await current_task.acquire() return v async def do_call(): - await callee(on_block) - if not blocked.done(): - blocked.set_result(False) + result = await callee(*args, on_block) + if not blocked_or_result.done(): + blocked_or_result.set_result(result) else: current_task.release() asyncio.create_task(do_call()) - return await blocked + return await blocked_or_result class Task(CallContext): ft: FuncType @@ -457,7 +459,7 @@ def exit(self): class Subtask(CallContext): ft: FuncType flat_args: CoreValueIter - flat_results: Optional[list[any]] + flat_results: Optional[list[Any]] state: CallState lenders: list[ResourceHandle] notify_supertask: bool @@ -1454,13 +1456,14 @@ async def canon_lower(opts, ft, callee, task, flat_args): async def do_call(on_block): await callee(task, subtask.on_start, subtask.on_return, on_block) [] = subtask.finish() - if await call_and_handle_blocking(do_call): - subtask.notify_supertask = True - task.need_to_drop += 1 - i = task.inst.async_subtasks.add(subtask) - flat_results = [pack_async_result(i, subtask.state)] - else: - flat_results = [0] + match await call_and_handle_blocking(do_call): + case Blocked(): + subtask.notify_supertask = True + task.need_to_drop += 1 + i = task.inst.async_subtasks.add(subtask) + flat_results = [pack_async_result(i, subtask.state)] + case None: + flat_results = [0] return flat_results def pack_async_result(i, state):