Skip to content

Commit

Permalink
refactor(cluster): add explicit message types
Browse files Browse the repository at this point in the history
  • Loading branch information
darrachequesne committed Feb 20, 2024
1 parent d3d4598 commit eff552e
Showing 1 changed file with 104 additions and 21 deletions.
125 changes: 104 additions & 21 deletions lib/cluster-adapter.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
import { Adapter, BroadcastOptions, Room } from "./index";
import {
Adapter,
type BroadcastFlags,
type BroadcastOptions,
type Room,
} from "./index";
import { debug as debugModule } from "debug";
import { randomBytes } from "crypto";

Expand All @@ -10,6 +15,10 @@ function randomId() {
return randomBytes(8).toString("hex");
}

type DistributiveOmit<T, K extends keyof any> = T extends any
? Omit<T, K>
: never;

export interface ClusterAdapterOptions {
/**
* The number of ms between two heartbeats.
Expand Down Expand Up @@ -38,11 +47,50 @@ export enum MessageType {
BROADCAST_ACK,
}

export interface ClusterMessage {
export type ClusterMessage = {
uid: string;
type: MessageType;
data?: Record<string, unknown>;
}
nsp: string;
} & (
| {
type: MessageType.INITIAL_HEARTBEAT | MessageType.HEARTBEAT;
}
| {
type: MessageType.BROADCAST;
data: {
opts: { rooms: string[]; except: string[]; flags: BroadcastFlags };
packet: unknown;
requestId?: string;
};
}
| {
type: MessageType.SOCKETS_JOIN | MessageType.SOCKETS_LEAVE;
data: {
opts: { rooms: string[]; except: string[]; flags: BroadcastFlags };
rooms: string[];
};
}
| {
type: MessageType.DISCONNECT_SOCKETS;
data: {
opts: { rooms: string[]; except: string[]; flags: BroadcastFlags };
close?: boolean;
};
}
| {
type: MessageType.FETCH_SOCKETS;
data: {
opts: { rooms: string[]; except: string[]; flags: BroadcastFlags };
requestId: string;
};
}
| {
type: MessageType.SERVER_SIDE_EMIT;
data: {
requestId?: string;
packet: unknown;
};
}
);

interface ClusterRequest {
type: MessageType;
Expand All @@ -53,13 +101,39 @@ interface ClusterRequest {
responses: any[];
}

interface ClusterResponse {
type: MessageType;
data: {
requestId: string;
[key: string]: unknown;
};
}
export type ClusterResponse = {
uid: string;
nsp: string;
} & (
| {
type: MessageType.FETCH_SOCKETS_RESPONSE;
data: {
requestId: string;
sockets: unknown[];
};
}
| {
type: MessageType.SERVER_SIDE_EMIT_RESPONSE;
data: {
requestId: string;
packet: unknown;
};
}
| {
type: MessageType.BROADCAST_CLIENT_COUNT;
data: {
requestId: string;
clientCount: number;
};
}
| {
type: MessageType.BROADCAST_ACK;
data: {
requestId: string;
packet: unknown;
};
}
);

interface ClusterAckRequest {
clientCountCallback: (clientCount: number) => void;
Expand All @@ -85,7 +159,7 @@ function decodeOptions(opts): BroadcastOptions {
/**
* A cluster-ready adapter. Any extending class must:
*
* - implement {@link ClusterAdapter#publishMessage} and {@link ClusterAdapter#publishResponse}
* - implement {@link ClusterAdapter#doPublish} and {@link ClusterAdapter#doPublishResponse}
* - call {@link ClusterAdapter#onMessage} and {@link ClusterAdapter#onResponse}
*/
export abstract class ClusterAdapter extends Adapter {
Expand Down Expand Up @@ -295,6 +369,7 @@ export abstract class ClusterAdapter extends Adapter {
}

default:
// @ts-ignore
debug("unknown response type: %s", response.type);
}
}
Expand Down Expand Up @@ -537,11 +612,10 @@ export abstract class ClusterAdapter extends Adapter {
});
}

protected publish(message: Omit<ClusterMessage, "uid">) {
return this.publishMessage({
uid: this.uid,
...message,
});
protected publish(message: DistributiveOmit<ClusterMessage, "nsp" | "uid">) {
(message as ClusterMessage).uid = this.uid;
(message as ClusterMessage).nsp = this.nsp.name;
return this.doPublish(message as ClusterMessage);
}

/**
Expand All @@ -551,7 +625,16 @@ export abstract class ClusterAdapter extends Adapter {
* @protected
* @return an offset, if applicable
*/
protected abstract publishMessage(message: ClusterMessage): Promise<string>;
protected abstract doPublish(message: ClusterMessage): Promise<string>;

protected publishResponse(
requesterUid: string,
response: Omit<ClusterResponse, "nsp" | "uid">
) {
(response as ClusterResponse).uid = this.uid;
(response as ClusterResponse).nsp = this.nsp.name;
return this.doPublishResponse(requesterUid, response as ClusterResponse);
}

/**
* Send a response to the given member of the cluster.
Expand All @@ -560,7 +643,7 @@ export abstract class ClusterAdapter extends Adapter {
* @param response
* @protected
*/
protected abstract publishResponse(
protected abstract doPublishResponse(
requesterUid: string,
response: ClusterResponse
);
Expand Down Expand Up @@ -640,7 +723,7 @@ export abstract class ClusterAdapterWithHeartbeat extends ClusterAdapter {
return Promise.resolve(1 + this.nodesMap.size);
}

override publish(message: Omit<ClusterMessage, "uid">) {
override publish(message: DistributiveOmit<ClusterMessage, "nsp" | "uid">) {
this.scheduleHeartbeat();

return super.publish(message);
Expand Down

0 comments on commit eff552e

Please sign in to comment.