Skip to content

Commit

Permalink
fix(websockets): unable to send messages to clients (#115)
Browse files Browse the repository at this point in the history
Now that each `cloud.Function` executes in it's own process, it is impossible to use a global WebSockets server to send messages.

Instead, starts a small local http server that sends the messages across process boundaries.
  • Loading branch information
eladb authored Mar 12, 2024
1 parent f646608 commit 4d4fdc8
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 47 deletions.
14 changes: 0 additions & 14 deletions websockets/inflight/websocket.sim.js

This file was deleted.

21 changes: 12 additions & 9 deletions websockets/lib.test.w
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ wb.onMessage(inflight (id: str, body: str): void => {
});

interface IWebSocketJS {
inflight on(cmd: str, handler: inflight(str):void): void;
inflight on(cmd: str, handler: inflight(str): void): void;
inflight send(e: str): void;
inflight close(): void;
}
Expand Down Expand Up @@ -70,7 +70,11 @@ let receiver = new cloud.Service(inflight () => {
ws.on("close", () => {
log("close socket (receiver)");
});
}, autoStart: false) as "receive message";

return () => {
ws.close();
};
}) as "receive message";

let sender = new cloud.Service(inflight () => {
let ws = Util._ws(wb.url);
Expand Down Expand Up @@ -99,19 +103,18 @@ let sender = new cloud.Service(inflight () => {
ws.on("close", () => {
log("close socket (sender)");
});
}, autoStart: false) as "send message";

return () => {
ws.close();
};
}) as "send message";

test "simple websocket test" {
receiver.start();
assert(receiver.started());

sender.start();
assert(sender.started());

log("waiting 10s");
util.sleep(10s);

sender.stop();
receiver.stop();

assert(counter.peek() == 10);
}
8 changes: 3 additions & 5 deletions websockets/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@winglibs/websockets",
"description": "WebSocket library for Wing",
"version": "0.2.3",
"version": "0.3.0",
"repository": {
"type": "git",
"url": "https://github.com/winglang/winglibs.git",
Expand All @@ -13,10 +13,8 @@
"name": "Marcio Cruz de Almeida"
},
"peerDependencies": {
"cdktf": "^0.19.1",
"winglang": "^0.54.39"
},
"dependencies": {
"cdktf": "*",
"winglang": "*",
"@aws-sdk/client-apigatewaymanagementapi": "^3.451.0",
"@cdktf/provider-aws": "^18.0.5",
"aws-cdk-lib": "^2.109.0",
Expand Down
18 changes: 13 additions & 5 deletions websockets/platform/sim.w
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
bring cloud;
bring util;
bring sim;
bring http;
bring "../commons/api.w" as api;

interface StartWebSocketApiResult {
inflight close(): inflight(): void;
inflight url(): str;
inflight local(): str;
}

pub class WebSocket_sim impl api.IWebSocket {
Expand All @@ -14,6 +16,7 @@ pub class WebSocket_sim impl api.IWebSocket {
var messageFn: inflight(str, str): void;
state: sim.State;
urlStateKey: str;
localStateKey: str;

pub url: str;

Expand All @@ -23,10 +26,13 @@ pub class WebSocket_sim impl api.IWebSocket {
this.messageFn = inflight () => {};
this.state = new sim.State();
this.urlStateKey = "url";
this.localStateKey = "local";

this.url = this.state.token(this.urlStateKey);
new cloud.Service(inflight () => {
let res = WebSocket_sim._startWebSocketApi(this.connectFn, this.disconnectFn, this.messageFn);
this.state.set(this.urlStateKey, res.url());
this.state.set(this.localStateKey, res.local());
return () => {
res.close();
};
Expand All @@ -36,9 +42,11 @@ pub class WebSocket_sim impl api.IWebSocket {
pub onConnect(handler: inflight(str): void): void {
this.connectFn = handler;
}

pub onDisconnect(handler: inflight(str): void): void {
this.disconnectFn = handler;
}

pub onMessage(handler: inflight(str, str): void): void {
this.messageFn = handler;
}
Expand All @@ -49,11 +57,11 @@ pub class WebSocket_sim impl api.IWebSocket {
onmessageFn: inflight (str, str): void,
): StartWebSocketApiResult;

extern "../inflight/websocket.sim.js" static inflight _sendMessage(
connectionId: str,
message: str,
): inflight(): void;
pub inflight sendMessage(connectionId: str, message: str) {
WebSocket_sim._sendMessage(connectionId, message);
let localUrl = this.state.get(this.localStateKey).asStr();
http.post(localUrl, body: Json.stringify({
connectionId: connectionId,
message: message
}));
}
}
57 changes: 43 additions & 14 deletions websockets/platform/sim/wb.js
Original file line number Diff line number Diff line change
@@ -1,15 +1,10 @@
import { WebSocketServer } from "ws";
import getPort from "get-port";
import * as http from "http";

export const _startWebSocketApi = async (
onConnect,
onDisconnect,
onMessage) => {

export const _startWebSocketApi = async (onConnect, onDisconnect, onMessage) => {
const port = await getPort();
// const port = Math.floor(Math.random() * 1000 + 3000);
const wss = new WebSocketServer({ port });
global.wss = wss;

wss.on('connection', function connection(ws) {
ws.id = Date.now().toString().slice(-6);
Expand All @@ -26,11 +21,45 @@ export const _startWebSocketApi = async (
onConnect(ws.id);
});

return {
close: () => {
console.log("closing...");
wss.close();
},
url: () => `ws://127.0.0.1:${port}`
}
// This is a local HTTP server that will be used to send messages to a client
const server = http.createServer((req, res) => {
const body = [];
req.on('data', chunk => body.push(chunk)).on('end', () => {
try {
const jsonBody = JSON.parse(Buffer.concat(body).toString());
const { connectionId, message } = jsonBody;
if (!connectionId) {
console.error("No connectionId in body:", body);
res.statusCode = 400;
return res.end();
}

for (const ws of wss.clients) {
if (ws.id === connectionId) {
ws.send(message)
}
}

return res.end();
} catch (e) {
console.error("Failed to parse body as JSON:", body, e.message);
res.statusCode = 400;
return res.end();
}
});
});

return new Promise((ok, ko) => {
server.listen(() => {
return ok({
close: () => {
console.log("closing...");
wss.close();
server.close();
},
url: () => `ws://127.0.0.1:${port}`,
local: () => `http://127.0.0.1:${server.address().port}`,
});
});
});
};

0 comments on commit 4d4fdc8

Please sign in to comment.