From eb706ae86ee496be6208de55698abdd5ef3513ff Mon Sep 17 00:00:00 2001 From: mussonero Date: Mon, 12 Oct 2020 12:46:17 +0200 Subject: [PATCH] Update binance_websocket.cpp added additional notes and fixed "IF ELSE" statement + missing header --- src/binance_websocket.cpp | 51 ++++++++++++++++++++++++++++----------- 1 file changed, 37 insertions(+), 14 deletions(-) diff --git a/src/binance_websocket.cpp b/src/binance_websocket.cpp index eb0a366..a94f01a 100644 --- a/src/binance_websocket.cpp +++ b/src/binance_websocket.cpp @@ -12,6 +12,7 @@ #include #include #include +#include using namespace binance; using namespace std; @@ -22,8 +23,12 @@ static lws_sorted_usec_list_t _sul; static atomic lws_service_cancelled(0); static void connect_client(lws_sorted_usec_list_t *sul); +/* + * This "contains" the endpoint connection proprty and has + * the connection bound to it + */ struct endpoint_connection { - lws_sorted_usec_list_t sul; /* schedule connection retry */ + lws_sorted_usec_list_t sul; /* schedule connection retry */ struct lws *wsi; /* related wsi if any */ uint16_t retry_count; /* count of consequetive retries */ lws* conn; @@ -34,14 +39,20 @@ struct endpoint_connection { /* * The retry and backoff policy we want to use for our client connections */ -static const uint32_t backoff_ms[] = { 1000, 1000*2, 1000*3, 1000*4, 1000*5, 1000*6, 1000*7, 1000*8, 1000*9, 1000*10}; +static const uint32_t backoff_ms[] = { 1000, 1000*2, 1000*3, 1000*4, 1000*5}; +/* + * This struct sets the policy for delays between retries, + * and for how long a connection may be 'idle' + * before it first tries to ping / pong on it to confirm it's up, + * or drops the connection if still idle. + */ static const lws_retry_bo_t retry = { .retry_ms_table = backoff_ms, .retry_ms_table_count = LWS_ARRAY_SIZE(backoff_ms), - .conceal_count = LWS_ARRAY_SIZE(backoff_ms), - .secs_since_valid_ping = 30, /* force PINGs after secs idle */ - .secs_since_valid_hangup = 100, /* hangup after secs idle */ + .conceal_count = LWS_ARRAY_SIZE(backoff_ms)*2, + .secs_since_valid_ping = 30, /* force PINGs after secs idle */ + .secs_since_valid_hangup = 60, /* hangup after secs idle */ .jitter_percent = 15, /* * jitter_percent controls how much additional random delay is @@ -80,8 +91,10 @@ static int event_cb(lws *wsi, enum lws_callback_reasons reason, void *user, void switch (reason) { case LWS_CALLBACK_CLIENT_ESTABLISHED : - lws_callback_on_writable(wsi); - break; + lwsl_user("%s: established\n", __func__); + lws_callback_on_writable(wsi); + endpoint_prop->wsi = wsi; + break; case LWS_CALLBACK_CLIENT_RECEIVE : { @@ -116,6 +129,7 @@ static int event_cb(lws *wsi, enum lws_callback_reasons reason, void *user, void Logger::write_log(" Error LWS_CALLBACK_CLOSED message : %s\n", e.what()); } goto do_retry; + case LWS_CALLBACK_GET_THREAD_ID: { #ifdef __APPLE__ @@ -142,11 +156,8 @@ static int event_cb(lws *wsi, enum lws_callback_reasons reason, void *user, void Logger::write_log(" Error LWS_CALLBACK_CLIENT_CONNECTION_ERROR message : %s\n", e.what()); } goto do_retry; - break; case LWS_CALLBACK_CLIENT_CLOSED: - /*lwsl_err("LWS_CALLBACK_CLIENT_CLOSED : %s\n", - in ? (char *)in : "(null)");*/ try{ if (handles.find(wsi) != handles.end()) handles.erase(wsi); @@ -155,7 +166,6 @@ static int event_cb(lws *wsi, enum lws_callback_reasons reason, void *user, void Logger::write_log(" Error LWS_CALLBACK_CLIENT_CLOSED message : %s\n", e.what()); } goto do_retry; - break; default : // Make compiler happy regarding unhandled enums. @@ -198,6 +208,7 @@ const lws_protocols protocols[] = static void sigint_handler(int sig) { + Logger::write_log(" Interactive attention signal : %d\n", sig); atomic_store(&lws_service_cancelled, 1); } @@ -206,7 +217,7 @@ sigint_handler(int sig) */ static void connect_client(lws_sorted_usec_list_t *sul) { - struct endpoint_connection *endpoint_prop = lws_container_of(sul, struct endpoint_connection, sul); + struct endpoint_connection *endpoint_prop = lws_container_of(&_sul, struct endpoint_connection, sul); struct lws_client_connect_info ccinfo; memset(&ccinfo, 0, sizeof(ccinfo)); @@ -222,6 +233,12 @@ static void connect_client(lws_sorted_usec_list_t *sul) ccinfo.local_protocol_name = protocols[0].name; ccinfo.retry_and_idle_policy = &retry; ccinfo.userdata = endpoint_prop; + /* + * We store the new wsi here early in the connection process, + * this gives the callback a way to identify which wsi faced the error + * even before the new wsi is returned and even if ultimately no wsi is returned. + */ + ccinfo.pwsi = &endpoint_prop->wsi; endpoint_prop->conn = lws_client_connect_via_info(&ccinfo); if (!endpoint_prop->conn) @@ -236,8 +253,14 @@ static void connect_client(lws_sorted_usec_list_t *sul) { lwsl_err("%s: connection attempts exhausted\n", __func__); atomic_store(&lws_service_cancelled, 1); + assert(endpoint_prop->wsi); + if ((endpoint_prop->wsi) && handles.find(endpoint_prop->wsi) != handles.end()) + handles.erase(endpoint_prop->wsi); + return; + } + else{ + handles[endpoint_prop->conn] = endpoint_prop->callback_jason_func; } - handles[endpoint_prop->conn] = endpoint_prop->callback_jason_func; }else{ handles[endpoint_prop->conn] = endpoint_prop->callback_jason_func; } @@ -264,7 +287,7 @@ void binance::Websocket::init() atomic_store(&lws_service_cancelled, 1); return; } else{ - atomic_store(&lws_service_cancelled, 0); + atomic_store(&lws_service_cancelled, 0); } }