diff --git a/lib/cluster-adapter.ts b/lib/cluster-adapter.ts index c2e2161..5c8f3bc 100644 --- a/lib/cluster-adapter.ts +++ b/lib/cluster-adapter.ts @@ -1,4 +1,9 @@ -import { Adapter, BroadcastOptions, Room } from "./index"; +import { + Adapter, + type BroadcastFlags, + type BroadcastOptions, + type Room, +} from "./index"; import { debug as debugModule } from "debug"; import { randomBytes } from "crypto"; @@ -10,6 +15,10 @@ function randomId() { return randomBytes(8).toString("hex"); } +type DistributiveOmit = T extends any + ? Omit + : never; + export interface ClusterAdapterOptions { /** * The number of ms between two heartbeats. @@ -38,11 +47,50 @@ export enum MessageType { BROADCAST_ACK, } -export interface ClusterMessage { +export type ClusterMessage = { uid: string; - type: MessageType; - data?: Record; -} + nsp: string; +} & ( + | { + type: MessageType.INITIAL_HEARTBEAT | MessageType.HEARTBEAT; + } + | { + type: MessageType.BROADCAST; + data: { + opts: { rooms: string[]; except: string[]; flags: BroadcastFlags }; + packet: unknown; + requestId?: string; + }; + } + | { + type: MessageType.SOCKETS_JOIN | MessageType.SOCKETS_LEAVE; + data: { + opts: { rooms: string[]; except: string[]; flags: BroadcastFlags }; + rooms: string[]; + }; + } + | { + type: MessageType.DISCONNECT_SOCKETS; + data: { + opts: { rooms: string[]; except: string[]; flags: BroadcastFlags }; + close?: boolean; + }; + } + | { + type: MessageType.FETCH_SOCKETS; + data: { + opts: { rooms: string[]; except: string[]; flags: BroadcastFlags }; + requestId: string; + }; + } + | { + type: MessageType.SERVER_SIDE_EMIT; + data: { + requestId?: string; + packet: unknown; + }; + } +); interface ClusterRequest { type: MessageType; @@ -53,13 +101,39 @@ interface ClusterRequest { responses: any[]; } -interface ClusterResponse { - type: MessageType; - data: { - requestId: string; - [key: string]: unknown; - }; -} +export type ClusterResponse = { + uid: string; + nsp: string; +} & ( + | { + type: MessageType.FETCH_SOCKETS_RESPONSE; + data: { + requestId: string; + sockets: unknown[]; + }; + } + | { + type: MessageType.SERVER_SIDE_EMIT_RESPONSE; + data: { + requestId: string; + packet: unknown; + }; + } + | { + type: MessageType.BROADCAST_CLIENT_COUNT; + data: { + requestId: string; + clientCount: number; + }; + } + | { + type: MessageType.BROADCAST_ACK; + data: { + requestId: string; + packet: unknown; + }; + } +); interface ClusterAckRequest { clientCountCallback: (clientCount: number) => void; @@ -85,7 +159,7 @@ function decodeOptions(opts): BroadcastOptions { /** * A cluster-ready adapter. Any extending class must: * - * - implement {@link ClusterAdapter#publishMessage} and {@link ClusterAdapter#publishResponse} + * - implement {@link ClusterAdapter#doPublish} and {@link ClusterAdapter#doPublishResponse} * - call {@link ClusterAdapter#onMessage} and {@link ClusterAdapter#onResponse} */ export abstract class ClusterAdapter extends Adapter { @@ -295,6 +369,7 @@ export abstract class ClusterAdapter extends Adapter { } default: + // @ts-ignore debug("unknown response type: %s", response.type); } } @@ -537,11 +612,10 @@ export abstract class ClusterAdapter extends Adapter { }); } - protected publish(message: Omit) { - return this.publishMessage({ - uid: this.uid, - ...message, - }); + protected publish(message: DistributiveOmit) { + (message as ClusterMessage).uid = this.uid; + (message as ClusterMessage).nsp = this.nsp.name; + return this.doPublish(message as ClusterMessage); } /** @@ -551,7 +625,16 @@ export abstract class ClusterAdapter extends Adapter { * @protected * @return an offset, if applicable */ - protected abstract publishMessage(message: ClusterMessage): Promise; + protected abstract doPublish(message: ClusterMessage): Promise; + + protected publishResponse( + requesterUid: string, + response: Omit + ) { + (response as ClusterResponse).uid = this.uid; + (response as ClusterResponse).nsp = this.nsp.name; + return this.doPublishResponse(requesterUid, response as ClusterResponse); + } /** * Send a response to the given member of the cluster. @@ -560,7 +643,7 @@ export abstract class ClusterAdapter extends Adapter { * @param response * @protected */ - protected abstract publishResponse( + protected abstract doPublishResponse( requesterUid: string, response: ClusterResponse ); @@ -640,7 +723,7 @@ export abstract class ClusterAdapterWithHeartbeat extends ClusterAdapter { return Promise.resolve(1 + this.nodesMap.size); } - override publish(message: Omit) { + override publish(message: DistributiveOmit) { this.scheduleHeartbeat(); return super.publish(message);