Skip to content

Commit

Permalink
fix: WebSocket connections should not write if readyState is CLOSING …
Browse files Browse the repository at this point in the history
…or CLOSED. WebSocketDuplex connection should handle invalid frames.
  • Loading branch information
stevensJourney committed Jun 11, 2024
1 parent b559817 commit 4aecb92
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 74 deletions.
108 changes: 58 additions & 50 deletions packages/rsocket-websocket-server/src/WebsocketDuplexConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,12 @@ import {
FrameHandler,
Multiplexer,
Outbound,
serializeFrame,
} from "rsocket-core";
import { Duplex } from "stream";

export class WebsocketDuplexConnection
extends Deferred
implements DuplexConnection, Outbound
{
serializeFrame
} from 'rsocket-core';
import { Duplex } from 'stream';
import WebSocket from 'ws';

export class WebsocketDuplexConnection extends Deferred implements DuplexConnection, Outbound {
readonly multiplexerDemultiplexer: Multiplexer & Demultiplexer & FrameHandler;

constructor(
Expand All @@ -40,18 +38,16 @@ export class WebsocketDuplexConnection
multiplexerDemultiplexerFactory: (
frame: Frame,
outbound: Outbound & Closeable
) => Multiplexer & Demultiplexer & FrameHandler
) => Multiplexer & Demultiplexer & FrameHandler,
private rawSocket: WebSocket.WebSocket
) {
super();

websocketDuplex.on("close", this.handleClosed);
websocketDuplex.on("error", this.handleError);
websocketDuplex.on("data", this.handleMessage);
websocketDuplex.on('close', this.handleClosed);
websocketDuplex.on('error', this.handleError);
websocketDuplex.on('data', this.handleMessage);

this.multiplexerDemultiplexer = multiplexerDemultiplexerFactory(
frame,
this
);
this.multiplexerDemultiplexer = multiplexerDemultiplexerFactory(frame, this);
}

get availability(): number {
Expand All @@ -77,32 +73,40 @@ export class WebsocketDuplexConnection
return;
}

// if (__DEV__) {
// if (this._options.debug) {
// console.log(printFrame(frame));
// }
// }
const buffer =
/* this._options.lengthPrefixedFrames
? serializeFrameWithLength(frame, this._encoders)
:*/ serializeFrame(frame);
// if (!this._socket) {
// throw new Error(
// "RSocketWebSocketClient: Cannot send frame, not connected."
// );
// }
this.websocketDuplex.write(buffer);
try {
// if (__DEV__) {
// if (this._options.debug) {
// console.log(printFrame(frame));
// }
// }
const buffer =
/* this._options.lengthPrefixedFrames
? serializeFrameWithLength(frame, this._encoders)
:*/ serializeFrame(frame);
// if (!this._socket) {
// throw new Error(
// "RSocketWebSocketClient: Cannot send frame, not connected."
// );
// }

// Work around for this issue
// https://github.com/websockets/ws/issues/1515
if (this.rawSocket.readyState == this.rawSocket.CLOSING || this.rawSocket.readyState == this.rawSocket.CLOSED) {
this.close(new Error('WebSocket is closing'));
return;
}

this.websocketDuplex.write(buffer);
} catch (ex) {
this.close(new Error(ex.reason || `Could not write to WebSocket duplex connection: ${ex}`));
}
}

private handleClosed = (e: CloseEvent): void => {
this.close(
new Error(
e.reason || "WebsocketDuplexConnection: Socket closed unexpectedly."
)
);
private handleClosed = (e: WebSocket.CloseEvent): void => {
this.close(new Error(e.reason || 'WebsocketDuplexConnection: Socket closed unexpectedly.'));
};

private handleError = (e: ErrorEvent): void => {
private handleError = (e: WebSocket.ErrorEvent): void => {
this.close(e.error);
};

Expand All @@ -125,23 +129,27 @@ export class WebsocketDuplexConnection

static create(
socket: Duplex,
connectionAcceptor: (
frame: Frame,
connection: DuplexConnection
) => Promise<void>,
connectionAcceptor: (frame: Frame, connection: DuplexConnection) => Promise<void>,
multiplexerDemultiplexerFactory: (
frame: Frame,
outbound: Outbound & Closeable
) => Multiplexer & Demultiplexer & FrameHandler
) => Multiplexer & Demultiplexer & FrameHandler,
rawSocket: WebSocket.WebSocket
): void {
// TODO: timeout on no data?
socket.once("data", async (buffer) => {
const frame = deserializeFrame(buffer);
const connection = new WebsocketDuplexConnection(
socket,
frame,
multiplexerDemultiplexerFactory
);
socket.once('data', async (buffer) => {
let frame: Frame | undefined = undefined;
try {
frame = deserializeFrame(buffer);
if (!frame) {
throw new Error(`Unable to deserialize frame`);
}
} catch (ex) {
// The initial frame should always be parsable
return socket.end();
}

const connection = new WebsocketDuplexConnection(socket, frame, multiplexerDemultiplexerFactory, rawSocket);
if (connection.done) {
return;
}
Expand Down
37 changes: 15 additions & 22 deletions packages/rsocket-websocket-server/src/WebsocketServerTransport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ import {
FrameHandler,
Multiplexer,
Outbound,
ServerTransport,
} from "rsocket-core";
import WebSocket, { Server } from "ws";
import { WebsocketDuplexConnection } from "./WebsocketDuplexConnection";
ServerTransport
} from 'rsocket-core';
import WebSocket, { Server } from 'ws';
import { WebsocketDuplexConnection } from './WebsocketDuplexConnection';

export type SocketFactory = (options: SocketOptions) => Server;

Expand All @@ -43,7 +43,7 @@ export type ServerOptions = SocketOptions & {
const defaultFactory: SocketFactory = (options: SocketOptions) => {
return new Server({
host: options.host,
port: options.port,
port: options.port
});
};

Expand All @@ -59,10 +59,7 @@ export class WebsocketServerTransport implements ServerTransport {
}

async bind(
connectionAcceptor: (
frame: Frame,
connection: DuplexConnection
) => Promise<void>,
connectionAcceptor: (frame: Frame, connection: DuplexConnection) => Promise<void>,
multiplexerDemultiplexerFactory: (
frame: Frame,
outbound: Outbound & Closeable
Expand All @@ -72,22 +69,18 @@ export class WebsocketServerTransport implements ServerTransport {
const serverCloseable = new ServerCloseable(websocketServer);

const connectionListener = (websocket: WebSocket) => {
websocket.binaryType = "nodebuffer";
websocket.binaryType = 'nodebuffer';
const duplex = WebSocket.createWebSocketStream(websocket);
WebsocketDuplexConnection.create(
duplex,
connectionAcceptor,
multiplexerDemultiplexerFactory
);
WebsocketDuplexConnection.create(duplex, connectionAcceptor, multiplexerDemultiplexerFactory, websocket);
};

const closeListener = (error?: Error) => {
serverCloseable.close(error);
};

websocketServer.addListener("connection", connectionListener);
websocketServer.addListener("close", closeListener);
websocketServer.addListener("error", closeListener);
websocketServer.addListener('connection', connectionListener);
websocketServer.addListener('close', closeListener);
websocketServer.addListener('error', closeListener);

return serverCloseable;
}
Expand All @@ -96,16 +89,16 @@ export class WebsocketServerTransport implements ServerTransport {
return new Promise((resolve, reject) => {
const websocketServer = this.factory({
host: this.host,
port: this.port,
port: this.port
});

const earlyCloseListener = (error?: Error) => {
reject(error);
};

websocketServer.addListener("close", earlyCloseListener);
websocketServer.addListener("error", earlyCloseListener);
websocketServer.addListener("listening", () => resolve(websocketServer));
websocketServer.addListener('close', earlyCloseListener);
websocketServer.addListener('error', earlyCloseListener);
websocketServer.addListener('listening', () => resolve(websocketServer));
});
}
}
Expand Down
5 changes: 3 additions & 2 deletions packages/rsocket-websocket-server/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
* limitations under the License.
*/

"use strict";
'use strict';

export * from "./WebsocketServerTransport";
export * from './WebsocketServerTransport';
export * from './WebsocketDuplexConnection';

0 comments on commit 4aecb92

Please sign in to comment.