Skip to content

Commit

Permalink
Added 'close-listener' CLI option (#89).
Browse files Browse the repository at this point in the history
Changed the 'reconnect' default to false.
  • Loading branch information
maxsharabayko authored Apr 26, 2024
1 parent bbae166 commit 7e8858d
Show file tree
Hide file tree
Showing 8 changed files with 29 additions and 14 deletions.
5 changes: 3 additions & 2 deletions xtransmit/generate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ void xtransmit::generate::run(const std::vector<std::string>& dst_urls, const co
{
using namespace std::placeholders;
processing_fn_t process_fn = std::bind(run_pipe, _1, cfg, _2);
common_run(dst_urls, cfg, cfg.reconnect, force_break, process_fn);
common_run(dst_urls, cfg, cfg.reconnect, cfg.close_listener, force_break, process_fn);
}

CLI::App* xtransmit::generate::add_subcommand(CLI::App& app, config& cfg, std::vector<std::string>& dst_urls)
Expand All @@ -119,7 +119,8 @@ CLI::App* xtransmit::generate::add_subcommand(CLI::App& app, config& cfg, std::v
sc_generate->add_option("--statsfreq", cfg.stats_freq_ms, fmt::format("Output stats report frequency, ms (default {})", cfg.stats_freq_ms))
->transform(CLI::AsNumberWithUnit(to_ms, CLI::AsNumberWithUnit::CASE_SENSITIVE));
sc_generate->add_flag("--twoway", cfg.two_way, "Both send and receive data");
sc_generate->add_flag("--reconnect", cfg.reconnect, "Reconnect automatically");
sc_generate->add_flag("--reconnect,!--no-reconnect", cfg.reconnect, "Reconnect automatically");
sc_generate->add_flag("--close-listener,!--no-close-listener", cfg.close_listener, "Close listener once connection is established");
sc_generate->add_flag("--enable-metrics", cfg.enable_metrics, "Enable embeding metrics: latency, loss, reordering, jitter, etc.");
sc_generate->add_option("--playback-csv", cfg.playback_csv, "Input CSV file with timestamp of every packet");
sc_generate->add_flag("--spin-wait", cfg.spin_wait, "Use CPU-expensive spin waiting for better sending accuracy");
Expand Down
3 changes: 2 additions & 1 deletion xtransmit/generate.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ struct config : public stats_config
int duration = 0;
int message_size = 1316; ////8 * 1024 * 1024;
bool two_way = false;
bool reconnect = true;
bool reconnect = false;
bool close_listener = false;
bool enable_metrics = false;
bool spin_wait = false;
std::string playback_csv;
Expand Down
4 changes: 2 additions & 2 deletions xtransmit/misc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ shared_sock_t create_connection(const vector<UriParser>& parsed_urls, shared_soc


// Use std::bind to pass the run_pipe function, and bind arguments to it.
void common_run(const vector<string>& urls, const stats_config& cfg, bool reconnect, const atomic_bool& force_break,
void common_run(const vector<string>& urls, const stats_config& cfg, bool reconnect, bool close_listener, const atomic_bool& force_break,
processing_fn_t& processing_fn)
{
if (urls.empty())
Expand Down Expand Up @@ -138,7 +138,7 @@ void common_run(const vector<string>& urls, const stats_config& cfg, bool reconn
}

// Closing a listener socket (if any) will not allow further connections.
if (!reconnect)
if (close_listener)
listening_sock.reset();

if (stats)
Expand Down
13 changes: 9 additions & 4 deletions xtransmit/misc.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,16 @@ typedef std::function<void(shared_sock_t, const std::atomic_bool&)> processing_f
/// @brief Creates stats writer if needed, establishes a connection, and runs `processing_fn`.
/// @param urls a list of URLs to to establish a connection
/// @param cfg
/// @param reconnect
/// @param reconnect whether to reconnect after existing connection was broken
/// @param close_listener whether to close a listener once a connection has been established
/// @param force_break
/// @param processing_fn
void common_run(const std::vector<std::string>& urls, const stats_config& cfg, bool reconnect, const std::atomic_bool& force_break,
processing_fn_t& processing_fn);
/// @param processing_fn
void common_run(const std::vector<std::string>& urls,
const stats_config& cfg,
bool reconnect,
bool close_listener,
const std::atomic_bool& force_break,
processing_fn_t& processing_fn);

/// @brief Create netaddr_any from host and port values.
netaddr_any create_addr(const std::string& host, unsigned short port, int pref_family = AF_UNSPEC);
Expand Down
5 changes: 3 additions & 2 deletions xtransmit/receive.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ void xtransmit::receive::run(const std::vector<std::string>& src_urls,
{
using namespace std::placeholders;
processing_fn_t process_fn = std::bind(run_pipe, _1, cfg, _2);
common_run(src_urls, cfg, cfg.reconnect, force_break, process_fn);
common_run(src_urls, cfg, cfg.reconnect, cfg.close_listener, force_break, process_fn);
}

CLI::App* xtransmit::receive::add_subcommand(CLI::App& app, config& cfg, std::vector<std::string>& src_urls)
Expand All @@ -191,7 +191,8 @@ CLI::App* xtransmit::receive::add_subcommand(CLI::App& app, config& cfg, std::ve
sc_receive->add_option("--statsfreq", cfg.stats_freq_ms, fmt::format("Output stats report frequency, ms (default {})", cfg.stats_freq_ms))
->transform(CLI::AsNumberWithUnit(to_ms, CLI::AsNumberWithUnit::CASE_SENSITIVE));
sc_receive->add_flag("--printmsg", cfg.print_notifications, "Print message to stdout");
sc_receive->add_flag("--reconnect", cfg.reconnect, "Reconnect automatically");
sc_receive->add_flag("--reconnect,!--no-reconnect", cfg.reconnect, "Reconnect automatically");
sc_receive->add_flag("--close-listener,!--no-close-listener", cfg.close_listener, "Close listener once connection is established");
sc_receive->add_flag("--enable-metrics", cfg.enable_metrics, "Enable checking metrics: jitter, latency, etc.");
sc_receive->add_option("--metricsfile", cfg.metrics_file, "Metrics output filename (default stdout)");
sc_receive->add_option("--metricsfreq", cfg.metrics_freq_ms, fmt::format("Metrics report frequency, ms (default {})", cfg.metrics_freq_ms))
Expand Down
3 changes: 2 additions & 1 deletion xtransmit/receive.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ struct config : stats_config
{
bool print_notifications = false; // Print notifications about the messages received
bool send_reply = false;
bool reconnect = true;
bool reconnect = false;
bool close_listener = false;
bool enable_metrics = false;
unsigned metrics_freq_ms = 1000;
std::string metrics_file;
Expand Down
9 changes: 7 additions & 2 deletions xtransmit/route.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,12 @@ void xtransmit::route::run(const vector<string>& src_urls, const vector<string>&

shared_sock_t listening_sock_a; // A shared pointer to store a listening socket for multiple connections.
shared_sock_t listening_sock_b; // A shared pointer to store a listening socket for multiple connections.
shared_sock dst = create_connection(parsed_dst_urls, listening_sock_a);
shared_sock src = create_connection(parsed_src_urls, listening_sock_b);
shared_sock dst = cfg.close_listener
? create_connection(parsed_dst_urls)
: create_connection(parsed_dst_urls, listening_sock_a);;
shared_sock src = cfg.close_listener
? create_connection(parsed_src_urls)
: create_connection(parsed_src_urls, listening_sock_b);;

if (stats)
{
Expand Down Expand Up @@ -127,6 +131,7 @@ CLI::App* xtransmit::route::add_subcommand(CLI::App& app, config& cfg, vector<st
sc_route->add_option("-o,--output", dst_urls, "Destination URIs");
sc_route->add_option("--msgsize", cfg.message_size, "Size of a buffer to receive message payload");
sc_route->add_flag("--bidir", cfg.bidir, "Enable bidirectional transmission");
sc_route->add_flag("--close-listener,!--no-close-listener", cfg.close_listener, "Close listener once connection is established");
sc_route->add_option("--statsfile", cfg.stats_file, "output stats report filename");
sc_route->add_option("--statsformat", cfg.stats_format, "output stats report format (json, csv)");
sc_route->add_option("--statsfreq", cfg.stats_freq_ms, "output stats report frequency (ms)")
Expand Down
1 change: 1 addition & 0 deletions xtransmit/route.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ namespace xtransmit {
{
int message_size = 1456;
bool bidir = false;
bool close_listener = false;
int stats_freq_ms = 0;
std::string stats_file;
std::string stats_format = "csv";
Expand Down

0 comments on commit 7e8858d

Please sign in to comment.