Skip to content

Commit

Permalink
feat: (wip) async adapter
Browse files Browse the repository at this point in the history
- requester fireAndForget
- requester requestResponse
- requester requestStream

refactor: renamed to SubscribingAsyncIterator + added more tests

feat: (wip) add async responders

- fireAndForget
- requestResponse

feat: AsyncIterable requestStream responder

refactor: use rxjs observer for async iterable requestStream example

feat: add requesChannel responders and requesters

refactor: remove unnecessary passing of scheduler

test: (wip) requester tests

test: async requestResponse requesters tests

test: async adapter fireAndForget requester tests

refactor: apply linting

fix: resolve issues from rebasing

test: add tests for requestStream requester

refactor: rename async package to adapter-async
Signed-off-by: Kevin Viglucci <[email protected]>
  • Loading branch information
viglucci committed Jan 12, 2022
1 parent 72c2497 commit ea42001
Show file tree
Hide file tree
Showing 29 changed files with 3,224 additions and 22,024 deletions.
20,591 changes: 0 additions & 20,591 deletions package-lock.json

This file was deleted.

20 changes: 20 additions & 0 deletions packages/rsocket-adapter-async/jest.config.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import type { Config } from "@jest/types";
import { pathsToModuleNameMapper } from "ts-jest/utils";
import { compilerOptions } from "../../tsconfig.json";

const config: Config.InitialOptions = {
preset: "ts-jest",
testRegex: "(\\/__tests__\\/.*|\\.(test|spec))\\.(ts)$",
moduleNameMapper: pathsToModuleNameMapper(compilerOptions.paths, {
// This has to match the baseUrl defined in tsconfig.json.
prefix: "<rootDir>/../../",
}),
modulePathIgnorePatterns: [
"<rootDir>/__tests__/test-utils",
"<rootDir>/__tests__/*.d.ts",
],
collectCoverage: true,
collectCoverageFrom: ["<rootDir>/src/**/*.ts", "!**/node_modules/**"],
};

export default config;
28 changes: 28 additions & 0 deletions packages/rsocket-adapter-async/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
{
"name": "@rsocket/adapter-async",
"version": "1.0.0",
"main": "dist/index",
"types": "dist/index",
"files": [
"dist"
],
"scripts": {
"build": "yarn run clean && yarn run compile",
"clean": "rimraf -rf ./dist",
"compile": "tsc -p tsconfig.build.json",
"prepublishOnly": "yarn run build",
"test": "jest"
},
"dependencies": {
"@rsocket/composite-metadata": "^1.0.0",
"@rsocket/core": "^1.0.0",
"@rsocket/messaging": "^1.0.0",
"@rsocket/adapter-rxjs": "^1.0.0",
"rxjs": "^7.4.0",
"rxjs-for-await": "^1.0.0"
},
"devDependencies": {
"rimraf": "~3.0.2",
"typescript": "~4.5.2"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import SubscribingAsyncIterator from "../lib/SubscribingAsyncIterator";
import { mock } from "jest-mock-extended";
import {
OnExtensionSubscriber,
OnNextSubscriber,
OnTerminalSubscriber,
Requestable,
} from "@rsocket/core";
import { Codec } from "@rsocket/messaging";
import BufferingForwardingSubscriber from "../lib/BufferingForwardingSubscriber";
import { Buffer } from "buffer";

jest.useFakeTimers();

describe("BufferingForwardingSubscriber", function () {
it("forwards all received onNext calls when received before subscription", async function () {
const mockSubscriber = mock<
OnNextSubscriber & OnTerminalSubscriber & OnExtensionSubscriber
>();
const testObj = new BufferingForwardingSubscriber();

testObj.onNext({ data: Buffer.from("1") }, false);
testObj.onNext({ data: Buffer.from("2") }, false);
testObj.onNext({ data: Buffer.from("3") }, true);

testObj.subscribe(mockSubscriber);

expect(mockSubscriber.onNext).toBeCalledWith(
{ data: Buffer.from("1") },
false
);
expect(mockSubscriber.onNext).toBeCalledWith(
{ data: Buffer.from("2") },
false
);
expect(mockSubscriber.onNext).toBeCalledWith(
{ data: Buffer.from("3") },
true
);
});

it("forwards all received onNext calls when received after subscription", async function () {
const mockSubscriber = mock<
OnNextSubscriber & OnTerminalSubscriber & OnExtensionSubscriber
>();
const testObj = new BufferingForwardingSubscriber();

testObj.subscribe(mockSubscriber);

testObj.onNext({ data: Buffer.from("1") }, false);
testObj.onNext({ data: Buffer.from("2") }, false);
testObj.onNext({ data: Buffer.from("3") }, true);

expect(mockSubscriber.onNext).toBeCalledWith(
{ data: Buffer.from("1") },
false
);
expect(mockSubscriber.onNext).toBeCalledWith(
{ data: Buffer.from("2") },
false
);
expect(mockSubscriber.onNext).toBeCalledWith(
{ data: Buffer.from("3") },
true
);
});

it("forwards all received onNext calls before forwarding subsequent onComplete", async function () {
const mockSubscriber = mock<
OnNextSubscriber & OnTerminalSubscriber & OnExtensionSubscriber
>();
const testObj = new BufferingForwardingSubscriber();

testObj.subscribe(mockSubscriber);

testObj.onNext({ data: Buffer.from("1") }, false);
testObj.onNext({ data: Buffer.from("2") }, false);
testObj.onNext({ data: Buffer.from("3") }, false);
testObj.onComplete();

expect(mockSubscriber.onNext).toBeCalledWith(
{ data: Buffer.from("1") },
false
);
expect(mockSubscriber.onNext).toBeCalledWith(
{ data: Buffer.from("2") },
false
);
expect(mockSubscriber.onNext).toBeCalledWith(
{ data: Buffer.from("3") },
false
);
expect(mockSubscriber.onComplete).toBeCalledWith();
});
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,233 @@
import SubscribingAsyncIterator from "../lib/SubscribingAsyncIterator";
import { mock } from "jest-mock-extended";
import { Cancellable, Requestable } from "@rsocket/core";
import { Codec } from "@rsocket/messaging";

jest.useFakeTimers();

class StringCodec implements Codec<string> {
readonly mimeType: string = "text/plain";

decode(buffer: Buffer): string {
return buffer.toString();
}

encode(entity: string): Buffer {
return Buffer.from(entity);
}
}

describe("SubscribingAsyncIterator", function () {
it("iterates over emitted values", async function () {
let subscriber;
const subscription = mock<Requestable & Cancellable>({
request(requestN: number) {
for (let i = 0; i < requestN; i++) {
setTimeout(() => {
subscriber.onNext(
{
data: Buffer.from(`${i}`),
metadata: undefined,
},
i === requestN - 1
);
});
}
},
});
const requestSpy = jest.spyOn(subscription, "request");

const initialRequestN = 3;
subscriber = new SubscribingAsyncIterator(
subscription,
initialRequestN * 2,
new StringCodec()
);
subscription.request(initialRequestN);

jest.runAllTimers();

const values = [];
for await (const value of subscriber) {
jest.runAllTimers();
values.push(value);
}

expect(values).toStrictEqual(["0", "1", "2"]);
expect(requestSpy).toBeCalledTimes(1);
});

it("iterates over emitted values until onComplete", async function () {
let subscriber;
const subscription = mock<Requestable & Cancellable>({
request(requestN: number) {
for (let i = 0; i < requestN; i++) {
setTimeout(() => {
if (i === requestN - 1) {
subscriber.onComplete();
} else {
subscriber.onNext(
{
data: Buffer.from(`${i}`),
metadata: undefined,
},
false
);
}
});
}
},
});
const requestSpy = jest.spyOn(subscription, "request");

const initialRequestN = 3;
subscriber = new SubscribingAsyncIterator(
subscription,
initialRequestN * 2,
new StringCodec()
);
subscription.request(initialRequestN);

jest.runAllTimers();

const values = [];
for await (const value of subscriber) {
jest.runAllTimers();
values.push(value);
}

expect(values).toStrictEqual(["0", "1"]);
expect(requestSpy).toBeCalledTimes(1);
});

it("cancels when break statement reached", async function () {
let subscriber;
const subscription = mock<Requestable & Cancellable>({
request(requestN: number) {
for (let i = 0; i < requestN; i++) {
setTimeout(() => {
subscriber.onNext(
{
data: Buffer.from(`${i}`),
metadata: undefined,
},
i === requestN - 1
);
});
}
},
});
const requestSpy = jest.spyOn(subscription, "request");
const cancelSpy = jest.spyOn(subscription, "cancel");

const initialRequestN = 10;
subscriber = new SubscribingAsyncIterator(
subscription,
initialRequestN * 2,
new StringCodec()
);
subscription.request(initialRequestN);

jest.runAllTimers();

const values = [];
for await (const value of subscriber) {
if (values.length == 2) {
break;
}
jest.runAllTimers();
values.push(value);
}

expect(values).toStrictEqual(["0", "1"]);
expect(requestSpy).toBeCalledTimes(1);
expect(requestSpy).toBeCalledWith(10);
expect(cancelSpy).toBeCalledTimes(1);
});

it("ends and throws with emitted exception", async function () {
let subscriber;
const expectedError = new Error("test error");
const subscription = mock<Requestable & Cancellable>({
request(requestN: number) {
setTimeout(() => {
subscriber.onError(expectedError);
});
},
});
const requestSpy = jest.spyOn(subscription, "request");

const initialRequestN = 10;
subscriber = new SubscribingAsyncIterator(
subscription,
initialRequestN * 2,
new StringCodec()
);
subscription.request(initialRequestN);

jest.runAllTimers();

const values = [];

let capturedError;
try {
for await (const value of subscriber) {
jest.runAllTimers();
values.push(value);
}
} catch (error) {
capturedError = error;
}

expect(capturedError).toBe(expectedError);
expect(values).toStrictEqual([]);
expect(requestSpy).toBeCalledWith(10);
});

it("cancels on exception processing emitted value", async function () {
let subscriber;
const subscription = mock<Requestable & Cancellable>({
request(requestN: number) {
for (let i = 0; i < requestN; i++) {
setTimeout(() => {
subscriber.onNext(
{
data: Buffer.from(`${i}`),
metadata: undefined,
},
i === requestN - 1
);
});
}
},
});
const requestSpy = jest.spyOn(subscription, "request");
const cancelSpy = jest.spyOn(subscription, "cancel");

const initialRequestN = 10;
subscriber = new SubscribingAsyncIterator(
subscription,
initialRequestN * 2,
new StringCodec()
);
subscription.request(initialRequestN);

jest.runAllTimers();

const values = [];
try {
for await (const value of subscriber) {
if (values.length == 2) {
throw new Error("test error");
}
values.push(value);
jest.runAllTimers();
}
} catch (e) {}

expect(values).toStrictEqual(["0", "1"]);
expect(requestSpy).toBeCalledTimes(1);
expect(requestSpy).toBeCalledWith(10);
expect(cancelSpy).toBeCalledTimes(1);
});
});
Loading

0 comments on commit ea42001

Please sign in to comment.