Skip to content

Commit

Permalink
Update binance_websocket.cpp
Browse files Browse the repository at this point in the history
added additional notes and fixed "IF ELSE" statement +  missing header
  • Loading branch information
mussonero committed Oct 13, 2020
1 parent 2ecf781 commit 0b00db1
Showing 1 changed file with 62 additions and 51 deletions.
113 changes: 62 additions & 51 deletions src/binance_websocket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include <libwebsockets.h>
#include <map>
#include <csignal>
#include <cassert>

using namespace binance;
using namespace std;
Expand All @@ -22,8 +23,12 @@ static lws_sorted_usec_list_t _sul;
static atomic<int> lws_service_cancelled(0);
static void connect_client(lws_sorted_usec_list_t *sul);

struct endpoint_connection {
lws_sorted_usec_list_t sul; /* schedule connection retry */
/*
* This "contains" the endpoint connection proprty and has
* the connection bound to it
*/
static struct endpoint_connection {
lws_sorted_usec_list_t sul; /* schedule connection retry */
struct lws *wsi; /* related wsi if any */
uint16_t retry_count; /* count of consequetive retries */
lws* conn;
Expand All @@ -34,14 +39,20 @@ struct endpoint_connection {
/*
* The retry and backoff policy we want to use for our client connections
*/
static const uint32_t backoff_ms[] = { 1000, 1000*2, 1000*3, 1000*4, 1000*5, 1000*6, 1000*7, 1000*8, 1000*9, 1000*10};
static const uint32_t backoff_ms[] = { 1000, 1000*2, 1000*3, 1000*4, 1000*5};

/*
* This struct sets the policy for delays between retries,
* and for how long a connection may be 'idle'
* before it first tries to ping / pong on it to confirm it's up,
* or drops the connection if still idle.
*/
static const lws_retry_bo_t retry = {
.retry_ms_table = backoff_ms,
.retry_ms_table_count = LWS_ARRAY_SIZE(backoff_ms),
.conceal_count = LWS_ARRAY_SIZE(backoff_ms),
.secs_since_valid_ping = 30, /* force PINGs after secs idle */
.secs_since_valid_hangup = 100, /* hangup after secs idle */
.conceal_count = LWS_ARRAY_SIZE(backoff_ms)*2,
.secs_since_valid_ping = 30, /* force PINGs after secs idle */
.secs_since_valid_hangup = 60, /* hangup after secs idle */
.jitter_percent = 15,
/*
* jitter_percent controls how much additional random delay is
Expand Down Expand Up @@ -80,8 +91,10 @@ static int event_cb(lws *wsi, enum lws_callback_reasons reason, void *user, void
switch (reason)
{
case LWS_CALLBACK_CLIENT_ESTABLISHED :
lws_callback_on_writable(wsi);
break;
lwsl_user("%s: established\n", __func__);
lws_callback_on_writable(wsi);
endpoint_prop->wsi = wsi;
break;

case LWS_CALLBACK_CLIENT_RECEIVE :
{
Expand All @@ -107,15 +120,9 @@ static int event_cb(lws *wsi, enum lws_callback_reasons reason, void *user, void
case LWS_CALLBACK_CLIENT_WRITEABLE :
break;

case LWS_CALLBACK_CLOSED :
try{
if (handles.find(wsi) != handles.end())
handles.erase(wsi);
}catch (exception &e)
{
Logger::write_log("<binance::Websocket::event_cb> Error LWS_CALLBACK_CLOSED message : %s\n", e.what());
}
goto do_retry;
case LWS_CALLBACK_CLOSED :
goto do_retry;

case LWS_CALLBACK_GET_THREAD_ID:
{
#ifdef __APPLE__
Expand All @@ -133,29 +140,17 @@ static int event_cb(lws *wsi, enum lws_callback_reasons reason, void *user, void
case LWS_CALLBACK_CLIENT_CONNECTION_ERROR :
lwsl_err("CLIENT_CONNECTION_ERROR: %s\n",
in ? (char *)in : "(null)");
try{
if (handles.find(wsi) != handles.end())
handles.erase(wsi);
Logger::write_log("<binance::Websocket::event_cb> LWS_CALLBACK_CLIENT_CONNECTION_ERROR\n");
}catch (exception &e)
{
Logger::write_log("<binance::Websocket::event_cb> Error LWS_CALLBACK_CLIENT_CONNECTION_ERROR message : %s\n", e.what());
}
goto do_retry;
if (handles.find(wsi) != handles.end())
handles.erase(wsi);
lws_cancel_service(lws_get_context(wsi));
atomic_store(&lws_service_cancelled, 1);
return -1;
break;

case LWS_CALLBACK_CLIENT_CLOSED:
/*lwsl_err("LWS_CALLBACK_CLIENT_CLOSED : %s\n",
in ? (char *)in : "(null)");*/
try{
if (handles.find(wsi) != handles.end())
handles.erase(wsi);
}catch (exception &e)
{
Logger::write_log("<binance::Websocket::event_cb> Error LWS_CALLBACK_CLIENT_CLOSED message : %s\n", e.what());
}
lwsl_err("CLIENT_CALLBACK_CLIENT_CLOSED: %s\n",
in ? (char *)in : "");
goto do_retry;
break;

default :
// Make compiler happy regarding unhandled enums.
Expand All @@ -169,9 +164,12 @@ static int event_cb(lws *wsi, enum lws_callback_reasons reason, void *user, void
if (lws_retry_sul_schedule_retry_wsi(wsi, &endpoint_prop->sul, connect_client,
&endpoint_prop->retry_count))
{
if (handles.find(wsi) != handles.end())
handles.erase(wsi);
lws_cancel_service(lws_get_context(wsi));
lwsl_err("%s: connection attempts exhausted\n", __func__);
atomic_store(&lws_service_cancelled, 1);
return -1;
atomic_store(&lws_service_cancelled, 0);
return 0;
}
}catch (exception &e)
{
Expand All @@ -183,21 +181,22 @@ static int event_cb(lws *wsi, enum lws_callback_reasons reason, void *user, void
return 0;
}

const lws_protocols protocols[] =
{
static const lws_protocols protocols[] =
{
.name = "binance-websocket-api",
.callback = event_cb,
.per_session_data_size = 0,
.rx_buffer_size = 65536,
},
{
.name = "binance-websocket-api",
.callback = event_cb,
.per_session_data_size = 0,
.rx_buffer_size = 65536,
},

{ NULL, NULL, 0, 0 } /* end */
};
{ NULL, NULL, 0, 0 } /* end */
};

static void
sigint_handler(int sig)
{
Logger::write_log("<binance::Websocket::sigint_handler> Interactive attention signal : %d\n", sig);
atomic_store(&lws_service_cancelled, 1);
}

Expand All @@ -206,7 +205,7 @@ sigint_handler(int sig)
*/
static void connect_client(lws_sorted_usec_list_t *sul)
{
struct endpoint_connection *endpoint_prop = lws_container_of(sul, struct endpoint_connection, sul);
struct endpoint_connection *endpoint_prop = lws_container_of(&_sul, struct endpoint_connection, sul);
struct lws_client_connect_info ccinfo;

memset(&ccinfo, 0, sizeof(ccinfo));
Expand All @@ -222,6 +221,12 @@ static void connect_client(lws_sorted_usec_list_t *sul)
ccinfo.local_protocol_name = protocols[0].name;
ccinfo.retry_and_idle_policy = &retry;
ccinfo.userdata = endpoint_prop;
/*
* We store the new wsi here early in the connection process,
* this gives the callback a way to identify which wsi faced the error
* even before the new wsi is returned and even if ultimately no wsi is returned.
*/
ccinfo.pwsi = &endpoint_prop->wsi;

endpoint_prop->conn = lws_client_connect_via_info(&ccinfo);
if (!endpoint_prop->conn)
Expand All @@ -236,8 +241,14 @@ static void connect_client(lws_sorted_usec_list_t *sul)
{
lwsl_err("%s: connection attempts exhausted\n", __func__);
atomic_store(&lws_service_cancelled, 1);
assert(endpoint_prop->wsi);
if ((endpoint_prop->wsi) && handles.find(endpoint_prop->wsi) != handles.end())
handles.erase(endpoint_prop->wsi);
return;
}
else{
handles[endpoint_prop->conn] = endpoint_prop->callback_jason_func;
}
handles[endpoint_prop->conn] = endpoint_prop->callback_jason_func;
}else{
handles[endpoint_prop->conn] = endpoint_prop->callback_jason_func;
}
Expand All @@ -264,7 +275,7 @@ void binance::Websocket::init()
atomic_store(&lws_service_cancelled, 1);
return;
} else{
atomic_store(&lws_service_cancelled, 0);
atomic_store(&lws_service_cancelled, 0);
}
}

Expand All @@ -287,7 +298,7 @@ void binance::Websocket::enter_event_loop(std::chrono::hours hours)
auto end = start + hours;
auto n = 0;
do {
n = lws_service(context, 10);
n = lws_service(context, 1000);
if (lws_service_cancelled)
{
lws_cancel_service(context);
Expand Down

0 comments on commit 0b00db1

Please sign in to comment.