From 4d4fdc8211813951edc061a07911fdda0d297feb Mon Sep 17 00:00:00 2001 From: Elad Ben-Israel Date: Tue, 12 Mar 2024 13:12:37 +0200 Subject: [PATCH] fix(websockets): unable to send messages to clients (#115) Now that each `cloud.Function` executes in it's own process, it is impossible to use a global WebSockets server to send messages. Instead, starts a small local http server that sends the messages across process boundaries. --- websockets/inflight/websocket.sim.js | 14 ------- websockets/lib.test.w | 21 +++++----- websockets/package.json | 8 ++-- websockets/platform/sim.w | 18 ++++++--- websockets/platform/sim/wb.js | 57 +++++++++++++++++++++------- 5 files changed, 71 insertions(+), 47 deletions(-) delete mode 100644 websockets/inflight/websocket.sim.js diff --git a/websockets/inflight/websocket.sim.js b/websockets/inflight/websocket.sim.js deleted file mode 100644 index 3e387b15..00000000 --- a/websockets/inflight/websocket.sim.js +++ /dev/null @@ -1,14 +0,0 @@ -export const _sendMessage = (connectionId, message) => { - let wss = global.wss; - if (!wss) { - return; - } - - wss.clients.forEach((ws) => { - if (ws.id !== connectionId) { - return; - } - - ws.send(message) - }); -} diff --git a/websockets/lib.test.w b/websockets/lib.test.w index 52105066..44cfbefd 100644 --- a/websockets/lib.test.w +++ b/websockets/lib.test.w @@ -37,7 +37,7 @@ wb.onMessage(inflight (id: str, body: str): void => { }); interface IWebSocketJS { - inflight on(cmd: str, handler: inflight(str):void): void; + inflight on(cmd: str, handler: inflight(str): void): void; inflight send(e: str): void; inflight close(): void; } @@ -70,7 +70,11 @@ let receiver = new cloud.Service(inflight () => { ws.on("close", () => { log("close socket (receiver)"); }); -}, autoStart: false) as "receive message"; + + return () => { + ws.close(); + }; +}) as "receive message"; let sender = new cloud.Service(inflight () => { let ws = Util._ws(wb.url); @@ -99,19 +103,18 @@ let sender = new cloud.Service(inflight () => { ws.on("close", () => { log("close socket (sender)"); }); -}, autoStart: false) as "send message"; + + return () => { + ws.close(); + }; +}) as "send message"; test "simple websocket test" { - receiver.start(); assert(receiver.started()); - - sender.start(); assert(sender.started()); + log("waiting 10s"); util.sleep(10s); - sender.stop(); - receiver.stop(); - assert(counter.peek() == 10); } diff --git a/websockets/package.json b/websockets/package.json index eb8d8900..a2f9e223 100644 --- a/websockets/package.json +++ b/websockets/package.json @@ -1,7 +1,7 @@ { "name": "@winglibs/websockets", "description": "WebSocket library for Wing", - "version": "0.2.3", + "version": "0.3.0", "repository": { "type": "git", "url": "https://github.com/winglang/winglibs.git", @@ -13,10 +13,8 @@ "name": "Marcio Cruz de Almeida" }, "peerDependencies": { - "cdktf": "^0.19.1", - "winglang": "^0.54.39" - }, - "dependencies": { + "cdktf": "*", + "winglang": "*", "@aws-sdk/client-apigatewaymanagementapi": "^3.451.0", "@cdktf/provider-aws": "^18.0.5", "aws-cdk-lib": "^2.109.0", diff --git a/websockets/platform/sim.w b/websockets/platform/sim.w index ac1916f4..02052b43 100644 --- a/websockets/platform/sim.w +++ b/websockets/platform/sim.w @@ -1,11 +1,13 @@ bring cloud; bring util; bring sim; +bring http; bring "../commons/api.w" as api; interface StartWebSocketApiResult { inflight close(): inflight(): void; inflight url(): str; + inflight local(): str; } pub class WebSocket_sim impl api.IWebSocket { @@ -14,6 +16,7 @@ pub class WebSocket_sim impl api.IWebSocket { var messageFn: inflight(str, str): void; state: sim.State; urlStateKey: str; + localStateKey: str; pub url: str; @@ -23,10 +26,13 @@ pub class WebSocket_sim impl api.IWebSocket { this.messageFn = inflight () => {}; this.state = new sim.State(); this.urlStateKey = "url"; + this.localStateKey = "local"; + this.url = this.state.token(this.urlStateKey); new cloud.Service(inflight () => { let res = WebSocket_sim._startWebSocketApi(this.connectFn, this.disconnectFn, this.messageFn); this.state.set(this.urlStateKey, res.url()); + this.state.set(this.localStateKey, res.local()); return () => { res.close(); }; @@ -36,9 +42,11 @@ pub class WebSocket_sim impl api.IWebSocket { pub onConnect(handler: inflight(str): void): void { this.connectFn = handler; } + pub onDisconnect(handler: inflight(str): void): void { this.disconnectFn = handler; } + pub onMessage(handler: inflight(str, str): void): void { this.messageFn = handler; } @@ -49,11 +57,11 @@ pub class WebSocket_sim impl api.IWebSocket { onmessageFn: inflight (str, str): void, ): StartWebSocketApiResult; - extern "../inflight/websocket.sim.js" static inflight _sendMessage( - connectionId: str, - message: str, - ): inflight(): void; pub inflight sendMessage(connectionId: str, message: str) { - WebSocket_sim._sendMessage(connectionId, message); + let localUrl = this.state.get(this.localStateKey).asStr(); + http.post(localUrl, body: Json.stringify({ + connectionId: connectionId, + message: message + })); } } diff --git a/websockets/platform/sim/wb.js b/websockets/platform/sim/wb.js index 25b51489..fbc245f7 100644 --- a/websockets/platform/sim/wb.js +++ b/websockets/platform/sim/wb.js @@ -1,15 +1,10 @@ import { WebSocketServer } from "ws"; import getPort from "get-port"; +import * as http from "http"; -export const _startWebSocketApi = async ( - onConnect, - onDisconnect, - onMessage) => { - +export const _startWebSocketApi = async (onConnect, onDisconnect, onMessage) => { const port = await getPort(); - // const port = Math.floor(Math.random() * 1000 + 3000); const wss = new WebSocketServer({ port }); - global.wss = wss; wss.on('connection', function connection(ws) { ws.id = Date.now().toString().slice(-6); @@ -26,11 +21,45 @@ export const _startWebSocketApi = async ( onConnect(ws.id); }); - return { - close: () => { - console.log("closing..."); - wss.close(); - }, - url: () => `ws://127.0.0.1:${port}` - } + // This is a local HTTP server that will be used to send messages to a client + const server = http.createServer((req, res) => { + const body = []; + req.on('data', chunk => body.push(chunk)).on('end', () => { + try { + const jsonBody = JSON.parse(Buffer.concat(body).toString()); + const { connectionId, message } = jsonBody; + if (!connectionId) { + console.error("No connectionId in body:", body); + res.statusCode = 400; + return res.end(); + } + + for (const ws of wss.clients) { + if (ws.id === connectionId) { + ws.send(message) + } + } + + return res.end(); + } catch (e) { + console.error("Failed to parse body as JSON:", body, e.message); + res.statusCode = 400; + return res.end(); + } + }); + }); + + return new Promise((ok, ko) => { + server.listen(() => { + return ok({ + close: () => { + console.log("closing..."); + wss.close(); + server.close(); + }, + url: () => `ws://127.0.0.1:${port}`, + local: () => `http://127.0.0.1:${server.address().port}`, + }); + }); + }); };