From 7c5ccf5287b9330184a12f2d8aa6bed60ee5cf46 Mon Sep 17 00:00:00 2001 From: ChenLi Date: Wed, 17 Mar 2021 15:07:43 +0800 Subject: [PATCH] Use tickets for internal connection (#932) * Add connect ticket for RawTransport --- source/agent/InternalConnectionFactory.js | 13 ++- .../addons/internalIO/InternalInWrapper.cc | 11 +- .../addons/internalIO/InternalOutWrapper.cc | 11 +- source/agent/analytics/analytics-agent.js | 4 +- .../videoGstPipeline/GstInternalIn.cpp | 3 +- .../videoGstPipeline/GstInternalIn.h | 2 +- .../videoGstPipeline/VideoGstAnalyzer.cc | 5 +- .../videoGstPipeline/VideoGstAnalyzer.h | 2 +- .../videoGstPipeline/VideoGstAnalyzerWrap.cc | 8 +- source/agent/conference/package.json | 1 + source/agent/conference/roomController.js | 9 +- source/core/owt_base/InternalIn.cpp | 19 +++ source/core/owt_base/InternalIn.h | 2 + source/core/owt_base/InternalOut.cpp | 16 ++- source/core/owt_base/InternalOut.h | 2 + source/core/owt_base/RawTransport.cpp | 110 +++++++++++++++++- source/core/owt_base/RawTransport.h | 8 ++ 17 files changed, 204 insertions(+), 22 deletions(-) diff --git a/source/agent/InternalConnectionFactory.js b/source/agent/InternalConnectionFactory.js index b36c1f91d..f4c3ee693 100644 --- a/source/agent/InternalConnectionFactory.js +++ b/source/agent/InternalConnectionFactory.js @@ -39,7 +39,7 @@ try { } // Wrapper object for sctp-connection and tcp/udp-connection -function InConnection(prot, minport, maxport) { +function InConnection(prot, minport, maxport, ticket) { var conn = null; var protocol = "quic"; @@ -47,7 +47,7 @@ function InConnection(prot, minport, maxport) { case 'tcp': case 'udp': protocol = prot; - conn = new InternalIn(prot, minport, maxport); + conn = new InternalIn(prot, minport, maxport, ticket); break; case 'quic': conn = new quicIO.in(cf, kf); @@ -78,7 +78,7 @@ function InConnection(prot, minport, maxport) { } // Wrapper object for sctp-connection and tcp/udp-connection -function OutConnection(prot, minport, maxport) { +function OutConnection(prot, minport, maxport, ticket) { var that = {}; var conn = null; var protocol = "quic"; @@ -111,7 +111,7 @@ function OutConnection(prot, minport, maxport) { } else if (protocol === 'quic') { conn = new quicIO.out(connectOpt.ip, connectOpt.port); } else { - conn = new InternalOut(protocol, connectOpt.ip, connectOpt.port); + conn = new InternalOut(protocol, connectOpt.ip, connectOpt.port, ticket); } }; @@ -139,6 +139,7 @@ module.exports = function() { var prot = internalOpt.protocol; var minport = internalOpt.minport || 0; var maxport = internalOpt.maxport || 0; + var ticket = internalOpt.ticket; if (preparedSet[connId]) { log.warn('Internal Connection already prepared:', connId); @@ -147,7 +148,9 @@ module.exports = function() { // right call sequence in upper layer. return preparedSet[connId].connection.getListeningPort(); } - var conn = (direction === 'in')? InConnection(prot, minport, maxport) : OutConnection(prot, minport, maxport); + var conn = (direction === 'in') + ? InConnection(prot, minport, maxport, ticket) + : OutConnection(prot, minport, maxport, ticket); preparedSet[connId] = {connection: conn, direction: direction}; return conn.getListeningPort(); diff --git a/source/agent/addons/internalIO/InternalInWrapper.cc b/source/agent/addons/internalIO/InternalInWrapper.cc index 6bd7311ac..a5c7e63a1 100644 --- a/source/agent/addons/internalIO/InternalInWrapper.cc +++ b/source/agent/addons/internalIO/InternalInWrapper.cc @@ -37,14 +37,23 @@ void InternalIn::New(const FunctionCallbackInfo& args) { String::Utf8Value param0(args[0]->ToString()); std::string protocol = std::string(*param0); unsigned int minPort = 0, maxPort = 0; + std::string ticket; if (args.Length() >= 3) { minPort = args[1]->Uint32Value(); maxPort = args[2]->Uint32Value(); } + if (args.Length() > 3) { + String::Utf8Value param3(args[3]->ToString()); + ticket = std::string(*param3); + } InternalIn* obj = new InternalIn(); - obj->me = new owt_base::InternalIn(protocol, minPort, maxPort); + if (ticket.empty()) { + obj->me = new owt_base::InternalIn(protocol, minPort, maxPort); + } else { + obj->me = new owt_base::InternalIn(protocol, ticket, minPort, maxPort); + } obj->src = obj->me; obj->Wrap(args.This()); diff --git a/source/agent/addons/internalIO/InternalOutWrapper.cc b/source/agent/addons/internalIO/InternalOutWrapper.cc index c32b36ea1..f0a72096f 100644 --- a/source/agent/addons/internalIO/InternalOutWrapper.cc +++ b/source/agent/addons/internalIO/InternalOutWrapper.cc @@ -36,9 +36,18 @@ void InternalOut::New(const v8::FunctionCallbackInfo& args) { String::Utf8Value param1(args[1]->ToString()); std::string dest_ip = std::string(*param1); unsigned int dest_port = args[2]->Uint32Value(); + std::string ticket; + if (args.Length() > 3) { + String::Utf8Value param3(args[3]->ToString()); + ticket = std::string(*param3); + } InternalOut* obj = new InternalOut(); - obj->me = new owt_base::InternalOut(protocol, dest_ip, dest_port); + if (ticket.empty()) { + obj->me = new owt_base::InternalOut(protocol, dest_ip, dest_port); + } else { + obj->me = new owt_base::InternalOut(protocol, ticket, dest_ip, dest_port); + } obj->dest = obj->me; obj->Wrap(args.This()); diff --git a/source/agent/analytics/analytics-agent.js b/source/agent/analytics/analytics-agent.js index a66d84d75..204d736d0 100644 --- a/source/agent/analytics/analytics-agent.js +++ b/source/agent/analytics/analytics-agent.js @@ -25,14 +25,16 @@ class AnalyticsAgent extends BaseAgent { this.engine = new VideoAnalyzer(); this.flag = 0; + this.ticket = null; } // override createInternalConnection(connectionId, direction, internalOpt) { internalOpt.minport = global.config.internal.minport; internalOpt.maxport = global.config.internal.maxport; + this.ticket = internalOpt.ticket; if (direction == 'in') { - this.engine.emitListenTo(internalOpt.minport,internalOpt.maxport); + this.engine.emitListenTo(internalOpt.minport,internalOpt.maxport, this.ticket); const portInfo = this.engine.getListeningPort(); // Create internal connection always success return Promise.resolve({ip: global.config.internal.ip_address, port: portInfo}); diff --git a/source/agent/analytics/videoGstPipeline/GstInternalIn.cpp b/source/agent/analytics/videoGstPipeline/GstInternalIn.cpp index 1bf7728ad..4eec8a242 100644 --- a/source/agent/analytics/videoGstPipeline/GstInternalIn.cpp +++ b/source/agent/analytics/videoGstPipeline/GstInternalIn.cpp @@ -19,10 +19,11 @@ static void dump(void* index, uint8_t* buf, int len) } DEFINE_LOGGER(GstInternalIn, "GstInternalIn"); -GstInternalIn::GstInternalIn(GstAppSrc *data, unsigned int minPort, unsigned int maxPort) +GstInternalIn::GstInternalIn(GstAppSrc *data, unsigned int minPort, unsigned int maxPort, std::string ticket) { m_transport.reset(new owt_base::RawTransport(this)); + m_transport->initTicket(ticket); if (minPort > 0 && minPort <= maxPort) { m_transport->listenTo(minPort, maxPort); } else { diff --git a/source/agent/analytics/videoGstPipeline/GstInternalIn.h b/source/agent/analytics/videoGstPipeline/GstInternalIn.h index dd14cffd2..bb4612ded 100644 --- a/source/agent/analytics/videoGstPipeline/GstInternalIn.h +++ b/source/agent/analytics/videoGstPipeline/GstInternalIn.h @@ -16,7 +16,7 @@ class GstInternalIn : public owt_base::RawTransportListener{ DECLARE_LOGGER(); public: - GstInternalIn(GstAppSrc *data, unsigned int minPort = 0, unsigned int maxPort = 0); + GstInternalIn(GstAppSrc *data, unsigned int minPort = 0, unsigned int maxPort = 0, std::string ticket = NULL); virtual ~GstInternalIn(); unsigned int getListeningPort(); diff --git a/source/agent/analytics/videoGstPipeline/VideoGstAnalyzer.cc b/source/agent/analytics/videoGstPipeline/VideoGstAnalyzer.cc index 4d8e8af86..3d2b079f6 100644 --- a/source/agent/analytics/videoGstPipeline/VideoGstAnalyzer.cc +++ b/source/agent/analytics/videoGstPipeline/VideoGstAnalyzer.cc @@ -352,10 +352,9 @@ int VideoGstAnalyzer::setPlaying() return 0; } -void VideoGstAnalyzer::emitListenTo(int minPort, int maxPort) -{ +void VideoGstAnalyzer::emitListenTo(int minPort, int maxPort, std::string ticket) { ELOG_DEBUG("Listening\n"); - m_internalin.reset(new GstInternalIn((GstAppSrc*)source, minPort, maxPort)); + m_internalin.reset(new GstInternalIn((GstAppSrc*)source, minPort, maxPort, ticket)); } void VideoGstAnalyzer::addOutput(int connectionID, owt_base::FrameDestination* out) diff --git a/source/agent/analytics/videoGstPipeline/VideoGstAnalyzer.h b/source/agent/analytics/videoGstPipeline/VideoGstAnalyzer.h index 58d4f70be..b7da82ebe 100644 --- a/source/agent/analytics/videoGstPipeline/VideoGstAnalyzer.h +++ b/source/agent/analytics/videoGstPipeline/VideoGstAnalyzer.h @@ -30,7 +30,7 @@ class VideoGstAnalyzer : public EventRegistry { void clearPipeline(); void destroyPipeline(); int getListeningPort(); - void emitListenTo(int minPort,int maxPort); + void emitListenTo(int minPort, int maxPort, std::string ticket); int setPlaying(); int addElementMany(); diff --git a/source/agent/analytics/videoGstPipeline/VideoGstAnalyzerWrap.cc b/source/agent/analytics/videoGstPipeline/VideoGstAnalyzerWrap.cc index cb200a632..a1ba65428 100644 --- a/source/agent/analytics/videoGstPipeline/VideoGstAnalyzerWrap.cc +++ b/source/agent/analytics/videoGstPipeline/VideoGstAnalyzerWrap.cc @@ -97,11 +97,17 @@ void VideoGstAnalyzerWrap::emitListenTo(const FunctionCallbackInfo& args) mcu::VideoGstAnalyzer* me = obj->me; unsigned int minPort = 0, maxPort = 0; + std::string ticket; + if (args.Length() >= 3) { minPort = args[0]->Uint32Value(); maxPort = args[1]->Uint32Value(); - me->emitListenTo(minPort,maxPort); + String::Utf8Value param3(args[2]->ToString()); + ticket = std::string(*param3); + } + me->emitListenTo(minPort, maxPort, ticket); + } diff --git a/source/agent/conference/package.json b/source/agent/conference/package.json index b12b2d93c..a959bc25d 100644 --- a/source/agent/conference/package.json +++ b/source/agent/conference/package.json @@ -7,6 +7,7 @@ "node-getopt": "*", "toml": "*", "mongoose": "^5.9.6", + "uuid": "^8.0.0", "fraction.js": "^4.0.12" }, "devDependencies": { diff --git a/source/agent/conference/roomController.js b/source/agent/conference/roomController.js index fc5a4b545..052a6143a 100644 --- a/source/agent/conference/roomController.js +++ b/source/agent/conference/roomController.js @@ -11,6 +11,8 @@ var makeRPC = require('./makeRPC').makeRPC; // Logger var log = logger.getLogger('RoomController'); +const { v4 : uuid } = require('uuid'); + const { isVideoFmtCompatible, isResolutionEqual, @@ -40,12 +42,14 @@ module.exports.create = function (spec, on_init_ok, on_init_failed) { rpcClient = spec.rpcClient, config = spec.config, room_id = spec.room, - origin = spec.origin, + origin = spec.origin, selfRpcId = spec.selfRpcId, enable_audio_transcoding = config.transcoding && !!config.transcoding.audio, enable_video_transcoding = config.transcoding && !!config.transcoding.video, internal_conn_protocol = config.internalConnProtocol; + var internalTicket = uuid().replace(/-/g, ''); + /* mix_views = { view: { @@ -524,7 +528,8 @@ module.exports.create = function (spec, on_init_ok, on_init_failed) { // Transport protocol for creating internal connection var internalOpt = { - protocol: internal_conn_protocol + protocol: internal_conn_protocol, + ticket: internalTicket, }; var from, to, has_published, has_subscribed; diff --git a/source/core/owt_base/InternalIn.cpp b/source/core/owt_base/InternalIn.cpp index 886be21b1..c627bb803 100644 --- a/source/core/owt_base/InternalIn.cpp +++ b/source/core/owt_base/InternalIn.cpp @@ -20,6 +20,25 @@ InternalIn::InternalIn(const std::string& protocol, unsigned int minPort, unsign } } +InternalIn::InternalIn( + const std::string& protocol, + const std::string& ticket, + unsigned int minPort, + unsigned int maxPort) +{ + if (protocol == "tcp") + m_transport.reset(new owt_base::RawTransport(this)); + else + m_transport.reset(new owt_base::RawTransport(this, 64 * 1024)); + + m_transport->initTicket(ticket); + if (minPort > 0 && minPort <= maxPort) { + m_transport->listenTo(minPort, maxPort); + } else { + m_transport->listenTo(0); + } +} + InternalIn::~InternalIn() { m_transport->close(); diff --git a/source/core/owt_base/InternalIn.h b/source/core/owt_base/InternalIn.h index 8751f0170..b542afcbd 100644 --- a/source/core/owt_base/InternalIn.h +++ b/source/core/owt_base/InternalIn.h @@ -13,6 +13,8 @@ namespace owt_base { class InternalIn : public FrameSource, public RawTransportListener { public: InternalIn(const std::string& protocol, unsigned int minPort = 0, unsigned int maxPort = 0); + InternalIn(const std::string& protocol, const std::string& ticket, + unsigned int minPort = 0, unsigned int maxPort = 0); virtual ~InternalIn(); unsigned int getListeningPort(); diff --git a/source/core/owt_base/InternalOut.cpp b/source/core/owt_base/InternalOut.cpp index e1322416b..8509a3266 100644 --- a/source/core/owt_base/InternalOut.cpp +++ b/source/core/owt_base/InternalOut.cpp @@ -16,6 +16,21 @@ InternalOut::InternalOut(const std::string& protocol, const std::string& dest_ip m_transport->createConnection(dest_ip, dest_port); } +InternalOut::InternalOut( + const std::string& protocol, + const std::string& ticket, + const std::string& dest_ip, + unsigned int dest_port) +{ + if (protocol == "tcp") + m_transport.reset(new owt_base::RawTransport(this)); + else + m_transport.reset(new owt_base::RawTransport(this)); + + m_transport->initTicket(ticket); + m_transport->createConnection(dest_ip, dest_port); +} + InternalOut::~InternalOut() { m_transport->close(); @@ -51,6 +66,5 @@ void InternalOut::onTransportData(char* buf, int len) } } - } /* namespace owt_base */ diff --git a/source/core/owt_base/InternalOut.h b/source/core/owt_base/InternalOut.h index ab4420018..d054814db 100644 --- a/source/core/owt_base/InternalOut.h +++ b/source/core/owt_base/InternalOut.h @@ -13,6 +13,8 @@ namespace owt_base { class InternalOut : public FrameDestination, public RawTransportListener { public: InternalOut(const std::string& protocol, const std::string& dest_ip, unsigned int dest_port); + InternalOut(const std::string& protocol, const std::string& ticket, + const std::string& dest_ip, unsigned int dest_port); virtual ~InternalOut(); void onFrame(const Frame&); diff --git a/source/core/owt_base/RawTransport.cpp b/source/core/owt_base/RawTransport.cpp index 294da0da5..197e4c73c 100644 --- a/source/core/owt_base/RawTransport.cpp +++ b/source/core/owt_base/RawTransport.cpp @@ -13,6 +13,7 @@ using boost::asio::ip::tcp; DEFINE_TEMPLATE_LOGGER(template, RawTransport, "owt.RawTransport"); +static constexpr const uint32_t kMaxTicketLen = 64; static constexpr const char kServerCrt[] = "cert/server.crt"; static constexpr const char kServerKey[] = "cert/server.key"; static constexpr const char kDHParams[] = "cert/dh2048.pem"; @@ -35,6 +36,8 @@ RawTransport::RawTransport(RawTransportListener* listener, size_t initialB , m_listener(listener) , m_receivedBytes(0) , m_ssl(false) + , m_isListener(false) + , m_verified(false) { } @@ -79,12 +82,80 @@ void RawTransport::close() ELOG_DEBUG("Closed"); } +template +bool RawTransport::initTicket(const std::string& ticket) +{ + ELOG_DEBUG("initTicket"); + m_connectTicket = ticket; + if (m_connectTicket.length() > kMaxTicketLen) { + m_connectTicket.resize(kMaxTicketLen); + return false; + } + return true; +} + +template +void RawTransport::sendTicket() +{ + if (!m_verified) { + ELOG_DEBUG("Send ticket"); + int len = m_connectTicket.length(); + TransportData data; + if (m_tag) { + data.buffer.reset(new char[len + 4]); + *(reinterpret_cast(data.buffer.get())) = htonl(len); + memcpy(data.buffer.get() + 4, m_connectTicket.c_str(), len); + data.length = len + 4; + } else { + data.buffer.reset(new char[len]); + memcpy(data.buffer.get(), m_connectTicket.c_str(), len); + data.length = len; + } + boost::lock_guard lock(m_sendQueueMutex); + m_sendQueue.push(data); + assert(m_sendQueue.size() == 1); + doSend(); + m_verified = true; + } +} + +template +void RawTransport::receiveTicket(char* data, int len) +{ + ELOG_DEBUG("Receive ticket"); + std::string receivedTicket(data, len); + + if (m_connectTicket == receivedTicket) { + m_verified = true; + } else { + boost::system::error_code ec; + if (m_socket.tcp.socket) { + m_socket.tcp.socket->shutdown(tcp::socket::shutdown_both, ec); + m_socket.tcp.socket->close(); + } + if (m_socket.ssl.socket) { + m_socket.ssl.socket->shutdown(); + } + if (m_socket.udp.socket) { + m_socket.udp.socket->shutdown(udp::socket::shutdown_both, ec); + m_socket.udp.socket->close(); + } + ELOG_WARN("Wrong connect ticket"); + if (ec) { + ELOG_DEBUG("receiveTicket shutdown error: %s", ec.message().c_str()); + } + } +} + template void RawTransport::createConnection(const std::string& ip, uint32_t port) { if (std::fstream{kServerCrt}) { m_ssl = true; } + if (m_connectTicket.empty()) { + m_verified = true; + } switch (prot) { case TCP: { if (m_ssl) { @@ -177,6 +248,7 @@ void RawTransport::connectHandler(const boost::system::error_code& ec) break; } + sendTicket(); m_listener->onTransportConnected(); receiveData(); } else { @@ -231,6 +303,9 @@ void RawTransport::handshakeHandler(const boost::system::error_code& ec) { if (!ec) { ELOG_DEBUG("Handshake completed"); + if (!m_isListener) { + sendTicket(); + } m_listener->onTransportConnected(); receiveData(); } else { @@ -243,11 +318,15 @@ void RawTransport::handshakeHandler(const boost::system::error_code& ec) template void RawTransport::listenTo(uint32_t port) { + m_isListener = true; if (std::fstream{kServerCrt} && std::fstream{kServerKey} && std::fstream{kDHParams}) { m_ssl = true; } + if (m_connectTicket.empty()) { + m_verified = true; + } switch (prot) { case TCP: { if (m_ssl) { @@ -304,11 +383,15 @@ void RawTransport::listenTo(uint32_t port) template void RawTransport::listenTo(uint32_t minPort, uint32_t maxPort) { + m_isListener = true; if (std::fstream{kServerCrt} && std::fstream{kServerKey} && std::fstream{kDHParams}) { m_ssl = true; } + if (m_connectTicket.empty()) { + m_verified = true; + } switch (prot) { case TCP: { if (m_socket.tcp.socket) { @@ -405,7 +488,11 @@ void RawTransport::readHandler(const boost::system::error_code& ec, std::s if (!ec || ec == boost::asio::error::message_size) { if (!m_tag) { - m_listener->onTransportData(m_receiveData.buffer.get(), bytes); + if (!m_verified && m_isListener) { + receiveTicket(m_receiveData.buffer.get(), bytes); + } else { + m_listener->onTransportData(m_receiveData.buffer.get(), bytes); + } receiveData(); return; } @@ -467,9 +554,12 @@ void RawTransport::readHandler(const boost::system::error_code& ec, std::s } else { unsigned char *p = reinterpret_cast(&(m_receiveData.buffer.get())[4]); ELOG_DEBUG("readHandler(%zu): [%x,%x,%x,%x,%x,%x,%x,%x,%x,%x,%x,%x,%x,%x,%x,%x...%x,%x,%x,%x]", bytes, p[0], p[1], p[2], p[3], p[4], p[5], p[6], p[7], p[8], p[9], p[10], p[11], p[12], p[13], p[14], p[15], p[payloadlen-4], p[payloadlen-3], p[payloadlen-2], p[payloadlen-1]); - m_listener->onTransportData(m_receiveData.buffer.get() + 4, payloadlen); + if (!m_verified && m_isListener) { + receiveTicket(m_receiveData.buffer.get() + 4, payloadlen); + } else { + m_listener->onTransportData(m_receiveData.buffer.get() + 4, payloadlen); + } } - receiveData(); break; default: @@ -512,7 +602,11 @@ void RawTransport::readPacketHandler(const boost::system::error_code& ec, } } else { m_receivedBytes = 0; - m_listener->onTransportData(m_receiveData.buffer.get(), expectedLen); + if (!m_verified && m_isListener) { + receiveTicket(m_receiveData.buffer.get(), expectedLen); + } else { + m_listener->onTransportData(m_receiveData.buffer.get(), expectedLen); + } receiveData(); } break; @@ -633,6 +727,10 @@ void RawTransport::dumpTcpSSLv3Header(const char* buf, int len) template void RawTransport::sendData(const char* buf, int len) { + if (!m_verified) { + return; + } + TransportData data; if (m_tag) { data.buffer.reset(new char[len + 4]); @@ -654,6 +752,10 @@ void RawTransport::sendData(const char* buf, int len) template void RawTransport::sendData(const char* header, int headerLength, const char* payload, int payloadLength) { + if (!m_verified) { + return; + } + TransportData data; if (m_tag) { data.buffer.reset(new char[headerLength + payloadLength + 4]); diff --git a/source/core/owt_base/RawTransport.h b/source/core/owt_base/RawTransport.h index 6ee830e78..920be0a74 100644 --- a/source/core/owt_base/RawTransport.h +++ b/source/core/owt_base/RawTransport.h @@ -43,6 +43,7 @@ class RawTransportInterface { virtual void sendData(const char*, int len) = 0; virtual void sendData(const char* header, int headerLength, const char* payload, int payloadLength) = 0; virtual void close() = 0; + virtual bool initTicket(const std::string& ticket) = 0; virtual unsigned short getListeningPort() = 0; }; @@ -60,6 +61,8 @@ class RawTransport : public RawTransportInterface { void sendData(const char*, int len); void sendData(const char* header, int headerLength, const char* payload, int payloadLength); void close(); + bool initTicket(const std::string& ticket); + unsigned short getListeningPort(); @@ -80,6 +83,8 @@ class RawTransport : public RawTransportInterface { void acceptHandler(const boost::system::error_code&); void handshakeHandler(const boost::system::error_code&); void dumpTcpSSLv3Header(const char*, int len); + void sendTicket(); + void receiveTicket(char*, int len); bool m_isClosing; bool m_tag; @@ -134,6 +139,9 @@ class RawTransport : public RawTransportInterface { RawTransportListener* m_listener; uint32_t m_receivedBytes; bool m_ssl; + bool m_isListener; + bool m_verified; + std::string m_connectTicket; }; } /* namespace owt_base */