Skip to content

Commit

Permalink
remove unused lws_extesion & more mutex luck to event-callback
Browse files Browse the repository at this point in the history
remove unused lws_extesion & more  mutex luck to event-callback
  • Loading branch information
mussonero committed Oct 18, 2020
1 parent 4c3e88a commit 649627d
Showing 1 changed file with 18 additions and 26 deletions.
44 changes: 18 additions & 26 deletions src/binance_websocket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,54 +61,33 @@ const lws_retry_bo_t retry = {
*/
};

/*
* If we don't enable permessage-deflate ws extension, during times when there
* are many ws messages per second the server coalesces them inside a smaller
* number of larger ssl records, for >100 mps typically >2048 records.
*
* This is a problem, because the coalesced record cannot be send nor decrypted
* until the last part of the record is received, meaning additional latency
* for the earlier members of the coalesced record that have just been sitting
* there waiting for the last one to go out and be decrypted.
*
* permessage-deflate reduces the data size before the tls layer, for >100mps
* reducing the colesced records to ~1.2KB.
*/
const struct lws_extension extensions[] = {
{
"permessage-deflate",
lws_extension_callback_pm_deflate,
"permessage-deflate"
"; client_no_context_takeover"
"; client_max_window_bits"
},
{ NULL, NULL, NULL /* terminator */ }
};

int event_cb(lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len)
{
int m;

switch (reason)
{
case LWS_CALLBACK_CLIENT_ESTABLISHED :
pthread_mutex_lock(&lock_concurrent);
for (std::pair<int, int> n : concurrent) {
if (endpoints_prop[n.second].wsi == wsi) {
lws_callback_on_writable(wsi);
m = n.second;
endpoints_prop[m].wsi = wsi;
lwsl_user("%s: connection established with success concurrent:%d ws_path::%s\n",
__func__, n.second, endpoints_prop[n.second].ws_path);
pthread_mutex_unlock(&lock_concurrent);
break;
}
}
pthread_mutex_unlock(&lock_concurrent);
break;

case LWS_CALLBACK_CLIENT_RECEIVE :
{
// Handle incomming messages here.
try
{
pthread_mutex_lock(&lock_concurrent);
string str_result = string(reinterpret_cast<char*>(in), len);
Json::Reader reader;
Json::Value json_result;
Expand All @@ -121,12 +100,15 @@ int event_cb(lws *wsi, enum lws_callback_reasons reason, void *user, void *in, s
endpoints_prop[m].retry_count = 0;
lwsl_user("%s: incomming messages from %s\n",
__func__ , endpoints_prop[n.second].ws_path);
pthread_mutex_unlock(&lock_concurrent);
break;
}
}
pthread_mutex_unlock(&lock_concurrent);
}
catch (exception &e)
{
pthread_mutex_unlock(&lock_concurrent);
Logger::write_log("<binance::Websocket::event_cb> Error parsing incoming message : %s\n", e.what());
return 1;
}
Expand All @@ -137,14 +119,17 @@ int event_cb(lws *wsi, enum lws_callback_reasons reason, void *user, void *in, s
break;

case LWS_CALLBACK_CLOSED :
pthread_mutex_lock(&lock_concurrent);
lwsl_err("CALLBACK_CLOSED: %s\n",
in ? (char *)in : "");
for (std::pair<int, int> n : concurrent) {
if (endpoints_prop[n.second].wsi == wsi) {
m = n.second;
pthread_mutex_unlock(&lock_concurrent);
goto do_retry;
}
}
pthread_mutex_unlock(&lock_concurrent);
break;

case LWS_CALLBACK_GET_THREAD_ID:
Expand All @@ -162,27 +147,33 @@ int event_cb(lws *wsi, enum lws_callback_reasons reason, void *user, void *in, s
break;

case LWS_CALLBACK_CLIENT_CONNECTION_ERROR :
pthread_mutex_lock(&lock_concurrent);
for (std::pair<int, int> n : concurrent) {
if (endpoints_prop[n.second].wsi == wsi) {
atomic_store(&lws_service_cancelled, 1);
endpoints_prop.erase(n.second);
concurrent.erase(n.second);
lwsl_err("CLIENT_CONNECTION_ERROR Unknown WIS: %s\n",
in ? (char *)in : "(null)");
pthread_mutex_unlock(&lock_concurrent);
return -1;
}
}
pthread_mutex_unlock(&lock_concurrent);
break;

case LWS_CALLBACK_CLIENT_CLOSED:
pthread_mutex_lock(&lock_concurrent);
lwsl_err("CLIENT_CALLBACK_CLIENT_CLOSED: %s\n",
in ? (char *)in : "");
for (std::pair<int, int> n : concurrent) {
if (endpoints_prop[n.second].wsi == wsi) {
m = n.second;
pthread_mutex_unlock(&lock_concurrent);
goto do_retry;
}
}
pthread_mutex_unlock(&lock_concurrent);
break;

default :
Expand All @@ -198,9 +189,11 @@ int event_cb(lws *wsi, enum lws_callback_reasons reason, void *user, void *in, s
&endpoints_prop[m].retry_count))
{
if(endpoints_prop[m].retry_count > (LWS_ARRAY_SIZE(backoff_ms))){
pthread_mutex_lock(&lock_concurrent);
endpoints_prop.erase(m);
concurrent.erase(m);
atomic_store(&lws_service_cancelled, 1);
pthread_mutex_unlock(&lock_concurrent);
return -1;
}
{
Expand Down Expand Up @@ -311,7 +304,6 @@ void binance::Websocket::init()
info.uid = -1;
info.protocols = protocols;
info.fd_limit_per_thread = 0;
info.extensions = extensions;

context = lws_create_context(&info);
if (!context) {
Expand Down

0 comments on commit 649627d

Please sign in to comment.