Skip to content

Commit

Permalink
Changed SRT Group stats handling
Browse files Browse the repository at this point in the history
  • Loading branch information
maxsharabayko committed Nov 20, 2020
1 parent 06a795b commit efc82e1
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 49 deletions.
23 changes: 6 additions & 17 deletions xtransmit/generate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,32 +37,21 @@ using shared_sock = std::shared_ptr<socket::isocket>;
int srt_gen_listen_callback(void* opaq, SRTSOCKET sock, int hsversion,
const struct sockaddr* peeraddr, const char* streamid)
{
socket::stats_writer* stats = reinterpret_cast<socket::stats_writer*>(opaq);
if (stats == nullptr)
return 0;

// Member sockets are always non-blocking
stats->add_socket(make_shared<socket::srt>(sock, false));
spdlog::trace(LOG_SC_GENERATE "Accepted member socket {}, added to stats.", sock);
spdlog::trace(LOG_SC_GENERATE "Accepted member socket 0x{:X}.", sock);
return 0;
}

void srt_gen_connect_callback(void* opaq, SRTSOCKET sock, int error, const sockaddr* /*peer*/, int token)
{
if (error != SRT_SUCCESS)
{
spdlog::warn(LOG_SC_GENERATE "Member socket {} (token {}) connection failed: ({}) {}.", sock, token, error,
spdlog::warn(LOG_SC_GENERATE "Member socket 0x{:X} (token {}) connection failed: ({}) {}.", sock, token, error,
srt_strerror(error, 0));
return;
}

socket::stats_writer* stats = reinterpret_cast<socket::stats_writer*>(opaq);
if (stats == nullptr)
return;

// Member sockets are always non-blocking
stats->add_socket(make_shared<socket::srt>(sock, false));
spdlog::trace(LOG_SC_GENERATE "Member socket {} (token {}) added to stats.", sock, token);
// After SRT v1.4.2 connection callback is no longer called on connection success.
spdlog::trace(LOG_SC_GENERATE "Member socket connected 0x{:X} (token {}).", sock, token);
}

void run_pipe(shared_sock dst, const config& cfg, const atomic_bool& force_break)
Expand Down Expand Up @@ -162,12 +151,12 @@ void xtransmit::generate::run(const vector<string>& dst_urls, const config& cfg,
const bool accept = s->mode() == socket::srt_group::LISTENER;
if (accept)
{
s->set_listen_callback(srt_gen_listen_callback, stats.get());
s->set_listen_callback(srt_gen_listen_callback, nullptr);
s->listen();
}
else
{
s->set_connect_callback(srt_gen_connect_callback, stats.get());
s->set_connect_callback(srt_gen_connect_callback, nullptr);
}
connection = accept ? s->accept() : s->connect();
}
Expand Down
24 changes: 6 additions & 18 deletions xtransmit/receive.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -141,33 +141,21 @@ void run_pipe(shared_sock src, const config &cfg, const atomic_bool &force_break
int srt_listen_callback(void* opaq, SRTSOCKET sock, int hsversion,
const struct sockaddr* peeraddr, const char* streamid)
{
spdlog::trace(LOG_SC_RECEIVE "Accepted member socket {}.", sock);
socket::stats_writer* stats = reinterpret_cast<socket::stats_writer*>(opaq);
if (stats == nullptr)
return 0;

// Member sockets are always non-blocking
stats->add_socket(make_shared<socket::srt>(sock, false));
spdlog::trace(LOG_SC_RECEIVE "Member socket {} added to stats.", sock);
spdlog::trace(LOG_SC_RECEIVE "Accepted member socket 0x{:X}.", sock);
return 0;
}

void srt_connect_callback(void* opaq, SRTSOCKET sock, int error, const sockaddr* /*peer*/, int token)
{
if (error != SRT_SUCCESS)
{
spdlog::warn(LOG_SC_RECEIVE "Member socket {} (token {}) connection failed: ({}) {}.", sock, token, error,
spdlog::warn(LOG_SC_RECEIVE "Member socket 0x{:X} (token {}) connection failed: ({}) {}.", sock, token, error,
srt_strerror(error, 0));
return;
}

socket::stats_writer* stats = reinterpret_cast<socket::stats_writer*>(opaq);
if (stats == nullptr)
return;

// Member sockets are always non-blocking
stats->add_socket(make_shared<socket::srt>(sock, false));
spdlog::trace(LOG_SC_RECEIVE "Member socket {} (token {}) added to stats.", sock, token);
// After SRT v1.4.2 connection callback is no longer called on connection success.
spdlog::trace(LOG_SC_RECEIVE "Member socket connected 0x{:X} (token {}).", sock, token);
}

void xtransmit::receive::run(const vector<string> &src_urls, const config &cfg, const atomic_bool &force_break)
Expand Down Expand Up @@ -218,11 +206,11 @@ void xtransmit::receive::run(const vector<string> &src_urls, const config &cfg,
socket::srt_group* s = static_cast<socket::srt_group*>(socket.get());
const bool accept = s->mode() == socket::srt_group::LISTENER;
if (accept) {
s->set_listen_callback(&srt_listen_callback, stats.get());
s->set_listen_callback(&srt_listen_callback, nullptr);
s->listen();
}
else {
s->set_connect_callback(&srt_connect_callback, stats.get());
s->set_connect_callback(&srt_connect_callback, nullptr);
}
connection = accept ? s->accept() : s->connect();
}
Expand Down
73 changes: 59 additions & 14 deletions xtransmit/srt_socket_group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,21 @@
#include "spdlog/spdlog.h"

// xtransmit
#include "misc.hpp" // HAS_PUTTIME
#include "srt_socket_group.hpp"
#include "srt_socket.hpp"

// srt utils
#include "verbose.hpp"
#include "socketoptions.hpp"
#include "apputil.hpp"
#include "common.h" // SRT library's SockStatusStr(..)

using namespace std;
using namespace xtransmit;
using shared_srt_group = shared_ptr<socket::srt_group>;

#define LOG_SOCK_SRT "SOCKET::SRT_GROUP "
#define LOG_SRT_GROUP "SOCKET::SRT_GROUP "

SocketOption::Mode detect_srt_mode(const UriParser& uri)
{
Expand Down Expand Up @@ -48,20 +50,20 @@ SocketOption::Mode validate_srt_group(const vector<UriParser>& urls)
{
if (url.type() != UriParser::SRT)
{
spdlog::error(LOG_SOCK_SRT "URI {} is not SRT.", url.uri());
spdlog::error(LOG_SRT_GROUP "URI {} is not SRT.", url.uri());
return SocketOption::FAILURE;
}

const auto mode = detect_srt_mode(url);
if (mode <= SocketOption::FAILURE || mode > SocketOption::RENDEZVOUS)
{
spdlog::error(LOG_SOCK_SRT "Failed to detect SRT mode for URI {}.", url.uri());
spdlog::error(LOG_SRT_GROUP "Failed to detect SRT mode for URI {}.", url.uri());
return SocketOption::FAILURE;
}

if (prev_mode != SocketOption::FAILURE && mode != prev_mode)
{
spdlog::error(LOG_SOCK_SRT
spdlog::error(LOG_SRT_GROUP
"Failed to match SRT modes for provided URIs. URI {} has mode {}. Previous mode is {}",
url.uri(),
SocketOption::mode_names[mode],
Expand Down Expand Up @@ -137,13 +139,13 @@ socket::srt_group::~srt_group()
{
if (!m_blocking_mode)
{
spdlog::debug(LOG_SOCK_SRT "0x{:X} Closing. Releasing epolls", m_bind_socket);
spdlog::debug(LOG_SRT_GROUP "0x{:X} Closing. Releasing epolls", m_bind_socket);
if (m_epoll_connect != -1)
srt_epoll_release(m_epoll_connect);
if (m_epoll_io != -1)
srt_epoll_release(m_epoll_io);
}
spdlog::debug(LOG_SOCK_SRT "0x{:X} Closing SRT group", m_bind_socket);
spdlog::debug(LOG_SRT_GROUP "0x{:X} Closing SRT group", m_bind_socket);
release_targets();
srt_close(m_bind_socket);
}
Expand Down Expand Up @@ -232,7 +234,7 @@ void socket::srt_group::listen()

shared_srt_group socket::srt_group::accept()
{
// spdlog::debug(LOG_SOCK_SRT "0x{:X} (srt://{}:{:d}) {} Waiting for incoming connection",
// spdlog::debug(LOG_SRT_GROUP "0x{:X} (srt://{}:{:d}) {} Waiting for incoming connection",
// m_bind_socket, m_host, m_port, m_blocking_mode ? "SYNC" : "ASYNC");

SRTSOCKET accepted_sock = SRT_INVALID_SOCK;
Expand All @@ -247,7 +249,7 @@ shared_srt_group socket::srt_group::accept()
if (srt_epoll_wait(m_epoll_connect, 0, 0, ready, &len, timeout_ms, 0, 0, 0, 0) == SRT_ERROR)
raise_exception("accept::epoll_wait");

spdlog::trace(LOG_SOCK_SRT "Epoll read-ready sock 0x{:X}, 0x{:X}", ready[0], ready[1]);
spdlog::trace(LOG_SRT_GROUP "Epoll read-ready sock 0x{:X}, 0x{:X}", ready[0], ready[1]);

sockaddr_in scl;
int sclen = sizeof scl;
Expand Down Expand Up @@ -289,13 +291,13 @@ void socket::srt_group::raise_exception(const string&& place, SRTSOCKET sock) co
const int udt_result = srt_getlasterror(nullptr);
const string message = srt_getlasterror_str();
spdlog::debug(
LOG_SOCK_SRT "0x{:X} {} ERROR {} {}", sock != SRT_INVALID_SOCK ? sock : m_bind_socket, place, udt_result, message);
LOG_SRT_GROUP "0x{:X} {} ERROR {} {}", sock != SRT_INVALID_SOCK ? sock : m_bind_socket, place, udt_result, message);
throw socket::exception(place + ": " + message);
}

void socket::srt_group::raise_exception(const string&& place, const string&& reason) const
{
spdlog::debug(LOG_SOCK_SRT "0x{:X} {} ERROR {}", m_bind_socket, place, reason);
spdlog::debug(LOG_SRT_GROUP "0x{:X} {} ERROR {}", m_bind_socket, place, reason);
throw socket::exception(place + ": " + reason);
}

Expand All @@ -309,7 +311,7 @@ void socket::srt_group::release_targets()
shared_srt_group socket::srt_group::connect()
{
spdlog::debug(
LOG_SOCK_SRT "0x{:X} {} Connecting group to remote srt.", m_bind_socket, m_blocking_mode ? "SYNC" : "ASYNC");
LOG_SRT_GROUP "0x{:X} {} Connecting group to remote srt.", m_bind_socket, m_blocking_mode ? "SYNC" : "ASYNC");

if (!m_blocking_mode)
{
Expand Down Expand Up @@ -345,7 +347,7 @@ shared_srt_group socket::srt_group::connect()
}

spdlog::debug(
LOG_SOCK_SRT "0x{:X} {} Group member connected to remote", m_bind_socket, m_blocking_mode ? "SYNC" : "ASYNC");
LOG_SRT_GROUP "0x{:X} {} Group member connected to remote", m_bind_socket, m_blocking_mode ? "SYNC" : "ASYNC");
release_targets();
return shared_from_this();
}
Expand Down Expand Up @@ -436,7 +438,7 @@ size_t socket::srt_group::read(const mutable_buffer& buffer, int timeout_ms)
if (srt_getlasterror(nullptr) != SRT_EASYNCRCV)
raise_exception("read::recv");

spdlog::warn(LOG_SOCK_SRT "recvmsg returned error 6002: read error, try again");
spdlog::warn(LOG_SRT_GROUP "recvmsg returned error 6002: read error, try again");
return 0;
}

Expand Down Expand Up @@ -564,9 +566,52 @@ const string socket::srt_group::stats_to_csv(int socketid, const SRT_TRACEBSTATS

const string socket::srt_group::statistics_csv(bool print_header)
{
//SRT_ASSERT(m_bind_socket != SRT_INVALID_SOCK);
SRT_TRACEBSTATS stats;
if (SRT_ERROR == srt_bstats(m_bind_socket, &stats, true))
raise_exception("statistics");

return stats_to_csv(m_bind_socket, stats, print_header);
string csv_stats = stats_to_csv(m_bind_socket, stats, print_header);

if (print_header)
return csv_stats;

size_t group_size = 0;
if (srt_group_data(m_bind_socket, NULL, &group_size) != SRT_SUCCESS)
{
// Not throwing an exception as group stats was retrieved.
spdlog::warn(LOG_SRT_GROUP "0x{:X} statistics_csv: Failed to retrieve the number of group members", m_bind_socket);
return csv_stats;
}

SRT_SOCKGROUPDATA group_data[group_size];
const int num_members = srt_group_data(m_bind_socket, group_data, &group_size);
if (num_members == SRT_ERROR)
{
// Not throwing an exception as group stats was retrieved.
spdlog::warn(LOG_SRT_GROUP "0x{:X} statistics_csv: Failed to retrieve group data", m_bind_socket);
return csv_stats;
}

for (int i = 0; i < num_members; ++i)
{
const int id = group_data[i].id;
const SRT_SOCKSTATUS status = group_data[i].sockstate;

if (group_data[i].sockstate != SRTS_CONNECTED)
{
spdlog::trace(LOG_SRT_GROUP "0x{:X} statistics_csv: Socket state is {}, skipping.", id, srt_logging::SockStatusStr(status));
continue;
}

if (SRT_ERROR == srt_bstats(id, &stats, true))
{
spdlog::warn(LOG_SRT_GROUP "0x{:X} statistics_csv: Failed to retrieve group member stats. {}", id, srt_getlasterror_str());
break;
}

csv_stats += stats_to_csv(id, stats, false);
}

return csv_stats;
}

0 comments on commit efc82e1

Please sign in to comment.