diff --git a/lib/cluster-adapter.ts b/lib/cluster-adapter.ts index 9421397..083db18 100644 --- a/lib/cluster-adapter.ts +++ b/lib/cluster-adapter.ts @@ -45,6 +45,7 @@ export enum MessageType { SERVER_SIDE_EMIT_RESPONSE, BROADCAST_CLIENT_COUNT, BROADCAST_ACK, + ADAPTER_CLOSE, } export type ClusterMessage = { @@ -52,7 +53,10 @@ export type ClusterMessage = { nsp: string; } & ( | { - type: MessageType.INITIAL_HEARTBEAT | MessageType.HEARTBEAT; + type: + | MessageType.INITIAL_HEARTBEAT + | MessageType.HEARTBEAT + | MessageType.ADAPTER_CLOSE; } | { type: MessageType.BROADCAST; @@ -643,11 +647,21 @@ export abstract class ClusterAdapter extends Adapter { ); } +interface CustomClusterRequest { + type: MessageType; + resolve: Function; + timeout: NodeJS.Timeout; + missingUids: Set; + responses: any[]; +} + export abstract class ClusterAdapterWithHeartbeat extends ClusterAdapter { private readonly _opts: Required; private heartbeatTimer: NodeJS.Timeout; private nodesMap: Map = new Map(); // uid => timestamp of last message + private readonly cleanupTimer: NodeJS.Timeout | undefined; + private customRequests: Map = new Map(); protected constructor(nsp, opts: ClusterAdapterOptions) { super(nsp); @@ -658,9 +672,19 @@ export abstract class ClusterAdapterWithHeartbeat extends ClusterAdapter { }, opts ); + this.cleanupTimer = setInterval(() => { + const now = Date.now(); + this.nodesMap.forEach((lastSeen, uid) => { + const nodeSeemsDown = now - lastSeen > this._opts.heartbeatTimeout; + if (nodeSeemsDown) { + debug("node %s seems down", uid); + this.removeNode(uid); + } + }); + }, 1_000); } - override init(): Promise | void { + override init() { this.publish({ type: MessageType.INITIAL_HEARTBEAT, }); @@ -677,8 +701,14 @@ export abstract class ClusterAdapterWithHeartbeat extends ClusterAdapter { }, this._opts.heartbeatInterval); } - override close(): Promise | void { + override close() { + this.publish({ + type: MessageType.ADAPTER_CLOSE, + }); clearTimeout(this.heartbeatTimer); + if (this.cleanupTimer) { + clearInterval(this.cleanupTimer); + } } override async onMessage(message: ClusterMessage, offset?: string) { @@ -700,6 +730,9 @@ export abstract class ClusterAdapterWithHeartbeat extends ClusterAdapter { case MessageType.HEARTBEAT: // nothing to do break; + case MessageType.ADAPTER_CLOSE: + this.removeNode(message.uid); + break; default: super.onMessage(message, offset); } @@ -722,4 +755,178 @@ export abstract class ClusterAdapterWithHeartbeat extends ClusterAdapter { return super.publish(message); } + + override async serverSideEmit(packet: any[]) { + const withAck = typeof packet[packet.length - 1] === "function"; + + if (!withAck) { + return this.publish({ + type: MessageType.SERVER_SIDE_EMIT, + data: { + packet, + }, + }); + } + + const ack = packet.pop(); + const expectedResponseCount = this.nodesMap.size; + + debug( + 'waiting for %d responses to "serverSideEmit" request', + expectedResponseCount + ); + + if (expectedResponseCount <= 0) { + return ack(null, []); + } + + const requestId = randomId(); + + const timeout = setTimeout(() => { + const storedRequest = this.customRequests.get(requestId); + if (storedRequest) { + ack( + new Error( + `timeout reached: missing ${storedRequest.missingUids.size} responses` + ), + storedRequest.responses + ); + this.customRequests.delete(requestId); + } + }, DEFAULT_TIMEOUT); + + const storedRequest = { + type: MessageType.SERVER_SIDE_EMIT, + resolve: ack, + timeout, + missingUids: new Set([...this.nodesMap.keys()]), + responses: [], + }; + this.customRequests.set(requestId, storedRequest); + + this.publish({ + type: MessageType.SERVER_SIDE_EMIT, + data: { + requestId, // the presence of this attribute defines whether an acknowledgement is needed + packet, + }, + }); + } + + override async fetchSockets(opts: BroadcastOptions): Promise { + const [localSockets, serverCount] = await Promise.all([ + super.fetchSockets({ + rooms: opts.rooms, + except: opts.except, + flags: { + local: true, + }, + }), + this.serverCount(), + ]); + const expectedResponseCount = serverCount - 1; + + if (opts.flags?.local || expectedResponseCount <= 0) { + return localSockets as any[]; + } + + const requestId = randomId(); + + return new Promise((resolve, reject) => { + const timeout = setTimeout(() => { + const storedRequest = this.customRequests.get(requestId); + if (storedRequest) { + reject( + new Error( + `timeout reached: missing ${storedRequest.missingUids.size} responses` + ) + ); + this.customRequests.delete(requestId); + } + }, opts.flags.timeout || DEFAULT_TIMEOUT); + + const storedRequest = { + type: MessageType.FETCH_SOCKETS, + resolve, + timeout, + missingUids: new Set([...this.nodesMap.keys()]), + responses: localSockets as any[], + }; + this.customRequests.set(requestId, storedRequest); + + this.publish({ + type: MessageType.FETCH_SOCKETS, + data: { + opts: encodeOptions(opts), + requestId, + }, + }); + }); + } + + override onResponse(response: ClusterResponse) { + const requestId = response.data.requestId; + + debug("received response %s to request %s", response.type, requestId); + + switch (response.type) { + case MessageType.FETCH_SOCKETS_RESPONSE: { + const request = this.customRequests.get(requestId); + + if (!request) { + return; + } + + (response.data.sockets as any[]).forEach((socket) => + request.responses.push(socket) + ); + + request.missingUids.delete(response.uid); + if (request.missingUids.size === 0) { + clearTimeout(request.timeout); + request.resolve(request.responses); + this.customRequests.delete(requestId); + } + break; + } + + case MessageType.SERVER_SIDE_EMIT_RESPONSE: { + const request = this.customRequests.get(requestId); + + if (!request) { + return; + } + + request.responses.push(response.data.packet); + + request.missingUids.delete(response.uid); + if (request.missingUids.size === 0) { + clearTimeout(request.timeout); + request.resolve(null, request.responses); + this.customRequests.delete(requestId); + } + break; + } + + default: + super.onResponse(response); + } + } + + private removeNode(uid: string) { + this.customRequests.forEach((request, requestId) => { + request.missingUids.delete(uid); + if (request.missingUids.size === 0) { + clearTimeout(request.timeout); + if (request.type === MessageType.FETCH_SOCKETS) { + request.resolve(request.responses); + } else if (request.type === MessageType.SERVER_SIDE_EMIT) { + request.resolve(null, request.responses); + } + this.customRequests.delete(requestId); + } + }); + + this.nodesMap.delete(uid); + } }