Skip to content

Commit

Permalink
Update binance_websocket.cpp
Browse files Browse the repository at this point in the history
added additional notes and fixed "IF ELSE" statement +  missing header
  • Loading branch information
mussonero committed Oct 13, 2020
1 parent 2ecf781 commit eb706ae
Showing 1 changed file with 37 additions and 14 deletions.
51 changes: 37 additions & 14 deletions src/binance_websocket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include <libwebsockets.h>
#include <map>
#include <csignal>
#include <cassert>

using namespace binance;
using namespace std;
Expand All @@ -22,8 +23,12 @@ static lws_sorted_usec_list_t _sul;
static atomic<int> lws_service_cancelled(0);
static void connect_client(lws_sorted_usec_list_t *sul);

/*
* This "contains" the endpoint connection proprty and has
* the connection bound to it
*/
struct endpoint_connection {
lws_sorted_usec_list_t sul; /* schedule connection retry */
lws_sorted_usec_list_t sul; /* schedule connection retry */
struct lws *wsi; /* related wsi if any */
uint16_t retry_count; /* count of consequetive retries */
lws* conn;
Expand All @@ -34,14 +39,20 @@ struct endpoint_connection {
/*
* The retry and backoff policy we want to use for our client connections
*/
static const uint32_t backoff_ms[] = { 1000, 1000*2, 1000*3, 1000*4, 1000*5, 1000*6, 1000*7, 1000*8, 1000*9, 1000*10};
static const uint32_t backoff_ms[] = { 1000, 1000*2, 1000*3, 1000*4, 1000*5};

/*
* This struct sets the policy for delays between retries,
* and for how long a connection may be 'idle'
* before it first tries to ping / pong on it to confirm it's up,
* or drops the connection if still idle.
*/
static const lws_retry_bo_t retry = {
.retry_ms_table = backoff_ms,
.retry_ms_table_count = LWS_ARRAY_SIZE(backoff_ms),
.conceal_count = LWS_ARRAY_SIZE(backoff_ms),
.secs_since_valid_ping = 30, /* force PINGs after secs idle */
.secs_since_valid_hangup = 100, /* hangup after secs idle */
.conceal_count = LWS_ARRAY_SIZE(backoff_ms)*2,
.secs_since_valid_ping = 30, /* force PINGs after secs idle */
.secs_since_valid_hangup = 60, /* hangup after secs idle */
.jitter_percent = 15,
/*
* jitter_percent controls how much additional random delay is
Expand Down Expand Up @@ -80,8 +91,10 @@ static int event_cb(lws *wsi, enum lws_callback_reasons reason, void *user, void
switch (reason)
{
case LWS_CALLBACK_CLIENT_ESTABLISHED :
lws_callback_on_writable(wsi);
break;
lwsl_user("%s: established\n", __func__);
lws_callback_on_writable(wsi);
endpoint_prop->wsi = wsi;
break;

case LWS_CALLBACK_CLIENT_RECEIVE :
{
Expand Down Expand Up @@ -116,6 +129,7 @@ static int event_cb(lws *wsi, enum lws_callback_reasons reason, void *user, void
Logger::write_log("<binance::Websocket::event_cb> Error LWS_CALLBACK_CLOSED message : %s\n", e.what());
}
goto do_retry;

case LWS_CALLBACK_GET_THREAD_ID:
{
#ifdef __APPLE__
Expand All @@ -142,11 +156,8 @@ static int event_cb(lws *wsi, enum lws_callback_reasons reason, void *user, void
Logger::write_log("<binance::Websocket::event_cb> Error LWS_CALLBACK_CLIENT_CONNECTION_ERROR message : %s\n", e.what());
}
goto do_retry;
break;

case LWS_CALLBACK_CLIENT_CLOSED:
/*lwsl_err("LWS_CALLBACK_CLIENT_CLOSED : %s\n",
in ? (char *)in : "(null)");*/
try{
if (handles.find(wsi) != handles.end())
handles.erase(wsi);
Expand All @@ -155,7 +166,6 @@ static int event_cb(lws *wsi, enum lws_callback_reasons reason, void *user, void
Logger::write_log("<binance::Websocket::event_cb> Error LWS_CALLBACK_CLIENT_CLOSED message : %s\n", e.what());
}
goto do_retry;
break;

default :
// Make compiler happy regarding unhandled enums.
Expand Down Expand Up @@ -198,6 +208,7 @@ const lws_protocols protocols[] =
static void
sigint_handler(int sig)
{
Logger::write_log("<binance::Websocket::sigint_handler> Interactive attention signal : %d\n", sig);
atomic_store(&lws_service_cancelled, 1);
}

Expand All @@ -206,7 +217,7 @@ sigint_handler(int sig)
*/
static void connect_client(lws_sorted_usec_list_t *sul)
{
struct endpoint_connection *endpoint_prop = lws_container_of(sul, struct endpoint_connection, sul);
struct endpoint_connection *endpoint_prop = lws_container_of(&_sul, struct endpoint_connection, sul);
struct lws_client_connect_info ccinfo;

memset(&ccinfo, 0, sizeof(ccinfo));
Expand All @@ -222,6 +233,12 @@ static void connect_client(lws_sorted_usec_list_t *sul)
ccinfo.local_protocol_name = protocols[0].name;
ccinfo.retry_and_idle_policy = &retry;
ccinfo.userdata = endpoint_prop;
/*
* We store the new wsi here early in the connection process,
* this gives the callback a way to identify which wsi faced the error
* even before the new wsi is returned and even if ultimately no wsi is returned.
*/
ccinfo.pwsi = &endpoint_prop->wsi;

endpoint_prop->conn = lws_client_connect_via_info(&ccinfo);
if (!endpoint_prop->conn)
Expand All @@ -236,8 +253,14 @@ static void connect_client(lws_sorted_usec_list_t *sul)
{
lwsl_err("%s: connection attempts exhausted\n", __func__);
atomic_store(&lws_service_cancelled, 1);
assert(endpoint_prop->wsi);
if ((endpoint_prop->wsi) && handles.find(endpoint_prop->wsi) != handles.end())
handles.erase(endpoint_prop->wsi);
return;
}
else{
handles[endpoint_prop->conn] = endpoint_prop->callback_jason_func;
}
handles[endpoint_prop->conn] = endpoint_prop->callback_jason_func;
}else{
handles[endpoint_prop->conn] = endpoint_prop->callback_jason_func;
}
Expand All @@ -264,7 +287,7 @@ void binance::Websocket::init()
atomic_store(&lws_service_cancelled, 1);
return;
} else{
atomic_store(&lws_service_cancelled, 0);
atomic_store(&lws_service_cancelled, 0);
}
}

Expand Down

0 comments on commit eb706ae

Please sign in to comment.