Skip to content
This repository has been archived by the owner on Oct 23, 2024. It is now read-only.

Commit

Permalink
Use tickets for internal connection (#932)
Browse files Browse the repository at this point in the history
* Add connect ticket for RawTransport
  • Loading branch information
starwarfan authored Mar 17, 2021
1 parent 525cabb commit 7c5ccf5
Show file tree
Hide file tree
Showing 17 changed files with 204 additions and 22 deletions.
13 changes: 8 additions & 5 deletions source/agent/InternalConnectionFactory.js
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,15 @@ try {
}

// Wrapper object for sctp-connection and tcp/udp-connection
function InConnection(prot, minport, maxport) {
function InConnection(prot, minport, maxport, ticket) {
var conn = null;
var protocol = "quic";

switch (prot) {
case 'tcp':
case 'udp':
protocol = prot;
conn = new InternalIn(prot, minport, maxport);
conn = new InternalIn(prot, minport, maxport, ticket);
break;
case 'quic':
conn = new quicIO.in(cf, kf);
Expand Down Expand Up @@ -78,7 +78,7 @@ function InConnection(prot, minport, maxport) {
}

// Wrapper object for sctp-connection and tcp/udp-connection
function OutConnection(prot, minport, maxport) {
function OutConnection(prot, minport, maxport, ticket) {
var that = {};
var conn = null;
var protocol = "quic";
Expand Down Expand Up @@ -111,7 +111,7 @@ function OutConnection(prot, minport, maxport) {
} else if (protocol === 'quic') {
conn = new quicIO.out(connectOpt.ip, connectOpt.port);
} else {
conn = new InternalOut(protocol, connectOpt.ip, connectOpt.port);
conn = new InternalOut(protocol, connectOpt.ip, connectOpt.port, ticket);
}
};

Expand Down Expand Up @@ -139,6 +139,7 @@ module.exports = function() {
var prot = internalOpt.protocol;
var minport = internalOpt.minport || 0;
var maxport = internalOpt.maxport || 0;
var ticket = internalOpt.ticket;

if (preparedSet[connId]) {
log.warn('Internal Connection already prepared:', connId);
Expand All @@ -147,7 +148,9 @@ module.exports = function() {
// right call sequence in upper layer.
return preparedSet[connId].connection.getListeningPort();
}
var conn = (direction === 'in')? InConnection(prot, minport, maxport) : OutConnection(prot, minport, maxport);
var conn = (direction === 'in')
? InConnection(prot, minport, maxport, ticket)
: OutConnection(prot, minport, maxport, ticket);

preparedSet[connId] = {connection: conn, direction: direction};
return conn.getListeningPort();
Expand Down
11 changes: 10 additions & 1 deletion source/agent/addons/internalIO/InternalInWrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,23 @@ void InternalIn::New(const FunctionCallbackInfo<Value>& args) {
String::Utf8Value param0(args[0]->ToString());
std::string protocol = std::string(*param0);
unsigned int minPort = 0, maxPort = 0;
std::string ticket;

if (args.Length() >= 3) {
minPort = args[1]->Uint32Value();
maxPort = args[2]->Uint32Value();
}
if (args.Length() > 3) {
String::Utf8Value param3(args[3]->ToString());
ticket = std::string(*param3);
}

InternalIn* obj = new InternalIn();
obj->me = new owt_base::InternalIn(protocol, minPort, maxPort);
if (ticket.empty()) {
obj->me = new owt_base::InternalIn(protocol, minPort, maxPort);
} else {
obj->me = new owt_base::InternalIn(protocol, ticket, minPort, maxPort);
}
obj->src = obj->me;

obj->Wrap(args.This());
Expand Down
11 changes: 10 additions & 1 deletion source/agent/addons/internalIO/InternalOutWrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,18 @@ void InternalOut::New(const v8::FunctionCallbackInfo<v8::Value>& args) {
String::Utf8Value param1(args[1]->ToString());
std::string dest_ip = std::string(*param1);
unsigned int dest_port = args[2]->Uint32Value();
std::string ticket;
if (args.Length() > 3) {
String::Utf8Value param3(args[3]->ToString());
ticket = std::string(*param3);
}

InternalOut* obj = new InternalOut();
obj->me = new owt_base::InternalOut(protocol, dest_ip, dest_port);
if (ticket.empty()) {
obj->me = new owt_base::InternalOut(protocol, dest_ip, dest_port);
} else {
obj->me = new owt_base::InternalOut(protocol, ticket, dest_ip, dest_port);
}
obj->dest = obj->me;

obj->Wrap(args.This());
Expand Down
4 changes: 3 additions & 1 deletion source/agent/analytics/analytics-agent.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,16 @@ class AnalyticsAgent extends BaseAgent {
this.engine = new VideoAnalyzer();

this.flag = 0;
this.ticket = null;
}

// override
createInternalConnection(connectionId, direction, internalOpt) {
internalOpt.minport = global.config.internal.minport;
internalOpt.maxport = global.config.internal.maxport;
this.ticket = internalOpt.ticket;
if (direction == 'in') {
this.engine.emitListenTo(internalOpt.minport,internalOpt.maxport);
this.engine.emitListenTo(internalOpt.minport,internalOpt.maxport, this.ticket);
const portInfo = this.engine.getListeningPort();
// Create internal connection always success
return Promise.resolve({ip: global.config.internal.ip_address, port: portInfo});
Expand Down
3 changes: 2 additions & 1 deletion source/agent/analytics/videoGstPipeline/GstInternalIn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@ static void dump(void* index, uint8_t* buf, int len)
}

DEFINE_LOGGER(GstInternalIn, "GstInternalIn");
GstInternalIn::GstInternalIn(GstAppSrc *data, unsigned int minPort, unsigned int maxPort)
GstInternalIn::GstInternalIn(GstAppSrc *data, unsigned int minPort, unsigned int maxPort, std::string ticket)
{
m_transport.reset(new owt_base::RawTransport<owt_base::TCP>(this));

m_transport->initTicket(ticket);
if (minPort > 0 && minPort <= maxPort) {
m_transport->listenTo(minPort, maxPort);
} else {
Expand Down
2 changes: 1 addition & 1 deletion source/agent/analytics/videoGstPipeline/GstInternalIn.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
class GstInternalIn : public owt_base::RawTransportListener{
DECLARE_LOGGER();
public:
GstInternalIn(GstAppSrc *data, unsigned int minPort = 0, unsigned int maxPort = 0);
GstInternalIn(GstAppSrc *data, unsigned int minPort = 0, unsigned int maxPort = 0, std::string ticket = NULL);
virtual ~GstInternalIn();

unsigned int getListeningPort();
Expand Down
5 changes: 2 additions & 3 deletions source/agent/analytics/videoGstPipeline/VideoGstAnalyzer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -352,10 +352,9 @@ int VideoGstAnalyzer::setPlaying()
return 0;
}

void VideoGstAnalyzer::emitListenTo(int minPort, int maxPort)
{
void VideoGstAnalyzer::emitListenTo(int minPort, int maxPort, std::string ticket) {
ELOG_DEBUG("Listening\n");
m_internalin.reset(new GstInternalIn((GstAppSrc*)source, minPort, maxPort));
m_internalin.reset(new GstInternalIn((GstAppSrc*)source, minPort, maxPort, ticket));
}

void VideoGstAnalyzer::addOutput(int connectionID, owt_base::FrameDestination* out)
Expand Down
2 changes: 1 addition & 1 deletion source/agent/analytics/videoGstPipeline/VideoGstAnalyzer.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class VideoGstAnalyzer : public EventRegistry {
void clearPipeline();
void destroyPipeline();
int getListeningPort();
void emitListenTo(int minPort,int maxPort);
void emitListenTo(int minPort, int maxPort, std::string ticket);
int setPlaying();

int addElementMany();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,17 @@ void VideoGstAnalyzerWrap::emitListenTo(const FunctionCallbackInfo<Value>& args)
mcu::VideoGstAnalyzer* me = obj->me;

unsigned int minPort = 0, maxPort = 0;
std::string ticket;

if (args.Length() >= 3) {
minPort = args[0]->Uint32Value();
maxPort = args[1]->Uint32Value();

me->emitListenTo(minPort,maxPort);
String::Utf8Value param3(args[2]->ToString());
ticket = std::string(*param3);
}
me->emitListenTo(minPort, maxPort, ticket);

}


Expand Down
1 change: 1 addition & 0 deletions source/agent/conference/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
"node-getopt": "*",
"toml": "*",
"mongoose": "^5.9.6",
"uuid": "^8.0.0",
"fraction.js": "^4.0.12"
},
"devDependencies": {
Expand Down
9 changes: 7 additions & 2 deletions source/agent/conference/roomController.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ var makeRPC = require('./makeRPC').makeRPC;
// Logger
var log = logger.getLogger('RoomController');

const { v4 : uuid } = require('uuid');

const {
isVideoFmtCompatible,
isResolutionEqual,
Expand Down Expand Up @@ -40,12 +42,14 @@ module.exports.create = function (spec, on_init_ok, on_init_failed) {
rpcClient = spec.rpcClient,
config = spec.config,
room_id = spec.room,
origin = spec.origin,
origin = spec.origin,
selfRpcId = spec.selfRpcId,
enable_audio_transcoding = config.transcoding && !!config.transcoding.audio,
enable_video_transcoding = config.transcoding && !!config.transcoding.video,
internal_conn_protocol = config.internalConnProtocol;

var internalTicket = uuid().replace(/-/g, '');

/*
mix_views = {
view: {
Expand Down Expand Up @@ -524,7 +528,8 @@ module.exports.create = function (spec, on_init_ok, on_init_failed) {

// Transport protocol for creating internal connection
var internalOpt = {
protocol: internal_conn_protocol
protocol: internal_conn_protocol,
ticket: internalTicket,
};
var from, to, has_published, has_subscribed;

Expand Down
19 changes: 19 additions & 0 deletions source/core/owt_base/InternalIn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,25 @@ InternalIn::InternalIn(const std::string& protocol, unsigned int minPort, unsign
}
}

InternalIn::InternalIn(
const std::string& protocol,
const std::string& ticket,
unsigned int minPort,
unsigned int maxPort)
{
if (protocol == "tcp")
m_transport.reset(new owt_base::RawTransport<TCP>(this));
else
m_transport.reset(new owt_base::RawTransport<UDP>(this, 64 * 1024));

m_transport->initTicket(ticket);
if (minPort > 0 && minPort <= maxPort) {
m_transport->listenTo(minPort, maxPort);
} else {
m_transport->listenTo(0);
}
}

InternalIn::~InternalIn()
{
m_transport->close();
Expand Down
2 changes: 2 additions & 0 deletions source/core/owt_base/InternalIn.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ namespace owt_base {
class InternalIn : public FrameSource, public RawTransportListener {
public:
InternalIn(const std::string& protocol, unsigned int minPort = 0, unsigned int maxPort = 0);
InternalIn(const std::string& protocol, const std::string& ticket,
unsigned int minPort = 0, unsigned int maxPort = 0);
virtual ~InternalIn();

unsigned int getListeningPort();
Expand Down
16 changes: 15 additions & 1 deletion source/core/owt_base/InternalOut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,21 @@ InternalOut::InternalOut(const std::string& protocol, const std::string& dest_ip
m_transport->createConnection(dest_ip, dest_port);
}

InternalOut::InternalOut(
const std::string& protocol,
const std::string& ticket,
const std::string& dest_ip,
unsigned int dest_port)
{
if (protocol == "tcp")
m_transport.reset(new owt_base::RawTransport<TCP>(this));
else
m_transport.reset(new owt_base::RawTransport<UDP>(this));

m_transport->initTicket(ticket);
m_transport->createConnection(dest_ip, dest_port);
}

InternalOut::~InternalOut()
{
m_transport->close();
Expand Down Expand Up @@ -51,6 +66,5 @@ void InternalOut::onTransportData(char* buf, int len)
}
}


} /* namespace owt_base */

2 changes: 2 additions & 0 deletions source/core/owt_base/InternalOut.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ namespace owt_base {
class InternalOut : public FrameDestination, public RawTransportListener {
public:
InternalOut(const std::string& protocol, const std::string& dest_ip, unsigned int dest_port);
InternalOut(const std::string& protocol, const std::string& ticket,
const std::string& dest_ip, unsigned int dest_port);
virtual ~InternalOut();

void onFrame(const Frame&);
Expand Down
Loading

0 comments on commit 7c5ccf5

Please sign in to comment.