Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(python-runtime): callbacks from async contexts result in BrokenPipeError #3919

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions packages/@jsii/kernel/src/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ export function isPropertyOverride(value: Override): value is PropertyOverride {

export interface Callback {
readonly cbid: string;
/** Whether this callback is synchronous. */
readonly sync: boolean;
readonly cookie: string | undefined;
readonly invoke?: InvokeRequest;
readonly get?: GetRequest;
Expand Down
30 changes: 30 additions & 0 deletions packages/@jsii/kernel/src/kernel.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2212,6 +2212,36 @@ defineTest('invokeBinScript() accepts arguments', (sandbox) => {
});
});

// defineTest('ImplementationFromAsyncContext compliance', async (sandbox) => {
// const producer = sandbox.create({
// fqn: 'Object',
// overrides: [{ method: 'produce', cookie: 'produce1234' }],
// interfaces: ['jsii-calc.IPromiseProducer'],
// });

// const obj = sandbox.create({
// fqn: 'jsii-calc.ImplementationFromAsyncContext',
// args: [producer],
// });

// const promise1 = sandbox.begin({
// objref: obj,
// method: 'doAsyncWork',
// });

// const callbacks1 = sandbox.callbacks();
// expect(callbacks1.callbacks.length).toBe(1);
// expect(callbacks1.callbacks[0].cookie).toBe('produce1234');

// sandbox.complete({
// cbid: callbacks1.callbacks[0].cbid,
// result: 'test-string',
// });

// const result = (await sandbox.end(promise1)).result;
// expect(result).toBe('test-string');
// });

// =================================================================================================

const testNames: { [name: string]: boolean } = {};
Expand Down
54 changes: 52 additions & 2 deletions packages/@jsii/kernel/src/kernel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,52 @@ export class Kernel {

let result;
try {
result = await promise;
let settled = false;
/**
* Poll for new callback requests until the promise is resolved. This is
* to allow any promises necessary for the promise to be able to settle.
* We use setImmediate so the next poll happens on the next run loop tick,
* after other microtasks might have been paused on a pending callback.
*/
// eslint-disable-next-line no-inner-declarations
function pollForCallbacks(kernel: Kernel) {
// Promise has settled already, not going any further...
if (settled) {
return;
}

for (const [cbid, cb] of kernel.cbs.entries()) {
kernel.waiting.set(cbid, cb);
kernel.cbs.delete(cbid);
try {
cb.succeed(
kernel.callbackHandler({
cbid,
sync: false,
cookie: cb.override.cookie,
invoke: {
objref: cb.objref,
method: cb.override.method,
args: cb.args,
},
}),
);
} catch (err) {
cb.fail(err);
} finally {
kernel.waiting.delete(cbid);
}
}
if (!settled) {
setImmediate(pollForCallbacks, kernel);
}
}
pollForCallbacks(this);

result = await promise.finally(() => {
settled = true;
});

this._debug('promise result:', result);
} catch (e: any) {
this._debug('promise error:', e);
Expand Down Expand Up @@ -475,14 +520,16 @@ export class Kernel {
};
}

/** @deprecated the flow should be handled directly by "end" */
public callbacks(_req?: api.CallbacksRequest): api.CallbacksResponse {
this._debug('callbacks');
const ret = Array.from(this.cbs.entries()).map(([cbid, cb]) => {
this.waiting.set(cbid, cb); // move to waiting
this.cbs.delete(cbid); // remove from created
const callback: api.Callback = {
cbid,
cookie: cb.override.cookie,
cbid,
sync: false,
invoke: {
objref: cb.objref,
method: cb.override.method,
Expand Down Expand Up @@ -758,6 +805,7 @@ export class Kernel {
const result = this.callbackHandler({
cookie: override.cookie,
cbid: this._makecbid(),
sync: true,
get: { objref, property: propertyName },
});
this._debug('callback returned', result);
Expand All @@ -774,6 +822,7 @@ export class Kernel {
this.callbackHandler({
cookie: override.cookie,
cbid: this._makecbid(),
sync: true,
set: {
objref,
property: propertyName,
Expand Down Expand Up @@ -910,6 +959,7 @@ export class Kernel {
const result = this.callbackHandler({
cookie: override.cookie,
cbid: this._makecbid(),
sync: true,
invoke: {
objref,
method: methodName,
Expand Down
27 changes: 7 additions & 20 deletions packages/@jsii/python-runtime/src/jsii/_kernel/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import datetime
import inspect
import itertools
import time
from types import FunctionType, MethodType, BuiltinFunctionType, LambdaType

from typing import Callable, cast, Any, List, Optional, Sequence, Type
Expand Down Expand Up @@ -28,6 +29,7 @@
CreateResponse,
DeleteRequest,
EndRequest,
EndResponse,
EnumRef,
GetRequest,
GetResponse,
Expand Down Expand Up @@ -464,26 +466,11 @@ def ainvoke(self, obj: Any, method: str, args: Optional[List[Any]] = None) -> An
if isinstance(promise, Callback):
promise = _callback_till_result(self, promise, BeginResponse)

callbacks = self.provider.callbacks(CallbacksRequest()).callbacks
while callbacks:
for callback in callbacks:
try:
result = _handle_callback(self, callback)
except Exception as exc:
# TODO: Maybe we want to print the whole traceback here?
complete = self.provider.complete(
CompleteRequest(cbid=callback.cbid, err=str(exc))
)
else:
complete = self.provider.complete(
CompleteRequest(cbid=callback.cbid, result=result)
)

assert complete.cbid == callback.cbid

callbacks = self.provider.callbacks(CallbacksRequest()).callbacks

return self.provider.end(EndRequest(promiseid=promise.promiseid)).result
response = self.provider.end(EndRequest(promiseid=promise.promiseid))
if isinstance(response, Callback):
return _callback_till_result(self, response, EndResponse).result
else:
return response.result

def stats(self):
resp = self.provider.stats(StatsRequest())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ def stop(self) -> None:
assert self._process.stdin is not None
if not self._process.stdin.closed:
self._process.stdin.write(b'{"exit":0}\n')
# Close the process' STDIN, singaling we are done with it
# Close the process' STDIN, signaling we are done with it
self._process.stdin.close()

try:
Expand Down
1 change: 1 addition & 0 deletions packages/@jsii/python-runtime/src/jsii/_kernel/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ class StatsResponse:
LoadResponse,
CreateResponse,
DeleteResponse,
EndResponse,
GetResponse,
InvokeResponse,
InvokeScriptResponse,
Expand Down
19 changes: 18 additions & 1 deletion packages/@jsii/python-runtime/tests/test_compliance.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@
AnonymousImplementationProvider,
UpcasingReflectable,
PromiseNothing,
IPromiseProducer,
ImplementationFromAsyncContext,
)
from jsii_calc.cdk16625 import Cdk16625
from jsii_calc.cdk22369 import AcceptsPath
Expand Down Expand Up @@ -527,6 +529,7 @@ def test_asyncOverrides_overrideAsyncMethodByParentClass():
assert obj.call_me() == 4452


# fails for no reason and poisons the runtime
def test_asyncOverrides_overrideCallsSuper():
obj = OverrideCallsSuper()
assert obj.override_me(12) == 1441
Expand Down Expand Up @@ -1358,5 +1361,19 @@ def test_void_returning_async():
"""Verifies it's okay to return a Promise<void>."""

assert PromiseNothing().instance_promise_it() is None
## TODO: This is currently broken as code-gen is incorrect for static async.
# TODO: This is currently broken as code-gen is incorrect for static async.
# assert PromiseNothing.promise_it() is None


def test_calling_implementation_from_async_context():
@jsii.implements(IPromiseProducer)
class ConcreteProducer:
def produce(self) -> str:
return "result"

producer = ConcreteProducer()

assert producer.produce() == "result"

worker = ImplementationFromAsyncContext(producer)
assert worker.do_async_work() == "result"
8 changes: 4 additions & 4 deletions packages/@jsii/runtime/lib/host.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ export class KernelHost {
return this.processRequest(
req,
completeCallback.bind(this),
/* sync */ true,
callback.sync,
);
}
}
Expand Down Expand Up @@ -126,7 +126,7 @@ export class KernelHost {
// promises. see the kernel test 'async overrides: two overrides'
// for an example for this use case.
if (apiReq.api === 'begin' || apiReq.api === 'complete') {
checkIfAsyncIsAllowed();
assertAsyncIsAllowed();

this.debug('processing pending promises before responding');

Expand All @@ -141,7 +141,7 @@ export class KernelHost {
// if this is an async method, return immediately and
// call next only when the promise is fulfilled.
if (this.isPromise(ret)) {
checkIfAsyncIsAllowed();
assertAsyncIsAllowed();

this.debug('waiting for promise to be fulfilled');

Expand Down Expand Up @@ -169,7 +169,7 @@ export class KernelHost {
// indicate this request was processed (synchronously).
return next();

function checkIfAsyncIsAllowed() {
function assertAsyncIsAllowed() {
if (sync) {
throw new JsiiFault(
'Cannot handle async operations while waiting for a sync callback to return',
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 25 additions & 0 deletions packages/jsii-calc/lib/compliance.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3141,3 +3141,28 @@ export class PromiseNothing {
return PromiseNothing.promiseIt();
}
}

/**
* Async Context operations
* Validates features work when run from within an async context
*
* @see https://github.com/aws/jsii/issues/3917
*/
export interface IPromiseProducer {
produce(): Promise<string>;
}

export class ImplementationFromAsyncContext {
public constructor(private readonly producer: IPromiseProducer) {}

public async doAsyncWork(): Promise<string> {
await this.sleep(200);
return this.producer.produce();
}

private async sleep(ms: number) {
return new Promise((resolve) => {
setTimeout(resolve, ms);
});
}
}
Loading