diff --git a/Program/uWS.dll b/Program/uWS.dll new file mode 100644 index 0000000000..c7fb5da742 Binary files /dev/null and b/Program/uWS.dll differ diff --git a/StepmaniaCore.cmake b/StepmaniaCore.cmake index 601f2ae80b..990fefcc98 100644 --- a/StepmaniaCore.cmake +++ b/StepmaniaCore.cmake @@ -254,6 +254,18 @@ if(WIN32) link_libraries(${SM_EXTERN_DIR}/LuaJIT/lua51.lib) include_directories(${SM_EXTERN_DIR}/LuaJIT/include) + find_library(LIB_UWS NAMES "uWS" + PATHS "${SM_EXTERN_DIR}/uWebSocket" NO_DEFAULT_PATH + ) + find_library(LIB_EAY NAMES "libeay32" + PATHS "${SM_EXTERN_DIR}/uWebSocket" NO_DEFAULT_PATH + ) + find_library(LIB_SSL NAMES "ssleay32" + PATHS "${SM_EXTERN_DIR}/uWebSocket" NO_DEFAULT_PATH + ) + find_library(LIB_UV NAMES "libuv" + PATHS "${SM_EXTERN_DIR}/uWebSocket" NO_DEFAULT_PATH + ) if (MINGW AND WITH_FFMPEG) include("${SM_CMAKE_DIR}/SetupFfmpeg.cmake") set(HAS_FFMPEG TRUE) diff --git a/extern/uWebSocket/include/Asio.h b/extern/uWebSocket/include/Asio.h new file mode 100644 index 0000000000..f204eff6e4 --- /dev/null +++ b/extern/uWebSocket/include/Asio.h @@ -0,0 +1,192 @@ +#ifndef ASIO_H +#define ASIO_H + +#include + +typedef boost::asio::ip::tcp::socket::native_handle_type uv_os_sock_t; +static const int UV_READABLE = 1; +static const int UV_WRITABLE = 2; + +namespace uS { + +struct Loop : boost::asio::io_service { + + static Loop *createLoop(bool defaultLoop = true) { + return new Loop; + } + + void destroy() { + delete this; + } + + void run() { + boost::asio::io_service::run(); + } + + void poll() { + boost::asio::io_service::poll(); + } +}; + +struct Timer { + boost::asio::deadline_timer asio_timer; + void *data; + + Timer(Loop *loop) : asio_timer(*loop) { + + } + + void start(void (*cb)(Timer *), int first, int repeat) { + asio_timer.expires_from_now(boost::posix_time::milliseconds(first)); + asio_timer.async_wait([this, cb, repeat](const boost::system::error_code &ec) { + if (ec != boost::asio::error::operation_aborted) { + if (repeat) { + start(cb, repeat, repeat); + } + cb(this); + } + }); + } + + void setData(void *data) { + this->data = data; + } + + void *getData() { + return data; + } + + // bug: cancel does not cancel expired timers! + // it has to guarantee that the timer is not called after + // stop is called! ffs boost! + void stop() { + asio_timer.cancel(); + } + + void close() { + asio_timer.get_io_service().post([this]() { + delete this; + }); + } +}; + +struct Async { + Loop *loop; + void (*cb)(Async *); + void *data; + + boost::asio::io_service::work asio_work; + + Async(Loop *loop) : loop(loop), asio_work(*loop) { + } + + void start(void (*cb)(Async *)) { + this->cb = cb; + } + + void send() { + loop->post([this]() { + cb(this); + }); + } + + void close() { + loop->post([this]() { + delete this; + }); + } + + void setData(void *data) { + this->data = data; + } + + void *getData() { + return data; + } +}; + +struct Poll { + boost::asio::posix::stream_descriptor *socket; + void (*cb)(Poll *p, int status, int events); + + Poll(Loop *loop, uv_os_sock_t fd) { + socket = new boost::asio::posix::stream_descriptor(*loop, fd); + socket->non_blocking(true); + } + + bool isClosed() { + return !socket; + } + + boost::asio::ip::tcp::socket::native_handle_type getFd() { + return socket ? socket->native_handle() : -1; + } + + void setCb(void (*cb)(Poll *p, int status, int events)) { + this->cb = cb; + } + + void (*getCb())(Poll *, int, int) { + return cb; + } + + void reInit(Loop *loop, uv_os_sock_t fd) { + delete socket; + socket = new boost::asio::posix::stream_descriptor(*loop, fd); + socket->non_blocking(true); + } + + void start(Loop *, Poll *self, int events) { + if (events & UV_READABLE) { + socket->async_read_some(boost::asio::null_buffers(), [self](boost::system::error_code ec, std::size_t) { + if (ec != boost::asio::error::operation_aborted) { + self->start(nullptr, self, UV_READABLE); + self->cb(self, ec ? -1 : 0, UV_READABLE); + } + }); + } + + if (events & UV_WRITABLE) { + socket->async_write_some(boost::asio::null_buffers(), [self](boost::system::error_code ec, std::size_t) { + if (ec != boost::asio::error::operation_aborted) { + self->start(nullptr, self, UV_WRITABLE); + self->cb(self, ec ? -1 : 0, UV_WRITABLE); + } + }); + } + } + + void change(Loop *, Poll *self, int events) { + socket->cancel(); + start(nullptr, self, events); + } + + bool fastTransfer(Loop *loop, Loop *newLoop, int events) { + return false; + } + + // todo: asio is thread safe, use it! + bool threadSafeChange(Loop *loop, Poll *self, int events) { + return false; + } + + void stop(Loop *) { + socket->cancel(); + } + + // this is not correct, but it works for now + // think about transfer - should allow one to not delete + // but in this case it doesn't matter at all + void close(Loop *loop, void (*cb)(Poll *)) { + socket->release(); + socket->get_io_service().post([cb, this]() { + cb(this); + }); + delete socket; + socket = nullptr; + } +}; + +} + +#endif // ASIO_H diff --git a/extern/uWebSocket/include/Backend.h b/extern/uWebSocket/include/Backend.h new file mode 100644 index 0000000000..fbcdffbea4 --- /dev/null +++ b/extern/uWebSocket/include/Backend.h @@ -0,0 +1,17 @@ +#ifndef BACKEND_H +#define BACKEND_H + +// Default to Epoll if nothing specified and on Linux +// Default to Libuv if nothing specified and not on Linux +#ifdef USE_ASIO +#include "Asio.h" +#elif !defined(__linux__) || defined(USE_LIBUV) +#include "Libuv.h" +#else +#ifndef USE_EPOLL +#define USE_EPOLL +#endif +#include "Epoll.h" +#endif + +#endif // BACKEND_H diff --git a/extern/uWebSocket/include/Epoll.h b/extern/uWebSocket/include/Epoll.h new file mode 100644 index 0000000000..2d69a121ef --- /dev/null +++ b/extern/uWebSocket/include/Epoll.h @@ -0,0 +1,265 @@ +#ifndef EPOLL_H +#define EPOLL_H + +#include +#include +#include +#include +#include +#include +#include +#include + +typedef int uv_os_sock_t; +static const int UV_READABLE = EPOLLIN; +static const int UV_WRITABLE = EPOLLOUT; + +namespace uS { + +struct Poll; +struct Timer; + +extern std::recursive_mutex cbMutex; +extern void (*callbacks[16])(Poll *, int, int); +extern int cbHead; + +struct Timepoint { + void (*cb)(Timer *); + Timer *timer; + std::chrono::system_clock::time_point timepoint; + int nextDelay; +}; + +struct Loop { + int epfd; + int numPolls = 0; + bool cancelledLastTimer; + int delay = -1; // delay to next timer expiry, or -1 if no timers pending + epoll_event readyEvents[1024]; + std::chrono::system_clock::time_point timepoint; + std::vector timers; + std::vector> closing; + + void (*preCb)(void *) = nullptr; + void (*postCb)(void *) = nullptr; + void *preCbData, *postCbData; + + Loop(bool defaultLoop) { + epfd = epoll_create1(EPOLL_CLOEXEC); + timepoint = std::chrono::system_clock::now(); + } + + static Loop *createLoop(bool defaultLoop = true) { + return new Loop(defaultLoop); + } + + void destroy() { + ::close(epfd); + delete this; + } + + void doEpoll(int epollTimeout); + + void run(); + + void poll(); + + int getEpollFd() { + return epfd; + } +}; + +struct Timer { + Loop *loop; + void *data; + + Timer(Loop *loop) { + this->loop = loop; + } + + void start(void (*cb)(Timer *), int timeout, int repeat) { + loop->timepoint = std::chrono::system_clock::now(); + std::chrono::system_clock::time_point timepoint = loop->timepoint + std::chrono::milliseconds(timeout); + + Timepoint t = {cb, this, timepoint, repeat}; + loop->timers.insert( + std::upper_bound(loop->timers.begin(), loop->timers.end(), t, [](const Timepoint &a, const Timepoint &b) { + return a.timepoint < b.timepoint; + }), + t + ); + + loop->delay = -1; + if (loop->timers.size()) { + loop->delay = std::max(std::chrono::duration_cast(loop->timers[0].timepoint - loop->timepoint).count(), 0); + } + } + + void setData(void *data) { + this->data = data; + } + + void *getData() { + return data; + } + + // always called before destructor + void stop() { + auto pos = loop->timers.begin(); + for (Timepoint &t : loop->timers) { + if (t.timer == this) { + loop->timers.erase(pos); + break; + } + pos++; + } + loop->cancelledLastTimer = true; + + loop->delay = -1; + if (loop->timers.size()) { + loop->delay = std::max(std::chrono::duration_cast(loop->timers[0].timepoint - loop->timepoint).count(), 0); + } + } + + void close() { + delete this; + } +}; + +// 4 bytes +struct Poll { +protected: + struct { + int fd : 28; + unsigned int cbIndex : 4; + } state = {-1, 0}; + + Poll(Loop *loop, uv_os_sock_t fd) { + fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, 0) | O_NONBLOCK); + state.fd = fd; + loop->numPolls++; + } + + // todo: pre-set all of callbacks up front and remove mutex + void setCb(void (*cb)(Poll *p, int status, int events)) { + cbMutex.lock(); + state.cbIndex = cbHead; + for (int i = 0; i < cbHead; i++) { + if (callbacks[i] == cb) { + state.cbIndex = i; + break; + } + } + if (state.cbIndex == cbHead) { + callbacks[cbHead++] = cb; + } + cbMutex.unlock(); + } + + void (*getCb())(Poll *, int, int) { + return callbacks[state.cbIndex]; + } + + void reInit(Loop *loop, uv_os_sock_t fd) { + state.fd = fd; + loop->numPolls++; + } + + void start(Loop *loop, Poll *self, int events) { + epoll_event event; + event.events = events; + event.data.ptr = self; + epoll_ctl(loop->epfd, EPOLL_CTL_ADD, state.fd, &event); + } + + void change(Loop *loop, Poll *self, int events) { + epoll_event event; + event.events = events; + event.data.ptr = self; + epoll_ctl(loop->epfd, EPOLL_CTL_MOD, state.fd, &event); + } + + void stop(Loop *loop) { + epoll_event event; + epoll_ctl(loop->epfd, EPOLL_CTL_DEL, state.fd, &event); + } + + bool fastTransfer(Loop *loop, Loop *newLoop, int events) { + stop(loop); + start(newLoop, this, events); + loop->numPolls--; + // needs to lock the newLoop's numPolls! + newLoop->numPolls++; + return true; + } + + bool threadSafeChange(Loop *loop, Poll *self, int events) { + change(loop, self, events); + return true; + } + + void close(Loop *loop, void (*cb)(Poll *)) { + state.fd = -1; + loop->closing.push_back({this, cb}); + } + +public: + bool isClosed() { + return state.fd == -1; + } + + uv_os_sock_t getFd() { + return state.fd; + } + + friend struct Loop; +}; + +// this should be put in the Loop as a general "post" function always available +struct Async : Poll { + void (*cb)(Async *); + Loop *loop; + void *data; + + Async(Loop *loop) : Poll(loop, ::eventfd(0, EFD_CLOEXEC)) { + this->loop = loop; + } + + void start(void (*cb)(Async *)) { + this->cb = cb; + Poll::setCb([](Poll *p, int, int) { + uint64_t val; + if (::read(((Async *) p)->state.fd, &val, 8) == 8) { + ((Async *) p)->cb((Async *) p); + } + }); + Poll::start(loop, this, UV_READABLE); + } + + void send() { + uint64_t one = 1; + if (::write(state.fd, &one, 8) != 8) { + return; + } + } + + void close() { + Poll::stop(loop); + ::close(state.fd); + Poll::close(loop, [](Poll *p) { + delete (Async *) p; + }); + } + + void setData(void *data) { + this->data = data; + } + + void *getData() { + return data; + } +}; + +} + +#endif // EPOLL_H diff --git a/extern/uWebSocket/include/Extensions.h b/extern/uWebSocket/include/Extensions.h new file mode 100644 index 0000000000..3f95325194 --- /dev/null +++ b/extern/uWebSocket/include/Extensions.h @@ -0,0 +1,29 @@ +#ifndef EXTENSIONS_UWS_H +#define EXTENSIONS_UWS_H + +#include + +namespace uWS { + +enum Options : unsigned int { + NO_OPTIONS = 0, + PERMESSAGE_DEFLATE = 1, + SERVER_NO_CONTEXT_TAKEOVER = 2, + CLIENT_NO_CONTEXT_TAKEOVER = 4, + NO_DELAY = 8 +}; + +template +class ExtensionsNegotiator { +protected: + int options; +public: + ExtensionsNegotiator(int wantedOptions); + std::string generateOffer(); + void readOffer(std::string offer); + int getNegotiatedOptions(); +}; + +} + +#endif // EXTENSIONS_UWS_H diff --git a/extern/uWebSocket/include/Group.h b/extern/uWebSocket/include/Group.h new file mode 100644 index 0000000000..8159182ecd --- /dev/null +++ b/extern/uWebSocket/include/Group.h @@ -0,0 +1,144 @@ +#ifndef GROUP_UWS_H +#define GROUP_UWS_H + +#include "WebSocket.h" +#include "HTTPSocket.h" +#include "Extensions.h" +#include +#include + +namespace uWS { + +enum ListenOptions { + TRANSFERS +}; + +struct Hub; + +template +struct WIN32_EXPORT Group : protected uS::NodeData { +protected: + friend struct Hub; + friend struct WebSocket; + friend struct HttpSocket; + friend struct HttpSocket; + + std::function *, HttpRequest)> connectionHandler; + std::function *)> transferHandler; + std::function *, char *message, size_t length, OpCode opCode)> messageHandler; + std::function *, int code, char *message, size_t length)> disconnectionHandler; + std::function *, char *, size_t)> pingHandler; + std::function *, char *, size_t)> pongHandler; + std::function *)> httpConnectionHandler; + std::function httpRequestHandler; + std::function httpDataHandler; + std::function httpCancelledRequestHandler; + std::function *)> httpDisconnectionHandler; + std::function *, HttpRequest)> httpUpgradeHandler; + + using errorType = typename std::conditional::type; + std::function errorHandler; + + unsigned int maxPayload; + Hub *hub; + int extensionOptions; + uS::Timer *timer = nullptr, *httpTimer = nullptr; + std::string userPingMessage; + std::stack iterators; + + // todo: cannot be named user, collides with parent! + void *userData = nullptr; + static void timerCallback(uS::Timer *timer); + + WebSocket *webSocketHead = nullptr; + HttpSocket *httpSocketHead = nullptr; + + void addWebSocket(WebSocket *webSocket); + void removeWebSocket(WebSocket *webSocket); + + // todo: remove these, template + void addHttpSocket(HttpSocket *httpSocket); + void removeHttpSocket(HttpSocket *httpSocket); + + Group(int extensionOptions, unsigned int maxPayload, Hub *hub, uS::NodeData *nodeData); + void stopListening(); + +public: + void onConnection(std::function *, HttpRequest)> handler); + void onTransfer(std::function *)> handler); + void onMessage(std::function *, char *, size_t, OpCode)> handler); + void onDisconnection(std::function *, int code, char *message, size_t length)> handler); + void onPing(std::function *, char *, size_t)> handler); + void onPong(std::function *, char *, size_t)> handler); + void onError(std::function handler); + void onHttpConnection(std::function *)> handler); + void onHttpRequest(std::function handler); + void onHttpData(std::function handler); + void onHttpDisconnection(std::function *)> handler); + void onCancelledHttpRequest(std::function handler); + void onHttpUpgrade(std::function *, HttpRequest)> handler); + + // Thread safe + void broadcast(const char *message, size_t length, OpCode opCode); + void setUserData(void *user); + void *getUserData(); + + // Not thread safe + void terminate(); + void close(int code = 1000, char *message = nullptr, size_t length = 0); + void startAutoPing(int intervalMs, std::string userMessage = ""); + + // same as listen(TRANSFERS), backwards compatible API for now + void addAsync() { + if (!async) { + NodeData::addAsync(); + } + } + + void listen(ListenOptions listenOptions) { + if (listenOptions == TRANSFERS && !async) { + addAsync(); + } + } + + template + void forEach(const F &cb) { + uS::Poll *iterator = webSocketHead; + iterators.push(iterator); + while (iterator) { + uS::Poll *lastIterator = iterator; + cb((WebSocket *) iterator); + iterator = iterators.top(); + if (lastIterator == iterator) { + iterator = ((uS::Socket *) iterator)->next; + iterators.top() = iterator; + } + } + iterators.pop(); + } + + // duplicated code for now! + template + void forEachHttpSocket(const F &cb) { + uS::Poll *iterator = httpSocketHead; + iterators.push(iterator); + while (iterator) { + uS::Poll *lastIterator = iterator; + cb((HttpSocket *) iterator); + iterator = iterators.top(); + if (lastIterator == iterator) { + iterator = ((uS::Socket *) iterator)->next; + iterators.top() = iterator; + } + } + iterators.pop(); + } + + static Group *from(uS::Socket *s) { + return static_cast *>(s->getNodeData()); + } +}; + +} + +#endif // GROUP_UWS_H diff --git a/extern/uWebSocket/include/HTTPSocket.h b/extern/uWebSocket/include/HTTPSocket.h new file mode 100644 index 0000000000..ac1de60a3a --- /dev/null +++ b/extern/uWebSocket/include/HTTPSocket.h @@ -0,0 +1,285 @@ +#ifndef HTTPSOCKET_UWS_H +#define HTTPSOCKET_UWS_H + +#include "Socket.h" +#include +// #include + +namespace uWS { + +struct Header { + char *key, *value; + unsigned int keyLength, valueLength; + + operator bool() { + return key; + } + + // slow without string_view! + std::string toString() { + return std::string(value, valueLength); + } +}; + +enum HttpMethod { + METHOD_GET, + METHOD_POST, + METHOD_PUT, + METHOD_DELETE, + METHOD_PATCH, + METHOD_OPTIONS, + METHOD_HEAD, + METHOD_TRACE, + METHOD_CONNECT, + METHOD_INVALID +}; + +struct HttpRequest { + Header *headers; + Header getHeader(const char *key) { + return getHeader(key, strlen(key)); + } + + HttpRequest(Header *headers = nullptr) : headers(headers) {} + + Header getHeader(const char *key, size_t length) { + if (headers) { + for (Header *h = headers; *++h; ) { + if (h->keyLength == length && !strncmp(h->key, key, length)) { + return *h; + } + } + } + return {nullptr, nullptr, 0, 0}; + } + + Header getUrl() { + if (headers->key) { + return *headers; + } + return {nullptr, nullptr, 0, 0}; + } + + HttpMethod getMethod() { + if (!headers->key) { + return METHOD_INVALID; + } + switch (headers->keyLength) { + case 3: + if (!strncmp(headers->key, "get", 3)) { + return METHOD_GET; + } else if (!strncmp(headers->key, "put", 3)) { + return METHOD_PUT; + } + break; + case 4: + if (!strncmp(headers->key, "post", 4)) { + return METHOD_POST; + } else if (!strncmp(headers->key, "head", 4)) { + return METHOD_HEAD; + } + break; + case 5: + if (!strncmp(headers->key, "patch", 5)) { + return METHOD_PATCH; + } else if (!strncmp(headers->key, "trace", 5)) { + return METHOD_TRACE; + } + break; + case 6: + if (!strncmp(headers->key, "delete", 6)) { + return METHOD_DELETE; + } + break; + case 7: + if (!strncmp(headers->key, "options", 7)) { + return METHOD_OPTIONS; + } else if (!strncmp(headers->key, "connect", 7)) { + return METHOD_CONNECT; + } + break; + } + return METHOD_INVALID; + } +}; + +struct HttpResponse; + +template +struct WIN32_EXPORT HttpSocket : uS::Socket { + void *httpUser; // remove this later, setTimeout occupies user for now + HttpResponse *outstandingResponsesHead = nullptr; + HttpResponse *outstandingResponsesTail = nullptr; + HttpResponse *preAllocatedResponse = nullptr; + + std::string httpBuffer; + size_t contentLength = 0; + bool missedDeadline = false; + + HttpSocket(uS::Socket *socket) : uS::Socket(std::move(*socket)) {} + + void terminate() { + onEnd(this); + } + + void upgrade(const char *secKey, const char *extensions, + size_t extensionsLength, const char *subprotocol, + size_t subprotocolLength, bool *perMessageDeflate); + +protected: + friend struct uS::Socket; + friend struct HttpResponse; + friend struct Hub; + static uS::Socket *onData(uS::Socket *s, char *data, size_t length); + static void onEnd(uS::Socket *s); +}; + +struct HttpResponse { + HttpSocket *httpSocket; + HttpResponse *next = nullptr; + void *userData = nullptr; + void *extraUserData = nullptr; + HttpSocket::Queue::Message *messageQueue = nullptr; + bool hasEnded = false; + bool hasHead = false; + + HttpResponse(HttpSocket *httpSocket) : httpSocket(httpSocket) { + + } + + template + static HttpResponse *allocateResponse(HttpSocket *httpSocket) { + if (httpSocket->preAllocatedResponse) { + HttpResponse *ret = httpSocket->preAllocatedResponse; + httpSocket->preAllocatedResponse = nullptr; + return ret; + } else { + return new HttpResponse((HttpSocket *) httpSocket); + } + } + + //template + void freeResponse(HttpSocket *httpData) { + if (httpData->preAllocatedResponse) { + delete this; + } else { + httpData->preAllocatedResponse = this; + } + } + + void write(const char *message, size_t length = 0, + void(*callback)(void *httpSocket, void *data, bool cancelled, void *reserved) = nullptr, + void *callbackData = nullptr) { + + struct NoopTransformer { + static size_t estimate(const char *data, size_t length) { + return length; + } + + static size_t transform(const char *src, char *dst, size_t length, int transformData) { + memcpy(dst, src, length); + return length; + } + }; + + httpSocket->sendTransformed(message, length, callback, callbackData, 0); + hasHead = true; + } + + // todo: maybe this function should have a fast path for 0 length? + void end(const char *message = nullptr, size_t length = 0, + void(*callback)(void *httpResponse, void *data, bool cancelled, void *reserved) = nullptr, + void *callbackData = nullptr) { + + struct TransformData { + bool hasHead; + } transformData = {hasHead}; + + struct HttpTransformer { + + // todo: this should get TransformData! + static size_t estimate(const char *data, size_t length) { + return length + 128; + } + + static size_t transform(const char *src, char *dst, size_t length, TransformData transformData) { + // todo: sprintf is extremely slow + int offset = transformData.hasHead ? 0 : std::sprintf(dst, "HTTP/1.1 200 OK\r\nContent-Length: %u\r\n\r\n", (unsigned int) length); + memcpy(dst + offset, src, length); + return length + offset; + } + }; + + if (httpSocket->outstandingResponsesHead != this) { + HttpSocket::Queue::Message *messagePtr = httpSocket->allocMessage(HttpTransformer::estimate(message, length)); + messagePtr->length = HttpTransformer::transform(message, (char *) messagePtr->data, length, transformData); + messagePtr->callback = callback; + messagePtr->callbackData = callbackData; + messagePtr->nextMessage = messageQueue; + messageQueue = messagePtr; + hasEnded = true; + } else { + httpSocket->sendTransformed(message, length, callback, callbackData, transformData); + // move head as far as possible + HttpResponse *head = next; + while (head) { + // empty message queue + HttpSocket::Queue::Message *messagePtr = head->messageQueue; + while (messagePtr) { + HttpSocket::Queue::Message *nextMessage = messagePtr->nextMessage; + + bool wasTransferred; + if (httpSocket->write(messagePtr, wasTransferred)) { + if (!wasTransferred) { + httpSocket->freeMessage(messagePtr); + if (callback) { + callback(this, callbackData, false, nullptr); + } + } else { + messagePtr->callback = callback; + messagePtr->callbackData = callbackData; + } + } else { + httpSocket->freeMessage(messagePtr); + if (callback) { + callback(this, callbackData, true, nullptr); + } + goto updateHead; + } + messagePtr = nextMessage; + } + // cannot go beyond unfinished responses + if (!head->hasEnded) { + break; + } else { + HttpResponse *next = head->next; + head->freeResponse(httpSocket); + head = next; + } + } + updateHead: + httpSocket->outstandingResponsesHead = head; + if (!head) { + httpSocket->outstandingResponsesTail = nullptr; + } + + freeResponse(httpSocket); + } + } + + void setUserData(void *userData) { + this->userData = userData; + } + + void *getUserData() { + return userData; + } + + HttpSocket *getHttpSocket() { + return httpSocket; + } +}; + +} + +#endif // HTTPSOCKET_UWS_H diff --git a/extern/uWebSocket/include/Hub.h b/extern/uWebSocket/include/Hub.h new file mode 100644 index 0000000000..4b56313706 --- /dev/null +++ b/extern/uWebSocket/include/Hub.h @@ -0,0 +1,99 @@ +#ifndef HUB_UWS_H +#define HUB_UWS_H + +#include "Group.h" +#include "Node.h" +#include +#include +#include +#include + +namespace uWS { + +struct WIN32_EXPORT Hub : protected uS::Node, public Group, public Group { +protected: + struct ConnectionData { + std::string path; + void *user; + Group *group; + }; + + z_stream inflationStream = {}; + char *inflationBuffer; + char *inflate(char *data, size_t &length, size_t maxPayload); + std::string dynamicInflationBuffer; + static const int LARGE_BUFFER_SIZE = 300 * 1024; + + static void onServerAccept(uS::Socket *s); + static void onClientConnection(uS::Socket *s, bool error); + +public: + template + Group *createGroup(int extensionOptions = 0, unsigned int maxPayload = 16777216) { + return new Group(extensionOptions, maxPayload, this, nodeData); + } + + template + Group &getDefaultGroup() { + return static_cast &>(*this); + } + + bool listen(int port, uS::TLS::Context sslContext = nullptr, int options = 0, Group *eh = nullptr); + bool listen(const char *host, int port, uS::TLS::Context sslContext = nullptr, int options = 0, Group *eh = nullptr); + void connect(std::string uri, void *user = nullptr, std::map extraHeaders = {}, int timeoutMs = 5000, Group *eh = nullptr); + void upgrade(uv_os_sock_t fd, const char *secKey, SSL *ssl, const char *extensions, size_t extensionsLength, const char *subprotocol, size_t subprotocolLength, Group *serverGroup = nullptr); + + Hub(int extensionOptions = 0, bool useDefaultLoop = false, unsigned int maxPayload = 16777216) : uS::Node(LARGE_BUFFER_SIZE, WebSocketProtocol>::CONSUME_PRE_PADDING, WebSocketProtocol>::CONSUME_POST_PADDING, useDefaultLoop), + Group(extensionOptions, maxPayload, this, nodeData), Group(0, maxPayload, this, nodeData) { + inflateInit2(&inflationStream, -15); + inflationBuffer = new char[LARGE_BUFFER_SIZE]; + +#ifdef UWS_THREADSAFE + getLoop()->preCbData = nodeData; + getLoop()->preCb = [](void *nodeData) { + static_cast(nodeData)->asyncMutex->lock(); + }; + + getLoop()->postCbData = nodeData; + getLoop()->postCb = [](void *nodeData) { + static_cast(nodeData)->asyncMutex->unlock(); + }; +#endif + } + + ~Hub() { + inflateEnd(&inflationStream); + delete [] inflationBuffer; + } + + using uS::Node::run; + using uS::Node::poll; + using uS::Node::getLoop; + using Group::onConnection; + using Group::onConnection; + using Group::onTransfer; + using Group::onTransfer; + using Group::onMessage; + using Group::onMessage; + using Group::onDisconnection; + using Group::onDisconnection; + using Group::onPing; + using Group::onPing; + using Group::onPong; + using Group::onPong; + using Group::onError; + using Group::onError; + using Group::onHttpRequest; + using Group::onHttpData; + using Group::onHttpConnection; + using Group::onHttpDisconnection; + using Group::onHttpUpgrade; + using Group::onCancelledHttpRequest; + + friend struct WebSocket; + friend struct WebSocket; +}; + +} + +#endif // HUB_UWS_H diff --git a/extern/uWebSocket/include/Libuv.h b/extern/uWebSocket/include/Libuv.h new file mode 100644 index 0000000000..017e25b9c9 --- /dev/null +++ b/extern/uWebSocket/include/Libuv.h @@ -0,0 +1,183 @@ +#ifndef LIBUV_H +#define LIBUV_H + +#include +static_assert (UV_VERSION_MINOR >= 3, "µWebSockets requires libuv >=1.3.0"); + +namespace uS { + +struct Loop : uv_loop_t { + static Loop *createLoop(bool defaultLoop = true) { + if (defaultLoop) { + return (Loop *) uv_default_loop(); + } else { + return (Loop *) uv_loop_new(); + } + } + + void destroy() { + if (this != uv_default_loop()) { + uv_loop_delete(this); + } + } + + void run() { + uv_run(this, UV_RUN_DEFAULT); + } + + void poll() { + uv_run(this, UV_RUN_NOWAIT); + } +}; + +struct Async { + uv_async_t uv_async; + + Async(Loop *loop) { + uv_async.loop = loop; + } + + void start(void (*cb)(Async *)) { + uv_async_init(uv_async.loop, &uv_async, (uv_async_cb) cb); + } + + void send() { + uv_async_send(&uv_async); + } + + void close() { + uv_close((uv_handle_t *) &uv_async, [](uv_handle_t *a) { + delete (Async *) a; + }); + } + + void setData(void *data) { + uv_async.data = data; + } + + void *getData() { + return uv_async.data; + } +}; + +struct Timer { + uv_timer_t uv_timer; + + Timer(Loop *loop) { + uv_timer_init(loop, &uv_timer); + } + + void start(void (*cb)(Timer *), int first, int repeat) { + uv_timer_start(&uv_timer, (uv_timer_cb) cb, first, repeat); + } + + void setData(void *data) { + uv_timer.data = data; + } + + void *getData() { + return uv_timer.data; + } + + void stop() { + uv_timer_stop(&uv_timer); + } + + void close() { + uv_close((uv_handle_t *) &uv_timer, [](uv_handle_t *t) { + delete (Timer *) t; + }); + } + +private: + ~Timer() { + + } +}; + +struct Poll { + uv_poll_t *uv_poll; + void (*cb)(Poll *p, int status, int events); + + Poll(Loop *loop, uv_os_sock_t fd) { + uv_poll = new uv_poll_t; + uv_poll_init_socket(loop, uv_poll, fd); + } + + Poll(Poll &&other) { + uv_poll = other.uv_poll; + cb = other.cb; + other.uv_poll = nullptr; + } + + Poll(const Poll &other) = delete; + + ~Poll() { + delete uv_poll; + } + + bool isClosed() { + return uv_is_closing((uv_handle_t *) uv_poll); + } + + uv_os_sock_t getFd() { +#ifdef _WIN32 + uv_os_sock_t fd; + uv_fileno((uv_handle_t *) uv_poll, (uv_os_fd_t *) &fd); + return fd; +#else + return uv_poll->io_watcher.fd; +#endif + } + + void setCb(void (*cb)(Poll *p, int status, int events)) { + this->cb = cb; + } + + void (*getCb())(Poll *, int, int) { + return cb; + } + + void reInit(Loop *loop, uv_os_sock_t fd) { + delete uv_poll; + uv_poll = new uv_poll_t; + uv_poll_init_socket(loop, uv_poll, fd); + } + + void start(Loop *, Poll *self, int events) { + uv_poll->data = self; + uv_poll_start(uv_poll, events, [](uv_poll_t *p, int status, int events) { + Poll *self = (Poll *) p->data; + self->cb(self, status, events); + }); + } + + void change(Loop *, Poll *self, int events) { + start(nullptr, self, events); + } + + void stop(Loop *loop) { + uv_poll_stop(uv_poll); + } + + bool fastTransfer(Loop *loop, Loop *newLoop, int events) { + return false; + } + + bool threadSafeChange(Loop *, Poll *self, int events) { + return false; + } + + void close(Loop *loop, void (*cb)(Poll *)) { + this->cb = (void(*)(Poll *, int, int)) cb; + uv_close((uv_handle_t *) uv_poll, [](uv_handle_t *p) { + Poll *poll = (Poll *) p->data; + void (*cb)(Poll *) = (void(*)(Poll *)) poll->cb; + cb(poll); + }); + } +}; + +} + +#endif // LIBUV_H diff --git a/extern/uWebSocket/include/Networking.h b/extern/uWebSocket/include/Networking.h new file mode 100644 index 0000000000..e26888cc77 --- /dev/null +++ b/extern/uWebSocket/include/Networking.h @@ -0,0 +1,259 @@ +// the purpose of this header should be to provide SSL and networking wrapped in a common interface +// it should allow cross-platform networking and SSL and also easy usage of mTCP and similar tech + +#ifndef NETWORKING_UWS_H +#define NETWORKING_UWS_H + +#include +#if OPENSSL_VERSION_NUMBER < 0x10100000L +#define SSL_CTX_up_ref(x) x->references++ +#define SSL_up_ref(x) x->references++ +#endif + +#ifndef __linux +#define MSG_NOSIGNAL 0 +#else +#include +#endif + +#ifdef __APPLE__ +#include +#define htobe64(x) OSSwapHostToBigInt64(x) +#define be64toh(x) OSSwapBigToHostInt64(x) +#endif + +#ifdef _WIN32 +#define NOMINMAX +#include +#include +#pragma comment(lib, "ws2_32.lib") +#define SHUT_WR SD_SEND +#ifdef __MINGW32__ +// Windows has always been tied to LE +#define htobe64(x) __builtin_bswap64(x) +#define be64toh(x) __builtin_bswap64(x) +#else +#define __thread __declspec(thread) +#define htobe64(x) htonll(x) +#define be64toh(x) ntohll(x) +#define pthread_t DWORD +#define pthread_self GetCurrentThreadId +#endif +#define WIN32_EXPORT __declspec(dllexport) + +inline void close(SOCKET fd) {closesocket(fd);} +inline int setsockopt(SOCKET fd, int level, int optname, const void *optval, socklen_t optlen) { + return setsockopt(fd, level, optname, (const char *) optval, optlen); +} + +inline SOCKET dup(SOCKET socket) { + WSAPROTOCOL_INFOW pi; + if (WSADuplicateSocketW(socket, GetCurrentProcessId(), &pi) == SOCKET_ERROR) { + return INVALID_SOCKET; + } + return WSASocketW(pi.iAddressFamily, pi.iSocketType, pi.iProtocol, &pi, 0, WSA_FLAG_OVERLAPPED); +} +#else +#include +#include +#include +#include +#include +#include +#include +#define SOCKET_ERROR -1 +#define INVALID_SOCKET -1 +#define WIN32_EXPORT +#endif + +#include "Backend.h" +#include +#include +#include +#include +#include +#include +#include + +namespace uS { + +// todo: mark sockets nonblocking in these functions +// todo: probably merge this Context with the TLS::Context for same interface for SSL and non-SSL! +struct Context { + +#ifdef USE_MTCP + mtcp_context *mctx; +#endif + + Context() { + // mtcp_create_context +#ifdef USE_MTCP + mctx = mtcp_create_context(0); // cpu index? +#endif + } + + ~Context() { +#ifdef USE_MTCP + mtcp_destroy_context(mctx); +#endif + } + + // returns INVALID_SOCKET on error + uv_os_sock_t acceptSocket(uv_os_sock_t fd) { + uv_os_sock_t acceptedFd; +#if defined(SOCK_CLOEXEC) && defined(SOCK_NONBLOCK) + // Linux, FreeBSD + acceptedFd = accept4(fd, nullptr, nullptr, SOCK_CLOEXEC | SOCK_NONBLOCK); +#else + // Windows, OS X + acceptedFd = accept(fd, nullptr, nullptr); +#endif + +#ifdef __APPLE__ + if (acceptedFd != INVALID_SOCKET) { + int noSigpipe = 1; + setsockopt(acceptedFd, SOL_SOCKET, SO_NOSIGPIPE, &noSigpipe, sizeof(int)); + } +#endif + return acceptedFd; + } + + // returns INVALID_SOCKET on error + uv_os_sock_t createSocket(int domain, int type, int protocol) { + int flags = 0; +#if defined(SOCK_CLOEXEC) && defined(SOCK_NONBLOCK) + flags = SOCK_CLOEXEC | SOCK_NONBLOCK; +#endif + + uv_os_sock_t createdFd = socket(domain, type | flags, protocol); + +#ifdef __APPLE__ + if (createdFd != INVALID_SOCKET) { + int noSigpipe = 1; + setsockopt(createdFd, SOL_SOCKET, SO_NOSIGPIPE, &noSigpipe, sizeof(int)); + } +#endif + + return createdFd; + } + + void closeSocket(uv_os_sock_t fd) { +#ifdef _WIN32 + closesocket(fd); +#else + close(fd); +#endif + } + + bool wouldBlock() { +#ifdef _WIN32 + return WSAGetLastError() == WSAEWOULDBLOCK; +#else + return errno == EWOULDBLOCK;// || errno == EAGAIN; +#endif + } +}; + +namespace TLS { + +class WIN32_EXPORT Context { +protected: + SSL_CTX *context = nullptr; + std::shared_ptr password; + + static int passwordCallback(char *buf, int size, int rwflag, void *u) + { + std::string *password = (std::string *) u; + int length = std::min(size, (int) password->length()); + memcpy(buf, password->data(), length); + buf[length] = '\0'; + return length; + } + +public: + friend Context WIN32_EXPORT createContext(std::string certChainFileName, std::string keyFileName, std::string keyFilePassword); + Context(SSL_CTX *context) : context(context) { + + } + + Context() = default; + Context(const Context &other); + Context &operator=(const Context &other); + ~Context(); + operator bool() { + return context; + } + + SSL_CTX *getNativeContext() { + return context; + } +}; + +Context WIN32_EXPORT createContext(std::string certChainFileName, std::string keyFileName, std::string keyFilePassword = std::string()); + +} + +struct Socket; + +// NodeData is like a Context, maybe merge them? +struct WIN32_EXPORT NodeData { + char *recvBufferMemoryBlock; + char *recvBuffer; + int recvLength; + Loop *loop; + uS::Context *netContext; + void *user = nullptr; + static const int preAllocMaxSize = 1024; + char **preAlloc; + SSL_CTX *clientContext; + + Async *async = nullptr; + pthread_t tid; + + std::recursive_mutex *asyncMutex; + std::vector transferQueue; + std::vector changePollQueue; + static void asyncCallback(Async *async); + + static int getMemoryBlockIndex(size_t length) { + return (int) ((length >> 4) + bool(length & 15)); + } + + char *getSmallMemoryBlock(int index) { + if (preAlloc[index]) { + char *memory = preAlloc[index]; + preAlloc[index] = nullptr; + return memory; + } else { + return new char[index << 4]; + } + } + + void freeSmallMemoryBlock(char *memory, int index) { + if (!preAlloc[index]) { + preAlloc[index] = memory; + } else { + delete [] memory; + } + } + +public: + void addAsync() { + async = new Async(loop); + async->setData(this); + async->start(NodeData::asyncCallback); + } + + void clearPendingPollChanges(Poll *p) { + asyncMutex->lock(); + changePollQueue.erase( + std::remove(changePollQueue.begin(), changePollQueue.end(), p), + changePollQueue.end() + ); + asyncMutex->unlock(); + } +}; + +} + +#endif // NETWORKING_UWS_H diff --git a/extern/uWebSocket/include/Node.h b/extern/uWebSocket/include/Node.h new file mode 100644 index 0000000000..5c31cafa2e --- /dev/null +++ b/extern/uWebSocket/include/Node.h @@ -0,0 +1,202 @@ +#ifndef NODE_UWS_H +#define NODE_UWS_H + +#include "Socket.h" +#include +#include + +namespace uS { + +enum ListenOptions : int { + REUSE_PORT = 1, + ONLY_IPV4 = 2 +}; + +class WIN32_EXPORT Node { +protected: + template + static void connect_cb(Poll *p, int status, int events) { + C((Socket *) p, status < 0); + } + + template + static void accept_poll_cb(Poll *p, int status, int events) { + ListenSocket *listenData = (ListenSocket *) p; + accept_cb(listenData); + } + + template + static void accept_timer_cb(Timer *p) { + ListenSocket *listenData = (ListenSocket *) p->getData(); + accept_cb(listenData); + } + + template + static void accept_cb(ListenSocket *listenSocket) { + uv_os_sock_t serverFd = listenSocket->getFd(); + Context *netContext = listenSocket->nodeData->netContext; + uv_os_sock_t clientFd = netContext->acceptSocket(serverFd); + if (clientFd == INVALID_SOCKET) { + /* + * If accept is failing, the pending connection won't be removed and the + * polling will cause the server to spin, using 100% cpu. Switch to a timer + * event instead to avoid this. + */ + if (!TIMER && !netContext->wouldBlock()) { + listenSocket->stop(listenSocket->nodeData->loop); + + listenSocket->timer = new Timer(listenSocket->nodeData->loop); + listenSocket->timer->setData(listenSocket); + listenSocket->timer->start(accept_timer_cb, 1000, 1000); + } + return; + } else if (TIMER) { + listenSocket->timer->stop(); + listenSocket->timer->close(); + listenSocket->timer = nullptr; + + listenSocket->setCb(accept_poll_cb); + listenSocket->start(listenSocket->nodeData->loop, listenSocket, UV_READABLE); + } + do { + SSL *ssl = nullptr; + if (listenSocket->sslContext) { + ssl = SSL_new(listenSocket->sslContext.getNativeContext()); + SSL_set_accept_state(ssl); + } + + Socket *socket = new Socket(listenSocket->nodeData, listenSocket->nodeData->loop, clientFd, ssl); + socket->setPoll(UV_READABLE); + A(socket); + } while ((clientFd = netContext->acceptSocket(serverFd)) != INVALID_SOCKET); + } + + Loop *loop; + NodeData *nodeData; + std::recursive_mutex asyncMutex; + +public: + Node(int recvLength = 1024, int prePadding = 0, int postPadding = 0, bool useDefaultLoop = false); + ~Node(); + + /* Blocking */ + void run(); + + /* Non-blocking */ + void poll(); + + Loop *getLoop() { + return loop; + } + + template + Socket *connect(const char *hostname, int port, bool secure, NodeData *nodeData) { + Context *netContext = nodeData->netContext; + + addrinfo hints, *result; + memset(&hints, 0, sizeof(addrinfo)); + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = SOCK_STREAM; + if (getaddrinfo(hostname, std::to_string(port).c_str(), &hints, &result) != 0) { + return nullptr; + } + + uv_os_sock_t fd = netContext->createSocket(result->ai_family, result->ai_socktype, result->ai_protocol); + if (fd == INVALID_SOCKET) { + freeaddrinfo(result); + return nullptr; + } + + ::connect(fd, result->ai_addr, result->ai_addrlen); + freeaddrinfo(result); + + SSL *ssl = nullptr; + if (secure) { + ssl = SSL_new(nodeData->clientContext); + SSL_set_connect_state(ssl); + SSL_set_tlsext_host_name(ssl, hostname); + } + + Socket initialSocket(nodeData, getLoop(), fd, ssl); + uS::Socket *socket = I(&initialSocket); + + socket->setCb(connect_cb); + socket->start(loop, socket, socket->setPoll(UV_WRITABLE)); + return socket; + } + + // todo: hostname, backlog + template + bool listen(const char *host, int port, uS::TLS::Context sslContext, int options, uS::NodeData *nodeData, void *user) { + addrinfo hints, *result; + memset(&hints, 0, sizeof(addrinfo)); + + hints.ai_flags = AI_PASSIVE; + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = SOCK_STREAM; + + Context *netContext = nodeData->netContext; + + if (getaddrinfo(host, std::to_string(port).c_str(), &hints, &result)) { + return true; + } + + uv_os_sock_t listenFd = SOCKET_ERROR; + addrinfo *listenAddr; + if ((options & uS::ONLY_IPV4) == 0) { + for (addrinfo *a = result; a && listenFd == SOCKET_ERROR; a = a->ai_next) { + if (a->ai_family == AF_INET6) { + listenFd = netContext->createSocket(a->ai_family, a->ai_socktype, a->ai_protocol); + listenAddr = a; + } + } + } + + for (addrinfo *a = result; a && listenFd == SOCKET_ERROR; a = a->ai_next) { + if (a->ai_family == AF_INET) { + listenFd = netContext->createSocket(a->ai_family, a->ai_socktype, a->ai_protocol); + listenAddr = a; + } + } + + if (listenFd == SOCKET_ERROR) { + freeaddrinfo(result); + return true; + } + +#ifdef __linux +#ifdef SO_REUSEPORT + if (options & REUSE_PORT) { + int optval = 1; + setsockopt(listenFd, SOL_SOCKET, SO_REUSEPORT, &optval, sizeof(optval)); + } +#endif +#endif + + int enabled = true; + setsockopt(listenFd, SOL_SOCKET, SO_REUSEADDR, &enabled, sizeof(enabled)); + + if (::bind(listenFd, listenAddr->ai_addr, listenAddr->ai_addrlen) || ::listen(listenFd, 512)) { + netContext->closeSocket(listenFd); + freeaddrinfo(result); + return true; + } + + ListenSocket *listenSocket = new ListenSocket(nodeData, loop, listenFd, nullptr); + listenSocket->sslContext = sslContext; + listenSocket->nodeData = nodeData; + + listenSocket->setCb(accept_poll_cb); + listenSocket->start(loop, listenSocket, UV_READABLE); + + // should be vector of listen data! one group can have many listeners! + nodeData->user = listenSocket; + + freeaddrinfo(result); + return false; + } +}; + +} + +#endif // NODE_UWS_H diff --git a/extern/uWebSocket/include/Socket.h b/extern/uWebSocket/include/Socket.h new file mode 100644 index 0000000000..673ee25d76 --- /dev/null +++ b/extern/uWebSocket/include/Socket.h @@ -0,0 +1,507 @@ +#ifndef SOCKET_UWS_H +#define SOCKET_UWS_H + +#include "Networking.h" + +namespace uS { + +struct TransferData { + // Connection state + uv_os_sock_t fd; + SSL *ssl; + + // Poll state + void (*pollCb)(Poll *, int, int); + int pollEvents; + + // User state + void *userData; + + // Destination + NodeData *destination; + void (*transferCb)(Poll *); +}; + +// perfectly 64 bytes (4 + 60) +struct WIN32_EXPORT Socket : Poll { +protected: + struct { + int poll : 4; + int shuttingDown : 4; + } state = {0, false}; + + SSL *ssl; + void *user = nullptr; + NodeData *nodeData; + + // this is not needed by HttpSocket! + struct Queue { + struct Message { + const char *data; + size_t length; + Message *nextMessage = nullptr; + void (*callback)(void *socket, void *data, bool cancelled, void *reserved) = nullptr; + void *callbackData = nullptr, *reserved = nullptr; + }; + + Message *head = nullptr, *tail = nullptr; + void pop() + { + Message *nextMessage; + if ((nextMessage = head->nextMessage)) { + delete [] (char *) head; + head = nextMessage; + } else { + delete [] (char *) head; + head = tail = nullptr; + } + } + + bool empty() {return head == nullptr;} + Message *front() {return head;} + + void push(Message *message) + { + message->nextMessage = nullptr; + if (tail) { + tail->nextMessage = message; + tail = message; + } else { + head = message; + tail = message; + } + } + } messageQueue; + + int getPoll() { + return state.poll; + } + + int setPoll(int poll) { + state.poll = poll; + return poll; + } + + void setShuttingDown(bool shuttingDown) { + state.shuttingDown = shuttingDown; + } + + void transfer(NodeData *nodeData, void (*cb)(Poll *)) { + // userData is invalid from now on till onTransfer + setUserData(new TransferData({getFd(), ssl, getCb(), getPoll(), getUserData(), nodeData, cb})); + stop(this->nodeData->loop); + close(this->nodeData->loop, [](Poll *p) { + Socket *s = (Socket *) p; + TransferData *transferData = (TransferData *) s->getUserData(); + + transferData->destination->asyncMutex->lock(); + bool wasEmpty = transferData->destination->transferQueue.empty(); + transferData->destination->transferQueue.push_back(s); + transferData->destination->asyncMutex->unlock(); + + if (wasEmpty) { + transferData->destination->async->send(); + } + }); + } + + void changePoll(Socket *socket) { + if (!threadSafeChange(nodeData->loop, this, socket->getPoll())) { + if (socket->nodeData->tid != pthread_self()) { + socket->nodeData->asyncMutex->lock(); + socket->nodeData->changePollQueue.push_back(socket); + socket->nodeData->asyncMutex->unlock(); + socket->nodeData->async->send(); + } else { + change(socket->nodeData->loop, socket, socket->getPoll()); + } + } + } + + // clears user data! + template + void startTimeout(int timeoutMs = 15000) { + Timer *timer = new Timer(nodeData->loop); + timer->setData(this); + timer->start([](Timer *timer) { + Socket *s = (Socket *) timer->getData(); + s->cancelTimeout(); + onTimeout(s); + }, timeoutMs, 0); + + user = timer; + } + + void cancelTimeout() { + Timer *timer = (Timer *) getUserData(); + if (timer) { + timer->stop(); + timer->close(); + user = nullptr; + } + } + + template + static void sslIoHandler(Poll *p, int status, int events) { + Socket *socket = (Socket *) p; + + if (status < 0) { + STATE::onEnd((Socket *) p); + return; + } + + if (!socket->messageQueue.empty() && ((events & UV_WRITABLE) || SSL_want(socket->ssl) == SSL_READING)) { + socket->cork(true); + while (true) { + Queue::Message *messagePtr = socket->messageQueue.front(); + int sent = SSL_write(socket->ssl, messagePtr->data, (int) messagePtr->length); + if (sent == (ssize_t) messagePtr->length) { + if (messagePtr->callback) { + messagePtr->callback(p, messagePtr->callbackData, false, messagePtr->reserved); + } + socket->messageQueue.pop(); + if (socket->messageQueue.empty()) { + if ((socket->state.poll & UV_WRITABLE) && SSL_want(socket->ssl) != SSL_WRITING) { + socket->change(socket->nodeData->loop, socket, socket->setPoll(UV_READABLE)); + } + break; + } + } else if (sent <= 0) { + switch (SSL_get_error(socket->ssl, sent)) { + case SSL_ERROR_WANT_READ: + break; + case SSL_ERROR_WANT_WRITE: + if ((socket->getPoll() & UV_WRITABLE) == 0) { + socket->change(socket->nodeData->loop, socket, socket->setPoll(socket->getPoll() | UV_WRITABLE)); + } + break; + default: + STATE::onEnd((Socket *) p); + return; + } + break; + } + } + socket->cork(false); + } + + if (events & UV_READABLE) { + do { + int length = SSL_read(socket->ssl, socket->nodeData->recvBuffer, socket->nodeData->recvLength); + if (length <= 0) { + switch (SSL_get_error(socket->ssl, length)) { + case SSL_ERROR_WANT_READ: + break; + case SSL_ERROR_WANT_WRITE: + if ((socket->getPoll() & UV_WRITABLE) == 0) { + socket->change(socket->nodeData->loop, socket, socket->setPoll(socket->getPoll() | UV_WRITABLE)); + } + break; + default: + STATE::onEnd((Socket *) p); + return; + } + break; + } else { + // Warning: onData can delete the socket! Happens when HttpSocket upgrades + socket = STATE::onData((Socket *) p, socket->nodeData->recvBuffer, length); + if (socket->isClosed() || socket->isShuttingDown()) { + return; + } + } + } while (SSL_pending(socket->ssl)); + } + } + + template + static void ioHandler(Poll *p, int status, int events) { + Socket *socket = (Socket *) p; + NodeData *nodeData = socket->nodeData; + Context *netContext = nodeData->netContext; + + if (status < 0) { + STATE::onEnd((Socket *) p); + return; + } + + if (events & UV_WRITABLE) { + if (!socket->messageQueue.empty() && (events & UV_WRITABLE)) { + socket->cork(true); + while (true) { + Queue::Message *messagePtr = socket->messageQueue.front(); + ssize_t sent = ::send(socket->getFd(), messagePtr->data, messagePtr->length, MSG_NOSIGNAL); + if (sent == (ssize_t) messagePtr->length) { + if (messagePtr->callback) { + messagePtr->callback(p, messagePtr->callbackData, false, messagePtr->reserved); + } + socket->messageQueue.pop(); + if (socket->messageQueue.empty()) { + // todo, remove bit, don't set directly + socket->change(socket->nodeData->loop, socket, socket->setPoll(UV_READABLE)); + break; + } + } else if (sent == SOCKET_ERROR) { + if (!netContext->wouldBlock()) { + STATE::onEnd((Socket *) p); + return; + } + break; + } else { + messagePtr->length -= sent; + messagePtr->data += sent; + break; + } + } + socket->cork(false); + } + } + + if (events & UV_READABLE) { + int length = (int) recv(socket->getFd(), nodeData->recvBuffer, nodeData->recvLength, 0); + if (length > 0) { + STATE::onData((Socket *) p, nodeData->recvBuffer, length); + } else if (length <= 0 || (length == SOCKET_ERROR && !netContext->wouldBlock())) { + STATE::onEnd((Socket *) p); + } + } + + } + + template + void setState() { + if (ssl) { + setCb(sslIoHandler); + } else { + setCb(ioHandler); + } + } + + bool hasEmptyQueue() { + return messageQueue.empty(); + } + + void enqueue(Queue::Message *message) { + messageQueue.push(message); + } + + Queue::Message *allocMessage(size_t length, const char *data = 0) { + Queue::Message *messagePtr = (Queue::Message *) new char[sizeof(Queue::Message) + length]; + messagePtr->length = length; + messagePtr->data = ((char *) messagePtr) + sizeof(Queue::Message); + messagePtr->nextMessage = nullptr; + + if (data) { + memcpy((char *) messagePtr->data, data, messagePtr->length); + } + + return messagePtr; + } + + void freeMessage(Queue::Message *message) { + delete [] (char *) message; + } + + bool write(Queue::Message *message, bool &wasTransferred) { + ssize_t sent = 0; + if (messageQueue.empty()) { + + if (ssl) { + sent = SSL_write(ssl, message->data, (int) message->length); + if (sent == (ssize_t) message->length) { + wasTransferred = false; + return true; + } else if (sent < 0) { + switch (SSL_get_error(ssl, (int) sent)) { + case SSL_ERROR_WANT_READ: + break; + case SSL_ERROR_WANT_WRITE: + if ((getPoll() & UV_WRITABLE) == 0) { + setPoll(getPoll() | UV_WRITABLE); + changePoll(this); + } + break; + default: + return false; + } + } + } else { + sent = ::send(getFd(), message->data, message->length, MSG_NOSIGNAL); + if (sent == (ssize_t) message->length) { + wasTransferred = false; + return true; + } else if (sent == SOCKET_ERROR) { + if (!nodeData->netContext->wouldBlock()) { + return false; + } + } else { + message->length -= sent; + message->data += sent; + } + + if ((getPoll() & UV_WRITABLE) == 0) { + setPoll(getPoll() | UV_WRITABLE); + changePoll(this); + } + } + } + messageQueue.push(message); + wasTransferred = true; + return true; + } + + template + void sendTransformed(const char *message, size_t length, void(*callback)(void *socket, void *data, bool cancelled, void *reserved), void *callbackData, D transformData) { + size_t estimatedLength = T::estimate(message, length) + sizeof(Queue::Message); + + if (hasEmptyQueue()) { + if (estimatedLength <= uS::NodeData::preAllocMaxSize) { + int memoryLength = (int) estimatedLength; + int memoryIndex = nodeData->getMemoryBlockIndex(memoryLength); + + Queue::Message *messagePtr = (Queue::Message *) nodeData->getSmallMemoryBlock(memoryIndex); + messagePtr->data = ((char *) messagePtr) + sizeof(Queue::Message); + messagePtr->length = T::transform(message, (char *) messagePtr->data, length, transformData); + + bool wasTransferred; + if (write(messagePtr, wasTransferred)) { + if (!wasTransferred) { + nodeData->freeSmallMemoryBlock((char *) messagePtr, memoryIndex); + if (callback) { + callback(this, callbackData, false, nullptr); + } + } else { + messagePtr->callback = callback; + messagePtr->callbackData = callbackData; + } + } else { + nodeData->freeSmallMemoryBlock((char *) messagePtr, memoryIndex); + if (callback) { + callback(this, callbackData, true, nullptr); + } + } + } else { + Queue::Message *messagePtr = allocMessage(estimatedLength - sizeof(Queue::Message)); + messagePtr->length = T::transform(message, (char *) messagePtr->data, length, transformData); + + bool wasTransferred; + if (write(messagePtr, wasTransferred)) { + if (!wasTransferred) { + freeMessage(messagePtr); + if (callback) { + callback(this, callbackData, false, nullptr); + } + } else { + messagePtr->callback = callback; + messagePtr->callbackData = callbackData; + } + } else { + freeMessage(messagePtr); + if (callback) { + callback(this, callbackData, true, nullptr); + } + } + } + } else { + Queue::Message *messagePtr = allocMessage(estimatedLength - sizeof(Queue::Message)); + messagePtr->length = T::transform(message, (char *) messagePtr->data, length, transformData); + messagePtr->callback = callback; + messagePtr->callbackData = callbackData; + enqueue(messagePtr); + } + } + +public: + Socket(NodeData *nodeData, Loop *loop, uv_os_sock_t fd, SSL *ssl) : Poll(loop, fd), ssl(ssl), nodeData(nodeData) { + if (ssl) { + // OpenSSL treats SOCKETs as int + SSL_set_fd(ssl, (int) fd); + SSL_set_mode(ssl, SSL_MODE_RELEASE_BUFFERS); + } + } + + NodeData *getNodeData() { + return nodeData; + } + + Poll *next = nullptr, *prev = nullptr; + + void *getUserData() { + return user; + } + + void setUserData(void *user) { + this->user = user; + } + + struct Address { + unsigned int port; + const char *address; + const char *family; + }; + + Address getAddress(); + + void setNoDelay(int enable) { + setsockopt(getFd(), IPPROTO_TCP, TCP_NODELAY, &enable, sizeof(int)); + } + + void cork(int enable) { +#if defined(TCP_CORK) + // Linux & SmartOS have proper TCP_CORK + setsockopt(getFd(), IPPROTO_TCP, TCP_CORK, &enable, sizeof(int)); +#elif defined(TCP_NOPUSH) + // Mac OS X & FreeBSD have TCP_NOPUSH + setsockopt(getFd(), IPPROTO_TCP, TCP_NOPUSH, &enable, sizeof(int)); + if (!enable) { + // Tested on OS X, FreeBSD situation is unclear + ::send(getFd(), "", 0, MSG_NOSIGNAL); + } +#endif + } + + void shutdown() { + if (ssl) { + //todo: poll in/out - have the io_cb recall shutdown if failed + SSL_shutdown(ssl); + } else { + ::shutdown(getFd(), SHUT_WR); + } + } + + template + void closeSocket() { + uv_os_sock_t fd = getFd(); + Context *netContext = nodeData->netContext; + stop(nodeData->loop); + netContext->closeSocket(fd); + + if (ssl) { + SSL_free(ssl); + } + + Poll::close(nodeData->loop, [](Poll *p) { + delete (T *) p; + }); + } + + bool isShuttingDown() { + return state.shuttingDown; + } + + friend class Node; + friend struct NodeData; +}; + +struct ListenSocket : Socket { + + ListenSocket(NodeData *nodeData, Loop *loop, uv_os_sock_t fd, SSL *ssl) : Socket(nodeData, loop, fd, ssl) { + + } + + Timer *timer = nullptr; + uS::TLS::Context sslContext; +}; + +} + +#endif // SOCKET_UWS_H diff --git a/extern/uWebSocket/include/WebSocket.h b/extern/uWebSocket/include/WebSocket.h new file mode 100644 index 0000000000..9e7f547366 --- /dev/null +++ b/extern/uWebSocket/include/WebSocket.h @@ -0,0 +1,89 @@ +#ifndef WEBSOCKET_UWS_H +#define WEBSOCKET_UWS_H + +#include "WebSocketProtocol.h" +#include "Socket.h" + +namespace uWS { + +template +struct Group; + +template +struct HttpSocket; + +template +struct WIN32_EXPORT WebSocket : uS::Socket, WebSocketState { +protected: + std::string fragmentBuffer; + enum CompressionStatus : char { + DISABLED, + ENABLED, + COMPRESSED_FRAME + } compressionStatus; + unsigned char controlTipLength = 0, hasOutstandingPong = false; + + WebSocket(bool perMessageDeflate, uS::Socket *socket) : uS::Socket(std::move(*socket)) { + compressionStatus = perMessageDeflate ? CompressionStatus::ENABLED : CompressionStatus::DISABLED; + } + + static uS::Socket *onData(uS::Socket *s, char *data, size_t length); + static void onEnd(uS::Socket *s); + using uS::Socket::closeSocket; + + static bool refusePayloadLength(uint64_t length, WebSocketState *webSocketState) { + WebSocket *webSocket = static_cast *>(webSocketState); + return length > Group::from(webSocket)->maxPayload; + } + + static bool setCompressed(WebSocketState *webSocketState) { + WebSocket *webSocket = static_cast *>(webSocketState); + + if (webSocket->compressionStatus == WebSocket::CompressionStatus::ENABLED) { + webSocket->compressionStatus = WebSocket::CompressionStatus::COMPRESSED_FRAME; + return true; + } else { + return false; + } + } + + static void forceClose(WebSocketState *webSocketState) { + WebSocket *webSocket = static_cast *>(webSocketState); + webSocket->terminate(); + } + + static bool handleFragment(char *data, size_t length, unsigned int remainingBytes, int opCode, bool fin, WebSocketState *webSocketState); + +public: + struct PreparedMessage { + char *buffer; + size_t length; + int references; + void(*callback)(void *webSocket, void *data, bool cancelled, void *reserved); + }; + + // Not thread safe + void sendPrepared(PreparedMessage *preparedMessage, void *callbackData = nullptr); + static void finalizeMessage(PreparedMessage *preparedMessage); + void close(int code = 1000, const char *message = nullptr, size_t length = 0); + void transfer(Group *group); + + // Thread safe + void terminate(); + void ping(const char *message) {send(message, OpCode::PING);} + void send(const char *message, OpCode opCode = OpCode::TEXT) {send(message, strlen(message), opCode);} + void send(const char *message, size_t length, OpCode opCode, void(*callback)(WebSocket *webSocket, void *data, bool cancelled, void *reserved) = nullptr, void *callbackData = nullptr); + static PreparedMessage *prepareMessage(char *data, size_t length, OpCode opCode, bool compressed, void(*callback)(WebSocket *webSocket, void *data, bool cancelled, void *reserved) = nullptr); + static PreparedMessage *prepareMessageBatch(std::vector &messages, std::vector &excludedMessages, + OpCode opCode, bool compressed, void(*callback)(WebSocket *webSocket, void *data, bool cancelled, void *reserved) = nullptr); + + friend struct Hub; + friend struct Group; + friend struct HttpSocket; + friend struct uS::Socket; + friend class WebSocketProtocol>; +}; + +} + +#endif // WEBSOCKET_UWS_H diff --git a/extern/uWebSocket/include/WebSocketProtocol.h b/extern/uWebSocket/include/WebSocketProtocol.h new file mode 100644 index 0000000000..56db71ffa9 --- /dev/null +++ b/extern/uWebSocket/include/WebSocketProtocol.h @@ -0,0 +1,377 @@ +#ifndef WEBSOCKETPROTOCOL_UWS_H +#define WEBSOCKETPROTOCOL_UWS_H + +// we do need to include this for htobe64, should be moved from networking! +#include "Networking.h" + +#include +#include + +namespace uWS { + +enum OpCode : unsigned char { + TEXT = 1, + BINARY = 2, + CLOSE = 8, + PING = 9, + PONG = 10 +}; + +enum { + CLIENT, + SERVER +}; + +// 24 bytes perfectly +template +struct WebSocketState { +public: + static const unsigned int SHORT_MESSAGE_HEADER = isServer ? 6 : 2; + static const unsigned int MEDIUM_MESSAGE_HEADER = isServer ? 8 : 4; + static const unsigned int LONG_MESSAGE_HEADER = isServer ? 14 : 10; + + // 16 bytes + struct State { + unsigned int wantsHead : 1; + unsigned int spillLength : 4; + int opStack : 2; // -1, 0, 1 + unsigned int lastFin : 1; + + // 15 bytes + unsigned char spill[LONG_MESSAGE_HEADER - 1]; + OpCode opCode[2]; + + State() { + wantsHead = true; + spillLength = 0; + opStack = -1; + lastFin = true; + } + + } state; + + // 8 bytes + unsigned int remainingBytes = 0; + char mask[isServer ? 4 : 1]; +}; + +template +class WIN32_EXPORT WebSocketProtocol { +public: + static const unsigned int SHORT_MESSAGE_HEADER = isServer ? 6 : 2; + static const unsigned int MEDIUM_MESSAGE_HEADER = isServer ? 8 : 4; + static const unsigned int LONG_MESSAGE_HEADER = isServer ? 14 : 10; + +protected: + static inline bool isFin(char *frame) {return *((unsigned char *) frame) & 128;} + static inline unsigned char getOpCode(char *frame) {return *((unsigned char *) frame) & 15;} + static inline unsigned char payloadLength(char *frame) {return ((unsigned char *) frame)[1] & 127;} + static inline bool rsv23(char *frame) {return *((unsigned char *) frame) & 48;} + static inline bool rsv1(char *frame) {return *((unsigned char *) frame) & 64;} + + static inline void unmaskImprecise(char *dst, char *src, char *mask, unsigned int length) { + for (unsigned int n = (length >> 2) + 1; n; n--) { + *(dst++) = *(src++) ^ mask[0]; + *(dst++) = *(src++) ^ mask[1]; + *(dst++) = *(src++) ^ mask[2]; + *(dst++) = *(src++) ^ mask[3]; + } + } + + static inline void unmaskImpreciseCopyMask(char *dst, char *src, char *maskPtr, unsigned int length) { + char mask[4] = {maskPtr[0], maskPtr[1], maskPtr[2], maskPtr[3]}; + unmaskImprecise(dst, src, mask, length); + } + + static inline void rotateMask(unsigned int offset, char *mask) { + char originalMask[4] = {mask[0], mask[1], mask[2], mask[3]}; + mask[(0 + offset) % 4] = originalMask[0]; + mask[(1 + offset) % 4] = originalMask[1]; + mask[(2 + offset) % 4] = originalMask[2]; + mask[(3 + offset) % 4] = originalMask[3]; + } + + static inline void unmaskInplace(char *data, char *stop, char *mask) { + while (data < stop) { + *(data++) ^= mask[0]; + *(data++) ^= mask[1]; + *(data++) ^= mask[2]; + *(data++) ^= mask[3]; + } + } + + enum { + SND_CONTINUATION = 1, + SND_NO_FIN = 2, + SND_COMPRESSED = 64 + }; + + template + static inline bool consumeMessage(T payLength, char *&src, unsigned int &length, WebSocketState *wState) { + if (getOpCode(src)) { + if (wState->state.opStack == 1 || (!wState->state.lastFin && getOpCode(src) < 2)) { + Impl::forceClose(wState); + return true; + } + wState->state.opCode[++wState->state.opStack] = (OpCode) getOpCode(src); + } else if (wState->state.opStack == -1) { + Impl::forceClose(wState); + return true; + } + wState->state.lastFin = isFin(src); + + if (Impl::refusePayloadLength(payLength, wState)) { + Impl::forceClose(wState); + return true; + } + + if (payLength + MESSAGE_HEADER <= length) { + if (isServer) { + unmaskImpreciseCopyMask(src + MESSAGE_HEADER - 4, src + MESSAGE_HEADER, src + MESSAGE_HEADER - 4, (unsigned int) payLength); + if (Impl::handleFragment(src + MESSAGE_HEADER - 4, payLength, 0, wState->state.opCode[wState->state.opStack], isFin(src), wState)) { + return true; + } + } else { + if (Impl::handleFragment(src + MESSAGE_HEADER, payLength, 0, wState->state.opCode[wState->state.opStack], isFin(src), wState)) { + return true; + } + } + + if (isFin(src)) { + wState->state.opStack--; + } + + src += payLength + MESSAGE_HEADER; + length -= payLength + MESSAGE_HEADER; + wState->state.spillLength = 0; + return false; + } else { + wState->state.spillLength = 0; + wState->state.wantsHead = false; + wState->remainingBytes = (unsigned int) (payLength - length + MESSAGE_HEADER); + bool fin = isFin(src); + if (isServer) { + memcpy(wState->mask, src + MESSAGE_HEADER - 4, 4); + unmaskImprecise(src, src + MESSAGE_HEADER, wState->mask, length - MESSAGE_HEADER); + rotateMask(4 - (length - MESSAGE_HEADER) % 4, wState->mask); + } else { + src += MESSAGE_HEADER; + } + Impl::handleFragment(src, length - MESSAGE_HEADER, wState->remainingBytes, wState->state.opCode[wState->state.opStack], fin, wState); + return true; + } + } + + static inline bool consumeContinuation(char *&src, unsigned int &length, WebSocketState *wState) { + if (wState->remainingBytes <= length) { + if (isServer) { + int n = wState->remainingBytes >> 2; + unmaskInplace(src, src + n * 4, wState->mask); + for (int i = 0, s = wState->remainingBytes % 4; i < s; i++) { + src[n * 4 + i] ^= wState->mask[i]; + } + } + + if (Impl::handleFragment(src, wState->remainingBytes, 0, wState->state.opCode[wState->state.opStack], wState->state.lastFin, wState)) { + return false; + } + + if (wState->state.lastFin) { + wState->state.opStack--; + } + + src += wState->remainingBytes; + length -= wState->remainingBytes; + wState->state.wantsHead = true; + return true; + } else { + if (isServer) { + unmaskInplace(src, src + ((length >> 2) + 1) * 4, wState->mask); + } + + wState->remainingBytes -= length; + if (Impl::handleFragment(src, length, wState->remainingBytes, wState->state.opCode[wState->state.opStack], wState->state.lastFin, wState)) { + return false; + } + + if (isServer && length % 4) { + rotateMask(4 - (length % 4), wState->mask); + } + return false; + } + } + +public: + WebSocketProtocol() { + + } + + // Based on utf8_check.c by Markus Kuhn, 2005 + // https://www.cl.cam.ac.uk/~mgk25/ucs/utf8_check.c + // Optimized for predominantly 7-bit content by Alex Hultman, 2016 + // Licensed as Zlib, like the rest of this project + static bool isValidUtf8(unsigned char *s, size_t length) + { + for (unsigned char *e = s + length; s != e; ) { + if (s + 4 <= e && ((*(uint32_t *) s) & 0x80808080) == 0) { + s += 4; + } else { + while (!(*s & 0x80)) { + if (++s == e) { + return true; + } + } + + if ((s[0] & 0x60) == 0x40) { + if (s + 1 >= e || (s[1] & 0xc0) != 0x80 || (s[0] & 0xfe) == 0xc0) { + return false; + } + s += 2; + } else if ((s[0] & 0xf0) == 0xe0) { + if (s + 2 >= e || (s[1] & 0xc0) != 0x80 || (s[2] & 0xc0) != 0x80 || + (s[0] == 0xe0 && (s[1] & 0xe0) == 0x80) || (s[0] == 0xed && (s[1] & 0xe0) == 0xa0)) { + return false; + } + s += 3; + } else if ((s[0] & 0xf8) == 0xf0) { + if (s + 3 >= e || (s[1] & 0xc0) != 0x80 || (s[2] & 0xc0) != 0x80 || (s[3] & 0xc0) != 0x80 || + (s[0] == 0xf0 && (s[1] & 0xf0) == 0x80) || (s[0] == 0xf4 && s[1] > 0x8f) || s[0] > 0xf4) { + return false; + } + s += 4; + } else { + return false; + } + } + } + return true; + } + + struct CloseFrame { + uint16_t code; + char *message; + size_t length; + }; + + static inline CloseFrame parseClosePayload(char *src, size_t length) { + CloseFrame cf = {}; + if (length >= 2) { + memcpy(&cf.code, src, 2); + cf = {ntohs(cf.code), src + 2, length - 2}; + if (cf.code < 1000 || cf.code > 4999 || (cf.code > 1011 && cf.code < 4000) || + (cf.code >= 1004 && cf.code <= 1006) || !isValidUtf8((unsigned char *) cf.message, cf.length)) { + return {}; + } + } + return cf; + } + + static inline size_t formatClosePayload(char *dst, uint16_t code, const char *message, size_t length) { + if (code) { + code = htons(code); + memcpy(dst, &code, 2); + memcpy(dst + 2, message, length); + return length + 2; + } + return 0; + } + + static inline size_t formatMessage(char *dst, const char *src, size_t length, OpCode opCode, size_t reportedLength, bool compressed) { + size_t messageLength; + size_t headerLength; + if (reportedLength < 126) { + headerLength = 2; + dst[1] = reportedLength; + } else if (reportedLength <= UINT16_MAX) { + headerLength = 4; + dst[1] = 126; + *((uint16_t *) &dst[2]) = htons(reportedLength); + } else { + headerLength = 10; + dst[1] = 127; + *((uint64_t *) &dst[2]) = htobe64(reportedLength); + } + + int flags = 0; + dst[0] = (flags & SND_NO_FIN ? 0 : 128) | (compressed ? SND_COMPRESSED : 0); + if (!(flags & SND_CONTINUATION)) { + dst[0] |= opCode; + } + + char mask[4]; + if (!isServer) { + dst[1] |= 0x80; + uint32_t random = rand(); + memcpy(mask, &random, 4); + memcpy(dst + headerLength, &random, 4); + headerLength += 4; + } + + messageLength = headerLength + length; + memcpy(dst + headerLength, src, length); + + if (!isServer) { + + // overwrites up to 3 bytes outside of the given buffer! + //WebSocketProtocol::unmaskInplace(dst + headerLength, dst + headerLength + length, mask); + + // this is not optimal + char *start = dst + headerLength; + char *stop = start + length; + int i = 0; + while (start != stop) { + (*start++) ^= mask[i++ % 4]; + } + } + return messageLength; + } + + static inline void consume(char *src, unsigned int length, WebSocketState *wState) { + if (wState->state.spillLength) { + src -= wState->state.spillLength; + length += wState->state.spillLength; + memcpy(src, wState->state.spill, wState->state.spillLength); + } + if (wState->state.wantsHead) { + parseNext: + while (length >= SHORT_MESSAGE_HEADER) { + + // invalid reserved bits / invalid opcodes / invalid control frames / set compressed frame + if ((rsv1(src) && !Impl::setCompressed(wState)) || rsv23(src) || (getOpCode(src) > 2 && getOpCode(src) < 8) || + getOpCode(src) > 10 || (getOpCode(src) > 2 && (!isFin(src) || payloadLength(src) > 125))) { + Impl::forceClose(wState); + return; + } + + if (payloadLength(src) < 126) { + if (consumeMessage(payloadLength(src), src, length, wState)) { + return; + } + } else if (payloadLength(src) == 126) { + if (length < MEDIUM_MESSAGE_HEADER) { + break; + } else if(consumeMessage(ntohs(*(uint16_t *) &src[2]), src, length, wState)) { + return; + } + } else if (length < LONG_MESSAGE_HEADER) { + break; + } else if (consumeMessage(be64toh(*(uint64_t *) &src[2]), src, length, wState)) { + return; + } + } + if (length) { + memcpy(wState->state.spill, src, length); + wState->state.spillLength = length; + } + } else if (consumeContinuation(src, length, wState)) { + goto parseNext; + } + } + + static const int CONSUME_POST_PADDING = 4; + static const int CONSUME_PRE_PADDING = LONG_MESSAGE_HEADER - 1; +}; + +} + +#endif // WEBSOCKETPROTOCOL_UWS_H diff --git a/extern/uWebSocket/include/uWS.h b/extern/uWebSocket/include/uWS.h new file mode 100644 index 0000000000..40a0e403df --- /dev/null +++ b/extern/uWebSocket/include/uWS.h @@ -0,0 +1,6 @@ +#ifndef UWS_UWS_H +#define UWS_UWS_H + +#include "Hub.h" + +#endif // UWS_UWS_H diff --git a/extern/uWebSocket/libeay32.lib b/extern/uWebSocket/libeay32.lib new file mode 100644 index 0000000000..deb2bc1a01 Binary files /dev/null and b/extern/uWebSocket/libeay32.lib differ diff --git a/extern/uWebSocket/libuv.lib b/extern/uWebSocket/libuv.lib new file mode 100644 index 0000000000..30aa827f88 Binary files /dev/null and b/extern/uWebSocket/libuv.lib differ diff --git a/extern/uWebSocket/ssleay32.lib b/extern/uWebSocket/ssleay32.lib new file mode 100644 index 0000000000..0f7780b39c Binary files /dev/null and b/extern/uWebSocket/ssleay32.lib differ diff --git a/extern/uWebSocket/uWS.lib b/extern/uWebSocket/uWS.lib new file mode 100644 index 0000000000..2412ac4104 Binary files /dev/null and b/extern/uWebSocket/uWS.lib differ diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 488064c884..7add162148 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -157,6 +157,7 @@ list(APPEND SM_WINDOWS_PROGRAM_DLLS "${SM_PROGRAM_DIR}/avutil-52.dll" "${SM_PROGRAM_DIR}/parallel_lights_io.dll" "${SM_PROGRAM_DIR}/swscale-2.dll" + "${SM_PROGRAM_DIR}/uWS.dll" ) if(WIN32) @@ -355,6 +356,10 @@ if (WIN32) "${LIB_AVUTIL}" "${LIB_CURL}" "${LIB_WLDAP32}" + "${LIB_UWS}" + "${LIB_EAY}" + "${LIB_SSL}" + "${LIB_UV}" ) if (WITH_OGG) @@ -524,6 +529,9 @@ list(APPEND SM_INCLUDE_DIRS ${CMAKE_CURRENT_SOURCE_DIR} "${SM_SRC_DIR}/generated" ) +list(APPEND SM_INCLUDE_DIRS +"${SM_EXTERN_DIR}/uWebSocket/include" +) if(NOT APPLE) list(APPEND SM_INCLUDE_DIRS "${SM_EXTERN_DIR}/glew-1.5.8/include" @@ -635,6 +643,7 @@ if(NOT APPLE) install(FILES "${SM_PROGRAM_DIR}/parallel_lights_io.dll" DESTINATION "${SM_FULL_INSTALLATION_PATH}") install(FILES "${SM_PROGRAM_DIR}/Etterna.vdi" DESTINATION "${SM_FULL_INSTALLATION_PATH}") install(FILES "${SM_PROGRAM_DIR}/swscale-2.dll" DESTINATION "${SM_FULL_INSTALLATION_PATH}") + install(FILES "${SM_PROGRAM_DIR}/uWS.dll" DESTINATION "${SM_FULL_INSTALLATION_PATH}") # foreach(SM_WINDOW_DLL "${SM_WINDOWS_PROGRAM_DLLS}") # install(FILES "${SM_WINDOW_DLL}" DESTINATION "${SM_INSTALL_DESTINATION}") diff --git a/stepmania.nsi b/stepmania.nsi index 148aee0ed4..38af848a2a 100644 --- a/stepmania.nsi +++ b/stepmania.nsi @@ -487,6 +487,8 @@ Section "Main Section" SecMain File "Program\avformat-55.dll" File "Program\avutil-52.dll" File "Program\swscale-2.dll" + ;uwebsocket + File "Program\uWS.dll" ; parallel lights File "Program\parallel_lights_io.dll" ; others @@ -919,6 +921,7 @@ Section "Uninstall" Delete "$INSTDIR\Program\avutil-50.dll" Delete "$INSTDIR\Program\swscale-2.dll" Delete "$INSTDIR\Program\swscale-0.dll" + Delete "$INSTDIR\Program\uWS.dll" ; others Delete "$INSTDIR\Program\dbghelp.dll" Delete "$INSTDIR\Program\jpeg.dll"