From 48d122a88dfad74f908addafcc772b90acba2b83 Mon Sep 17 00:00:00 2001 From: Kevin Viglucci Date: Sat, 8 Apr 2023 13:14:48 -0500 Subject: [PATCH] chore: add additional examples (#258) * chore: add nvmrc file * chore: add tcp request stream example * chore: add client-only composite metadata routing example * chore: fix comment typos in various examples --- .nvmrc | 1 + packages/rsocket-examples/package.json | 2 + .../ClientCompositeMetadataRouteExample.ts | 138 +++++++++++++++ .../ClientServerRequestStreamExampleTCP.ts | 167 ++++++++++++++++++ ...ientServerRequestStreamExampleWebSocket.ts | 2 +- 5 files changed, 309 insertions(+), 1 deletion(-) create mode 100644 .nvmrc create mode 100644 packages/rsocket-examples/src/ClientCompositeMetadataRouteExample.ts create mode 100644 packages/rsocket-examples/src/ClientServerRequestStreamExampleTCP.ts diff --git a/.nvmrc b/.nvmrc new file mode 100644 index 0000000..e44a38e --- /dev/null +++ b/.nvmrc @@ -0,0 +1 @@ +v18.12.1 diff --git a/packages/rsocket-examples/package.json b/packages/rsocket-examples/package.json index 16b849e..d38c079 100644 --- a/packages/rsocket-examples/package.json +++ b/packages/rsocket-examples/package.json @@ -12,6 +12,8 @@ "start-client-request-channel": "ts-node -r tsconfig-paths/register src/ClientRequestChannelExample.ts", "start-client-server-request-channel-resume": "ts-node -r tsconfig-paths/register src/ClientServerRequestChannelResumeExample.ts", "start-client-request-fnf-with-lease": "ts-node -r tsconfig-paths/register src/ClienRequestFnfnWithLeaseExampleTcp.ts", + "start-client-composite-metadata-route": "ts-node -r tsconfig-paths/register src/ClientCompositeMetadataRouteExample.ts", + "start-client-server-request-stream-tcp": "ts-node -r tsconfig-paths/register src/ClientServerRequestStreamExampleTCP.ts", "start-client-server-request-stream-websocket": "ts-node -r tsconfig-paths/register src/ClientServerRequestStreamExampleWebSocket.ts", "start-client-server-request-response-tcp": "ts-node -r tsconfig-paths/register src/ClientServerRequestResponseExampleTcp.ts", "start-client-server-request-response-websocket": "ts-node -r tsconfig-paths/register src/ClientServerRequestResponseExampleWebSocket.ts", diff --git a/packages/rsocket-examples/src/ClientCompositeMetadataRouteExample.ts b/packages/rsocket-examples/src/ClientCompositeMetadataRouteExample.ts new file mode 100644 index 0000000..b2e6e04 --- /dev/null +++ b/packages/rsocket-examples/src/ClientCompositeMetadataRouteExample.ts @@ -0,0 +1,138 @@ +import { RSocket, RSocketConnector } from "@rsocket/core"; +import { TcpClientTransport } from "@rsocket/transport-tcp-client"; +import { + encodeCompositeMetadata, + encodeRoute, + WellKnownMimeType, +} from "@rsocket/composite-metadata"; +import Logger from "./shared/logger"; +import { exit } from "process"; +import MESSAGE_RSOCKET_ROUTING = WellKnownMimeType.MESSAGE_RSOCKET_ROUTING; +import MESSAGE_RSOCKET_COMPOSITE_METADATA = WellKnownMimeType.MESSAGE_RSOCKET_COMPOSITE_METADATA; + +/** + * This example assumes you have a RSocket server running on 127.0.0.1:9000 that will respond + * to requests at the following routes: + * - login (requestResponse) + * - message (requestResponse) + * - messages.incoming (requestStream) + */ + +function makeConnector() { + const connectorConnectionOptions = { + host: "127.0.0.1", + port: 9000, + }; + console.log( + `Creating connector to ${JSON.stringify(connectorConnectionOptions)}` + ); + return new RSocketConnector({ + setup: { + metadataMimeType: MESSAGE_RSOCKET_COMPOSITE_METADATA.string, + }, + transport: new TcpClientTransport({ + connectionOptions: connectorConnectionOptions, + }), + }); +} + +function createRoute(route?: string) { + let compositeMetaData = undefined; + if (route) { + const encodedRoute = encodeRoute(route); + + const map = new Map(); + map.set(MESSAGE_RSOCKET_ROUTING, encodedRoute); + compositeMetaData = encodeCompositeMetadata(map); + } + return compositeMetaData; +} + +async function requestResponse(rsocket: RSocket, route: string, data: string) { + console.log(`Executing requestResponse: ${JSON.stringify({ route, data })}`); + return new Promise((resolve, reject) => { + return rsocket.requestResponse( + { + data: Buffer.from(data), + metadata: createRoute(route), + }, + { + onError: (e) => { + reject(e); + }, + onNext: (payload, isComplete) => { + Logger.info( + `requestResponse onNext payload[data: ${payload.data}; metadata: ${payload.metadata}]|${isComplete}` + ); + resolve(payload); + }, + onComplete: () => { + Logger.info(`requestResponse onComplete`); + resolve(null); + }, + onExtension: () => {}, + } + ); + }); +} + +async function main() { + const connector = makeConnector(); + const rsocket = await connector.connect(); + + await requestResponse(rsocket, "login", "user1"); + + await requestResponse( + rsocket, + "message", + JSON.stringify({ user: "user1", content: "a message" }) + ); + + await new Promise((resolve, reject) => { + let payloadsReceived = 0; + const maxPayloads = 10; + const requester = rsocket.requestStream( + { + data: Buffer.from("Hello World"), + metadata: createRoute("messages.incoming"), + }, + 3, + { + onError: (e) => reject(e), + onNext: (payload, isComplete) => { + Logger.info( + `[client] payload[data: ${payload.data}; metadata: ${payload.metadata}]|isComplete: ${isComplete}` + ); + + payloadsReceived++; + + // request 5 more payloads every 5th payload, until a max total payloads received + if (payloadsReceived % 2 == 0 && payloadsReceived < maxPayloads) { + requester.request(2); + } else if (payloadsReceived >= maxPayloads) { + requester.cancel(); + setTimeout(() => { + resolve(null); + }); + } + + if (isComplete) { + resolve(null); + } + }, + onComplete: () => { + Logger.info(`requestStream onComplete`); + resolve(null); + }, + onExtension: () => {}, + } + ); + }); +} + +main() + .then(() => exit()) + .catch((error: Error) => { + console.error(error); + exit(1); + }); diff --git a/packages/rsocket-examples/src/ClientServerRequestStreamExampleTCP.ts b/packages/rsocket-examples/src/ClientServerRequestStreamExampleTCP.ts new file mode 100644 index 0000000..0b8551e --- /dev/null +++ b/packages/rsocket-examples/src/ClientServerRequestStreamExampleTCP.ts @@ -0,0 +1,167 @@ +/* + * Copyright 2021-2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { + OnExtensionSubscriber, + OnNextSubscriber, + OnTerminalSubscriber, + Payload, + RSocketConnector, + RSocketServer, +} from "@rsocket/core"; +import { WebsocketClientTransport } from "@rsocket/transport-websocket-client"; +import { WebsocketServerTransport } from "@rsocket/transport-websocket-server"; +import { exit } from "process"; +import WebSocket from "ws"; +import Logger from "./shared/logger"; +import { TcpServerTransport } from "@rsocket/transport-tcp-server"; +import { TcpClientTransport } from "@rsocket/transport-tcp-client"; + +function makeServer() { + return new RSocketServer({ + transport: new TcpServerTransport({ + listenOptions: { + port: 9090, + host: "127.0.0.1", + }, + }), + acceptor: { + accept: async () => { + return { + requestStream: ( + payload: Payload, + initialRequestN, + responderStream: OnTerminalSubscriber & + OnNextSubscriber & + OnExtensionSubscriber + ) => { + Logger.info( + `[server] requestStream payload[data: ${payload.data}; metadata: ${payload.metadata}]|initialRequestN: ${initialRequestN}` + ); + + let interval = null; + let requestedResponses = initialRequestN; + let sentResponses = 0; + + // simulate async data with interval + interval = setInterval(() => { + sentResponses++; + let isComplete = sentResponses >= requestedResponses; + responderStream.onNext( + { + data: Buffer.from(new Date()), + metadata: undefined, + }, + isComplete + ); + if (isComplete) { + clearInterval(interval); + } + }, 750); + + return { + cancel() { + Logger.info("[server] stream cancelled by client"); + clearInterval(interval); + }, + request(n) { + requestedResponses += n; + Logger.info( + `[server] request n: ${n}, requestedResponses: ${requestedResponses}, sentResponses: ${sentResponses}` + ); + }, + onExtension: () => {}, + }; + }, + }; + }, + }, + }); +} + +function makeConnector() { + const connectorConnectionOptions = { + host: "127.0.0.1", + port: 9090, + }; + console.log( + `Creating connector to ${JSON.stringify(connectorConnectionOptions)}` + ); + return new RSocketConnector({ + transport: new TcpClientTransport({ + connectionOptions: connectorConnectionOptions, + }), + }); +} + +let serverCloseable; + +async function main() { + const server = makeServer(); + const connector = makeConnector(); + + serverCloseable = await server.bind(); + const rsocket = await connector.connect(); + + await new Promise((resolve, reject) => { + let payloadsReceived = 0; + const maxPayloads = 10; + const requester = rsocket.requestStream( + { + data: Buffer.from("Hello World"), + }, + 3, + { + onError: (e) => reject(e), + onNext: (payload, isComplete) => { + Logger.info( + `[client] payload[data: ${payload.data}; metadata: ${payload.metadata}]|isComplete: ${isComplete}` + ); + + payloadsReceived++; + + // request 5 more payloads every 5th payload, until a max total payloads received + if (payloadsReceived % 2 == 0 && payloadsReceived < maxPayloads) { + requester.request(2); + } else if (payloadsReceived >= maxPayloads) { + requester.cancel(); + setTimeout(() => { + resolve(null); + }); + } + + if (isComplete) { + resolve(null); + } + }, + onComplete: () => { + resolve(null); + }, + onExtension: () => {}, + } + ); + }); +} + +main() + .then(() => exit()) + .catch((error: Error) => { + console.error(error); + exit(1); + }) + .finally(() => { + serverCloseable?.close(); + }); diff --git a/packages/rsocket-examples/src/ClientServerRequestStreamExampleWebSocket.ts b/packages/rsocket-examples/src/ClientServerRequestStreamExampleWebSocket.ts index d45be0a..ed1cca7 100644 --- a/packages/rsocket-examples/src/ClientServerRequestStreamExampleWebSocket.ts +++ b/packages/rsocket-examples/src/ClientServerRequestStreamExampleWebSocket.ts @@ -126,7 +126,7 @@ async function main() { payloadsReceived++; - // request 5 more payloads event 5th payload, until a max total payloads received + // request 5 more payloads every 5th payload, until a max total payloads received if (payloadsReceived % 2 == 0 && payloadsReceived < maxPayloads) { requester.request(2); } else if (payloadsReceived >= maxPayloads) {