Skip to content

Commit

Permalink
Proper support for draining and fail over
Browse files Browse the repository at this point in the history
  • Loading branch information
havfo committed Feb 17, 2024
1 parent 9b4ab92 commit 1956396
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 8 deletions.
17 changes: 15 additions & 2 deletions src/RoomServerConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,13 @@ export class RoomServerConnection extends EventEmitter {
this.emit('close');
}

@skipIfClosed
public drain(timeout: number): void {
logger.debug('drain()');

this.notify({ method: 'mediaNodeDrain', data: { timeout } });
}

@skipIfClosed
public handleConnection(): void {
logger.debug('addConnection()');
Expand Down Expand Up @@ -112,7 +119,10 @@ export class RoomServerConnection extends EventEmitter {
@skipIfClosed
public notify(notification: SocketMessage): void {
logger.debug('notify() [method: %s]', notification.method);
notification.data.load = getCpuLoad();

if (!notification.data) notification.data = {};

notification.data.load = getCpuLoad();

try {
this.connection.notify(notification);
Expand All @@ -125,11 +135,14 @@ export class RoomServerConnection extends EventEmitter {
public async request(request: SocketMessage): Promise<unknown> {
logger.debug('request() [method: %s]', request.method);

if (!request.data) request.data = {};

request.data.load = getCpuLoad();

try {
return await this.connection.request(request);
} catch (error) {
logger.error('request() [error: %o]', error);
}
}
}
}
36 changes: 34 additions & 2 deletions src/interactiveServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import MediaService from './MediaService';
import { RoomServerConnection } from './RoomServerConnection';
import RoomServer from './RoomServer';
import { Logger } from 'edumeet-common';
import { cancelDrain, drain } from './server';

const SOCKET_PATH_UNIX = '/tmp/edumeet-media-node.sock';
const SOCKET_PATH_WIN = path.join('\\\\?\\pipe', process.cwd(), 'edumeet-media-node');
Expand Down Expand Up @@ -75,8 +76,9 @@ class InteractiveServer {
this.log('- h, help : show this message');
this.log('- logLevel level : changes logLevel in all mediasoup Workers');
this.log('- logTags [tag] [tag] : changes logTags in all mediasoup Workers (values separated by space)');
this.log('- drain [timeout] : drain server with a timeout in seconds (default 3600)');
this.log('- cancelDrain : cancel server drain');
this.log('- dw, dumpWorkers : dump mediasoup Workers');
this.log('- dwrs, dumpWebRtcServer [id] : dump mediasoup WebRtcServer with given id (or the latest created one)');
this.log('- dr, dumpRouter [id] : dump mediasoup Router with given id (or the latest created one)');
this.log('- dt, dumpTransport [id] : dump mediasoup Transport with given id (or the latest created one)');
this.log('- dp, dumpProducer [id] : dump mediasoup Producer with given id (or the latest created one)');
Expand Down Expand Up @@ -133,6 +135,36 @@ class InteractiveServer {
break;
}

case 'drain': {
const timeout = params[0] ? parseInt(params[0], 10) : 3600;

if (typeof timeout !== 'number' || timeout < 0) {
this.error('invalid timeout, draining aborted');

break;
}

const draining = drain(timeout);

if (!draining) {
this.error('server is already draining, please wait until it exits or cancel the drain first');

break;
}

this.log(`draining server with timeout ${timeout} seconds...`);

break;
}

case 'cancelDrain': {
cancelDrain();

this.log('drain cancelled');

break;
}

case 'dw':
case 'dumpWorkers': {
for (const worker of workers.values()) {
Expand Down Expand Up @@ -595,4 +627,4 @@ export const interactiveServer = (

export const interactiveServerAddMediaService = (mediaService: MediaService) => {
global.mediaService = mediaService;
};
};
57 changes: 53 additions & 4 deletions src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,44 @@ const showUsage = () => {
logger.debug(' The CPU usage percent limit to start cascading.\n\n');
};

const roomServerConnections = new Map<string, RoomServerConnection>();
const roomServers = new Map<string, RoomServer>();

let draining = false;
let drainingTimeout: NodeJS.Timeout;
let drainingStarted: number;
let drainingTime: number;

export const drain = (timeout: number): boolean => {
if (draining) return false;

draining = true;

logger.debug('drain()');

drainingTimeout = setTimeout(() => {
logger.debug('drain() | closing...');

process.exit(0);
}, timeout * 1000);

drainingTime = timeout * 1000;
drainingStarted = Date.now();
roomServerConnections.forEach((roomServerConnection) => roomServerConnection.drain(timeout));

return true;
};

export const cancelDrain = () => {
if (!draining) return;

logger.debug('cancelDrain()');

draining = false;

clearTimeout(drainingTimeout);
};

(async () => {
const {
help,
Expand Down Expand Up @@ -83,9 +121,6 @@ const showUsage = () => {

logger.debug('Starting...', { listenPort, listenHost, ip, announcedIp });

const roomServerConnections = new Map<string, RoomServerConnection>();
const roomServers = new Map<string, RoomServer>();

interactiveServer(roomServerConnections, roomServers);

const mediaService = await MediaService.create({
Expand Down Expand Up @@ -159,6 +194,20 @@ const showUsage = () => {
connection: new IOServerConnection(socket)
});

if (draining) {
logger.debug(
'socket connection | draining [socketId: %s]',
socket.id
);

const remaining = Math.max(0, drainingTime - (Date.now() - drainingStarted)) / 1000;

roomServerConnection.drain(remaining);
roomServerConnection.close();

return;
}

roomServerConnections.set(socket.id, roomServerConnection);
roomServerConnection.once('close', () =>
roomServerConnections.delete(socket.id));
Expand Down Expand Up @@ -189,4 +238,4 @@ const showUsage = () => {
process.once('SIGTERM', close);

logger.debug('Started!');
})();
})();

0 comments on commit 1956396

Please sign in to comment.