Skip to content

Commit

Permalink
feat: refactored concurrency handling & ipc messages + new strategy f…
Browse files Browse the repository at this point in the history
…or @discordjs/ws (#5)

* feat: refactor ipc & new worker strategy

* chore: add a space

* fix: remove unused import

* fix: add thread file as an entry

* fix: filename import

* fix: use join instead

* fix: enable shims

* fix: worker path

* feat: improve ipc inheritance and code further
  • Loading branch information
Deivu authored Jul 29, 2023
1 parent a0ff1e9 commit 0776dc1
Show file tree
Hide file tree
Showing 18 changed files with 551 additions and 198 deletions.
3 changes: 1 addition & 2 deletions index.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
export * from './src/client/ShardClient';
export * from './src/client/ShardClientUtil';
export * from './src/concurrency/ConcurrencyClient';
export * from './src/concurrency/ConcurrencyManager';
export * from './src/ipc/BaseIpc';
export * from './src/ipc/Main';
export * from './src/ipc/MainWorker';
export * from './src/ipc/BaseWorker';
export * from './src/ipc/ClientWorker';
export * from './src/manager/ClusterManager';
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
"build": "npm run build:ts && npm run build:docs",
"build:ts": "tsup --config tsup-config.json",
"build:docs": "typedoc --theme default --readme README.md --out docs/ --entryPointStrategy expand src/.",
"lint": "eslint --fix --ext .ts",
"lint": "eslint . --ext .ts --fix",
"prepare": "npm run build:ts"
},
"keywords": [
Expand Down
8 changes: 4 additions & 4 deletions src/Indomitable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ import {
FetchSessions,
MakeAbortableRequest,
AbortableData,
ClientEvents,
InternalEvents,
InternalOps,
InternalOpsData,
LibraryEvents,
Message,
SessionObject,
Expand Down Expand Up @@ -370,8 +370,8 @@ export class Indomitable extends EventEmitter {
* @internal
*/
private async destroyClusterClient(id: number): Promise<void> {
const content: InternalEvents = {
op: ClientEvents.DESTROY_CLIENT,
const content: InternalOpsData = {
op: InternalOps.DESTROY_CLIENT,
data: {},
internal: true
};
Expand Down
78 changes: 68 additions & 10 deletions src/Util.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import Https, { RequestOptions } from 'node:https';
import { WebSocketShardEvents } from '@discordjs/ws';

/**
* Hoisted Environmental Variable for ease of fetching
Expand All @@ -11,22 +12,51 @@ export const EnvProcessData = {
};

/**
* Events for internal use
* Internal operation codes for the cluster -> thread
*/
export enum ClientEvents {
export enum MainStrategyOps {
CONNECT = 'connect',
DESTROY = 'destroy',
SEND = 'send',
STATUS = 'status',
RECONNECT = 'reconnect'
}

/**
* Internal operation codes for the thread <- cluster
*/
export enum ThreadStrategyOps {
REQUEST_IDENTIFY = 'requestIdentify',
CANCEL_IDENTIFY = 'cancelIdentify',
SHARD_EVENT = 'shardEvent',
RETRIEVE_SESSION = 'retrieveSession',
UPDATE_SESSION = 'updateSession'
}

/**
* Internal operation codes
*/
export enum InternalOps {
EVAL = 'eval',
RESTART = 'restart',
RESTART_ALL = 'restartAll',
DESTROY_CLIENT = 'destroyClient',
REQUEST_IDENTIFY = 'requestIdentify',
CANCEL_IDENTIFY = 'cancelIdentify',
SESSION_INFO = 'sessionInfo',
PING = 'ping'
}

/**
* Events for internal use
*/
export enum ClientEvents {
READY = 'ready',
PING = 'ping',
SHARD_READY = 'shardReady',
SHARD_RECONNECT = 'shardReconnect',
SHARD_RESUME = 'shardResume',
SHARD_DISCONNECT = 'shardDisconnect'
SHARD_DISCONNECT = 'shardDisconnect',
ERROR = 'ERROR'
}

/**
Expand All @@ -52,24 +82,52 @@ export enum LibraryEvents {
*/
export enum RawIpcMessageType {
MESSAGE = 'message',
RESPONSE = 'response'
RESPONSE = 'response',
ERROR = 'error'
}

/**
* Type for raw ipc messages of cluster -> thread
*/
export interface MainStrategyData {
op: MainStrategyOps,
data: any,
internal: true
}

/**
* Type for raw ipc messages of cluster <- thread
*/
export interface ThreadStrategyData {
op: ThreadStrategyOps,
event: WebSocketShardEvents,
data: any,
shardId: number,
internal: true
}

/**
* Data structure representing an internal event
*/
export interface InternalEvents {
op: ClientEvents,
export interface InternalOpsData {
op: InternalOps,
data: any,
internal: true
}

/**
* Data structure representing an internal discord.js event
*/
export interface ClientEventData {
op: ClientEvents,
data: any,
internal: true,
}

/**
* Data structure representing an internal error
*/
export interface InternalError {
internal: true;
error: true;
export interface IpcErrorData {
name: string;
reason: string;
stack: string;
Expand Down
21 changes: 8 additions & 13 deletions src/client/ShardClient.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import type { Client, ClientOptions as DiscordJsClientOptions } from 'discord.js';
import { Indomitable } from '../Indomitable';
import { EnvProcessData, ClientEvents, InternalEvents, LibraryEvents } from '../Util';
import { ConcurrencyClient } from '../concurrency/ConcurrencyClient.js';
import { EnvProcessData, ClientEvents, ClientEventData } from '../Util';
import { IndomitableStrategy } from '../strategy/IndomitableStrategy';
import { ShardClientUtil } from './ShardClientUtil';
import { BaseWorker } from '../ipc/BaseWorker';

Expand All @@ -19,19 +19,14 @@ export class ShardClient {
const clientOptions = manager.clientOptions as DiscordJsClientOptions || {};
clientOptions.shards = EnvProcessData.shardIds;
clientOptions.shardCount = EnvProcessData.shardCount;
if (manager.handleConcurrency) {
if (!clientOptions.ws) clientOptions.ws = {};
clientOptions.ws.buildStrategy = ws => new IndomitableStrategy(ws, new BaseWorker(manager));
}
this.client = new manager.client(clientOptions);
// @ts-expect-error: Override shard client util with indomitable shard client util
this.client.shard = new ShardClientUtil(this.client, manager);
this.clusterId = Number(EnvProcessData.clusterId);
// pseudo code for the meantime until discord.js merges https://github.com/discordjs/discord.js/pull/9728
if (manager.handleConcurrency) {
const concurrencyClient = new ConcurrencyClient(new BaseWorker(manager));
// @ts-expect-error: Private function variable of @discordjs/ws manager
if (this.client.ws._ws) {
// @ts-expect-error: Override build identify throttler
this.client.ws._ws.options.buildIdentifyThrottler = () => Promise.resolve(concurrencyClient);
}
}
}

public async start(token: string): Promise<void> {
Expand All @@ -48,9 +43,9 @@ export class ShardClient {
private send(partial: PartialInternalEvents): void {
// @ts-ignore -- our own class
const shardClientUtil = this.client.shard as ShardClientUtil;
const content: InternalEvents = { ...partial, internal: true };
const content: ClientEventData = { ...partial, internal: true };
shardClientUtil
.send({ content, repliable: false })
.catch((error: unknown) => this.client.emit(LibraryEvents.ERROR, error as Error));
.catch((error: unknown) => this.client.emit(ClientEvents.ERROR, error as Error));
}
}
24 changes: 12 additions & 12 deletions src/client/ShardClientUtil.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ import {
EnvProcessData,
MakeAbortableRequest,
AbortableData,
ClientEvents,
InternalEvents,
InternalOps,
InternalOpsData,
Message,
SessionObject,
Transportable
Expand Down Expand Up @@ -49,8 +49,8 @@ export class ShardClientUtil extends EventEmitter {
* @returns A promise that resolves to delay in nanoseconds
*/
public async ping(): Promise<number> {
const content: InternalEvents = {
op: ClientEvents.PING,
const content: InternalOpsData = {
op: InternalOps.PING,
data: {},
internal: true
};
Expand All @@ -64,8 +64,8 @@ export class ShardClientUtil extends EventEmitter {
* @returns A promise that resolves to an array of code results
*/
public broadcastEval(script: Function, context: any = {}): Promise<unknown[]> {
const content: InternalEvents = {
op: ClientEvents.EVAL,
const content: InternalOpsData = {
op: InternalOps.EVAL,
data: `(${script.toString()})(this, ${JSON.stringify(context)})`,
internal: true
};
Expand All @@ -86,8 +86,8 @@ export class ShardClientUtil extends EventEmitter {
* @returns A session object
*/
public fetchSessions(update: boolean = false): Promise<SessionObject> {
const content: InternalEvents = {
op: ClientEvents.SESSION_INFO,
const content: InternalOpsData = {
op: InternalOps.SESSION_INFO,
data: { update },
internal: true
};
Expand All @@ -99,8 +99,8 @@ export class ShardClientUtil extends EventEmitter {
* @returns A promise that resolves to void
*/
public restart(clusterId: number): Promise<undefined> {
const content: InternalEvents = {
op: ClientEvents.RESTART,
const content: InternalOpsData = {
op: InternalOps.RESTART,
data: { clusterId },
internal: true
};
Expand All @@ -112,8 +112,8 @@ export class ShardClientUtil extends EventEmitter {
* @returns A promise that resolves to void
*/
public restartAll(): Promise<undefined> {
const content: InternalEvents = {
op: ClientEvents.RESTART_ALL,
const content: InternalOpsData = {
op: InternalOps.RESTART_ALL,
data: {},
internal: true
};
Expand Down
48 changes: 0 additions & 48 deletions src/concurrency/ConcurrencyClient.ts

This file was deleted.

Loading

0 comments on commit 0776dc1

Please sign in to comment.