Skip to content

Commit

Permalink
Switch from implicit to explicit drop of async subtasks
Browse files Browse the repository at this point in the history
  • Loading branch information
lukewagner committed Sep 2, 2024
1 parent 41bef2a commit db8d72e
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 8 deletions.
4 changes: 2 additions & 2 deletions design/mvp/Async.md
Original file line number Diff line number Diff line change
Expand Up @@ -451,8 +451,8 @@ features will be added in future chunks to complete "async" in Preview 3:
concurrency
* `subtask.cancel`: allow a supertask to signal to a subtask that its result is
no longer wanted and to please wrap it up promptly
* `subtask.drop`: allow tail-calling a subtask so that the current wasm
instance can be torn down eagerly
* allow "tail-calling" a subtask so that the current wasm instance can be torn
down eagerly
* `task.index`+`task.wake`: allow tasks in the same instance to wait on and
wake each other (async condvar-style)
* `nonblocking` function type attribute: allow a function to declare in its
Expand Down
1 change: 1 addition & 0 deletions design/mvp/Binary.md
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,7 @@ canon ::= 0x00 0x00 f:<core:funcidx> opts:<opts> ft:<typeidx> => (canon lift
| 0x0b => (canon task.wait (core func))
| 0x0c => (canon task.poll (core func))
| 0x0d => (canon task.yield (core func))
| 0x0e => (canon subtask.drop (core func))
opts ::= opt*:vec(<canonopt>) => opt*
canonopt ::= 0x00 => string-encoding=utf8
| 0x01 => string-encoding=utf16
Expand Down
26 changes: 23 additions & 3 deletions design/mvp/CanonicalABI.md
Original file line number Diff line number Diff line change
Expand Up @@ -619,9 +619,6 @@ once).
def process_event(self, subtask):
assert(subtask.supertask is self)
subtask.enqueued = False
if subtask.state == AsyncCallState.DONE:
self.inst.async_subtasks.remove(subtask.index)
self.num_async_subtasks -= 1
return (EventCode(subtask.state), subtask.index)
```
The `pending_tasks` queue (appended to by `enter` above) is emptied by `wait`
Expand Down Expand Up @@ -2399,6 +2396,29 @@ async def canon_task_yield(task):
return []
```

### 🔀 `canon subtask.drop`

For a canonical definition:
```wasm
(canon subtask.drop (core func $f))
```
validation specifies:
* `$f` is given type `(func (param i32))`

Calling `$f` removes the indicated subtask from the instance's table, trapping
if the subtask isn't done or isn't a subtask of the current task. The guard
on `enqueued` ensures that supertasks can only drop subtasks once they've been
officially notified of their completion (via `task.wait` or callback).
```python
async def canon_subtask_drop(task, i):
subtask = task.inst.async_subtasks.remove(i)
trap_if(subtask.enqueued)
trap_if(subtask.state != AsyncCallState.DONE)
trap_if(subtask.supertask is not task)
task.num_async_subtasks -= 1
return []
```

### 🧵 `canon thread.spawn`

For a canonical definition:
Expand Down
5 changes: 5 additions & 0 deletions design/mvp/Explainer.md
Original file line number Diff line number Diff line change
Expand Up @@ -1311,6 +1311,7 @@ canon ::= ...
| (canon task.wait (core func <id>?)) 🔀
| (canon task.poll (core func <id>?)) 🔀
| (canon task.yield (core func <id>?)) 🔀
| (canon subtask.drop (core func <id>?)) 🔀
| (canon thread.spawn <typeidx> (core func <id>?)) 🧵
| (canon thread.hw_concurrency (core func <id>?)) 🧵
```
Expand Down Expand Up @@ -1409,6 +1410,10 @@ switch to another task, allowing a long-running computation to cooperatively
interleave with other tasks. (See also [`canon_task_yield`] in the Canonical
ABI explainer.)

The `subtask.drop` built-in has type `[i32] -> []` and removes the indicated
[subtask](Async.md#subtask-and-supertask) from the current instance's subtask
table, trapping if the subtask isn't done.

##### 🧵 Threading built-ins

The [shared-everything-threads] proposal adds component model built-ins for
Expand Down
13 changes: 10 additions & 3 deletions design/mvp/canonical-abi/definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -461,9 +461,6 @@ def maybe_start_pending_task(self):
def process_event(self, subtask):
assert(subtask.supertask is self)
subtask.enqueued = False
if subtask.state == AsyncCallState.DONE:
self.inst.async_subtasks.remove(subtask.index)
self.num_async_subtasks -= 1
return (EventCode(subtask.state), subtask.index)

def poll(self):
Expand Down Expand Up @@ -1523,3 +1520,13 @@ async def canon_task_yield(task):
trap_if(task.opts.callback is not None)
await task.yield_()
return []

### 🔀 `canon subtask.drop`

async def canon_subtask_drop(task, i):
subtask = task.inst.async_subtasks.remove(i)
trap_if(subtask.enqueued)
trap_if(subtask.state != AsyncCallState.DONE)
trap_if(subtask.supertask is not task)
task.num_async_subtasks -= 1
return []
14 changes: 14 additions & 0 deletions design/mvp/canonical-abi/run_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,8 @@ async def consumer(task, args):
event, callidx = await task.wait()
assert(event == EventCode.CALL_DONE)
assert(callidx == 1)
assert(task.num_async_subtasks == 1)
await canon_subtask_drop(task, callidx)
assert(task.num_async_subtasks == 0)

dtor_fut = asyncio.Future()
Expand All @@ -571,6 +573,7 @@ async def dtor(task, args):
event, callidx = await task.wait()
assert(event == AsyncCallState.DONE)
assert(callidx == 1)
await canon_subtask_drop(task, callidx)
assert(task.num_async_subtasks == 0)

[] = await canon_task_return(task, CoreFuncType(['i32'],[]), [42])
Expand Down Expand Up @@ -632,6 +635,7 @@ async def callback(task, args):
if args[0] == 42:
assert(args[1] == EventCode.CALL_DONE)
assert(args[2] == 1)
await canon_subtask_drop(task, 1)
return [53]
elif args[0] == 52:
assert(args[1] == EventCode.YIELDED)
Expand All @@ -642,6 +646,7 @@ async def callback(task, args):
assert(args[0] == 62)
assert(args[1] == EventCode.CALL_DONE)
assert(args[2] == 2)
await canon_subtask_drop(task, 2)
[] = await canon_task_return(task, CoreFuncType(['i32'],[]), [83])
return [0]

Expand Down Expand Up @@ -708,6 +713,7 @@ async def consumer(task, args):
event, callidx = await task.wait()
assert(event == EventCode.CALL_DONE)
assert(callidx == 1)
await canon_subtask_drop(task, callidx)
assert(producer1_done == True)

assert(producer2_done == False)
Expand All @@ -716,6 +722,7 @@ async def consumer(task, args):
event, callidx = task.poll()
assert(event == EventCode.CALL_DONE)
assert(callidx == 2)
await canon_subtask_drop(task, callidx)
assert(producer2_done == True)

assert(task.poll() is None)
Expand Down Expand Up @@ -796,6 +803,9 @@ async def consumer(task, args):
assert(callidx == 2)
assert(producer2_done == True)

await canon_subtask_drop(task, 1)
await canon_subtask_drop(task, 2)

assert(task.poll() is None)

await canon_task_start(task, CoreFuncType([],[]), [])
Expand Down Expand Up @@ -851,6 +861,10 @@ async def core_func(task, args):
event, callidx = await task.wait()
assert(event == EventCode.CALL_DONE)
assert(callidx == 2)

await canon_subtask_drop(task, 1)
await canon_subtask_drop(task, 2)

return []

inst = ComponentInstance()
Expand Down

0 comments on commit db8d72e

Please sign in to comment.