From efc82e19eb24a603fca78e4fa7136f5a8585bb4f Mon Sep 17 00:00:00 2001 From: Maxim Sharabayko Date: Fri, 20 Nov 2020 12:33:54 +0100 Subject: [PATCH] Changed SRT Group stats handling --- xtransmit/generate.cpp | 23 +++-------- xtransmit/receive.cpp | 24 +++-------- xtransmit/srt_socket_group.cpp | 73 +++++++++++++++++++++++++++------- 3 files changed, 71 insertions(+), 49 deletions(-) diff --git a/xtransmit/generate.cpp b/xtransmit/generate.cpp index 0dbb4a9..698e085 100644 --- a/xtransmit/generate.cpp +++ b/xtransmit/generate.cpp @@ -37,13 +37,7 @@ using shared_sock = std::shared_ptr; int srt_gen_listen_callback(void* opaq, SRTSOCKET sock, int hsversion, const struct sockaddr* peeraddr, const char* streamid) { - socket::stats_writer* stats = reinterpret_cast(opaq); - if (stats == nullptr) - return 0; - - // Member sockets are always non-blocking - stats->add_socket(make_shared(sock, false)); - spdlog::trace(LOG_SC_GENERATE "Accepted member socket {}, added to stats.", sock); + spdlog::trace(LOG_SC_GENERATE "Accepted member socket 0x{:X}.", sock); return 0; } @@ -51,18 +45,13 @@ void srt_gen_connect_callback(void* opaq, SRTSOCKET sock, int error, const socka { if (error != SRT_SUCCESS) { - spdlog::warn(LOG_SC_GENERATE "Member socket {} (token {}) connection failed: ({}) {}.", sock, token, error, + spdlog::warn(LOG_SC_GENERATE "Member socket 0x{:X} (token {}) connection failed: ({}) {}.", sock, token, error, srt_strerror(error, 0)); return; } - socket::stats_writer* stats = reinterpret_cast(opaq); - if (stats == nullptr) - return; - - // Member sockets are always non-blocking - stats->add_socket(make_shared(sock, false)); - spdlog::trace(LOG_SC_GENERATE "Member socket {} (token {}) added to stats.", sock, token); + // After SRT v1.4.2 connection callback is no longer called on connection success. + spdlog::trace(LOG_SC_GENERATE "Member socket connected 0x{:X} (token {}).", sock, token); } void run_pipe(shared_sock dst, const config& cfg, const atomic_bool& force_break) @@ -162,12 +151,12 @@ void xtransmit::generate::run(const vector& dst_urls, const config& cfg, const bool accept = s->mode() == socket::srt_group::LISTENER; if (accept) { - s->set_listen_callback(srt_gen_listen_callback, stats.get()); + s->set_listen_callback(srt_gen_listen_callback, nullptr); s->listen(); } else { - s->set_connect_callback(srt_gen_connect_callback, stats.get()); + s->set_connect_callback(srt_gen_connect_callback, nullptr); } connection = accept ? s->accept() : s->connect(); } diff --git a/xtransmit/receive.cpp b/xtransmit/receive.cpp index 60e8d51..d977a12 100644 --- a/xtransmit/receive.cpp +++ b/xtransmit/receive.cpp @@ -141,14 +141,7 @@ void run_pipe(shared_sock src, const config &cfg, const atomic_bool &force_break int srt_listen_callback(void* opaq, SRTSOCKET sock, int hsversion, const struct sockaddr* peeraddr, const char* streamid) { - spdlog::trace(LOG_SC_RECEIVE "Accepted member socket {}.", sock); - socket::stats_writer* stats = reinterpret_cast(opaq); - if (stats == nullptr) - return 0; - - // Member sockets are always non-blocking - stats->add_socket(make_shared(sock, false)); - spdlog::trace(LOG_SC_RECEIVE "Member socket {} added to stats.", sock); + spdlog::trace(LOG_SC_RECEIVE "Accepted member socket 0x{:X}.", sock); return 0; } @@ -156,18 +149,13 @@ void srt_connect_callback(void* opaq, SRTSOCKET sock, int error, const sockaddr* { if (error != SRT_SUCCESS) { - spdlog::warn(LOG_SC_RECEIVE "Member socket {} (token {}) connection failed: ({}) {}.", sock, token, error, + spdlog::warn(LOG_SC_RECEIVE "Member socket 0x{:X} (token {}) connection failed: ({}) {}.", sock, token, error, srt_strerror(error, 0)); return; } - socket::stats_writer* stats = reinterpret_cast(opaq); - if (stats == nullptr) - return; - - // Member sockets are always non-blocking - stats->add_socket(make_shared(sock, false)); - spdlog::trace(LOG_SC_RECEIVE "Member socket {} (token {}) added to stats.", sock, token); + // After SRT v1.4.2 connection callback is no longer called on connection success. + spdlog::trace(LOG_SC_RECEIVE "Member socket connected 0x{:X} (token {}).", sock, token); } void xtransmit::receive::run(const vector &src_urls, const config &cfg, const atomic_bool &force_break) @@ -218,11 +206,11 @@ void xtransmit::receive::run(const vector &src_urls, const config &cfg, socket::srt_group* s = static_cast(socket.get()); const bool accept = s->mode() == socket::srt_group::LISTENER; if (accept) { - s->set_listen_callback(&srt_listen_callback, stats.get()); + s->set_listen_callback(&srt_listen_callback, nullptr); s->listen(); } else { - s->set_connect_callback(&srt_connect_callback, stats.get()); + s->set_connect_callback(&srt_connect_callback, nullptr); } connection = accept ? s->accept() : s->connect(); } diff --git a/xtransmit/srt_socket_group.cpp b/xtransmit/srt_socket_group.cpp index d7bedb3..5203d2b 100644 --- a/xtransmit/srt_socket_group.cpp +++ b/xtransmit/srt_socket_group.cpp @@ -6,6 +6,7 @@ #include "spdlog/spdlog.h" // xtransmit +#include "misc.hpp" // HAS_PUTTIME #include "srt_socket_group.hpp" #include "srt_socket.hpp" @@ -13,12 +14,13 @@ #include "verbose.hpp" #include "socketoptions.hpp" #include "apputil.hpp" +#include "common.h" // SRT library's SockStatusStr(..) using namespace std; using namespace xtransmit; using shared_srt_group = shared_ptr; -#define LOG_SOCK_SRT "SOCKET::SRT_GROUP " +#define LOG_SRT_GROUP "SOCKET::SRT_GROUP " SocketOption::Mode detect_srt_mode(const UriParser& uri) { @@ -48,20 +50,20 @@ SocketOption::Mode validate_srt_group(const vector& urls) { if (url.type() != UriParser::SRT) { - spdlog::error(LOG_SOCK_SRT "URI {} is not SRT.", url.uri()); + spdlog::error(LOG_SRT_GROUP "URI {} is not SRT.", url.uri()); return SocketOption::FAILURE; } const auto mode = detect_srt_mode(url); if (mode <= SocketOption::FAILURE || mode > SocketOption::RENDEZVOUS) { - spdlog::error(LOG_SOCK_SRT "Failed to detect SRT mode for URI {}.", url.uri()); + spdlog::error(LOG_SRT_GROUP "Failed to detect SRT mode for URI {}.", url.uri()); return SocketOption::FAILURE; } if (prev_mode != SocketOption::FAILURE && mode != prev_mode) { - spdlog::error(LOG_SOCK_SRT + spdlog::error(LOG_SRT_GROUP "Failed to match SRT modes for provided URIs. URI {} has mode {}. Previous mode is {}", url.uri(), SocketOption::mode_names[mode], @@ -137,13 +139,13 @@ socket::srt_group::~srt_group() { if (!m_blocking_mode) { - spdlog::debug(LOG_SOCK_SRT "0x{:X} Closing. Releasing epolls", m_bind_socket); + spdlog::debug(LOG_SRT_GROUP "0x{:X} Closing. Releasing epolls", m_bind_socket); if (m_epoll_connect != -1) srt_epoll_release(m_epoll_connect); if (m_epoll_io != -1) srt_epoll_release(m_epoll_io); } - spdlog::debug(LOG_SOCK_SRT "0x{:X} Closing SRT group", m_bind_socket); + spdlog::debug(LOG_SRT_GROUP "0x{:X} Closing SRT group", m_bind_socket); release_targets(); srt_close(m_bind_socket); } @@ -232,7 +234,7 @@ void socket::srt_group::listen() shared_srt_group socket::srt_group::accept() { - // spdlog::debug(LOG_SOCK_SRT "0x{:X} (srt://{}:{:d}) {} Waiting for incoming connection", + // spdlog::debug(LOG_SRT_GROUP "0x{:X} (srt://{}:{:d}) {} Waiting for incoming connection", // m_bind_socket, m_host, m_port, m_blocking_mode ? "SYNC" : "ASYNC"); SRTSOCKET accepted_sock = SRT_INVALID_SOCK; @@ -247,7 +249,7 @@ shared_srt_group socket::srt_group::accept() if (srt_epoll_wait(m_epoll_connect, 0, 0, ready, &len, timeout_ms, 0, 0, 0, 0) == SRT_ERROR) raise_exception("accept::epoll_wait"); - spdlog::trace(LOG_SOCK_SRT "Epoll read-ready sock 0x{:X}, 0x{:X}", ready[0], ready[1]); + spdlog::trace(LOG_SRT_GROUP "Epoll read-ready sock 0x{:X}, 0x{:X}", ready[0], ready[1]); sockaddr_in scl; int sclen = sizeof scl; @@ -289,13 +291,13 @@ void socket::srt_group::raise_exception(const string&& place, SRTSOCKET sock) co const int udt_result = srt_getlasterror(nullptr); const string message = srt_getlasterror_str(); spdlog::debug( - LOG_SOCK_SRT "0x{:X} {} ERROR {} {}", sock != SRT_INVALID_SOCK ? sock : m_bind_socket, place, udt_result, message); + LOG_SRT_GROUP "0x{:X} {} ERROR {} {}", sock != SRT_INVALID_SOCK ? sock : m_bind_socket, place, udt_result, message); throw socket::exception(place + ": " + message); } void socket::srt_group::raise_exception(const string&& place, const string&& reason) const { - spdlog::debug(LOG_SOCK_SRT "0x{:X} {} ERROR {}", m_bind_socket, place, reason); + spdlog::debug(LOG_SRT_GROUP "0x{:X} {} ERROR {}", m_bind_socket, place, reason); throw socket::exception(place + ": " + reason); } @@ -309,7 +311,7 @@ void socket::srt_group::release_targets() shared_srt_group socket::srt_group::connect() { spdlog::debug( - LOG_SOCK_SRT "0x{:X} {} Connecting group to remote srt.", m_bind_socket, m_blocking_mode ? "SYNC" : "ASYNC"); + LOG_SRT_GROUP "0x{:X} {} Connecting group to remote srt.", m_bind_socket, m_blocking_mode ? "SYNC" : "ASYNC"); if (!m_blocking_mode) { @@ -345,7 +347,7 @@ shared_srt_group socket::srt_group::connect() } spdlog::debug( - LOG_SOCK_SRT "0x{:X} {} Group member connected to remote", m_bind_socket, m_blocking_mode ? "SYNC" : "ASYNC"); + LOG_SRT_GROUP "0x{:X} {} Group member connected to remote", m_bind_socket, m_blocking_mode ? "SYNC" : "ASYNC"); release_targets(); return shared_from_this(); } @@ -436,7 +438,7 @@ size_t socket::srt_group::read(const mutable_buffer& buffer, int timeout_ms) if (srt_getlasterror(nullptr) != SRT_EASYNCRCV) raise_exception("read::recv"); - spdlog::warn(LOG_SOCK_SRT "recvmsg returned error 6002: read error, try again"); + spdlog::warn(LOG_SRT_GROUP "recvmsg returned error 6002: read error, try again"); return 0; } @@ -564,9 +566,52 @@ const string socket::srt_group::stats_to_csv(int socketid, const SRT_TRACEBSTATS const string socket::srt_group::statistics_csv(bool print_header) { + //SRT_ASSERT(m_bind_socket != SRT_INVALID_SOCK); SRT_TRACEBSTATS stats; if (SRT_ERROR == srt_bstats(m_bind_socket, &stats, true)) raise_exception("statistics"); - return stats_to_csv(m_bind_socket, stats, print_header); + string csv_stats = stats_to_csv(m_bind_socket, stats, print_header); + + if (print_header) + return csv_stats; + + size_t group_size = 0; + if (srt_group_data(m_bind_socket, NULL, &group_size) != SRT_SUCCESS) + { + // Not throwing an exception as group stats was retrieved. + spdlog::warn(LOG_SRT_GROUP "0x{:X} statistics_csv: Failed to retrieve the number of group members", m_bind_socket); + return csv_stats; + } + + SRT_SOCKGROUPDATA group_data[group_size]; + const int num_members = srt_group_data(m_bind_socket, group_data, &group_size); + if (num_members == SRT_ERROR) + { + // Not throwing an exception as group stats was retrieved. + spdlog::warn(LOG_SRT_GROUP "0x{:X} statistics_csv: Failed to retrieve group data", m_bind_socket); + return csv_stats; + } + + for (int i = 0; i < num_members; ++i) + { + const int id = group_data[i].id; + const SRT_SOCKSTATUS status = group_data[i].sockstate; + + if (group_data[i].sockstate != SRTS_CONNECTED) + { + spdlog::trace(LOG_SRT_GROUP "0x{:X} statistics_csv: Socket state is {}, skipping.", id, srt_logging::SockStatusStr(status)); + continue; + } + + if (SRT_ERROR == srt_bstats(id, &stats, true)) + { + spdlog::warn(LOG_SRT_GROUP "0x{:X} statistics_csv: Failed to retrieve group member stats. {}", id, srt_getlasterror_str()); + break; + } + + csv_stats += stats_to_csv(id, stats, false); + } + + return csv_stats; }