From 85b49010f9b7c3a318fcff1e3af511f5e28783a1 Mon Sep 17 00:00:00 2001 From: Damien Arrachequesne Date: Thu, 22 Feb 2024 08:57:49 +0100 Subject: [PATCH] fix: ensure the order of the commands Before this change, the broadcast() method would send the BROADCAST command and then apply it locally (which is required to retrieve the offset of the message, when connection state recovery is enabled), while the other commands like disconnectSockets() would first apply it locally and then send the command to the other nodes. So, for example: ``` io.emit("bye"); io.disconnectSockets(); ``` In that case, the clients connected on the io instance would not receive the "bye" event, while the clients connected to the other server instances would receive it before being disconnected. --- lib/cluster-adapter.ts | 81 +++++++++++++++++++++++------------------ test/cluster-adapter.ts | 26 ++++++++++++- test/util.ts | 4 ++ 3 files changed, 74 insertions(+), 37 deletions(-) diff --git a/lib/cluster-adapter.ts b/lib/cluster-adapter.ts index cf6baa1..143bb6f 100644 --- a/lib/cluster-adapter.ts +++ b/lib/cluster-adapter.ts @@ -503,55 +503,64 @@ export abstract class ClusterAdapter extends Adapter { super.broadcastWithAck(packet, opts, clientCountCallback, ack); } - override addSockets(opts: BroadcastOptions, rooms: Room[]) { - super.addSockets(opts, rooms); - + override async addSockets(opts: BroadcastOptions, rooms: Room[]) { const onlyLocal = opts.flags?.local; - if (onlyLocal) { - return; + + if (!onlyLocal) { + try { + await this.publishAndReturnOffset({ + type: MessageType.SOCKETS_JOIN, + data: { + opts: encodeOptions(opts), + rooms, + }, + }); + } catch (e) { + debug("[%s] error while publishing message: %s", this.uid, e.message); + } } - this.publish({ - type: MessageType.SOCKETS_JOIN, - data: { - opts: encodeOptions(opts), - rooms, - }, - }); + super.addSockets(opts, rooms); } - override delSockets(opts: BroadcastOptions, rooms: Room[]) { - super.delSockets(opts, rooms); - + override async delSockets(opts: BroadcastOptions, rooms: Room[]) { const onlyLocal = opts.flags?.local; - if (onlyLocal) { - return; + + if (!onlyLocal) { + try { + await this.publishAndReturnOffset({ + type: MessageType.SOCKETS_LEAVE, + data: { + opts: encodeOptions(opts), + rooms, + }, + }); + } catch (e) { + debug("[%s] error while publishing message: %s", this.uid, e.message); + } } - this.publish({ - type: MessageType.SOCKETS_LEAVE, - data: { - opts: encodeOptions(opts), - rooms, - }, - }); + super.delSockets(opts, rooms); } - override disconnectSockets(opts: BroadcastOptions, close: boolean) { - super.disconnectSockets(opts, close); - + override async disconnectSockets(opts: BroadcastOptions, close: boolean) { const onlyLocal = opts.flags?.local; - if (onlyLocal) { - return; + + if (!onlyLocal) { + try { + await this.publishAndReturnOffset({ + type: MessageType.DISCONNECT_SOCKETS, + data: { + opts: encodeOptions(opts), + close, + }, + }); + } catch (e) { + debug("[%s] error while publishing message: %s", this.uid, e.message); + } } - this.publish({ - type: MessageType.DISCONNECT_SOCKETS, - data: { - opts: encodeOptions(opts), - close, - }, - }); + super.disconnectSockets(opts, close); } async fetchSockets(opts: BroadcastOptions): Promise { diff --git a/test/cluster-adapter.ts b/test/cluster-adapter.ts index d5d6fdb..350ecda 100644 --- a/test/cluster-adapter.ts +++ b/test/cluster-adapter.ts @@ -3,7 +3,7 @@ import { Server, Socket as ServerSocket } from "socket.io"; import { io as ioc, Socket as ClientSocket } from "socket.io-client"; import expect = require("expect.js"); import type { AddressInfo } from "net"; -import { times, shouldNotHappen } from "./util"; +import { times, shouldNotHappen, sleep } from "./util"; import { ClusterAdapterWithHeartbeat, type ClusterMessage, @@ -243,6 +243,8 @@ describe("cluster adapter", () => { it("makes all socket instances join the specified room", async () => { servers[0].socketsJoin("room1"); + await sleep(); + expect(serverSockets[0].rooms.has("room1")).to.be(true); expect(serverSockets[1].rooms.has("room1")).to.be(true); expect(serverSockets[2].rooms.has("room1")).to.be(true); @@ -254,6 +256,8 @@ describe("cluster adapter", () => { servers[0].in("room1").socketsJoin("room2"); + await sleep(); + expect(serverSockets[0].rooms.has("room2")).to.be(true); expect(serverSockets[1].rooms.has("room2")).to.be(false); expect(serverSockets[2].rooms.has("room2")).to.be(true); @@ -275,6 +279,8 @@ describe("cluster adapter", () => { servers[0].socketsLeave("room1"); + await sleep(); + expect(serverSockets[0].rooms.has("room1")).to.be(false); expect(serverSockets[1].rooms.has("room1")).to.be(false); expect(serverSockets[2].rooms.has("room1")).to.be(false); @@ -287,6 +293,8 @@ describe("cluster adapter", () => { servers[0].in("room1").socketsLeave("room2"); + await sleep(); + expect(serverSockets[0].rooms.has("room2")).to.be(false); expect(serverSockets[1].rooms.has("room2")).to.be(false); expect(serverSockets[2].rooms.has("room2")).to.be(true); @@ -318,6 +326,22 @@ describe("cluster adapter", () => { servers[0].disconnectSockets(true); }); + + it("sends a packet before all socket instances disconnect", (done) => { + const partialDone = times(3, done); + + clientSockets.forEach((clientSocket) => { + clientSocket.on("disconnect", shouldNotHappen(done)); + + clientSocket.on("bye", () => { + clientSocket.off("disconnect"); + clientSocket.on("disconnect", partialDone); + }); + }); + + servers[0].emit("bye"); + servers[0].disconnectSockets(true); + }); }); describe("fetchSockets", () => { diff --git a/test/util.ts b/test/util.ts index 8109ef1..ecdf6df 100644 --- a/test/util.ts +++ b/test/util.ts @@ -13,3 +13,7 @@ export function times(count: number, fn: () => void) { export function shouldNotHappen(done) { return () => done(new Error("should not happen")); } + +export function sleep() { + return new Promise((resolve) => process.nextTick(resolve)); +}