Skip to content

Commit

Permalink
CABI refactor: improve call_and_handle_blocking interface
Browse files Browse the repository at this point in the history
  • Loading branch information
lukewagner committed Oct 4, 2024
1 parent 11fe9a6 commit 8811361
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 52 deletions.
74 changes: 40 additions & 34 deletions design/mvp/CanonicalABI.md
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ class EventCode(IntEnum):

EventTuple = tuple[EventCode, int]
EventCallback = Callable[[], EventTuple]
OnBlockCallback = Callable[[Awaitable], any]
OnBlockCallback = Callable[[Awaitable], Any]
```
The `CallState` enum describes the linear sequence of states that an async call
necessarily transitions through: [`STARTING`](Async.md#backpressure), `STARTED`,
Expand Down Expand Up @@ -340,45 +340,51 @@ async def default_on_block(f):
await current_task.acquire()
return v

async def call_and_handle_blocking(callee):
blocked = asyncio.Future()
class Blocked: pass

async def call_and_handle_blocking(callee, *args) -> Blocked|Any:
blocked_or_result = asyncio.Future[Blocked|Any]()
async def on_block(f):
if not blocked.done():
blocked.set_result(True)
if not blocked_or_result.done():
blocked_or_result.set_result(Blocked())
else:
current_task.release()
assert(not f.done())
v = await f
await current_task.acquire()
return v
async def do_call():
await callee(on_block)
if not blocked.done():
blocked.set_result(False)
result = await callee(*args, on_block)
if not blocked_or_result.done():
blocked_or_result.set_result(result)
else:
current_task.release()
asyncio.create_task(do_call())
return await blocked
return await blocked_or_result
```
Talking through this little Python pretzel of control flow:
1. `call_and_handle_blocking` starts by running `do_call` in a fresh Python
task and then immediately `await`ing a future that will be resolved by
`do_call`. Since `current_task` isn't `release()`d or `acquire()`d as part
of this process, the net effect is to directly transfer control flow from
`call_and_handle_blocking` to `do_call` task without allowing other tasks to
run (as if by `cont.new` + `resume` in [stack-switching]).
run (as if by the `cont.new` + `resume` instructions of [stack-switching]).
2. `do_call` passes the local `on_block` closure to `callee`, which the
Canonical ABI ensures will be called whenever there is a need to block.
3. If `on_block` is called, the first time it resolves `blocking`. Because
Canonical ABI ensures will be called whenever there is a need to block on
I/O (represented by the future `f`).
3. If `on_block` is called, the first time it is called it will signal that
the `callee` has `Blocked` before `await`ing the unresolved future. Because
the `current_task` lock is not `release()`d or `acquire()`d as part of this
process, the net effect is to directly transfer control flow from `do_call`
back to `call_and_handle_blocking` without allowing other tasks to run (as
if by `suspend` in [stack-switching]).
process, the net effect is to transfer control flow directly from
`on_block` to `call_and_handle_blocking` without allowing any other tasks
to execute (as if by the `suspend` instruction of [stack-switching]).
4. If `on_block` is called more than once, there is no longer a caller to
directly switch to, so the `current_task` lock is `release()`d, just like
in `default_on_block`, so that the Python async scheduler can pick another
task to switch to.
5. If `do_call` finishes without `on_block` ever having been called, it
resolves `blocking` to `False` to communicate this fact to the caller.
resolves `blocking` to the (not-`Blocking`) return value of `callee` to
communicate this fact to the caller.

With these tricky primitives defined, the rest of the logic below can simply
use `on_block` when there is a need to block and `call_and_handle_blocking`
Expand Down Expand Up @@ -616,7 +622,7 @@ tree.
class Subtask(CallContext):
ft: FuncType
flat_args: CoreValueIter
flat_results: Optional[list[any]]
flat_results: Optional[list[Any]]
state: CallState
lenders: list[ResourceHandle]
notify_supertask: bool
Expand Down Expand Up @@ -2147,25 +2153,25 @@ async def canon_lower(opts, ft, callee, task, flat_args):
async def do_call(on_block):
await callee(task, subtask.on_start, subtask.on_return, on_block)
[] = subtask.finish()
if await call_and_handle_blocking(do_call):
subtask.notify_supertask = True
task.need_to_drop += 1
i = task.inst.async_subtasks.add(subtask)
flat_results = [pack_async_result(i, subtask.state)]
else:
flat_results = [0]
match await call_and_handle_blocking(do_call):
case Blocked():
subtask.notify_supertask = True
task.need_to_drop += 1
i = task.inst.async_subtasks.add(subtask)
flat_results = [pack_async_result(i, subtask.state)]
case None:
flat_results = [0]
return flat_results
```
In the asynchronous case, `Task.call_and_handle_blocking` returns `True` if the
call to `do_call` blocks. In this blocking case, the `Subtask` is added to
stored in an instance-wide table and given an `i32` index that is later
returned by `task.wait` to indicate that the subtask made progress. The
`need_to_drop` increment is matched by a decrement in `canon_subtask_drop` and
ensures that all subtasks of a supertask are allowed to complete before the
supertask completes. The `notify_supertask` flag is set to tell `Subtask`
methods (below) to asynchronously notify the supertask of progress. Lastly,
the current state of the subtask is eagerly returned to the caller, packed
with the `i32` subtask index:
In the asynchronous case, if `do_call` blocks before `Subtask.finish`
(signalled by `callee` calling `on_block`), the `Subtask` is added to an
instance-wide table and given an `i32` index that is later returned by
`task.wait` to signal subtask's progress. The `need_to_drop` increment is
matched by a decrement in `canon_subtask_drop` and ensures that all subtasks
of a supertask are allowed to complete before the supertask completes. The
`notify_supertask` flag is set to tell `Subtask` methods (below) to
asynchronously notify the supertask of progress. Lastly, the current progress
of the subtask is returned to the caller, packed with the `i32` subtask index:
```python
def pack_async_result(i, state):
assert(0 < i < 2**30)
Expand Down
39 changes: 21 additions & 18 deletions design/mvp/canonical-abi/definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from __future__ import annotations
from dataclasses import dataclass
from functools import partial
from typing import Optional, Callable, Awaitable, Literal, MutableMapping, TypeVar, Generic
from typing import Any, Optional, Callable, Awaitable, Literal, MutableMapping, TypeVar, Generic
from enum import IntEnum
import math
import struct
Expand Down Expand Up @@ -304,7 +304,7 @@ class EventCode(IntEnum):

EventTuple = tuple[EventCode, int]
EventCallback = Callable[[], EventTuple]
OnBlockCallback = Callable[[Awaitable], any]
OnBlockCallback = Callable[[Awaitable], Any]

current_task = asyncio.Lock()
asyncio.run(current_task.acquire())
Expand All @@ -315,24 +315,26 @@ async def default_on_block(f):
await current_task.acquire()
return v

async def call_and_handle_blocking(callee):
blocked = asyncio.Future()
class Blocked: pass

async def call_and_handle_blocking(callee, *args) -> Blocked|Any:
blocked_or_result = asyncio.Future[Blocked|Any]()
async def on_block(f):
if not blocked.done():
blocked.set_result(True)
if not blocked_or_result.done():
blocked_or_result.set_result(Blocked())
else:
current_task.release()
v = await f
await current_task.acquire()
return v
async def do_call():
await callee(on_block)
if not blocked.done():
blocked.set_result(False)
result = await callee(*args, on_block)
if not blocked_or_result.done():
blocked_or_result.set_result(result)
else:
current_task.release()
asyncio.create_task(do_call())
return await blocked
return await blocked_or_result

class Task(CallContext):
ft: FuncType
Expand Down Expand Up @@ -457,7 +459,7 @@ def exit(self):
class Subtask(CallContext):
ft: FuncType
flat_args: CoreValueIter
flat_results: Optional[list[any]]
flat_results: Optional[list[Any]]
state: CallState
lenders: list[ResourceHandle]
notify_supertask: bool
Expand Down Expand Up @@ -1454,13 +1456,14 @@ async def canon_lower(opts, ft, callee, task, flat_args):
async def do_call(on_block):
await callee(task, subtask.on_start, subtask.on_return, on_block)
[] = subtask.finish()
if await call_and_handle_blocking(do_call):
subtask.notify_supertask = True
task.need_to_drop += 1
i = task.inst.async_subtasks.add(subtask)
flat_results = [pack_async_result(i, subtask.state)]
else:
flat_results = [0]
match await call_and_handle_blocking(do_call):
case Blocked():
subtask.notify_supertask = True
task.need_to_drop += 1
i = task.inst.async_subtasks.add(subtask)
flat_results = [pack_async_result(i, subtask.state)]
case None:
flat_results = [0]
return flat_results

def pack_async_result(i, state):
Expand Down

0 comments on commit 8811361

Please sign in to comment.