Skip to content

Commit

Permalink
adds support for flowable to async iterable conversion
Browse files Browse the repository at this point in the history
Signed-off-by: Oleh Dokuka <[email protected]>
  • Loading branch information
OlegDokuka committed Nov 1, 2020
1 parent 8efe639 commit 013f048
Show file tree
Hide file tree
Showing 8 changed files with 1,018 additions and 434 deletions.
2 changes: 1 addition & 1 deletion .flowconfig
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,4 @@ suppress_type=$FlowFixMe
suppress_type=$FixMe

[version]
^0.134.0
^0.136.0
36 changes: 19 additions & 17 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,41 +32,43 @@
"packages/*"
],
"devDependencies": {
"@babel/cli": "^7.11.6",
"@babel/core": "^7.11.6",
"@babel/cli": "^7.12.1",
"@babel/core": "^7.12.3",
"babel-jest": "^26.6.0",
"babel-eslint": "^10.1.0",
"babel-plugin-minify-replace": "^0.5.0",
"@babel/plugin-transform-async-to-generator": "^7.10.4",
"@babel/plugin-proposal-class-properties": "^7.10.4",
"@babel/plugin-transform-modules-commonjs": "^7.10.4",
"@babel/plugin-transform-flow-strip-types": "^7.10.4",
"@babel/plugin-proposal-object-rest-spread": "^7.11.0",
"@babel/plugin-transform-async-to-generator": "^7.12.1",
"@babel/plugin-proposal-async-generator-functions": "^7.12.1",
"@babel/plugin-proposal-class-properties": "^7.12.1",
"@babel/plugin-transform-modules-commonjs": "^7.12.1",
"@babel/plugin-transform-flow-strip-types": "^7.12.1",
"@babel/plugin-proposal-object-rest-spread": "^7.12.1",
"@babel/plugin-transform-runtime": "^7.11.5",
"@babel/polyfill": "^7.11.5",
"@babel/polyfill": "^7.12.1",
"@babel/runtime": "^7.12.1",
"babel-preset-fbjs": "^3.3.0",
"@babel/runtime": "^7.11.2",
"buffer": "^5.6.0",
"chalk": "^4.1.0",
"cross-env": "^7.0.2",
"eslint": "^7.10.0",
"eslint": "^7.11.0",
"eslint-config-fb-strict": "^26.0.0",
"eslint-plugin-babel": "^5.3.1",
"eslint-plugin-flowtype": "^5.2.0",
"eslint-plugin-jasmine": "^4.1.1",
"eslint-plugin-jsx-a11y": "^6.3.1",
"eslint-plugin-prefer-object-spread": "^1.2.1",
"eslint-plugin-react": "^7.21.2",
"eslint-plugin-react": "^7.21.5",
"eslint-plugin-relay": "^1.8.1",
"eslint-plugin-jest": "^24.0.2",
"fbjs": "^2.0.0",
"fbjs-scripts": "^2.0.0",
"flow-bin": "^0.134.0",
"eslint-plugin-jest": "^24.1.0",
"fbjs": "^3.0.0",
"fbjs-scripts": "^3.0.0",
"flow-bin": "^0.136.0",
"glob": "^7.1.6",
"jest": "^26.4.2",
"jest": "^26.6.0",
"lerna": "^3.22.1",
"object-assign": "^4.1.1",
"prettier": "2.1.2",
"rollup": "^2.28.2",
"rollup": "^2.32.1",
"@rollup/plugin-babel": "^5.2.1",
"@rollup/plugin-commonjs": "^15.1.0",
"@rollup/plugin-node-resolve": "^9.0.0",
Expand Down
14 changes: 8 additions & 6 deletions packages/rsocket-flowable/src/Flowable.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import type {

import FlowableMapOperator from './FlowableMapOperator';
import FlowableTakeOperator from './FlowableTakeOperator';
import FlowableAsyncIterable from './FlowableAsyncIterable';

import invariant from 'fbjs/lib/invariant';
import warning from 'fbjs/lib/warning';
Expand Down Expand Up @@ -117,6 +118,10 @@ export default class Flowable<T> implements IPublisher<T> {
);
}

toAsyncIterable(prefetch: number = 256): AsyncIterable<T> {
return new FlowableAsyncIterable<T>(this, prefetch);
}

_wrapCallback(callback: (T) => void): IPartialSubscriber<T> {
const max = this._max;
return {
Expand Down Expand Up @@ -255,17 +260,14 @@ class FlowableSubscriber<T> implements ISubscriber<T> {

_request = (n: number) => {
invariant(
Number.isInteger(n) && n >= 1 && n <= this._max,
'Flowable: Expected request value to be an integer with a ' +
'value greater than 0 and less than or equal to %s, got ' +
'`%s`.',
this._max,
Number.isInteger(n) && n >= 1,
'Flowable: Expected request value to be an integer with a value greater than 0, got `%s`.',
n,
);
if (!this._active) {
return;
}
if (n === this._max) {
if (n >= this._max) {
this._pending = this._max;
} else {
this._pending += n;
Expand Down
153 changes: 153 additions & 0 deletions packages/rsocket-flowable/src/FlowableAsyncIterable.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
/** Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* @flow
*/

'use strict';

import type {ISubscriber, ISubscription} from 'rsocket-types';
import Flowable from './Flowable';

// $FlowFixMe
export default class FlowableAsyncIterable<T> implements AsyncIterable<T> {
_source: Flowable<T>;
_prefetch: number;

constructor(source: Flowable<T>, prefetch: number = 256) {
this._source = source;
this._prefetch = prefetch;
}

asyncIterator(): AsyncIterator<T> {
const asyncIteratorSubscriber = new AsyncIteratorSubscriber(this._prefetch);
this._source.subscribe(asyncIteratorSubscriber);
return asyncIteratorSubscriber;
}

// $FlowFixMe
[Symbol.asyncIterator](): AsyncIterator<T> {
return this.asyncIterator();
}
}

// $FlowFixMe
class AsyncIteratorSubscriber<T> implements ISubscriber<T>, AsyncIterator<T> {
_values: T[];
_prefetch: number;
_limit: number;

_subscription: ISubscription;

_produced: number;

_done: boolean;
_error: Error;

_resolve: ?(
result: Promise<IteratorResult<T, void>> | IteratorResult<T, void>,
) => void;
_reject: ?(reason?: any) => void;

constructor(prefetch: number = 256) {
this._prefetch = prefetch;
this._values = [];
this._limit =
prefetch === Number.MAX_SAFE_INTEGER
? Number.MAX_SAFE_INTEGER
: prefetch - (prefetch >> 2);
this._produced = 0;
}

onSubscribe(subscription: ISubscription): void {
this._subscription = subscription;
subscription.request(this._prefetch);
}

onNext(value: T): void {
const resolve = this._resolve;
if (resolve) {
this._resolve = undefined;
this._reject = undefined;

if (++this._produced === this._limit) {
this._produced = 0;
this._subscription.request(this._limit);
}

resolve({done: false, value});
return;
}

this._values.push(value);
}

onComplete(): void {
this._done = true;

const resolve = this._resolve;
if (resolve) {
this._resolve = undefined;
this._reject = undefined;

resolve({done: true});
}
}

onError(error: Error): void {
this._done = true;
this._error = error;

const reject = this._reject;
if (reject) {
this._resolve = undefined;
this._reject = undefined;

reject(error);
}
}

next(): Promise<IteratorResult<T, void>> {
const value = this._values.shift();
if (value) {
if (++this._produced === this._limit) {
this._produced = 0;
this._subscription.request(this._limit);
}

return Promise.resolve({done: false, value});
} else if (this._done) {
if (this._error) {
return Promise.reject(this._error);
} else {
return Promise.resolve({done: true});
}
} else {
return new Promise((resolve, reject) => {
this._resolve = resolve;
this._reject = reject;
});
}
}

return(): Promise<IteratorResult<T, void>> {
this._subscription.cancel();
return Promise.resolve({done: true});
}

// $FlowFixMe
[Symbol.asyncIterator](): AsyncIterator<T> {
return this;
}
}
Loading

0 comments on commit 013f048

Please sign in to comment.