Skip to content

Commit

Permalink
print network status
Browse files Browse the repository at this point in the history
  • Loading branch information
cnbatch committed Jun 2, 2024
1 parent c947498 commit db1f8df
Show file tree
Hide file tree
Showing 14 changed files with 304 additions and 55 deletions.
2 changes: 2 additions & 0 deletions sln/udphop/udphop.vcxproj
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
<ClCompile Include="..\..\src\main.cpp" />
<ClCompile Include="..\..\src\networks\client.cpp" />
<ClCompile Include="..\..\src\networks\connections.cpp" />
<ClCompile Include="..\..\src\networks\relay.cpp" />
<ClCompile Include="..\..\src\networks\server.cpp" />
<ClCompile Include="..\..\src\networks\stun.cpp" />
<ClCompile Include="..\..\src\shares\configurations.cpp" />
Expand All @@ -35,6 +36,7 @@
<ClInclude Include="..\..\src\3rd_party\thread_pool.hpp" />
<ClInclude Include="..\..\src\networks\client.hpp" />
<ClInclude Include="..\..\src\networks\connections.hpp" />
<ClInclude Include="..\..\src\networks\relay.hpp" />
<ClInclude Include="..\..\src\networks\server.hpp" />
<ClInclude Include="..\..\src\networks\stun.hpp" />
<ClInclude Include="..\..\src\shares\aead.hpp" />
Expand Down
6 changes: 6 additions & 0 deletions sln/udphop/udphop.vcxproj.filters
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@
<ClCompile Include="..\..\src\3rd_party\fecpp_ssse3.cpp">
<Filter>Source Files\3rd_party</Filter>
</ClCompile>
<ClCompile Include="..\..\src\networks\relay.cpp">
<Filter>Source Files\networks</Filter>
</ClCompile>
</ItemGroup>
<ItemGroup>
<ClInclude Include="..\..\src\shares\share_defines.hpp">
Expand Down Expand Up @@ -101,5 +104,8 @@
<ClInclude Include="..\..\src\3rd_party\fecpp.hpp">
<Filter>Header Files\3rd_party</Filter>
</ClInclude>
<ClInclude Include="..\..\src\networks\relay.hpp">
<Filter>Header Files\networks</Filter>
</ClInclude>
</ItemGroup>
</Project>
26 changes: 22 additions & 4 deletions src/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,23 @@

int main(int argc, char *argv[])
{
#ifdef __cpp_lib_format
std::cout << std::format("{} version 20240602\n", app_name);
if (argc <= 1)
{
printf("%.*s version 20240316\n", (int)app_name.length(), app_name.data());
printf("Usage: %.*s config1.conf\n", (int)app_name.length(), app_name.data());
printf(" %.*s config1.conf config2.conf...\n", (int)app_name.length(), app_name.data());
std::cout << std::format("Usage: {} config1.conf\n", app_name);
std::cout << std::format(" {} config1.conf config2.conf...\n", (int)app_name.length(), app_name.data());
return 0;
}
#else
std::cout << app_name << " version 20240602\n";
if (argc <= 1)
{
std::cout << "Usage: " << app_name << " config1.conf\n";
std::cout << " " << app_name << " config1.conf config2.conf...\n";
return 0;
}
#endif

constexpr size_t task_count_limit = 8192u;
uint16_t thread_group_count = 1;
Expand Down Expand Up @@ -63,10 +73,18 @@ int main(int argc, char *argv[])
std::back_inserter(lines));

std::vector<std::string> error_msg;
profile_settings.emplace_back(parse_from_args(lines, error_msg));
user_settings current_settings = parse_from_args(lines, error_msg);
std::filesystem::path config_input_name = argv[i];
current_settings.config_filename = argv[i];
current_settings.log_status = current_settings.log_directory / (config_input_name.filename().string() + "_status.log");
profile_settings.emplace_back(std::move(current_settings));
if (error_msg.size() > 0)
{
#ifdef __cpp_lib_format
std::cout << std::format("Error(s) found in setting file {}\n", argv[i]);
#else
printf("Error(s) found in setting file %s\n", argv[i]);
#endif
for (const std::string &each_one : error_msg)
{
std::cerr << "\t" << each_one << "\n";
Expand Down
39 changes: 37 additions & 2 deletions src/networks/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@ client_mode::~client_mode()
timer_find_timeout.cancel();
timer_expiring_sessions.cancel();
timer_keep_alive.cancel();
timer_status_log.cancel();
}

bool client_mode::start()
{
printf("%.*s is running in client mode\n", (int)app_name.length(), app_name.data());
std::cout << app_name << " is running in client mode\n";

uint16_t port_number = current_settings.listen_port;
if (port_number == 0)
Expand Down Expand Up @@ -66,6 +67,12 @@ bool client_mode::start()
timer_keep_alive.expires_after(seconds{ current_settings.keep_alive });
timer_keep_alive.async_wait([this](const asio::error_code &e) { keep_alive(e); });
}

if (!current_settings.log_status.empty())
{
timer_status_log.expires_after(LOGGING_GAP);
timer_status_log.async_wait([this](const asio::error_code& e) { log_status(e); });
}
}
catch (std::exception &ex)
{
Expand Down Expand Up @@ -416,6 +423,7 @@ void client_mode::fec_find_missings(udp_mappings *udp_session_ptr, fec_control_d
{
auto [missed_data_ptr, missed_data_size] = extract_from_container(data);
udp_access_point->async_send_out(std::move(data), missed_data_ptr, missed_data_size, udp_session_ptr->ingress_source_endpoint);
fec_recovery_count++;
}

fec_controllor.fec_rcv_restored.insert(sn);
Expand Down Expand Up @@ -611,4 +619,31 @@ void client_mode::keep_alive(const asio::error_code& e)

timer_keep_alive.expires_after(seconds{ current_settings.keep_alive });
timer_keep_alive.async_wait([this](const asio::error_code &e) { keep_alive(e); });
}
}

void client_mode::log_status(const asio::error_code & e)
{
if (e == asio::error::operation_aborted)
return;

loop_get_status();

timer_status_log.expires_after(LOGGING_GAP);
timer_status_log.async_wait([this](const asio::error_code& e) { log_status(e); });
}

void client_mode::loop_get_status()
{
std::string output_text = time_to_string_with_square_brackets() + "Summary of " + current_settings.config_filename + "\n";
#ifdef __cpp_lib_format
output_text += std::format("fec recovery: {}\n", fec_recovery_count.exchange(0));
#else
std::ostringstream oss;
oss << "fec recovery: " << fec_recovery_count.exchange(0) << "\n";
output_text += oss.str();
#endif

if (!current_settings.log_status.empty())
print_status_to_file(output_text, current_settings.log_status);
std::cout << output_text << std::endl;
}
6 changes: 6 additions & 0 deletions src/networks/client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@ class client_mode

std::shared_mutex mutex_target_address;
std::unique_ptr<asio::ip::address> target_address;
std::atomic<size_t> fec_recovery_count;

asio::steady_timer timer_find_timeout;
asio::steady_timer timer_expiring_sessions;
asio::steady_timer timer_keep_alive;
asio::steady_timer timer_status_log;
ttp::task_group_pool &sequence_task_pool_local;
ttp::task_group_pool &sequence_task_pool_peer;
const size_t task_limit;
Expand All @@ -52,6 +54,8 @@ class client_mode
void expiring_wrapper_loops(const asio::error_code &e);
void change_new_port(std::shared_ptr<udp_mappings> udp_mappings_ptr);
void keep_alive(const asio::error_code &e);
void log_status(const asio::error_code &e);
void loop_get_status();

public:
client_mode() = delete;
Expand All @@ -64,6 +68,7 @@ class client_mode
timer_find_timeout(io_context),
timer_expiring_sessions(io_context),
timer_keep_alive(io_context),
timer_status_log(io_context),
sequence_task_pool_local(seq_task_pool_local),
sequence_task_pool_peer(seq_task_pool_peer),
task_limit(task_count_limit),
Expand All @@ -75,6 +80,7 @@ class client_mode
timer_find_timeout(std::move(existing_client.timer_find_timeout)),
timer_expiring_sessions(std::move(existing_client.timer_expiring_sessions)),
timer_keep_alive(std::move(existing_client.timer_keep_alive)),
timer_status_log(std::move(existing_client.timer_status_log)),
sequence_task_pool_local(existing_client.sequence_task_pool_local),
sequence_task_pool_peer(existing_client.sequence_task_pool_peer),
task_limit(existing_client.task_limit),
Expand Down
1 change: 1 addition & 0 deletions src/networks/connections.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ constexpr auto STUN_RESEND = std::chrono::seconds(30);
constexpr auto FINDER_TIMEOUT_INTERVAL = std::chrono::seconds(1);
constexpr auto CHANGEPORT_UPDATE_INTERVAL = std::chrono::seconds(1);
constexpr auto KEEP_ALIVE_UPDATE_INTERVAL = std::chrono::seconds(1);
constexpr auto LOGGING_GAP = std::chrono::seconds(30);
constexpr auto EXPRING_UPDATE_INTERVAL = std::chrono::seconds(2);
const asio::ip::udp::endpoint local_empty_target_v4(asio::ip::make_address_v4("127.0.0.1"), 70);
const asio::ip::udp::endpoint local_empty_target_v6(asio::ip::make_address_v6("::1"), 70);
Expand Down
48 changes: 44 additions & 4 deletions src/networks/relay.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -511,7 +511,7 @@ void relay_mode::fec_find_missings_via_listener(std::shared_ptr<udp_mappings> ud
{
data_sender_via_forwarder(udp_session_ptr, std::move(data), data_size);
};
fec_find_missings(udp_session_ptr, fec_controllor, fec_sn, max_fec_data_count, data_sender);
fec_recovery_count_ingress += fec_find_missings(udp_session_ptr, fec_controllor, fec_sn, max_fec_data_count, data_sender);
}

void relay_mode::fec_find_missings_via_forwarder(std::shared_ptr<udp_mappings> udp_session_ptr, fec_control_data &fec_controllor, uint32_t fec_sn, uint8_t max_fec_data_count)
Expand All @@ -520,12 +520,13 @@ void relay_mode::fec_find_missings_via_forwarder(std::shared_ptr<udp_mappings> u
{
data_sender_via_listener(udp_session_ptr.get(), udp_session_ptr->ingress_source_endpoint, std::move(data), data_size);
};
fec_find_missings(udp_session_ptr, fec_controllor, fec_sn, max_fec_data_count, data_sender);
fec_recovery_count_egress += fec_find_missings(udp_session_ptr, fec_controllor, fec_sn, max_fec_data_count, data_sender);
}

void relay_mode::fec_find_missings(std::shared_ptr<udp_mappings> udp_session_ptr, fec_control_data &fec_controllor, uint32_t fec_sn, uint8_t max_fec_data_count,
size_t relay_mode::fec_find_missings(std::shared_ptr<udp_mappings> udp_session_ptr, fec_control_data &fec_controllor, uint32_t fec_sn, uint8_t max_fec_data_count,
std::function<void(std::shared_ptr<udp_mappings>, std::unique_ptr<uint8_t[]>, size_t)> sender_func)
{
size_t fec_recovery_count = 0;
for (auto iter = fec_controllor.fec_rcv_cache.begin(), next_iter = iter; iter != fec_controllor.fec_rcv_cache.end(); iter = next_iter)
{
++next_iter;
Expand Down Expand Up @@ -561,10 +562,13 @@ void relay_mode::fec_find_missings(std::shared_ptr<udp_mappings> udp_session_ptr
std::unique_ptr<uint8_t[]> new_data = std::make_unique<uint8_t[]>(missed_data_size + BUFFER_EXPAND_SIZE);
std::copy(missed_data_ptr, missed_data_ptr + missed_data_size, new_data.get());
sender_func(udp_session_ptr, std::move(new_data), missed_data_size);
fec_recovery_count++;
}

fec_controllor.fec_rcv_restored.insert(sn);
}

return fec_recovery_count;
}

void relay_mode::cleanup_expiring_data_connections()
Expand Down Expand Up @@ -805,18 +809,48 @@ void relay_mode::keep_alive_egress(const asio::error_code & e)
timer_keep_alive_egress.async_wait([this](const asio::error_code &e) { keep_alive_egress(e); });
}

void relay_mode::log_status(const asio::error_code & e)
{
if (e == asio::error::operation_aborted)
return;

loop_get_status();

timer_status_log.expires_after(LOGGING_GAP);
timer_status_log.async_wait([this](const asio::error_code& e) { log_status(e); });
}

void relay_mode::loop_get_status()
{
std::string output_text = time_to_string_with_square_brackets() + "Summary of " + current_settings.config_filename + "\n";
#ifdef __cpp_lib_format
output_text += std::format("[Client <-> This] FEC recover: {}\t [This <-> Remote] FEC recover: {}\n",
fec_recovery_count_ingress.exchange(0), fec_recovery_count_egress.exchange(0));
#else
std::ostringstream oss;
oss << "[Client <-> This] FEC recover: " << fec_recovery_count_ingress.exchange(0) <<
"\t [This <-> Remote] FEC recover: " << fec_recovery_count_egress.exchange(0) << "\n";
output_text += oss.str();
#endif

if (!current_settings.log_status.empty())
print_status_to_file(output_text, current_settings.log_status);
std::cout << output_text << std::endl;
}

relay_mode::~relay_mode()
{
timer_expiring_sessions.cancel();
timer_find_timeout.cancel();
timer_stun.cancel();
timer_keep_alive_ingress.cancel();
timer_keep_alive_egress.cancel();
timer_status_log.cancel();
}

bool relay_mode::start()
{
printf("%.*s is running in relay mode\n", (int)app_name.length(), app_name.data());
std::cout << app_name << " is running in relay mode\n";

udp_callback_t func = std::bind(&relay_mode::udp_listener_incoming, this, _1, _2, _3, _4);
std::set<uint16_t> listen_ports;
Expand Down Expand Up @@ -898,6 +932,12 @@ bool relay_mode::start()
timer_keep_alive_egress.expires_after(KEEP_ALIVE_UPDATE_INTERVAL);
timer_keep_alive_egress.async_wait([this](const asio::error_code& e) { keep_alive_egress(e); });
}

if (!current_settings.log_status.empty())
{
timer_status_log.expires_after(LOGGING_GAP);
timer_status_log.async_wait([this](const asio::error_code& e) { log_status(e); });
}
}
catch (std::exception &ex)
{
Expand Down
9 changes: 8 additions & 1 deletion src/networks/relay.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,14 @@ class relay_mode
asio::steady_timer timer_stun;
asio::steady_timer timer_keep_alive_ingress;
asio::steady_timer timer_keep_alive_egress;
asio::steady_timer timer_status_log;
ttp::task_group_pool &sequence_task_pool_local;
ttp::task_group_pool &sequence_task_pool_peer;
const size_t task_limit;

std::unique_ptr<udp::endpoint> udp_target;
std::atomic<size_t> fec_recovery_count_ingress;
std::atomic<size_t> fec_recovery_count_egress;

void udp_listener_incoming(std::unique_ptr<uint8_t[]> data, size_t data_size, const udp::endpoint &peer, asio::ip::port_type port_number);
void udp_listener_incoming_unpack(std::unique_ptr<uint8_t[]> data, size_t plain_size, const udp::endpoint &peer, asio::ip::port_type port_number);
Expand All @@ -61,7 +64,7 @@ class relay_mode
void fec_maker_via_forwarder(std::shared_ptr<udp_mappings> udp_session_ptr, std::unique_ptr<uint8_t[]> data, size_t data_size);
void fec_find_missings_via_listener(std::shared_ptr<udp_mappings> udp_session_ptr, fec_control_data &fec_controllor, uint32_t fec_sn, uint8_t max_fec_data_count);
void fec_find_missings_via_forwarder(std::shared_ptr<udp_mappings> udp_session_ptr, fec_control_data &fec_controllor, uint32_t fec_sn, uint8_t max_fec_data_count);
void fec_find_missings(std::shared_ptr<udp_mappings> udp_session_ptr, fec_control_data &fec_controllor, uint32_t fec_sn, uint8_t max_fec_data_count,
size_t fec_find_missings(std::shared_ptr<udp_mappings> udp_session_ptr, fec_control_data &fec_controllor, uint32_t fec_sn, uint8_t max_fec_data_count,
std::function<void(std::shared_ptr<udp_mappings>, std::unique_ptr<uint8_t[]>, size_t)> sender_func);

void cleanup_expiring_data_connections();
Expand All @@ -74,6 +77,8 @@ class relay_mode
void change_new_port(std::shared_ptr<udp_mappings> udp_mappings_ptr);
void keep_alive_ingress(const asio::error_code& e);
void keep_alive_egress(const asio::error_code& e);
void log_status(const asio::error_code &e);
void loop_get_status();

public:
relay_mode() = delete;
Expand All @@ -88,6 +93,7 @@ class relay_mode
timer_stun(io_context),
timer_keep_alive_ingress(io_context),
timer_keep_alive_egress(io_context),
timer_status_log(io_context),
sequence_task_pool_local(seq_task_pool_local),
sequence_task_pool_peer(seq_task_pool_peer),
task_limit(task_count_limit),
Expand All @@ -106,6 +112,7 @@ class relay_mode
timer_stun(std::move(existing_server.timer_stun)),
timer_keep_alive_ingress(std::move(existing_server.timer_keep_alive_ingress)),
timer_keep_alive_egress(std::move(existing_server.timer_keep_alive_egress)),
timer_status_log(std::move(existing_server.timer_status_log)),
sequence_task_pool_local(existing_server.sequence_task_pool_local),
sequence_task_pool_peer(existing_server.sequence_task_pool_peer),
task_limit(existing_server.task_limit),
Expand Down
Loading

0 comments on commit db1f8df

Please sign in to comment.