-
Notifications
You must be signed in to change notification settings - Fork 97
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: async adapter #208
base: 1.0.x-alpha
Are you sure you want to change the base?
feat: async adapter #208
Conversation
// TODO: is using `@rsocket/rxjs` as intermediary adapter a bad idea? | ||
// - considerations: | ||
// - do we lose support for backpressure that we wouldn't have otherwise? | ||
// - what is bundle size consequences of relying on `@rsocket/rxjs`? | ||
// - what is bundle size consequences of relying on `rxjs` and `rxjs-for-await` | ||
const $responderObs = RxRequestersFactory.requestChannel( | ||
$requesterObs, | ||
inputCodec, | ||
outputCodec, | ||
prefetch | ||
)(rsocket, metadata); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here is where I leveraged the Rx adapter to support requestChannel more easily, but doing so raises some questions, which I've detailed in the comment, and below:
- do we lose support for backpressure that we wouldn't have otherwise?
- what is bundle size consequences of relying on
@rsocket/rxjs
? - what is bundle size consequences of relying on
rxjs
andrxjs-for-await
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@OlegDokuka can you help out with providing an alternative async adapter implementation for requestChannel here?
const subscriberFactory = RxRespondersFactory.requestChannel( | ||
($in) => from(handler(eachValueFrom($in))), | ||
codecs, | ||
prefetch | ||
); | ||
|
||
return subscriberFactory(payload, initialRequestN, isCompleted, s); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same idea here as above, I leveraged the RX adapter to more easily provide this functionality, but not sure if it's the best idea.
export function requestStream<T, R>( | ||
handler: DefaultResponderHandlerSignature<T>, | ||
codecs: { | ||
inputCodec: Codec<T>; | ||
outputCodec: Codec<R>; | ||
} | ||
) { | ||
return Object.assign< | ||
DefaultResponderHandlerSignature<Payload>, | ||
{ requestType: FrameTypes.REQUEST_STREAM } | ||
>( | ||
(payload, initialRequestN, subscriber) => { | ||
return handler( | ||
codecs.inputCodec.decode(payload.data), | ||
initialRequestN, | ||
subscriber | ||
); | ||
}, | ||
{ requestType: FrameTypes.REQUEST_STREAM } | ||
); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As I mentioned in the last PR, I started adding a set of "default" responders to aid with my testing, as well as an additional feature. I only got as far as this single responder though, so if we don't want to land this with only a single responder, we can pick it out and loop back around to adding the rest later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is also the question of if these default request/responders should be in the messaging package or elsewhere.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO: Reviewed where I am using this default responder and its only in an example currently so lets not land this API addition with this effort.
- requester fireAndForget - requester requestResponse - requester requestStream refactor: renamed to SubscribingAsyncIterator + added more tests feat: (wip) add async responders - fireAndForget - requestResponse feat: AsyncIterable requestStream responder refactor: use rxjs observer for async iterable requestStream example feat: add requesChannel responders and requesters refactor: remove unnecessary passing of scheduler test: (wip) requester tests test: async requestResponse requesters tests test: async adapter fireAndForget requester tests refactor: apply linting fix: resolve issues from rebasing test: add tests for requestStream requester refactor: rename async package to adapter-async Signed-off-by: Kevin Viglucci <[email protected]>
Signed-off-by: Kevin Viglucci <[email protected]>
Signed-off-by: Kevin Viglucci <[email protected]>
ee225e6
to
f30d5b6
Compare
Signed-off-by: Kevin Viglucci <[email protected]>
04fca05
to
43cd833
Compare
Signed-off-by: Kevin Viglucci <[email protected]>
479727a
to
303c6f3
Compare
Adds an adapter package that supports
async/await
and async generators for the various interaction patterns.