diff --git a/doc/servermd/StreamAPI.md b/doc/servermd/StreamAPI.md
index 8783d7989..f66589b53 100644
--- a/doc/servermd/StreamAPI.md
+++ b/doc/servermd/StreamAPI.md
@@ -19,7 +19,7 @@ Because it's REST, management clients can be implemented by different programmin
To enable stream API, add experimental targets `stream-service` and `customized-agent` during packing. `stream-service` is the module that provide stream related API. `customized-agent` is the module that provide server side customization for stream related API.
After packing, the stream API configuration is in stream_service/service.toml.
-Edit portal/portal.toml, set `stream_engine_name` to the same value of stream API configuration.
+Edit portal/portal.toml, set `stream_engine_name` to the same value of stream API configuration(`service.name` or `scheduler.name` in stream_service/service.toml).
Edit management_api/management_api.toml, set `stream_engine` and `control_agent` to the same values of stream API configuration.
Start OWT service with updated configuration, stream API should be enabled.
@@ -236,7 +236,9 @@ request body:
object(ListQuery):
{
- KEY: VALUE // Specified key-value pair for query result, such as `{name: "default"}`
+ query: {
+ KEY: VALUE // Specified key-value pair for query result, such as `{name: "default"}`
+ }
}
response body:
@@ -273,7 +275,9 @@ request body:
type: string(publishType), // E.g, "streaming", "video", ...
participant: string(participantId), // Or use domain name as participant ID.
media: object(MediaTrack) | object(MediaInfo),
- info: object(TypeSpecificInfo)
+ info: object(TypeSpecificInfo),
+ connection: object(ConnectionInfo) | undefined, // For "streaming"
+ processor: string(processorId) | undefined, // For "audio", "video"
}
object(MediaTrack) { // For WebRTC publications
tracks: [ object(TrackInfo) ],
@@ -287,6 +291,11 @@ request body:
parameters: object(VideoParameters)
}
}
+ object(ConnectionInfo) { // For streaming publications
+ url: string(streamingUrl),
+ transportProtocol: "tcp" | "udp",
+ bufferSize: number(bufferSize),
+ },
For *object(TrackInfo)*, refers to [tracks in MediaOptions](../Client-Portal%20Protocol.md#331-participant-joins-a-room).
For *format* and *parameters*, refers to [REST API](RESTAPI.md#53-streams-StreamAPIsection53).
@@ -367,7 +376,9 @@ request body:
object(ListQuery):
{
- KEY: VALUE // Specified key-value pair for query result, such as `{name: "default"}`
+ query: {
+ KEY: VALUE // Specified key-value pair for query result, such as `{name: "default"}`
+ }
}
response body:
@@ -404,7 +415,9 @@ request body:
type: string(subscribeType), // E.g, "streaming", "video", ...
participant: string(participantId), // Or use domain name as participant ID.
media: object(MediaTrack) | object(MediaInfo),
- info: object(TypeSpecificInfo)
+ info: object(TypeSpecificInfo),
+ connection: object(ConnectionInfo) | undefined, // For "streaming", "recording"
+ processor: string(processorId) | undefined, // For "audio", "video", "analytics"
}
object(MediaTrack) { // For WebRTC subscriptions
tracks: [ object(TrackInfo) ],
@@ -413,12 +426,21 @@ request body:
audio: {
from: string(sourceAudioId), // Could be publication ID or source track ID
format: object(AudioFormat),
- },
+ } | boolean(enable),
video: {
from: string(sourceVideoId), // Could be publication ID or source track ID
format: object(VideoFormat),
parameters: object(VideoParameters)
- }
+ } | boolean(enable)
+ }
+ object(ConnectionInfo) {
+ container: "mkv" | "mp4" | undefined, // For "recording"
+ url: string(url) | undefined, // For "streaming"
+ algorithm: string(algorithmName) | undefined, // For "analytics"
+ video: { // For "analytics"
+ format: object(VideoFormat), // Analytics output video format
+ parameters: object(VideoParameters), // Analytics output video parameters
+ } | undefined,
}
For *object(TrackInfo)*, refers to [tracks in MediaOptions](../Client-Portal%20Protocol.md#331-participant-joins-a-room).
@@ -497,7 +519,9 @@ request body:
object(ListQuery):
{
- KEY: VALUE // Specified key-value pair for query result, such as `{name: "default"}`
+ query: {
+ KEY: VALUE // Specified key-value pair for query result, such as `{name: "default"}`
+ }
}
response body:
@@ -577,6 +601,14 @@ request body:
id: string(analyticsId)
}
+ // For "sip" type processor
+ sip: {
+ server: string(serverHost),
+ user: string(sipUser),
+ password: string(sipPasswd)
+ },
+ stream: string(outgoingSipStream)
+
For *object(Region)*, refers to [REST API](RESTAPI.md#51-rooms-StreamAPIsection51).
response body:
@@ -605,7 +637,109 @@ response body:
**Empty**
-## 5.5 Nodes {#StreamAPIsection5_5}
+## 5.5 Participants {#StreamAPIsection5_5}
+Description:
+Participants represents owner of publications and subscriptions in OWT server.
+
+Resources:
+
+- /v1.1/stream-engine/participants
+- /v1.1/stream-engine/participants/{participantId}
+
+Data Model:
+
+ Object(Participant) {
+ id: string(ParticipantID),
+ domain: string(domainName), // For example, room ID
+ portal: string(portalId),
+ notifying: boolean(notifyOthers), // Notify other participants about join/leave.
+ }
+
+### List Participants {#StreamAPIsection5_5_1}
+**GET ${host}/v1.1/stream-engine/participants**
+**GET ${host}/v1.1/stream-engine/participants/{participantId}**
+
+Description:
+List participants in stream engine.
+
+request body:
+
+| type | content |
+|:-------------|:-------|
+| json | object(ListQuery) |
+
+**Note**: Definition of *ListQuery*.
+
+ object(ListQuery):
+ {
+ query: {
+ KEY: VALUE // Specified key-value pair for query result, such as `{name: "default"}`
+ }
+ }
+
+response body:
+
+| type | content |
+|:-------------|:-------|
+| json | Object(ListResult) |
+
+**Note**: Definition of *ListResult*.
+
+ object(ListResult):
+ {
+ total: number(ListSize),
+ start: number(offsetInList),
+ data: [ object(Participant) ]
+ }
+
+### Create Participant {#StreamAPIsection5_5_2}
+**POST ${host}/v1.1/stream-engine/participants**
+
+Description:
+Create a participant with configuration.
+
+request body:
+
+| type | content |
+|:-------------|:-------|
+| json | object(ParticipantRequest) |
+
+**Note**: Definition of *ParticipantRequest*.
+
+ Object(ParticipantRequest) {
+ id: string(ParticipantID),
+ domain: string(domainName),
+ notifying: boolean(notifyOthers), // Notify other participants about join/leave.
+ }
+
+response body:
+
+| type | content |
+|:-------------|:-------|
+| json | object(IdObject) |
+
+**Note**: Definition of *IdObject*.
+
+ Object(IdObject) {
+ id: string(createdParticipantId)
+ }
+
+### Delete Participant {#StreamAPIsection5_5_3}
+**DELETE ${host}/v1.1/stream-engine/participants/{participantId}**
+
+Description:
+Drop the specified participant, all related publications and subscriptions will be stopped as well.
+
+request body:
+
+ **Empty**
+
+response body:
+
+ **Empty**
+
+
+## 5.6 Nodes {#StreamAPIsection5_6}
Description:
Node represents working process in stream engine.
@@ -624,7 +758,7 @@ Data Model:
streamAddr: {ip: string(host), port: number(port)}
}
-### List Nodes {#StreamAPIsection5_5_1}
+### List Nodes {#StreamAPIsection5_6_1}
**GET ${host}/v1.1/stream-engine/nodes**
Description:
@@ -640,7 +774,9 @@ request body:
object(ListQuery):
{
- KEY: VALUE // Specified key-value pair for query result, such as `{name: "default"}`
+ query: {
+ KEY: VALUE // Specified key-value pair for query result, such as `{name: "default"}`
+ }
}
response body:
diff --git a/source/agent/analytics/index.js b/source/agent/analytics/index.js
index 9ff1ebe7c..3db7467bd 100644
--- a/source/agent/analytics/index.js
+++ b/source/agent/analytics/index.js
@@ -210,6 +210,9 @@ module.exports = function (rpcClient, rpcId, agentId, clusterIp) {
this.connectionclose = () => {
destroyStream(options.controller, newStreamId);
+ // Notify stream engine if needed
+ const data = {id: newStreamId};
+ notifyStatus(options.controller, connectionId, 'onStreamRemoved', data);
}
inputs[connectionId] = true;
@@ -259,11 +262,10 @@ module.exports = function (rpcClient, rpcId, agentId, clusterIp) {
}
}
- // For Stream Engine, onSessionProgress(id, name, data)
+ generateStream(options.controller, newStreamId, streamInfo);
+ // Notify stream engine if needed
streamInfo.id = newStreamId;
- notifyStatus(controller, connectionId, 'onNewStream', streamInfo);
-
- // generateStream(options.controller, newStreamId, streamInfo);
+ notifyStatus(controller, connectionId, 'onStreamAdded', streamInfo);
} catch (e) {
log.error("Parse stream added data with error:", e);
}
diff --git a/source/agent/conference/rpcRequest.js b/source/agent/conference/rpcRequest.js
index 72c774da0..7f9d9dfb5 100644
--- a/source/agent/conference/rpcRequest.js
+++ b/source/agent/conference/rpcRequest.js
@@ -574,10 +574,12 @@ const RpcRequest = function(rpcChannel, listener) {
};
that.addSipNode = function (workerNode) {
- grpcNode[workerNode] = grpcTools.startClient(
- 'sip',
- workerNode
- );
+ if (enableGrpc) {
+ grpcNode[workerNode] = grpcTools.startClient(
+ 'sip',
+ workerNode
+ );
+ }
}
return that;
diff --git a/source/agent/customized/conferenceLite.js b/source/agent/customized/conferenceLite.js
index c23b96122..258d10942 100644
--- a/source/agent/customized/conferenceLite.js
+++ b/source/agent/customized/conferenceLite.js
@@ -55,7 +55,7 @@ class AggregatedStream {
this.streams = [];
this.processor = null;
}
- addSetting(id, setting) {
+ addSetting(id, setting, data) {
const stream = this.streams.find((stream) => {
if (this.type === 'audio') {
return isAudioSettingMatch(stream.setting, setting);
@@ -64,10 +64,12 @@ class AggregatedStream {
}
});
if (!stream) {
- this.streams.push({id, setting});
+ this.streams.push({id, setting, data});
return true;
+ } else {
+ stream.id = id;
+ return false;
}
- return false;
}
getSetting(setting) {
const stream = this.streams.find((stream) => {
@@ -77,7 +79,7 @@ class AggregatedStream {
return isVideoSettingMatch(stream.setting, setting);
}
});
- return stream?.id || null;
+ return stream;
}
getSettings() {
return this.streams.map((stream) => {
@@ -225,15 +227,6 @@ class Conference {
initProms.push(p);
this.videos.set(mixedId, new AggregatedStream(mixedId, 'video'));
}
- const pubReq = {
- id: mixedId,
- type: 'virtual',
- data: mixedPub,
- domain: roomId,
- participant: roomId
- };
- // initProms.push(rpcChannel.makeRPC(
- // STREAM_ENGINE, 'publish', [pubReq]));
this.mixedStreams.set(mixedId, mixedPub);
}
await Promise.all(initProms);
@@ -252,14 +245,7 @@ class Conference {
// Publish from portal
async publish(req) {
log.debug('publish:', req);
- if (req.type === 'virtual') {
- return req;
- }
// Validate request
- if (!this.participants.has(req.participant) &&
- req.participant !== this.roomId) {
- throw new Error('Invalid participant ID');
- }
req.id = req.id || randomUUID().replace(/-/g, '');
if (req.type !== 'video' && req.type !== 'audio') {
req.info = Object.assign(req.info || {}, {
@@ -296,7 +282,18 @@ class Conference {
formatPreference
};
// Check subscribe source
- if (req.type === 'webrtc') {
+ if (req.type !== 'audio' && req.type !== 'video') {
+ if (req.type !== 'webrtc') {
+ req.media.tracks = [];
+ if (req.media.audio) {
+ req.media.audio.type = 'audio';
+ req.media.tracks.push(req.media.audio);
+ }
+ if (req.media.video) {
+ req.media.video.type = 'video';
+ req.media.tracks.push(req.media.video);
+ }
+ }
// Save track from for later linkup
for (const track of req.media.tracks) {
const tmpTrackId = track.id ?? req.id;
@@ -409,54 +406,16 @@ class Conference {
if (from && this.audios.has(from)) {
log.debug('Pending link from:', track.id, from);
const stream = this.audios.get(from);
- const mappedId = stream.getSetting(track);
- if (mappedId) {
- track.from = mappedId;
+ const audioSetting = {format: track.format};
+ const mapped = stream.getSetting(audioSetting);
+ if (mapped) {
+ track.from = await mapped.data;
+ log.debug('Update track audio from:', track.id, track.from);
} else {
- // Generate audio
- if (!stream.processor) {
- if (this.mixedStreams.has(from)) {
- log.error('Audio mixer is not ready');
- throw new Error('Audio mixer is not ready');
- }
- const txReq = {
- type: 'audio',
- id: this.roomId + '-' + from,
- transcoding: {id: from},
- domain: this.roomId,
- participant: this.roomId
- };
- stream.processor = {id: txReq.id};
- stream.processor = await this.rpcChannel.makeRPC(
- STREAM_ENGINE, 'addProcessor', [txReq]);
- // Add input
- const audioOption = {
- from,
- format: stream.getSettings()[0].format,
- }
- const inputReq = {
- type: 'audio',
- id: randomUUID().replace(/-/g, ''),
- media: {audio: audioOption},
- processor: stream.processor.id,
- participant: this.roomId,
- info: {owner: ''}
- };
- await this.rpcChannel.makeRPC(
- STREAM_ENGINE, 'subscribe', [inputReq]);
- }
- const genReq = {
- type: 'audio',
- id: randomUUID().replace(/-/g, ''),
- media: {audio: {format: track.format}},
- processor: stream.processor.id,
- participant: this.roomId,
- info: {owner: '', hidden: true}
- };
- const outputId = await this.rpcChannel.makeRPC(
- STREAM_ENGINE, 'publish', [genReq]);
- stream.addSetting(outputId.id, {format: track.format});
- track.from = outputId.id;
+ const transcode = this._transcodeAudio(
+ stream, from, audioSetting);
+ stream.addSetting(null, audioSetting, transcode);
+ track.from = await transcode;
log.debug('Update track audio from:', track.id, track.from);
}
hasUpdate = true;
@@ -474,59 +433,19 @@ class Conference {
}
if (from && this.videos.has(from)) {
const stream = this.videos.get(from);
- const mappedId = stream.getSetting(track);
- if (mappedId) {
- track.from = mappedId;
+ const videoSetting = {
+ format: track.format,
+ parameters: track.parameters,
+ };
+ const mapped = stream.getSetting(videoSetting);
+ if (mapped) {
+ track.from = await mapped.data;
+ log.debug('Update track video from:', track.id, track.from);
} else {
- // Generate video
- if (!stream.processor) {
- if (this.mixedStreams.has(from)) {
- log.error('Video mixer is not ready');
- throw new Error('Video mixer is not ready');
- }
- const txReq = {
- type: 'video',
- id: this.roomId + '-' + from,
- transcoding: {id: from},
- domain: this.roomId,
- participant: this.roomId,
- };
- stream.processor = {id: txReq.id};
- stream.processor = await this.rpcChannel.makeRPC(
- STREAM_ENGINE, 'addProcessor', [txReq]);
- // Add input
- const videoOption = {
- from,
- format: stream.getSettings()[0].format,
- parameters: stream.getSettings()[0].parameters,
- }
- const inputReq = {
- type: 'video',
- id: randomUUID().replace(/-/g, ''),
- media: {video: videoOption},
- processor: stream.processor.id,
- participant: this.roomId,
- info: {owner: ''}
- };
- await this.rpcChannel.makeRPC(
- STREAM_ENGINE, 'subscribe', [inputReq]);
- }
- const videoSetting = {
- format: track.format,
- parameters: track.parameters,
- };
- const genReq = {
- type: 'video',
- id: randomUUID().replace(/-/g, ''),
- media: {video: videoSetting},
- processor: stream.processor.id,
- participant: this.roomId,
- info: {owner: '', hidden: true}
- };
- const outputId = await this.rpcChannel.makeRPC(
- STREAM_ENGINE, 'publish', [genReq]);
- stream.addSetting(outputId.id, videoSetting);
- track.from = outputId.id;
+ const transcode = this._transcodeVideo(
+ stream, from, videoSetting);
+ stream.addSetting(null, videoSetting, transcode);
+ track.from = await transcode;
log.debug('Update track video from:', track.id, track.from);
}
hasUpdate = true;
@@ -559,6 +478,102 @@ class Conference {
req.id = req.id || randomUUID().replace(/-/g, '');
return req;
}
+
+ async _transcodeAudio(stream, from, audioSetting) {
+ // Generate audio
+ if (!stream.processor) {
+ if (this.mixedStreams.has(from)) {
+ log.error('Audio mixer is not ready');
+ throw new Error('Audio mixer is not ready');
+ }
+ const txReq = {
+ type: 'audio',
+ id: this.roomId + '-' + from,
+ transcoding: {id: from},
+ domain: this.roomId,
+ participant: this.roomId
+ };
+ stream.processor = {id: txReq.id};
+ stream.processor = await this.rpcChannel.makeRPC(
+ STREAM_ENGINE, 'addProcessor', [txReq]);
+ // Add input
+ const audioOption = {
+ from,
+ format: stream.getSettings()[0].format,
+ }
+ const inputReq = {
+ type: 'audio',
+ id: randomUUID().replace(/-/g, ''),
+ media: {audio: audioOption},
+ processor: stream.processor.id,
+ participant: this.roomId,
+ info: {owner: ''}
+ };
+ await this.rpcChannel.makeRPC(
+ STREAM_ENGINE, 'subscribe', [inputReq]);
+ }
+ const genReq = {
+ type: 'audio',
+ id: randomUUID().replace(/-/g, ''),
+ media: {audio: {format: audioSetting.format}},
+ processor: stream.processor.id,
+ participant: this.roomId,
+ info: {owner: '', hidden: true}
+ };
+ const outputId = await this.rpcChannel.makeRPC(
+ STREAM_ENGINE, 'publish', [genReq]);
+ stream.addSetting(outputId.id, audioSetting);
+ log.debug('Transcode audio completed:', stream.id, outputId.id, audioSetting);
+ }
+
+ async _transcodeVideo(stream, from, videoSetting) {
+ // Generate video
+ if (!stream.processor) {
+ if (this.mixedStreams.has(from)) {
+ log.error('Video mixer is not ready');
+ throw new Error('Video mixer is not ready');
+ }
+ const txReq = {
+ type: 'video',
+ id: this.roomId + '-' + from,
+ transcoding: {id: from},
+ domain: this.roomId,
+ participant: this.roomId,
+ };
+ stream.processor = {id: txReq.id};
+ stream.processor = await this.rpcChannel.makeRPC(
+ STREAM_ENGINE, 'addProcessor', [txReq]);
+ // Add input
+ const videoOption = {
+ from,
+ format: stream.getSettings()[0].format,
+ parameters: stream.getSettings()[0].parameters,
+ }
+ const inputReq = {
+ type: 'video',
+ id: randomUUID().replace(/-/g, ''),
+ media: {video: videoOption},
+ processor: stream.processor.id,
+ participant: this.roomId,
+ info: {owner: ''}
+ };
+ await this.rpcChannel.makeRPC(
+ STREAM_ENGINE, 'subscribe', [inputReq]);
+ }
+ const genReq = {
+ type: 'video',
+ id: randomUUID().replace(/-/g, ''),
+ media: {video: videoSetting},
+ processor: stream.processor.id,
+ participant: this.roomId,
+ info: {owner: '', hidden: true}
+ };
+ const outputId = await this.rpcChannel.makeRPC(
+ STREAM_ENGINE, 'publish', [genReq]);
+ stream.addSetting(outputId.id, videoSetting);
+ log.debug('Transcode video completed:', stream.id, outputId.id, videoSetting);
+ return outputId.id;
+ }
}
module.exports.Conference = Conference;
diff --git a/source/agent/sip/index.js b/source/agent/sip/index.js
index 6c773e3f4..9e7ed2d13 100644
--- a/source/agent/sip/index.js
+++ b/source/agent/sip/index.js
@@ -247,6 +247,7 @@ module.exports = function (rpcC, selfRpcId, parentRpcId, clusterWorkerIP) {
var router = new InternalConnectionRouter(global.config.internal);
if (enableGRPC) {
+ erizo.id = null;
const grpcTools = require('./grpcTools');
cluster_name = global.config?.cluster?.grpc_host || 'localhost:10080';
makeRPC = function (_, node, method, args, onOk, onError) {
@@ -713,7 +714,9 @@ module.exports = function (rpcC, selfRpcId, parentRpcId, clusterWorkerIP) {
that.init = function(options, callback) {
log.debug('init SipGateway:', options.sip_server, options.sip_user);
- erizo.id = rpcC.rpcAddress
+ if (!erizo.id) {
+ erizo.id = rpcClient.rpcAddress
+ }
if (typeof options.room_id !== 'string' || options.room_id === '') {
log.error('Invalid room id');
diff --git a/source/agent/sip/log4cxx.properties b/source/agent/sip/log4cxx.properties
index 8f75a46dc..97a9f0900 100644
--- a/source/agent/sip/log4cxx.properties
+++ b/source/agent/sip/log4cxx.properties
@@ -11,6 +11,9 @@ log4j.appender.A1.layout.ConversionPattern=%d - %p: %c - %m%n
# The raw UDP and TCP transports which are used for the connection between the Gateway and AVS.
log4j.logger.owt.RawTransport=INFO
+log4j.logger.owt.TransportSession=INFO
+log4j.logger.owt.TransportServer=INFO
+log4j.logger.owt.TransportClient=INFO
# If the SctpTransport log is set to debug, heavy IO would affact the connections
log4j.logger.owt.SctpTransport=INFO
diff --git a/source/management_api/resource/v1.1/index.js b/source/management_api/resource/v1.1/index.js
index 74ee22b96..17833ee56 100644
--- a/source/management_api/resource/v1.1/index.js
+++ b/source/management_api/resource/v1.1/index.js
@@ -12,6 +12,7 @@ const tokensResource = require('../v1/tokensResource');
const publicationsResource = require('./publicationsResource');
const subscriptionsResource = require('./subscriptionsResource');
const processorsResource = require('./processorsResource');
+const participantsResource = require('./participantsResource');
const routerV1 = require('../v1');
// Stream(including external streaming-in) management
@@ -44,6 +45,11 @@ router.get('/stream-engine/processors/:processors', processorsResource.get);
router.post('/stream-engine/processors', processorsResource.add);
router.delete('/stream-engine/processors/:processor', processorsResource.delete);
+router.get('/stream-engine/participants', participantsResource.getList);
+router.get('/stream-engine/participants/:participant', participantsResource.get);
+router.post('/stream-engine/participants', participantsResource.add);
+router.delete('/stream-engine/participants/:participant', participantsResource.delete);
+
// Same as previous version
router.use(routerV1);
diff --git a/source/management_api/resource/v1.1/participantsResource.js b/source/management_api/resource/v1.1/participantsResource.js
new file mode 100644
index 000000000..061739589
--- /dev/null
+++ b/source/management_api/resource/v1.1/participantsResource.js
@@ -0,0 +1,75 @@
+// Copyright (C) <2021> Intel Corporation
+//
+// SPDX-License-Identifier: Apache-2.0
+
+'use strict';
+const { query } = require('express');
+const e = require('../../errors');
+
+// Logger
+const log = require('../../logger').logger.getLogger('ParticipantsResource');
+const rpc = require('../../rpc/rpc');
+
+const STREAM_SERVICE_ID = global.config.cluster.stream_engine;
+
+function callStreamService(methodName, args, callback) {
+ rpc.callRpc(STREAM_SERVICE_ID, methodName, args, {callback: function(ret) {
+ if (ret === 'timeout' || ret === 'error') {
+ callback(ret);
+ } else {
+ callback(null, ret);
+ }
+ }});
+}
+
+exports.getList = function (req, res, next) {
+ log.debug('Representing participants for service ', req.authData.service._id);
+ const query = req.body?.query || {};
+ callStreamService('getParticipants', [{query}], (err, rets) => {
+ if (err) {
+ next(new e.CloudError('Failed to get participants'));
+ } else {
+ res.send(rets);
+ }
+ });
+};
+
+exports.get = function (req, res, next) {
+ log.debug('Representing publication:', req.params.participant);
+ const query = {id: req.params.participant};
+ callStreamService('getParticipants', [{query}], (err, rets) => {
+ if (err) {
+ next(new e.CloudError('Failed to get participants'));
+ } else {
+ res.send(rets[0]);
+ }
+ });
+};
+
+exports.add = function (req, res, next) {
+ log.debug('Join for service ', req.authData.service._id);
+ const data = {
+ id: req.body?.id,
+ domain: req.body?.domain,
+ portal: req.body?.portal,
+ notifying: req.body?.notifying
+ };
+ callStreamService('join', [data], (err, ret) => {
+ if (err) {
+ next(new e.CloudError('Failed to join'));
+ } else {
+ res.send(ret);
+ }
+ });
+};
+
+exports.delete = function (req, res, next) {
+ log.debug('Leave for service ', req.params.participant, req.authData.service._id);
+ callStreamService('leave', [{id: req.params.participant}], (err, ret) => {
+ if (err) {
+ next(new e.CloudError('Failed to leave'));
+ } else {
+ res.send(ret);
+ }
+ });
+};
diff --git a/source/management_api/resource/v1.1/processorsResource.js b/source/management_api/resource/v1.1/processorsResource.js
index 80c630b71..ed79b6246 100644
--- a/source/management_api/resource/v1.1/processorsResource.js
+++ b/source/management_api/resource/v1.1/processorsResource.js
@@ -3,12 +3,10 @@
// SPDX-License-Identifier: Apache-2.0
'use strict';
-const requestHandler = require('../../requestHandler');
const e = require('../../errors');
// Logger
const log = require('../../logger').logger.getLogger('ProcessorsResource');
-
const rpc = require('../../rpc/rpc');
const STREAM_SERVICE_ID = global.config.cluster.stream_engine;
@@ -24,9 +22,9 @@ function callStreamService(methodName, args, callback) {
}
exports.getList = function (req, res, next) {
- log.debug('Representing processors for domain ', req.params.domain,
- 'and service', req.authData.service._id);
- callStreamService('getProcessors', [{}], (err, pubs) => {
+ log.debug('Representing processors for service', req.authData.service._id);
+ const query = req.body?.query || {};
+ callStreamService('getProcessors', [{query}], (err, pubs) => {
if (err) {
next(new e.CloudError('Failed to get subscriptions'));
} else {
@@ -39,7 +37,7 @@ exports.get = function (req, res, next) {
log.debug('Representing processor:', req.params.processor,
' for domain ', req.params.domain);
const query = {id: req.params.processor};
- callStreamService('getProcessors', [query], (err, pubs) => {
+ callStreamService('getProcessors', [{query}], (err, pubs) => {
if (err) {
next(new e.CloudError('Failed to get processors'));
} else {
diff --git a/source/management_api/resource/v1.1/publicationsResource.js b/source/management_api/resource/v1.1/publicationsResource.js
index 89ab720ca..a7ec6fa07 100644
--- a/source/management_api/resource/v1.1/publicationsResource.js
+++ b/source/management_api/resource/v1.1/publicationsResource.js
@@ -3,12 +3,10 @@
// SPDX-License-Identifier: Apache-2.0
'use strict';
-const requestHandler = require('../../requestHandler');
const e = require('../../errors');
// Logger
const log = require('../../logger').logger.getLogger('PublicationsResource');
-
const rpc = require('../../rpc/rpc');
const STREAM_SERVICE_ID = global.config.cluster.stream_engine;
@@ -24,9 +22,9 @@ function callStreamService(methodName, args, callback) {
}
exports.getList = function (req, res, next) {
- log.debug('Representing publications for domain ', req.params.domain,
- 'and service', req.authData.service._id);
- callStreamService('getPublications', [{}], (err, pubs) => {
+ log.debug('Representing publications for service', req.authData.service._id);
+ const query = req.body?.query || {};
+ callStreamService('getPublications', [{query}], (err, pubs) => {
if (err) {
next(new e.CloudError('Failed to get publications'));
} else {
@@ -39,7 +37,7 @@ exports.get = function (req, res, next) {
log.debug('Representing publication:', req.params.publication,
' for domain ', req.params.domain);
const query = {id: req.params.publication};
- callStreamService('getPublications', [query], (err, pubs) => {
+ callStreamService('getPublications', [{query}], (err, pubs) => {
if (err) {
next(new e.CloudError('Failed to get publications'));
} else {
diff --git a/source/management_api/resource/v1.1/subscriptionsResource.js b/source/management_api/resource/v1.1/subscriptionsResource.js
index ec9f82d08..9c480dc41 100644
--- a/source/management_api/resource/v1.1/subscriptionsResource.js
+++ b/source/management_api/resource/v1.1/subscriptionsResource.js
@@ -3,12 +3,10 @@
// SPDX-License-Identifier: Apache-2.0
'use strict';
-const requestHandler = require('../../requestHandler');
const e = require('../../errors');
// Logger
const log = require('../../logger').logger.getLogger('SubscriptionsResource');
-
const rpc = require('../../rpc/rpc');
const STREAM_SERVICE_ID = global.config.cluster.stream_engine;
@@ -24,9 +22,9 @@ function callStreamService(methodName, args, callback) {
}
exports.getList = function (req, res, next) {
- log.debug('Representing subscriptions for domain ', req.params.domain,
- 'and service', req.authData.service._id);
- callStreamService('getSubscriptions', [{}], (err, pubs) => {
+ log.debug('Representing subscriptions for service', req.authData.service._id);
+ const query = req.body?.query || {};
+ callStreamService('getSubscriptions', [{query}], (err, pubs) => {
if (err) {
next(new e.CloudError('Failed to get subscriptions'));
} else {
@@ -39,7 +37,7 @@ exports.get = function (req, res, next) {
log.debug('Representing subscription:', req.params.subscription,
' for domain ', req.params.domain);
const query = {id: req.params.subscription};
- callStreamService('getSubscriptions', [query], (err, pubs) => {
+ callStreamService('getSubscriptions', [{query}], (err, pubs) => {
if (err) {
next(new e.CloudError('Failed to get subscriptions'));
} else {
diff --git a/source/portal/portal.toml b/source/portal/portal.toml
index f39fa3029..29ce3531b 100644
--- a/source/portal/portal.toml
+++ b/source/portal/portal.toml
@@ -21,7 +21,6 @@ cors = ["*"]
# Setup as GRPC server
#enable_grpc = true
-#customized_controller = "conference2"
#stream_engine_name = "stream-service"
[cluster]
diff --git a/source/sip_portal/sipErizoHelper.js b/source/sip_portal/sipErizoHelper.js
index 9624df92d..69ff066e9 100644
--- a/source/sip_portal/sipErizoHelper.js
+++ b/source/sip_portal/sipErizoHelper.js
@@ -48,7 +48,7 @@ module.exports = function (spec) {
onError(err);
} else {
const addr = result.info.ip + ':' + result.info.grpcPort;
- onOk(addr);
+ onOk({id: addr});
}
});
} else if (method === 'getNode') {
@@ -167,14 +167,13 @@ module.exports = function (spec) {
if (attempt <= 0) {
return on_failed('Failed in scheduling a sip agent.');
}
-
makeRPC(
rpcClient,
cluster,
'schedule',
['sip', for_whom/*FIXME: use room_id as taskId temporarily, should use for_whom instead later.*/, 'preference'/*FIXME: should fill-in actual preference*/, 10 * 1000],
function (result) {
- on_ok({id: result});
+ on_ok(result);
keepTrying = false;
}, function (reason) {
if (keepTrying) {
diff --git a/source/stream_service/controllers.json b/source/stream_service/controllers.json
index 34cdea8d2..ee598901d 100644
--- a/source/stream_service/controllers.json
+++ b/source/stream_service/controllers.json
@@ -19,6 +19,11 @@
"requirePath": "./controllers/streamingController",
"test": {}
},
+ "recording": {
+ "className": "StreamingController",
+ "requirePath": "./controllers/streamingController",
+ "test": {}
+ },
"analytics": {
"className": "AnalyticsController",
"requirePath": "./controllers/analyticsController",
@@ -28,10 +33,5 @@
"className": "QuicController",
"requirePath": "./controllers/quicController",
"test": {}
- },
- "virtual": {
- "className": "VirtualController",
- "requirePath": "./controllers/virtualController",
- "test": {}
}
}
\ No newline at end of file
diff --git a/source/stream_service/controllers/analyticsController.js b/source/stream_service/controllers/analyticsController.js
index f2dfaa52a..450e2ec3a 100644
--- a/source/stream_service/controllers/analyticsController.js
+++ b/source/stream_service/controllers/analyticsController.js
@@ -196,8 +196,8 @@ class AnalyticsController extends TypeController {
onSessionProgress(id, type, data) {
switch(type) {
- case 'onNewStream': {
- log.debug('onNewStream:', id, type, data);
+ case 'onStreamAdded': {
+ log.debug('onStreamAdded:', id, type, data);
const session = this.sessions.get(id);
const outputConfig = {
id: data.id,
@@ -213,6 +213,12 @@ class AnalyticsController extends TypeController {
.catch((e) => log.debug('Failed to create session on progress:', e));
break;
}
+ case 'onStreamRemoved': {
+ log.debug('onStreamRemoved:', id, type, data);
+ this.removeSession(id, 'in', 'onStreamRemoved')
+ .catch((e) => log.debug('Failed to remove session on progress:', e));
+ break;
+ }
default:
log.warn('Unknown progress type:', type);
break;
diff --git a/source/stream_service/controllers/audioController.js b/source/stream_service/controllers/audioController.js
index 045b7b5d4..825775e10 100644
--- a/source/stream_service/controllers/audioController.js
+++ b/source/stream_service/controllers/audioController.js
@@ -63,7 +63,8 @@ class AudioController extends TypeController {
const locality = await this.getWorkerNode(
'audio', audioConfig.domain, audioConfig.id, mediaPreference);
if (audioConfig.mixing) {
- const amixer = new Processor(audioConfig.id, 'amixer', audioConfig);
+ const amixer = new Processor(audioConfig.id, 'audio', audioConfig);
+ amixer.label = 'amixer';
amixer.locality = locality;
amixer.domain = audioConfig.domain;
const mixConfig = audioConfig.mixing;
@@ -78,7 +79,8 @@ class AudioController extends TypeController {
this.processors.set(audioConfig.id, amixer);
return amixer;
} else if (audioConfig.transcoding) {
- const atranscoder = new Processor(audioConfig.id, 'axcoder', audioConfig);
+ const atranscoder = new Processor(audioConfig.id, 'audio', audioConfig);
+ atranscoder.label = 'axcoder';
atranscoder.locality = locality;
atranscoder.domain = audioConfig.domain;
const transcodeConfig = audioConfig.transcoding;
@@ -93,7 +95,8 @@ class AudioController extends TypeController {
this.processors.set(audioConfig.id, atranscoder);
return atranscoder;
} else if (audioConfig.selecting) {
- const aselector = new Processor(audioConfig.id, 'aselector', audioConfig);
+ const aselector = new Processor(audioConfig.id, 'audio', audioConfig);
+ aselector.label = 'aselector';
aselector.locality = locality;
aselector.domain = audioConfig.domain;
const selectorConfig = audioConfig.selecting;
@@ -103,10 +106,15 @@ class AudioController extends TypeController {
// Create publication for active audio streams after return
process.nextTick(() => {
for (const streamId of selectorConfig.activeStreamIds) {
+ const format = {
+ codec: 'opus',
+ sampleRate: 48000,
+ channelNum: 2
+ };
const publication = new Publication(output.id, 'audio', sessionConfig.info);
publication.domain = audioConfig.domain;
publication.locality = locality;
- const audioTrack = {id: streamId, format: {codec: 'opus'}};
+ const audioTrack = {id: streamId, format};
publication.source.audio.push(audioTrack);
aselector.outputs.audio.push(audioTrack);
this.emit('session-established', streamId, publication);
diff --git a/source/stream_service/controllers/rtcController.js b/source/stream_service/controllers/rtcController.js
index 495ebafde..8ca36a3f9 100644
--- a/source/stream_service/controllers/rtcController.js
+++ b/source/stream_service/controllers/rtcController.js
@@ -238,8 +238,7 @@ class RtcController extends TypeController {
throw new Error(`Cannot find track for mute/unmute: ${id}`);
}
} else if (config.operation === 'update') {
- this.emit('session-updated', config.id,
- {type: 'update', data: Subscription.from(config.data)});
+ super.controlSession(direction, config);
} else {
throw new Error(`Operation not supported: ${config.operation}`);
}
diff --git a/source/stream_service/controllers/typeController.js b/source/stream_service/controllers/typeController.js
index bc75e0ab3..24ce56a0e 100644
--- a/source/stream_service/controllers/typeController.js
+++ b/source/stream_service/controllers/typeController.js
@@ -6,6 +6,7 @@
const { EventEmitter } = require('events');
const log = require('../logger').logger.getLogger('TypeController');
+const {Publication, Subscription} = require('../stateTypes')
/* Events
* 'session-established': (id, Publication|Subscription)
@@ -54,6 +55,16 @@ class TypeController extends EventEmitter {
const args = [locality.node, {room: domain, task: taskId}];
return this.makeRPC(locality.agent, 'recycleNode', args);
}
+
+ async controlSession(direction, config) {
+ if (config.operation === 'update') {
+ const data = direction === 'in' ?
+ Publication.from(config.data) : Subscription.from(config.data);
+ this.emit('session-updated', config.id, {type: 'update', data});
+ } else {
+ throw new Error(`Unknown control operation: ${config.operation}`);
+ }
+ }
}
exports.TypeController = TypeController;
diff --git a/source/stream_service/controllers/videoController.js b/source/stream_service/controllers/videoController.js
index e388c6a7e..c1a063022 100644
--- a/source/stream_service/controllers/videoController.js
+++ b/source/stream_service/controllers/videoController.js
@@ -26,6 +26,20 @@ function videoFormatStr(format) {
return str;
}
+function toVideoFormat(str) {
+ const i = str.indexOf(str);
+ if (i < 0) {
+ return {
+ codec: str
+ };
+ } else {
+ return {
+ codec: str.substring(0, i),
+ profile: str.substring(i + 1)
+ };
+ }
+}
+
/* Events
* 'session-established': (id, Publication|Subscription)
* 'session-updated': (id, Publication|Subscription)
@@ -60,13 +74,16 @@ class VideoController extends TypeController {
const locality = await this.getWorkerNode(
'video', videoConfig.domain, videoConfig.id, mediaPreference);
if (videoConfig.mixing) {
- const vmixer = new Processor(videoConfig.id, 'vmixer', videoConfig);
+ const vmixer = new Processor(videoConfig.id, 'video', videoConfig);
+ vmixer.label = 'vmixer';
vmixer.locality = locality;
vmixer.domain = videoConfig.domain;
const mixConfig = videoConfig.mixing;
try {
- await this.makeRPC(locality.node, 'init',
+ const ret = await this.makeRPC(locality.node, 'init',
['mixing', mixConfig, mixConfig.id, this.selfId, mixConfig.view]);
+ log.debug('Mixer init:', ret);
+ vmixer.codecs = ret.codecs;
} catch (e) {
this.recycleWorkerNode(locality, videoConfig.domain, videoConfig.id)
.catch((err) => log.debug('Failed to recycleNode:', err));
@@ -76,13 +93,16 @@ class VideoController extends TypeController {
return vmixer;
} else if (videoConfig.transcoding) {
- const vtranscoder = new Processor(videoConfig.id, 'vxcoder', videoConfig);
+ const vtranscoder = new Processor(videoConfig.id, 'video', videoConfig);
+ vtranscoder.label = 'vxcoder';
vtranscoder.locality = locality;
vtranscoder.domain = videoConfig.domain;
const transcodeConfig = videoConfig.transcoding;
try {
- await this.makeRPC(locality.node, 'init',
- ['transcoding', transcodeConfig, transcodeConfig.id, this.selfId, ''])
+ const ret = await this.makeRPC(locality.node, 'init',
+ ['transcoding', transcodeConfig, transcodeConfig.id, this.selfId, '']);
+ log.debug('Transcoder init:', ret);
+ vtranscoder.codecs = ret.codecs;
} catch (e) {
this.recycleWorkerNode(locality, videoConfig.domain, videoConfig.id)
.catch((err) => log.debug('Failed to recycleNode:', err));
@@ -163,14 +183,24 @@ class VideoController extends TypeController {
// output = {id, resolution, framerate, bitrate, keyFrameInterval}
const output = await this.makeRPC(processor.locality.node, 'generate',
[videoFormatStr(format), resolution, framerate, bitrate, keyFrameInterval]);
+ log.debug('Get output:', output);
sessionConfig.id = output.id;
+ const outputVideo = {
+ format: toVideoFormat(processor.codecs?.encode?.[0]),
+ parameters: {
+ resolution: output.resolution,
+ framerate: output.framerate,
+ bitrate: output.bitrate,
+ keyFrameInterval: output.keyFrameInterval,
+ }
+ }
this.sessions.set(output.id, session);
// Create publication
const publication = new Publication(output.id, 'video', sessionConfig.info);
publication.domain = processor.domain;
publication.locality = processor.locality;
publication.participant = sessionConfig.participant;
- const videoTrack = Object.assign({id: output.id}, sessionConfig.media.video);
+ const videoTrack = Object.assign({id: output.id}, outputVideo);
publication.source.video.push(videoTrack);
processor.outputs.video.push(videoTrack);
const ret = Promise.resolve(output.id);
@@ -183,18 +213,25 @@ class VideoController extends TypeController {
} else {
// Add input
const inputId = sessionConfig.id;
+ if (!sessionConfig.media?.video) {
+ return Promise.reject('No media.video for mixer input');
+ }
+ if (!sessionConfig.media.video.format) {
+ sessionConfig.media.video.format =
+ toVideoFormat(processor.codecs?.decode?.[0]);
+ }
const inputConfig = {
controller: this.selfId,
publisher: sessionConfig.info?.owner || 'common',
video: {
- codec: videoFormatStr(sessionConfig.media?.video?.format)
+ codec: videoFormatStr(sessionConfig.media.video.format)
},
};
await this.makeRPC(processor.locality.node, 'publish',
[inputId, 'internal', inputConfig]);
this.sessions.set(inputId, session);
// Create subscription
- const subscription = new Subscription(inputId, 'video', sessionConfig.info);
+ const subscription = new Subscription(sessionConfig.id, 'video', sessionConfig.info);
subscription.domain = processor.domain;
subscription.locality = processor.locality;
subscription.participant = processor.participant;
@@ -210,18 +247,15 @@ class VideoController extends TypeController {
async removeSession(id, direction, reason) {
if (this.sessions.has(id)) {
- const rpcChannel = this.rpcChannel;
const session = this.sessions.get(id);
const processor = this.processors.get(session.processor);
if (!processor) {
throw new Error(`Processor for ${id} not found`);
}
-
if (session.direction === 'in') {
// Degenerate
- rpcChannel.makeRPC(processor.locality.node, 'degenerate', [id])
+ this.makeRPC(processor.locality.node, 'degenerate', [id])
.catch((e) => log.debug('degenerate:', e));
-
const idx = processor.outputs.video.findIndex((track) => track.id === id);
if (idx >= 0) {
processor.outputs.video.splice(idx, 1);
@@ -232,7 +266,7 @@ class VideoController extends TypeController {
log.debug('session:', session);
// Let cutoff do remove-input
const inputId = session.media?.video?.from;
- rpcChannel.makeRPC(processor.locality.node, 'unpublish', [inputId])
+ this.makeRPC(processor.locality.node, 'unpublish', [inputId])
.catch((e) => log.debug('ignore unpublish callback'));
const idx = processor.inputs.video.findIndex((track) => track.id === id);
if (idx >= 0) {
diff --git a/source/stream_service/dist.json b/source/stream_service/dist.json
index 90c9492dc..6cc34c608 100644
--- a/source/stream_service/dist.json
+++ b/source/stream_service/dist.json
@@ -23,6 +23,7 @@
"../common/rpcChannel.js",
"../common/rpcStarter.js",
"../common/grpcTools.js",
+ "../common/formatUtil.js",
"../protos/protoConfig.json",
"../protos/*.proto",
"../../scripts/release/initauth.js",
diff --git a/source/stream_service/scheduler.js b/source/stream_service/scheduler.js
index 75150c8b9..b6db2ebc4 100644
--- a/source/stream_service/scheduler.js
+++ b/source/stream_service/scheduler.js
@@ -19,6 +19,7 @@ function stringHash(str) {
}
const CHECK_INTERVAL = 60 * 1000; // 1 min
+const MAX_RPC_FAILS = 5;
class ServiceScheduler {
static supportedMethods = [
@@ -37,6 +38,7 @@ class ServiceScheduler {
'removeProcessor',
'getProcessors',
'onSessionSignaling',
+ 'getParticipants',
];
constructor(rpcChannel, stateStores) {
this.rpcChannel = rpcChannel;
@@ -44,6 +46,7 @@ class ServiceScheduler {
this.checkAliveTimer = setInterval(() => {
this.checkService();
}, CHECK_INTERVAL);
+ this.failureCounts = new Map(); // Node => count
}
async scheduleService(req) {
@@ -58,18 +61,24 @@ class ServiceScheduler {
const scheduled = await this.stateStores.read('scheduleMaps', {_id: hash});
if (scheduled) {
log.debug('scheduled:', scheduled);
- return scheduled.node;
- } else {
- const node = serviceNodes[hash % serviceNodes.length].id;
- log.debug('node:', node);
- try {
- const map = {_id: hash, node};
- await this.stateStores.create('scheduleMaps', map);
- } catch (e) {
- log.debug('Failed to update schedule map:', e?.message);
+ if (!serviceNodes.find((node) => (node.id === scheduled.node))) {
+ // Out of date
+ log.debug('Node out of date:', scheduled);
+ this.stateStores.delete('scheduleMaps', {_id: hash})
+ .catch((e) => log.debug('Failed to delete scheduleMaps:', hash));
+ } else {
+ return scheduled.node;
}
- return node;
}
+ const node = serviceNodes[hash % serviceNodes.length].id;
+ log.debug('node:', node);
+ try {
+ const map = {_id: hash, node};
+ await this.stateStores.create('scheduleMaps', map);
+ } catch (e) {
+ log.debug('Failed to update schedule map:', e?.message);
+ }
+ return node;
}
// Check service availability
@@ -78,13 +87,18 @@ class ServiceScheduler {
this.stateStores.readMany('streamEngineNodes', {})
.then(async (ret) => {
const serviceNodes = ret.data || [];
- const req = {id: 'non-existent'};
+ const req = {query: {_id: 'non-existent'}};
for (const service of serviceNodes) {
try {
await this.rpcChannel.makeRPC(
service.id, 'getNodes', [req]);
+ this.failureCounts.delete(service.id);
} catch (e) {
log.warn('Failed to call service node:', service.id);
+ if (!this.failureCounts.has(service.id)) {
+ this.failureCounts.set(service.id, 1);
+ }
+ this._handleCheckFailure(service.id);
}
}
}).catch((e) => {
@@ -97,20 +111,38 @@ class ServiceScheduler {
const self = this;
for (const method of ServiceScheduler.supportedMethods) {
api[method] = async function (req, callback) {
+ let serviceNode = null;
try {
- const serviceNode = await self.scheduleService(req);
+ serviceNode = await self.scheduleService(req);
log.debug('Schedule req:', req, serviceNode);
const ret = await self.rpcChannel.makeRPC(
serviceNode, method, [req]);
log.debug('Schedule ret:', ret, serviceNode);
callback('callback', ret);
} catch (e) {
+ if (serviceNode) {
+ self._handleCheckFailure(serviceNode);
+ }
callback('callback', 'error', e?.message || e);
}
};
}
return api;
}
+
+ _handleCheckFailure(id) {
+ if (!this.failureCounts.has(id)) {
+ return;
+ }
+ let count = this.failureCounts.get(id) + 1;
+ if (count >= MAX_RPC_FAILS) {
+ this.stateStores.delete('streamEngineNodes', {id})
+ .catch((e) => log.info('Fail to clean service node:', id))
+ this.failureCounts.delete(id);
+ } else {
+ this.failureCounts.set(service.id, count);
+ }
+ }
}
module.exports.ServiceScheduler = ServiceScheduler;
diff --git a/source/stream_service/stateTypes.js b/source/stream_service/stateTypes.js
index fa1cc3ff5..5b27a487f 100644
--- a/source/stream_service/stateTypes.js
+++ b/source/stream_service/stateTypes.js
@@ -6,6 +6,8 @@
'use strict';
+const {calcResolution} = require('./formatUtil');
+
// Domain and its controller node
class Domain {
constructor(id, node) {
@@ -139,15 +141,12 @@ class Publication {
}
}
const videoOptional = info.optional?.video;
+ const params = videoOptional?.parameters;
if (videoOptional) {
// Check info optional format
if (Array.isArray(videoOptional.format)) {
optional.video.format = videoOptional.format;
}
- const params = videoOptional.params;
- if (Array.isArray(params?.resolution)) {
- optional.video.parameters.resolution = params.resolution;
- }
if (Array.isArray(params?.bitrate)) {
optional.video.parameters.bitrate = params.bitrate;
}
@@ -158,11 +157,26 @@ class Publication {
optional.video.parameters.keyFrameInterval = params.keyFrameInterval;
}
}
+ const generateResolutions = (base) => {
+ if (Array.isArray(params?.resolution)) {
+ const resolutions = params.resolution.map((res) => {
+ return calcResolution(res, base);
+ }).filter((res) => {
+ return res.width <= base.width && res.height <= base.height;
+ });
+ return resolutions;
+ }
+ return [];
+ };
this.source.audio.forEach((track) => {
tracks.push(Object.assign(
{type: 'audio', optional: optional.audio}, track));
});
this.source.video.forEach((track) => {
+ if (track.parameters?.resolution) {
+ optional.video.parameters.resolution =
+ generateResolutions(track.parameters.resolution);
+ }
tracks.push(Object.assign(
{type: 'video', optional: optional.video}, track));
});
diff --git a/source/stream_service/streamService.js b/source/stream_service/streamService.js
index 7844ee650..39e1a5a8e 100644
--- a/source/stream_service/streamService.js
+++ b/source/stream_service/streamService.js
@@ -259,7 +259,7 @@ function streamEngine(rpcClient) {
// Link subscription tracks to their subscribed source
const linkSubscription = async function (subscription) {
// Linkup
- log.debug('linkSubscription:', JSON.stringify(subscription));
+ log.debug('linkSubscription:', subscription.id);
// SubTrack => SubSource {audio, video, data}
const links = new Map();
const updatePros = [];
@@ -710,6 +710,8 @@ function streamEngine(rpcClient) {
// Ongoing session
const req = publishings.get(id) || subscribings.get(id);
controllers[req.type].onSessionProgress(id, name, data);
+ } else if (controllers[data?.type]) {
+ controllers[data?.type].onSessionProgress(id, name, data);
} else { //
log.warn('Unknown SessionProgress:', id, name, data);
}
@@ -719,7 +721,7 @@ function streamEngine(rpcClient) {
// Interface for portal signaling
that.onSessionSignaling = function (req, callback) {
log.debug('onSessionSignaling:', req);
- const type = (publishings.get(req.id) || subscribings.get(req.id))?.type;
+ const type = req.type || 'webrtc';
controllers[type].onClientTransportSignaling(req.id, req.signaling)
.then(() => {
callback('callback', 'ok');
@@ -772,6 +774,16 @@ function streamEngine(rpcClient) {
callback('callback', 'error', err && err.message);
});
};
+ that.getParticipants = function (filter, callback) {
+ log.debug('getParticipants:', filter, callback);
+ const query = filter?.query || {};
+ stateStores.readMany('participants', query).then((ret) => {
+ callback('callback', ret);
+ }).catch((e) => {
+ log.debug('Get participants error:', e, e?.stack);
+ callback('callback', 'error', e?.message);
+ });
+ };
// Interfaces for publication
that.publish = function(req, callback) {
log.debug('publish:', req.type, req);
@@ -973,7 +985,7 @@ function streamEngine(rpcClient) {
}
const removed = await stateStores.delete('processors', {id: procId});
if (removed) {
- await controllers[req.type].removeProcessor(procId);
+ await controllers[proc.type].removeProcessor(procId);
}
callback('callback', 'ok');
}).catch((err) => {
@@ -1014,6 +1026,7 @@ function streamEngine(rpcClient) {
await stateStores.delete('publications', {});
await stateStores.delete('subscriptions', {});
await stateStores.delete('sourceTracks', {});
+ await stateStores.delete('processors', {});
}
} catch (e) {
log.debug('Clean state stores:', e);