diff --git a/index.ts b/index.ts index 6a37bea..f2b3f65 100644 --- a/index.ts +++ b/index.ts @@ -1,9 +1,8 @@ export * from './src/client/ShardClient'; export * from './src/client/ShardClientUtil'; -export * from './src/concurrency/ConcurrencyClient'; export * from './src/concurrency/ConcurrencyManager'; export * from './src/ipc/BaseIpc'; -export * from './src/ipc/Main'; +export * from './src/ipc/MainWorker'; export * from './src/ipc/BaseWorker'; export * from './src/ipc/ClientWorker'; export * from './src/manager/ClusterManager'; diff --git a/package.json b/package.json index d0a8a7b..2e415cf 100644 --- a/package.json +++ b/package.json @@ -17,7 +17,7 @@ "build": "npm run build:ts && npm run build:docs", "build:ts": "tsup --config tsup-config.json", "build:docs": "typedoc --theme default --readme README.md --out docs/ --entryPointStrategy expand src/.", - "lint": "eslint --fix --ext .ts", + "lint": "eslint . --ext .ts --fix", "prepare": "npm run build:ts" }, "keywords": [ diff --git a/src/Indomitable.ts b/src/Indomitable.ts index 25c7204..1bd5ad8 100644 --- a/src/Indomitable.ts +++ b/src/Indomitable.ts @@ -11,8 +11,8 @@ import { FetchSessions, MakeAbortableRequest, AbortableData, - ClientEvents, - InternalEvents, + InternalOps, + InternalOpsData, LibraryEvents, Message, SessionObject, @@ -370,8 +370,8 @@ export class Indomitable extends EventEmitter { * @internal */ private async destroyClusterClient(id: number): Promise { - const content: InternalEvents = { - op: ClientEvents.DESTROY_CLIENT, + const content: InternalOpsData = { + op: InternalOps.DESTROY_CLIENT, data: {}, internal: true }; diff --git a/src/Util.ts b/src/Util.ts index 446c301..a3dcf86 100644 --- a/src/Util.ts +++ b/src/Util.ts @@ -1,4 +1,5 @@ import Https, { RequestOptions } from 'node:https'; +import { WebSocketShardEvents } from '@discordjs/ws'; /** * Hoisted Environmental Variable for ease of fetching @@ -11,9 +12,31 @@ export const EnvProcessData = { }; /** - * Events for internal use + * Internal operation codes for the cluster -> thread */ -export enum ClientEvents { +export enum MainStrategyOps { + CONNECT = 'connect', + DESTROY = 'destroy', + SEND = 'send', + STATUS = 'status', + RECONNECT = 'reconnect' +} + +/** + * Internal operation codes for the thread <- cluster + */ +export enum ThreadStrategyOps { + REQUEST_IDENTIFY = 'requestIdentify', + CANCEL_IDENTIFY = 'cancelIdentify', + SHARD_EVENT = 'shardEvent', + RETRIEVE_SESSION = 'retrieveSession', + UPDATE_SESSION = 'updateSession' +} + +/** + * Internal operation codes + */ +export enum InternalOps { EVAL = 'eval', RESTART = 'restart', RESTART_ALL = 'restartAll', @@ -21,12 +44,19 @@ export enum ClientEvents { REQUEST_IDENTIFY = 'requestIdentify', CANCEL_IDENTIFY = 'cancelIdentify', SESSION_INFO = 'sessionInfo', + PING = 'ping' +} + +/** + * Events for internal use + */ +export enum ClientEvents { READY = 'ready', - PING = 'ping', SHARD_READY = 'shardReady', SHARD_RECONNECT = 'shardReconnect', SHARD_RESUME = 'shardResume', - SHARD_DISCONNECT = 'shardDisconnect' + SHARD_DISCONNECT = 'shardDisconnect', + ERROR = 'ERROR' } /** @@ -52,24 +82,52 @@ export enum LibraryEvents { */ export enum RawIpcMessageType { MESSAGE = 'message', - RESPONSE = 'response' + RESPONSE = 'response', + ERROR = 'error' +} + +/** + * Type for raw ipc messages of cluster -> thread + */ +export interface MainStrategyData { + op: MainStrategyOps, + data: any, + internal: true +} + +/** + * Type for raw ipc messages of cluster <- thread + */ +export interface ThreadStrategyData { + op: ThreadStrategyOps, + event: WebSocketShardEvents, + data: any, + shardId: number, + internal: true } /** * Data structure representing an internal event */ -export interface InternalEvents { - op: ClientEvents, +export interface InternalOpsData { + op: InternalOps, data: any, internal: true } +/** + * Data structure representing an internal discord.js event + */ +export interface ClientEventData { + op: ClientEvents, + data: any, + internal: true, +} + /** * Data structure representing an internal error */ -export interface InternalError { - internal: true; - error: true; +export interface IpcErrorData { name: string; reason: string; stack: string; diff --git a/src/client/ShardClient.ts b/src/client/ShardClient.ts index 467b743..8a2079c 100644 --- a/src/client/ShardClient.ts +++ b/src/client/ShardClient.ts @@ -1,7 +1,7 @@ import type { Client, ClientOptions as DiscordJsClientOptions } from 'discord.js'; import { Indomitable } from '../Indomitable'; -import { EnvProcessData, ClientEvents, InternalEvents, LibraryEvents } from '../Util'; -import { ConcurrencyClient } from '../concurrency/ConcurrencyClient.js'; +import { EnvProcessData, ClientEvents, ClientEventData } from '../Util'; +import { IndomitableStrategy } from '../strategy/IndomitableStrategy'; import { ShardClientUtil } from './ShardClientUtil'; import { BaseWorker } from '../ipc/BaseWorker'; @@ -19,19 +19,14 @@ export class ShardClient { const clientOptions = manager.clientOptions as DiscordJsClientOptions || {}; clientOptions.shards = EnvProcessData.shardIds; clientOptions.shardCount = EnvProcessData.shardCount; + if (manager.handleConcurrency) { + if (!clientOptions.ws) clientOptions.ws = {}; + clientOptions.ws.buildStrategy = ws => new IndomitableStrategy(ws, new BaseWorker(manager)); + } this.client = new manager.client(clientOptions); // @ts-expect-error: Override shard client util with indomitable shard client util this.client.shard = new ShardClientUtil(this.client, manager); this.clusterId = Number(EnvProcessData.clusterId); - // pseudo code for the meantime until discord.js merges https://github.com/discordjs/discord.js/pull/9728 - if (manager.handleConcurrency) { - const concurrencyClient = new ConcurrencyClient(new BaseWorker(manager)); - // @ts-expect-error: Private function variable of @discordjs/ws manager - if (this.client.ws._ws) { - // @ts-expect-error: Override build identify throttler - this.client.ws._ws.options.buildIdentifyThrottler = () => Promise.resolve(concurrencyClient); - } - } } public async start(token: string): Promise { @@ -48,9 +43,9 @@ export class ShardClient { private send(partial: PartialInternalEvents): void { // @ts-ignore -- our own class const shardClientUtil = this.client.shard as ShardClientUtil; - const content: InternalEvents = { ...partial, internal: true }; + const content: ClientEventData = { ...partial, internal: true }; shardClientUtil .send({ content, repliable: false }) - .catch((error: unknown) => this.client.emit(LibraryEvents.ERROR, error as Error)); + .catch((error: unknown) => this.client.emit(ClientEvents.ERROR, error as Error)); } } diff --git a/src/client/ShardClientUtil.ts b/src/client/ShardClientUtil.ts index d3c3e87..1b34891 100644 --- a/src/client/ShardClientUtil.ts +++ b/src/client/ShardClientUtil.ts @@ -7,8 +7,8 @@ import { EnvProcessData, MakeAbortableRequest, AbortableData, - ClientEvents, - InternalEvents, + InternalOps, + InternalOpsData, Message, SessionObject, Transportable @@ -49,8 +49,8 @@ export class ShardClientUtil extends EventEmitter { * @returns A promise that resolves to delay in nanoseconds */ public async ping(): Promise { - const content: InternalEvents = { - op: ClientEvents.PING, + const content: InternalOpsData = { + op: InternalOps.PING, data: {}, internal: true }; @@ -64,8 +64,8 @@ export class ShardClientUtil extends EventEmitter { * @returns A promise that resolves to an array of code results */ public broadcastEval(script: Function, context: any = {}): Promise { - const content: InternalEvents = { - op: ClientEvents.EVAL, + const content: InternalOpsData = { + op: InternalOps.EVAL, data: `(${script.toString()})(this, ${JSON.stringify(context)})`, internal: true }; @@ -86,8 +86,8 @@ export class ShardClientUtil extends EventEmitter { * @returns A session object */ public fetchSessions(update: boolean = false): Promise { - const content: InternalEvents = { - op: ClientEvents.SESSION_INFO, + const content: InternalOpsData = { + op: InternalOps.SESSION_INFO, data: { update }, internal: true }; @@ -99,8 +99,8 @@ export class ShardClientUtil extends EventEmitter { * @returns A promise that resolves to void */ public restart(clusterId: number): Promise { - const content: InternalEvents = { - op: ClientEvents.RESTART, + const content: InternalOpsData = { + op: InternalOps.RESTART, data: { clusterId }, internal: true }; @@ -112,8 +112,8 @@ export class ShardClientUtil extends EventEmitter { * @returns A promise that resolves to void */ public restartAll(): Promise { - const content: InternalEvents = { - op: ClientEvents.RESTART_ALL, + const content: InternalOpsData = { + op: InternalOps.RESTART_ALL, data: {}, internal: true }; diff --git a/src/concurrency/ConcurrencyClient.ts b/src/concurrency/ConcurrencyClient.ts deleted file mode 100644 index 5b3d831..0000000 --- a/src/concurrency/ConcurrencyClient.ts +++ /dev/null @@ -1,48 +0,0 @@ -import { ClientEvents, InternalEvents } from '../Util'; -import { BaseWorker } from '../ipc/BaseWorker'; - -/** - * Internal class that is passed to @discordjs/ws to handle concurrency - */ -export class ConcurrencyClient { - private ipc: BaseWorker; - constructor(ipc: BaseWorker) { - this.ipc = ipc; - } - - /** - * Method to try and acquire a lock for identify - */ - public async waitForIdentify(shardId: number, signal: AbortSignal): Promise { - const content: InternalEvents = { - op: ClientEvents.REQUEST_IDENTIFY, - data: { shardId }, - internal: true - }; - const listener = () => this.abortIdentify(shardId); - try { - signal.addEventListener('abort', listener); - await this.ipc.send({ content, repliable: true }); - } catch (error: any) { - // only throw the error when it's an invoked abort ident from ws package - // will be removed once https://github.com/discordjs/discord.js/issues/9688 is resolved - if (error.message.includes('aborted the identify')) throw error; - } finally { - signal.removeEventListener('abort', listener); - } - } - - /** - * Aborts an acquire lock request - */ - private abortIdentify(shardId: number): void { - const content: InternalEvents = { - op: ClientEvents.CANCEL_IDENTIFY, - data: { shardId }, - internal: true - }; - this.ipc - .send({ content, repliable: false }) - .catch(() => null); - } -} diff --git a/src/ipc/BaseIpc.ts b/src/ipc/BaseIpc.ts index 2cd046b..c18ad1d 100644 --- a/src/ipc/BaseIpc.ts +++ b/src/ipc/BaseIpc.ts @@ -1,14 +1,17 @@ import { Serializable } from 'node:child_process'; +import { randomUUID } from 'crypto'; import { Indomitable } from '../Indomitable.js'; import { InternalAbortSignal, InternalPromise, + IpcErrorData, LibraryEvents, + Message, RawIpcMessage, RawIpcMessageType, - SavePromiseOptions, Transportable + SavePromiseOptions, + Transportable } from '../Util.js'; -import { randomUUID } from 'crypto'; /** * Base class where primary and worker ipc inherits @@ -65,6 +68,7 @@ export abstract class BaseIpc { this.waitForPromise({ id, resolve, reject, signal: transportable.signal }); }); } + /** * Taps into message event of worker or primary process to handle ipc communication * @internal @@ -75,8 +79,9 @@ export abstract class BaseIpc { if (!(data as any).internal) return; switch((data as RawIpcMessage).type) { case RawIpcMessageType.MESSAGE: - return await this.handleMessage(data as RawIpcMessage); + return await this.handleUnparsedMessage(data as RawIpcMessage); case RawIpcMessageType.RESPONSE: + case RawIpcMessageType.ERROR: return this.handlePromise(data as RawIpcMessage); } } catch (error: unknown) { @@ -108,16 +113,58 @@ export abstract class BaseIpc { if (promise.controller) { promise.controller.signal.removeEventListener('abort', promise.controller.listener); } - if (data.content?.internal && data.content?.error) { - const error = new Error(data.content.reason || 'Unknown error reason'); - error.stack = data.content.stack; - error.name = data.content.name; - return promise.reject(error); + if (data.type === RawIpcMessageType.ERROR) { + const content = data.content as IpcErrorData; + const error = new Error(content.reason); + error.stack = content.stack; + error.name = content.name; + promise.reject(error); + return; } promise.resolve(data.content); } + private async handleUnparsedMessage(data: RawIpcMessage): Promise { + const reply = (content: any) => { + if (!data.id) return; + const response: RawIpcMessage = { + id: data.id, + content, + internal: true, + type: RawIpcMessageType.RESPONSE + }; + this.sendData(response); + }; + const message: Message = { + repliable: !!data.id, + content: data.content, + reply + }; + if (!data.content.internal) + return this.emitMessage(message); + try { + await this.handleMessage(message); + } catch (error: any) { + if (!message.repliable) return; + const response: RawIpcMessage = { + id: data.id, + content: { + name: error.name, + reason: error.reason, + stack: error.stack + }, + internal: true, + type: RawIpcMessageType.ERROR + }; + this.sendData(response); + } + } + + protected emitMessage(message: Message): void { + this.manager.emit(LibraryEvents.MESSAGE, message); + } + protected abstract available(): boolean; protected abstract sendData(data: RawIpcMessage): void; - protected abstract handleMessage(data: RawIpcMessage): Promise|boolean|void; + protected abstract handleMessage(message: Message): Promise; } diff --git a/src/ipc/BaseWorker.ts b/src/ipc/BaseWorker.ts index 5e5ad2e..ff56fcd 100644 --- a/src/ipc/BaseWorker.ts +++ b/src/ipc/BaseWorker.ts @@ -1,7 +1,7 @@ import { Serializable } from 'node:child_process'; import { BaseIpc } from './BaseIpc.js'; import { Indomitable } from '../Indomitable'; -import { RawIpcMessage } from '../Util'; +import { Message, RawIpcMessage } from '../Util'; /** * Basic worker ipc class, basic child process ipc handler @@ -22,5 +22,7 @@ export class BaseWorker extends BaseIpc { process.send!(data); } - protected handleMessage(data: RawIpcMessage): boolean|void {} + protected handleMessage(message: Message): Promise { + return Promise.resolve(); + } } diff --git a/src/ipc/ClientWorker.ts b/src/ipc/ClientWorker.ts index 44e1d1d..c2b6bc5 100644 --- a/src/ipc/ClientWorker.ts +++ b/src/ipc/ClientWorker.ts @@ -2,15 +2,14 @@ import { Indomitable } from '../Indomitable'; import { ShardClientUtil } from '../client/ShardClientUtil'; import { BaseWorker } from './BaseWorker'; import { - ClientEvents, - InternalError, - InternalEvents, + InternalOps, + InternalOpsData, LibraryEvents, - Message, - RawIpcMessage, - RawIpcMessageType + Message } from '../Util'; +const internalOpsValues = Object.values(InternalOps); + /** * Extended worker ipc class, shard client util ipc class */ @@ -21,44 +20,23 @@ export class ClientWorker extends BaseWorker { this.shard = shard; } - protected handleMessage(data: RawIpcMessage): boolean|void { - const reply = (content: any) => { - if (!data.id) return; - const response: RawIpcMessage = { - id: data.id, - content, - internal: true, - type: RawIpcMessageType.RESPONSE - }; - this.sendData(response); - }; - const message: Message = { - repliable: !!data.id, - content: data.content, - reply - }; - if (!message.content.internal) - return this.shard.emit(LibraryEvents.MESSAGE, message); - try { - const content = message.content as InternalEvents; - switch (content.op) { - case ClientEvents.EVAL: + protected emitMessage(message: Message): void { + this.shard.emit(LibraryEvents.MESSAGE, message); + } + + protected handleMessage(message: Message): Promise { + if (!internalOpsValues.includes(message.content.op)) + return Promise.resolve(); + const content = message.content as InternalOpsData; + switch (content.op) { + case InternalOps.EVAL: // @ts-expect-error - message.reply(this.shard.client._eval(content.data)); - break; - case ClientEvents.DESTROY_CLIENT: + message.reply(this.shard.client._eval(content.data)); + break; + case InternalOps.DESTROY_CLIENT: this.shard.client!.destroy(); - message.reply(null); - } - } catch (error: any) { - if (!message.repliable) throw error as Error; - message.reply({ - internal: true, - error: true, - name: error.name, - reason: error.reason, - stack: error.stack - } as InternalError); + message.reply(null); } + return Promise.resolve(); } } diff --git a/src/ipc/MainStrategyWorker.ts b/src/ipc/MainStrategyWorker.ts new file mode 100644 index 0000000..cd59549 --- /dev/null +++ b/src/ipc/MainStrategyWorker.ts @@ -0,0 +1,70 @@ +import EventEmitter from 'node:events'; +import { Worker } from 'node:worker_threads'; +import { BaseIpc } from './BaseIpc'; +import { + InternalOps, + InternalOpsData, + Message, + RawIpcMessage, + ThreadStrategyData, + ThreadStrategyOps +} from '../Util'; +import { IndomitableStrategy } from '../strategy/IndomitableStrategy'; + +export class MainStrategyWorker extends BaseIpc { + public readonly id: number; + public readonly thread: Worker; + public readonly strategy: IndomitableStrategy; + constructor(id: number, thread: Worker, strategy: IndomitableStrategy) { + // @ts-expect-error: Indomitable will not be used in this class + super(new EventEmitter()); + this.id = id; + this.thread = thread; + this.strategy = strategy; + } + + protected available(): boolean { + return true; + } + + protected sendData(data: RawIpcMessage) { + return this.thread.postMessage(data); + } + + protected async handleMessage(message: Message): Promise { + const content = message.content as ThreadStrategyData; + switch(content.op) { + case ThreadStrategyOps.SHARD_EVENT: + this.strategy.manager.emit(content.event, { ...content.data, shardId: content.shardId }); + break; + case ThreadStrategyOps.REQUEST_IDENTIFY: { + const request: InternalOpsData = { + op: InternalOps.REQUEST_IDENTIFY, + data: { shardId: content.data.shardId }, + internal: true + }; + await this.strategy.ipc.send({ content: request, repliable: true }); + message.reply(null); + break; + } + case ThreadStrategyOps.CANCEL_IDENTIFY: { + const request: InternalOpsData = { + op: InternalOps.CANCEL_IDENTIFY, + data: { shardId: content.data.shardId }, + internal: true + }; + await this.strategy.ipc.send({ content: request }); + message.reply(null); + break; + } + case ThreadStrategyOps.RETRIEVE_SESSION: { + const session = await this.strategy.manager.options.retrieveSessionInfo(content.data.shardId); + message.reply(session); + break; + } + case ThreadStrategyOps.UPDATE_SESSION: + await this.strategy.manager.options.updateSessionInfo(content.data.shardId, content.data.sessionInfo); + break; + } + } +} diff --git a/src/ipc/Main.ts b/src/ipc/MainWorker.ts similarity index 55% rename from src/ipc/Main.ts rename to src/ipc/MainWorker.ts index 434aae9..9e015cf 100644 --- a/src/ipc/Main.ts +++ b/src/ipc/MainWorker.ts @@ -1,19 +1,22 @@ import { BaseIpc } from './BaseIpc.js'; import { ClusterManager } from '../manager/ClusterManager.js'; import { + InternalOps, + InternalOpsData, ClientEvents, - InternalError, - InternalEvents, + ClientEventData, LibraryEvents, Message, - RawIpcMessage, - RawIpcMessageType + RawIpcMessage } from '../Util'; +const internalOpsValues = Object.values(InternalOps); +const clientEventsValues = Object.values(ClientEvents); + /** * Primary ipc class. Only initialized at main process */ -export class Main extends BaseIpc { +export class MainWorker extends BaseIpc { public readonly cluster: ClusterManager; constructor(cluster: ClusterManager) { super(cluster.manager); @@ -28,40 +31,18 @@ export class Main extends BaseIpc { this.cluster.worker?.send(data); } - protected async handleMessage(data: RawIpcMessage): Promise { - const reply = (content: any) => { - if (!data.id) return; - const response: RawIpcMessage = { - id: data.id, - content, - internal: true, - type: RawIpcMessageType.RESPONSE - }; - this.sendData(response); - }; - const message: Message = { - repliable: !!data.id, - content: data.content, - reply - }; - if (!message.content.internal) - return this.manager.emit(LibraryEvents.MESSAGE, message); - // internal error handling - try { - const content = message.content as InternalEvents; - this.manager.emit(LibraryEvents.DEBUG, `Received internal message. op: ${content.op} | data: ${JSON.stringify(content.data || {})}`); + protected async handleMessage(message: Message): Promise { + this.manager.emit(LibraryEvents.DEBUG, `Received internal message. op: ${message.content.op} | data: ${JSON.stringify(message.content.data || {})}`); + if (internalOpsValues.includes(message.content.op)) { + const content = message.content as InternalOpsData; switch(content.op) { - case ClientEvents.READY: { - this.manager.emit(LibraryEvents.CLIENT_READY, content.data); - break; - } - case ClientEvents.PING: { + case InternalOps.PING: { const end = process.hrtime.bigint().toString(); message.reply(end); break; } - case ClientEvents.EVAL: { - // don't touch eval data, just forward it to clusters since this is already an instance of InternalEvent + case InternalOps.EVAL: { + // don't touch eval data, just forward it to clusters since this is already an instance of InternalEvent const data = await this.manager.broadcast({ content, repliable: true @@ -69,25 +50,32 @@ export class Main extends BaseIpc { message.reply(data); break; } - case ClientEvents.SESSION_INFO: { + case InternalOps.SESSION_INFO: { if (content.data.update || !this.manager.cachedSession) this.manager.cachedSession = await this.manager.fetchSessions(); message.reply(this.manager.cachedSession); break; } - case ClientEvents.REQUEST_IDENTIFY: + case InternalOps.REQUEST_IDENTIFY: await this.manager.concurrencyManager!.waitForIdentify(content.data.shardId); message.reply(null); break; - case ClientEvents.CANCEL_IDENTIFY: - this.manager.concurrencyManager!.abortIdentify(content.data.shardId); + case InternalOps.CANCEL_IDENTIFY: + this.manager.concurrencyManager!.abortIdentify(content.data.shardId); break; - case ClientEvents.RESTART: + case InternalOps.RESTART: await this.manager.restart(content.data.clusterId); break; - case ClientEvents.RESTART_ALL: + case InternalOps.RESTART_ALL: await this.manager.restartAll(); break; + } + } else if (clientEventsValues.includes(message.content.op)) { + const content = message.content as ClientEventData; + switch(content.op) { + case ClientEvents.READY: + this.manager.emit(LibraryEvents.CLIENT_READY, content.data); + break; case ClientEvents.SHARD_READY: this.manager.emit(LibraryEvents.SHARD_READY, content.data); break; @@ -100,15 +88,6 @@ export class Main extends BaseIpc { case ClientEvents.SHARD_DISCONNECT: this.manager.emit(LibraryEvents.SHARD_DISCONNECT, content.data); } - } catch (error: any) { - if (!message.repliable) throw error as Error; - message.reply({ - internal: true, - error: true, - name: error.name, - reason: error.reason, - stack: error.stack - } as InternalError); } } } diff --git a/src/ipc/ThreadStrategyWorker.ts b/src/ipc/ThreadStrategyWorker.ts new file mode 100644 index 0000000..4a9f276 --- /dev/null +++ b/src/ipc/ThreadStrategyWorker.ts @@ -0,0 +1,52 @@ +import EventEmitter from 'node:events'; +import { BaseIpc } from './BaseIpc'; +import { MainStrategyData, MainStrategyOps, Message, RawIpcMessage } from '../Util'; +import { parentPort } from 'worker_threads'; +import { WebSocketShard } from '@discordjs/ws'; + +export class ThreadStrategyWorker extends BaseIpc { + private shard: WebSocketShard|undefined; + constructor() { + // @ts-expect-error: Indomitable will not be used in the thread process + super(new EventEmitter()); + parentPort!.on('message', message => this.handleRawResponse(message, () => null)); + } + + public build(shard: WebSocketShard): void { + if (!this.shard) this.shard = shard; + } + + protected available(): boolean { + return !!parentPort; + } + + protected sendData(data: RawIpcMessage) { + return parentPort!.postMessage(data); + } + + protected async handleMessage(message: Message): Promise { + const content = message.content as MainStrategyData; + if (!this.shard) throw new Error('Shard isn\'t initialized yet'); + switch(content.op) { + case MainStrategyOps.CONNECT: + await this.shard.connect(); + message.reply(null); + break; + case MainStrategyOps.DESTROY: + await this.shard.destroy(content.data || {}); + message.reply(null); + break; + case MainStrategyOps.SEND: + await this.shard.send(content.data || {}); + message.reply(null); + break; + case MainStrategyOps.RECONNECT: + await this.shard.destroy(content.data); + message.reply(null); + break; + case MainStrategyOps.STATUS: + message.reply(this.shard.status); + break; + } + } +} diff --git a/src/manager/ClusterManager.ts b/src/manager/ClusterManager.ts index 247f5c5..abe2bec 100644 --- a/src/manager/ClusterManager.ts +++ b/src/manager/ClusterManager.ts @@ -1,7 +1,7 @@ import Cluster, { Worker } from 'node:cluster'; import { clearTimeout } from 'timers'; import { Indomitable, ShardEventData } from '../Indomitable.js'; -import { Main } from '../ipc/Main.js'; +import { MainWorker } from '../ipc/MainWorker'; import { Delay, LibraryEvents } from '../Util.js'; /** @@ -19,7 +19,7 @@ export interface ClusterManagerOptions { export class ClusterManager { public readonly manager: Indomitable; public readonly id: number; - public readonly ipc: Main; + public readonly ipc: MainWorker; public shards: number[]; public started: boolean; public ready: boolean; @@ -35,7 +35,7 @@ export class ClusterManager { this.manager = options.manager; this.id = options.id; this.shards = options.shards; - this.ipc = new Main(this); + this.ipc = new MainWorker(this); this.started = false; this.started = false; this.ready = false; diff --git a/src/strategy/IndomitableFetchingStrategy.ts b/src/strategy/IndomitableFetchingStrategy.ts new file mode 100644 index 0000000..308b549 --- /dev/null +++ b/src/strategy/IndomitableFetchingStrategy.ts @@ -0,0 +1,66 @@ +import { FetchingStrategyOptions, IContextFetchingStrategy, SessionInfo, WebSocketShardEvents } from '@discordjs/ws'; +import { ThreadStrategyWorker } from '../ipc/ThreadStrategyWorker'; +import { ThreadStrategyData, ThreadStrategyOps } from '../Util'; + +export class IndomitableFetchingStrategy implements IContextFetchingStrategy { + private readonly ipc: ThreadStrategyWorker; + public readonly options: FetchingStrategyOptions; + constructor(ipc: ThreadStrategyWorker, options: FetchingStrategyOptions) { + this.ipc = ipc; + this.options = options; + } + + public async retrieveSessionInfo(shardId: number): Promise { + const content: ThreadStrategyData = { + op: ThreadStrategyOps.RETRIEVE_SESSION, + event: WebSocketShardEvents.Ready, + data: { shardId }, + shardId: shardId, + internal: true + }; + return await this.ipc.send({ content, repliable: true }) as SessionInfo; + } + + public async updateSessionInfo(shardId: number, sessionInfo: SessionInfo | null): Promise { + const content: ThreadStrategyData = { + op: ThreadStrategyOps.UPDATE_SESSION, + event: WebSocketShardEvents.Ready, + data: { shardId, sessionInfo }, + shardId: shardId, + internal: true + }; + await this.ipc.send({ content }); + } + + public async waitForIdentify(shardId: number, signal: AbortSignal): Promise { + const content: ThreadStrategyData = { + op: ThreadStrategyOps.REQUEST_IDENTIFY, + event: WebSocketShardEvents.Ready, + data: { shardId }, + shardId: shardId, + internal: true + }; + const listener = () => this.abortIdentify(shardId); + try { + signal.addEventListener('abort', listener); + await this.ipc + .send({ content, repliable: true }) + .catch(() => null); + } finally { + signal.removeEventListener('abort', listener); + } + } + + private abortIdentify(shardId: number): void { + const content: ThreadStrategyData = { + op: ThreadStrategyOps.CANCEL_IDENTIFY, + event: WebSocketShardEvents.Ready, + data: { shardId }, + shardId: shardId, + internal: true + }; + this.ipc + .send({ content, repliable: false }) + .catch(() => null); + } +} diff --git a/src/strategy/IndomitableStrategy.ts b/src/strategy/IndomitableStrategy.ts new file mode 100644 index 0000000..5aa910e --- /dev/null +++ b/src/strategy/IndomitableStrategy.ts @@ -0,0 +1,120 @@ +import { join } from 'node:path'; +import { once } from 'node:events'; +import { Worker } from 'node:worker_threads'; +import { + FetchingStrategyOptions, + IShardingStrategy, + managerToFetchingStrategyOptions, + WebSocketManager, + WebSocketShardDestroyOptions, + WebSocketShardDestroyRecovery, + WebSocketShardStatus +} from '@discordjs/ws'; +import { Collection } from 'discord.js'; +import { MainStrategyData, MainStrategyOps } from '../Util'; +import { MainStrategyWorker } from '../ipc/MainStrategyWorker'; +import { BaseWorker } from '../ipc/BaseWorker'; + +export interface WorkerData extends FetchingStrategyOptions { + shardId: number; +} + +export interface IndomitableWorker { + thread: Worker; + ipc: MainStrategyWorker; +} + +export class IndomitableStrategy implements IShardingStrategy { + public readonly manager: WebSocketManager; + public readonly ipc: BaseWorker; + public readonly workers: Collection; + constructor(manager: WebSocketManager, ipc: BaseWorker) { + this.manager = manager; + this.ipc = ipc; + this.workers = new Collection(); + } + + public async spawn(shardIds: number[]): Promise { + const strategyOptions = await managerToFetchingStrategyOptions(this.manager); + const promises = shardIds.map(shardId => this.createWorker(shardId, { ...strategyOptions, shardId })); + await Promise.all(promises); + } + + public async connect(): Promise { + const promises = []; + for (const worker of this.workers.values()) { + const content: MainStrategyData = { + op: MainStrategyOps.CONNECT, + data: {}, + internal: true + }; + promises.push(worker.ipc.send({ content, repliable: true })); + } + await Promise.all(promises); + } + + public async destroy(data: Omit = {}): Promise { + const promises = []; + for (const worker of this.workers.values()) { + const content: MainStrategyData = { + op: MainStrategyOps.DESTROY, + data, + internal: true + }; + promises.push(worker.ipc.send({ content, repliable: true })); + } + await Promise.all(promises); + } + + public async reconnect(shardId: number): Promise { + const worker = this.workers.get(shardId); + if (!worker) + throw new Error(`No worker found for shard #${shardId}`); + const content: MainStrategyData = { + op: MainStrategyOps.RECONNECT, + data: { recovery: WebSocketShardDestroyRecovery.Reconnect }, + internal: true + }; + await worker.ipc.send({ content, repliable: true }); + } + + public async send(shardId: number, data: any): Promise { + const worker = this.workers.get(shardId); + if (!worker) + throw new Error(`No worker found for shard #${shardId}`); + const content: MainStrategyData = { + op: MainStrategyOps.SEND, + data, + internal: true + }; + await worker.ipc.send({ content, repliable: true }); + } + + public async fetchStatus(): Promise> { + const collection: Collection = new Collection(); + const promises = this.workers.map(async (worker, id) => { + const content: MainStrategyData = { + op: MainStrategyOps.STATUS, + data: {}, + internal: true + }; + const status = await worker.ipc.send({ content, repliable: true }); + collection.set(id, status as WebSocketShardStatus); + }); + await Promise.all(promises); + return collection; + } + + private async createWorker(shardId: number, workerData: WorkerData): Promise { + const thread = new Worker(join(__dirname, 'src/strategy/', 'Thread.js'), { workerData }); + await once(thread, 'online'); + const ipc = new MainStrategyWorker(shardId, thread, this); + thread + .on('error', error => { + throw error; + }) + .on('message', message => ipc.handleRawResponse(message, () => null)); + this.workers.set(shardId, { thread, ipc }); + return thread; + } +} diff --git a/src/strategy/Thread.ts b/src/strategy/Thread.ts new file mode 100644 index 0000000..3dec08f --- /dev/null +++ b/src/strategy/Thread.ts @@ -0,0 +1,34 @@ +import { WebSocketShard, WebSocketShardEvents } from '@discordjs/ws'; +import { workerData } from 'worker_threads'; +import { WorkerData } from './IndomitableStrategy'; +import { IndomitableFetchingStrategy } from './IndomitableFetchingStrategy'; +import { ThreadStrategyWorker } from '../ipc/ThreadStrategyWorker'; +import { ThreadStrategyData, ThreadStrategyOps } from '../Util'; + +const options = workerData as WorkerData; + +const ipc = new ThreadStrategyWorker(); +const strategy = new IndomitableFetchingStrategy(ipc, options); +const shard = new WebSocketShard(strategy, options.shardId); + +ipc.build(shard); + +for (const event of Object.values(WebSocketShardEvents)) { + // @ts-expect-error: unknown fix + shard.on(event, data => { + const content: ThreadStrategyData = { + op: ThreadStrategyOps.SHARD_EVENT, + event, + data, + shardId: shard.id, + internal: true + }; + ipc.send({ content }) + .catch(() => null); + }); +} + + + + + diff --git a/tsup-config.json b/tsup-config.json index 73d44f4..c66cde7 100644 --- a/tsup-config.json +++ b/tsup-config.json @@ -1,8 +1,9 @@ { "dts": true, - "entry": ["index.ts"], + "entry": ["index.ts", "src/strategy/Thread.ts"], "splitting": false, "sourcemap": true, "clean": true, - "format": ["esm", "cjs"] -} \ No newline at end of file + "format": ["esm", "cjs"], + "shims": true +}