diff --git a/include/binance_websocket.h b/include/binance_websocket.h index 15b3ba8..bb72469 100644 --- a/include/binance_websocket.h +++ b/include/binance_websocket.h @@ -23,7 +23,7 @@ namespace binance public : static void connect_endpoint(CB user_cb, const char* path); static void init(); - static void enter_event_loop(std::chrono::hours hours = std::chrono::hours(24)); + static void enter_event_loop(const std::chrono::hours &hours = std::chrono::hours(24)); }; } diff --git a/src/binance_websocket.cpp b/src/binance_websocket.cpp index c72df76..c07f2a1 100644 --- a/src/binance_websocket.cpp +++ b/src/binance_websocket.cpp @@ -12,14 +12,15 @@ #include #include #include +#include +#include using namespace binance; using namespace std; static struct lws_context *context; static atomic lws_service_cancelled(0); -void connect_client(lws_sorted_usec_list_t *sul); - +static void connect_client(lws_sorted_usec_list_t *sul); /* * This "contains" the endpoint connection proprty and has * the connection bound to it @@ -33,9 +34,8 @@ struct endpoint_connection { char* ws_path; }; -static std::map concurrent; -static std::map endpoints_prop; -static pthread_mutex_t lock_concurrent; /* serialize access */ +static std::map endpoints_prop; /* serialize access */ +static pthread_mutex_t lock_concurrent; /* lock serialize access */ /* * The retry and backoff policy we want to use for our client connections @@ -61,20 +61,22 @@ const lws_retry_bo_t retry = { */ }; -int event_cb(lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len) +static int event_cb(lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len) { - int m; + std::atomic idx(-1); + auto *current_data = static_cast< endpoint_connection *>(user); switch (reason) { case LWS_CALLBACK_CLIENT_ESTABLISHED : pthread_mutex_lock(&lock_concurrent); - for (std::pair n : concurrent) { - if (endpoints_prop[n.second].wsi == wsi) { + for (std::pair n : endpoints_prop) { + if (endpoints_prop[n.first].wsi == wsi && current_data->ws_path == endpoints_prop[n.first].ws_path) { + idx = n.first; lws_callback_on_writable(wsi); - m = n.second; - lwsl_user("%s: connection established with success concurrent:%d ws_path::%s\n", - __func__, n.second, endpoints_prop[n.second].ws_path); + endpoints_prop[idx].wsi = wsi; + lwsl_user("%s: connection established with success endpoint#:%d ws_path::%s\n", + __func__, idx.load(), endpoints_prop[n.first].ws_path); pthread_mutex_unlock(&lock_concurrent); break; } @@ -85,26 +87,23 @@ int event_cb(lws *wsi, enum lws_callback_reasons reason, void *user, void *in, s case LWS_CALLBACK_CLIENT_RECEIVE : { // Handle incomming messages here. + pthread_mutex_lock(&lock_concurrent); try { - pthread_mutex_lock(&lock_concurrent); - string str_result = string(reinterpret_cast(in), len); - Json::Reader reader; - Json::Value json_result; - reader.parse(str_result , json_result); - - for (std::pair n : concurrent) { - if (endpoints_prop[n.second].wsi == wsi) { - m = n.second; - endpoints_prop[n.second].json_cb(json_result); - endpoints_prop[m].retry_count = 0; - lwsl_user("%s: incomming messages from %s\n", - __func__ , endpoints_prop[n.second].ws_path); - pthread_mutex_unlock(&lock_concurrent); + for (std::pair n : endpoints_prop) { + if (endpoints_prop[n.first].wsi == wsi && current_data->ws_path == endpoints_prop[n.first].ws_path) { + string str_result = string(reinterpret_cast(in), len); + Json::Reader reader; + Json::Value json_result; + reader.parse(str_result , json_result); + assert(!json_result.isNull()); + idx = n.first; + endpoints_prop[idx].json_cb(json_result); + endpoints_prop[idx].retry_count = 0; + pthread_mutex_unlock(&lock_concurrent); break; } } - pthread_mutex_unlock(&lock_concurrent); } catch (exception &e) { @@ -112,6 +111,7 @@ int event_cb(lws *wsi, enum lws_callback_reasons reason, void *user, void *in, s Logger::write_log(" Error parsing incoming message : %s\n", e.what()); return 1; } + pthread_mutex_unlock(&lock_concurrent); } break; @@ -119,12 +119,12 @@ int event_cb(lws *wsi, enum lws_callback_reasons reason, void *user, void *in, s break; case LWS_CALLBACK_CLOSED : - pthread_mutex_lock(&lock_concurrent); - lwsl_err("CALLBACK_CLOSED: %s\n", - in ? (char *)in : ""); - for (std::pair n : concurrent) { - if (endpoints_prop[n.second].wsi == wsi) { - m = n.second; + pthread_mutex_lock(&lock_concurrent); + for (std::pair n : endpoints_prop) { + if (endpoints_prop[n.first].wsi == wsi && current_data->ws_path == endpoints_prop[n.first].ws_path) { + idx = n.first; + lwsl_user("CALLBACK_CLOSED: %s\n", + in ? (char *)in : ""); pthread_mutex_unlock(&lock_concurrent); goto do_retry; } @@ -147,28 +147,37 @@ int event_cb(lws *wsi, enum lws_callback_reasons reason, void *user, void *in, s break; case LWS_CALLBACK_CLIENT_CONNECTION_ERROR : - pthread_mutex_lock(&lock_concurrent); - for (std::pair n : concurrent) { - if (endpoints_prop[n.second].wsi == wsi) { - atomic_store(&lws_service_cancelled, 1); - endpoints_prop.erase(n.second); - concurrent.erase(n.second); - lwsl_err("CLIENT_CONNECTION_ERROR Unknown WIS: %s\n", + pthread_mutex_lock(&lock_concurrent); + for (std::pair n : endpoints_prop) { + if (endpoints_prop[n.first].wsi == wsi && current_data->ws_path == endpoints_prop[n.first].ws_path) { + lwsl_err("CLIENT_CONNECTION_ERROR: %s\n", in ? (char *)in : "(null)"); - pthread_mutex_unlock(&lock_concurrent); + idx = n.first; + endpoints_prop.erase(idx); + lws_cancel_service(lws_get_context(wsi)); + lws_context_destroy(lws_get_context(wsi)); + atomic_store(&lws_service_cancelled, 1); + pthread_mutex_unlock(&lock_concurrent); return -1; } } + { + lwsl_err("CLIENT_CONNECTION_ERROR Unknown WIS: %s\n", + in ? (char *)in : "(null)"); + lws_cancel_service(lws_get_context(wsi)); + lws_context_destroy(lws_get_context(wsi)); + atomic_store(&lws_service_cancelled, 1); + } pthread_mutex_unlock(&lock_concurrent); - break; + return -1; case LWS_CALLBACK_CLIENT_CLOSED: - pthread_mutex_lock(&lock_concurrent); - lwsl_err("CLIENT_CALLBACK_CLIENT_CLOSED: %s\n", - in ? (char *)in : ""); - for (std::pair n : concurrent) { - if (endpoints_prop[n.second].wsi == wsi) { - m = n.second; + pthread_mutex_lock(&lock_concurrent); + for (std::pair n : endpoints_prop) { + if (endpoints_prop[n.first].wsi == wsi && current_data->ws_path == endpoints_prop[n.first].ws_path) { + idx = n.first; + lwsl_user("CLIENT_CALLBACK_CLIENT_CLOSED: %s\n", + in ? (char *)in : ""); pthread_mutex_unlock(&lock_concurrent); goto do_retry; } @@ -185,27 +194,30 @@ int event_cb(lws *wsi, enum lws_callback_reasons reason, void *user, void *in, s do_retry: try{ - if (lws_retry_sul_schedule_retry_wsi(endpoints_prop[m].wsi, &endpoints_prop[m]._sul, connect_client, - &endpoints_prop[m].retry_count)) + if (lws_retry_sul_schedule_retry_wsi(endpoints_prop[idx].wsi, &endpoints_prop[idx]._sul, connect_client, + &endpoints_prop[idx].retry_count)) { - if(endpoints_prop[m].retry_count > (LWS_ARRAY_SIZE(backoff_ms))){ + if(endpoints_prop[idx].retry_count > 2*(LWS_ARRAY_SIZE(backoff_ms))){ pthread_mutex_lock(&lock_concurrent); - endpoints_prop.erase(m); - concurrent.erase(m); + lwsl_err("%s: connection attempts exhausted, after [%d] retry, ws_path:%s\n", + __func__, endpoints_prop[idx].retry_count, endpoints_prop[idx].ws_path); + endpoints_prop.erase(idx); + lws_cancel_service(lws_get_context(wsi)); + lws_context_destroy(lws_get_context(wsi)); atomic_store(&lws_service_cancelled, 1); pthread_mutex_unlock(&lock_concurrent); return -1; } { - lwsl_err("%s: connection attempts exhausted,we will keep retrying count:%d ws_path:%s\n", - __func__, endpoints_prop[m].retry_count, endpoints_prop[m].ws_path); + lwsl_err("%s: connection attempts exhausted,we will keep retrying [%d] ws_path:%s\n", + __func__, endpoints_prop[idx].retry_count, endpoints_prop[idx].ws_path); atomic_store(&lws_service_cancelled, 0); - lws_sul_schedule(lws_get_context(endpoints_prop[m].wsi), 0, &endpoints_prop[m]._sul, connect_client, 10 * LWS_US_PER_SEC); + lws_sul_schedule(lws_get_context(endpoints_prop[idx].wsi), 0, &endpoints_prop[idx]._sul, connect_client, 1 * LWS_US_PER_MS); return 0; } } - lwsl_user("%s: connection attempts success, retrying count:%d ws_path:%s\n", - __func__, endpoints_prop[m].retry_count, endpoints_prop[m].ws_path); + lwsl_user("%s: connection attempts success, after [%d] retry, ws_path:%s\n", + __func__, endpoints_prop[idx].retry_count, endpoints_prop[idx].ws_path); }catch (exception &e) { Logger::write_log(" Error do_retry message : %s\n", e.what()); @@ -216,19 +228,19 @@ int event_cb(lws *wsi, enum lws_callback_reasons reason, void *user, void *in, s return 0; } -const lws_protocols protocols[] = +static const lws_protocols protocols[] = { { .name = "binance-websocket-api", .callback = event_cb, .per_session_data_size = 0, - .rx_buffer_size = 65536, + .rx_buffer_size = 0, }, { NULL, NULL, 0, 0 } /* end */ }; -void +static void sigint_handler(int sig) { Logger::write_log(" Interactive attention signal : %d\n", sig); @@ -238,12 +250,13 @@ sigint_handler(int sig) /* * Scheduled sul callback that starts the connection attempt */ -void connect_client(lws_sorted_usec_list_t *sul) +static void connect_client(lws_sorted_usec_list_t *sul) { - for (std::pair n : concurrent) { - if (&endpoints_prop[n.second]._sul == sul) { + pthread_mutex_lock(&lock_concurrent); + for (std::pair n : endpoints_prop) { + if (&endpoints_prop[n.first]._sul == sul) { lwsl_user("%s: success ws_path::%s\n", - __func__, endpoints_prop[n.second].ws_path); + __func__, endpoints_prop[n.first].ws_path); struct lws_client_connect_info ccinfo; memset(&ccinfo, 0, sizeof(ccinfo)); @@ -251,48 +264,50 @@ void connect_client(lws_sorted_usec_list_t *sul) ccinfo.context = context; ccinfo.port = BINANCE_WS_PORT; ccinfo.address = BINANCE_WS_HOST; - ccinfo.path = endpoints_prop[n.second].ws_path; + ccinfo.path = endpoints_prop[n.first].ws_path; ccinfo.host = lws_canonical_hostname(context); ccinfo.origin = "origin"; ccinfo.ssl_connection = LCCSCF_USE_SSL | LCCSCF_ALLOW_SELFSIGNED | LCCSCF_SKIP_SERVER_CERT_HOSTNAME_CHECK | LCCSCF_PIPELINE; ccinfo.protocol = protocols[0].name; ccinfo.local_protocol_name = protocols[0].name; ccinfo.retry_and_idle_policy = &retry; - ccinfo.userdata = &endpoints_prop[n.second]; + ccinfo.userdata = &endpoints_prop[n.first]; /* * We store the new wsi here early in the connection process, * this gives the callback a way to identify which wsi faced the error * even before the new wsi is returned and even if ultimately no wsi is returned. */ - ccinfo.pwsi = &endpoints_prop[n.second].wsi; + ccinfo.pwsi = &endpoints_prop[n.first].wsi; - endpoints_prop[n.second].conn = lws_client_connect_via_info(&ccinfo); - if (!endpoints_prop[n.second].conn) + endpoints_prop[n.first].conn = lws_client_connect_via_info(&ccinfo); + if (!endpoints_prop[n.first].conn) { /* * Failed... schedule a retry... we can't use the _retry_wsi() * convenience wrapper api here because no valid wsi at this * point. */ - if (lws_retry_sul_schedule(context, 0, &endpoints_prop[n.second]._sul, &retry, - connect_client, &endpoints_prop[n.second].retry_count)) + if (lws_retry_sul_schedule(context, 0, &endpoints_prop[n.first]._sul, &retry, + connect_client, &endpoints_prop[n.first].retry_count)) { - lwsl_err("%s: connection attempts exhausted\n", __func__); - endpoints_prop.erase(n.second); - concurrent.erase(n.second); + lwsl_err("%s: Failed schedule a retry, we can't use the _retry_wsi():%s\n", + __func__, endpoints_prop[n.first].ws_path); atomic_store(&lws_service_cancelled, 1); + pthread_mutex_unlock(&lock_concurrent); return; } } break; } } - + pthread_mutex_unlock(&lock_concurrent); } void binance::Websocket::init() { pthread_mutex_init(&lock_concurrent, NULL); + endpoints_prop.clear(); + struct lws_context_creation_info info; signal(SIGINT, sigint_handler); memset(&info, 0, sizeof(info)); @@ -303,7 +318,8 @@ void binance::Websocket::init() info.gid = -1; info.uid = -1; info.protocols = protocols; - info.fd_limit_per_thread = 0; + info.fd_limit_per_thread = 1024; + info.max_http_header_pool = 1024; context = lws_create_context(&info); if (!context) { @@ -319,46 +335,53 @@ void binance::Websocket::init() void binance::Websocket::connect_endpoint(CB cb, const char* path) { pthread_mutex_lock(&lock_concurrent); - if(concurrent.size() > 1024){ + if(endpoints_prop.size() > 1024){ lwsl_err("%s: maximum of 1024 connect_endpoints reached,\n", - __func__); + __func__); pthread_mutex_unlock(&lock_concurrent); return; } - int n = concurrent.size(); - concurrent.emplace(std::pair(n,n)); + int n = endpoints_prop.size(); endpoints_prop[n].ws_path = const_cast(path); endpoints_prop[n].json_cb = cb; + pthread_mutex_unlock(&lock_concurrent); connect_client(&endpoints_prop[n]._sul); if (!lws_service_cancelled) { /* schedule the first client connection attempt to happen immediately */ - lws_sul_schedule(context, 0, &endpoints_prop[n]._sul, connect_client, 1); - lwsl_user("%s: concurrent:%d ws_path::%s\n", - __func__, n, endpoints_prop[n].ws_path); + lws_sul_schedule(context, 0, &endpoints_prop[n]._sul, connect_client, 1 * LWS_US_PER_MS); + lwsl_user("%s: schedule the first client connection for endpoint#:%d ws_path::%s\n", + __func__, n, endpoints_prop[n].ws_path); } - pthread_mutex_unlock(&lock_concurrent); } // Entering event loop -void binance::Websocket::enter_event_loop(std::chrono::hours hours) +void binance::Websocket::enter_event_loop(const std::chrono::hours &hours) { auto start = std::chrono::steady_clock::now(); auto end = start + hours; auto n = 0; do { - n = lws_service(context, 1000); - if (lws_service_cancelled) - { + try{ + n = lws_service(context, 1000*2); + if (lws_service_cancelled) + { + lws_cancel_service(context); + break; + } + lws_cancel_service(context); + }catch ( exception &e ) { + lwsl_err("%s:::%s\n", + __func__, e.what()); + Logger::write_log( " Error ! %s", e.what() ); lws_cancel_service(context); break; } } while (n >= 0 && std::chrono::steady_clock::now() < end); - concurrent.clear(); + endpoints_prop.clear(); atomic_store(&lws_service_cancelled, 1); - lws_context_destroy(context); pthread_mutex_destroy(&lock_concurrent); -} +} \ No newline at end of file