diff --git a/src/RoomServerConnection.ts b/src/RoomServerConnection.ts index 07fbeee..3efcb94 100644 --- a/src/RoomServerConnection.ts +++ b/src/RoomServerConnection.ts @@ -58,6 +58,13 @@ export class RoomServerConnection extends EventEmitter { this.emit('close'); } + @skipIfClosed + public drain(timeout: number): void { + logger.debug('drain()'); + + this.notify({ method: 'mediaNodeDrain', data: { timeout } }); + } + @skipIfClosed public handleConnection(): void { logger.debug('addConnection()'); @@ -112,7 +119,10 @@ export class RoomServerConnection extends EventEmitter { @skipIfClosed public notify(notification: SocketMessage): void { logger.debug('notify() [method: %s]', notification.method); - notification.data.load = getCpuLoad(); + + if (!notification.data) notification.data = {}; + + notification.data.load = getCpuLoad(); try { this.connection.notify(notification); @@ -125,11 +135,14 @@ export class RoomServerConnection extends EventEmitter { public async request(request: SocketMessage): Promise { logger.debug('request() [method: %s]', request.method); + if (!request.data) request.data = {}; + request.data.load = getCpuLoad(); + try { return await this.connection.request(request); } catch (error) { logger.error('request() [error: %o]', error); } } -} \ No newline at end of file +} diff --git a/src/interactiveServer.ts b/src/interactiveServer.ts index bdf43fd..59615ef 100644 --- a/src/interactiveServer.ts +++ b/src/interactiveServer.ts @@ -16,6 +16,7 @@ import MediaService from './MediaService'; import { RoomServerConnection } from './RoomServerConnection'; import RoomServer from './RoomServer'; import { Logger } from 'edumeet-common'; +import { cancelDrain, drain } from './server'; const SOCKET_PATH_UNIX = '/tmp/edumeet-media-node.sock'; const SOCKET_PATH_WIN = path.join('\\\\?\\pipe', process.cwd(), 'edumeet-media-node'); @@ -75,8 +76,9 @@ class InteractiveServer { this.log('- h, help : show this message'); this.log('- logLevel level : changes logLevel in all mediasoup Workers'); this.log('- logTags [tag] [tag] : changes logTags in all mediasoup Workers (values separated by space)'); + this.log('- drain [timeout] : drain server with a timeout in seconds (default 3600)'); + this.log('- cancelDrain : cancel server drain'); this.log('- dw, dumpWorkers : dump mediasoup Workers'); - this.log('- dwrs, dumpWebRtcServer [id] : dump mediasoup WebRtcServer with given id (or the latest created one)'); this.log('- dr, dumpRouter [id] : dump mediasoup Router with given id (or the latest created one)'); this.log('- dt, dumpTransport [id] : dump mediasoup Transport with given id (or the latest created one)'); this.log('- dp, dumpProducer [id] : dump mediasoup Producer with given id (or the latest created one)'); @@ -133,6 +135,36 @@ class InteractiveServer { break; } + case 'drain': { + const timeout = params[0] ? parseInt(params[0], 10) : 3600; + + if (typeof timeout !== 'number' || timeout < 0) { + this.error('invalid timeout, draining aborted'); + + break; + } + + const draining = drain(timeout); + + if (!draining) { + this.error('server is already draining, please wait until it exits or cancel the drain first'); + + break; + } + + this.log(`draining server with timeout ${timeout} seconds...`); + + break; + } + + case 'cancelDrain': { + cancelDrain(); + + this.log('drain cancelled'); + + break; + } + case 'dw': case 'dumpWorkers': { for (const worker of workers.values()) { @@ -595,4 +627,4 @@ export const interactiveServer = ( export const interactiveServerAddMediaService = (mediaService: MediaService) => { global.mediaService = mediaService; -}; \ No newline at end of file +}; diff --git a/src/server.ts b/src/server.ts index 68f5d94..30356f0 100644 --- a/src/server.ts +++ b/src/server.ts @@ -52,6 +52,44 @@ const showUsage = () => { logger.debug(' The CPU usage percent limit to start cascading.\n\n'); }; +const roomServerConnections = new Map(); +const roomServers = new Map(); + +let draining = false; +let drainingTimeout: NodeJS.Timeout; +let drainingStarted: number; +let drainingTime: number; + +export const drain = (timeout: number): boolean => { + if (draining) return false; + + draining = true; + + logger.debug('drain()'); + + drainingTimeout = setTimeout(() => { + logger.debug('drain() | closing...'); + + process.exit(0); + }, timeout * 1000); + + drainingTime = timeout * 1000; + drainingStarted = Date.now(); + roomServerConnections.forEach((roomServerConnection) => roomServerConnection.drain(timeout)); + + return true; +}; + +export const cancelDrain = () => { + if (!draining) return; + + logger.debug('cancelDrain()'); + + draining = false; + + clearTimeout(drainingTimeout); +}; + (async () => { const { help, @@ -83,9 +121,6 @@ const showUsage = () => { logger.debug('Starting...', { listenPort, listenHost, ip, announcedIp }); - const roomServerConnections = new Map(); - const roomServers = new Map(); - interactiveServer(roomServerConnections, roomServers); const mediaService = await MediaService.create({ @@ -159,6 +194,20 @@ const showUsage = () => { connection: new IOServerConnection(socket) }); + if (draining) { + logger.debug( + 'socket connection | draining [socketId: %s]', + socket.id + ); + + const remaining = Math.max(0, drainingTime - (Date.now() - drainingStarted)) / 1000; + + roomServerConnection.drain(remaining); + roomServerConnection.close(); + + return; + } + roomServerConnections.set(socket.id, roomServerConnection); roomServerConnection.once('close', () => roomServerConnections.delete(socket.id)); @@ -189,4 +238,4 @@ const showUsage = () => { process.once('SIGTERM', close); logger.debug('Started!'); -})(); \ No newline at end of file +})();