Skip to content

Commit

Permalink
Merge pull request #299 from crypto-chassis/develop
Browse files Browse the repository at this point in the history
Release
  • Loading branch information
cryptochassis authored Aug 24, 2022
2 parents 50931bc + b4bcbae commit 67f5c30
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 1 deletion.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -914,6 +914,7 @@ session.serviceByServiceNameExchangeMap[CCAPI_EXECUTION_MANAGEMENT][CCAPI_EXCHAN
* Only enable the services and exchanges that you need.
* Use FIX API instead of REST API.
* Handle events in ["batching" mode](#handle-events-in-immediate-vs-batching-mode) if your application (e.g. market data archiver) isn't latency sensitive.
* Define macro `CCAPI_USE_SINGLE_THREAD`. It reduces locking overhead for single threaded applications.

## Applications

Expand Down
14 changes: 14 additions & 0 deletions include/ccapi_cpp/ccapi_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ class Queue {
std::string EXCEPTION_QUEUE_EMPTY = "queue is empty";
explicit Queue(const size_t maxSize = 0) : maxSize(maxSize) {}
void pushBack(T&& t) {
#ifndef CCAPI_USE_SINGLE_THREAD
std::lock_guard<std::mutex> lock(this->m);
#endif
if (this->maxSize <= 0 || this->queue.size() < this->maxSize) {
CCAPI_LOGGER_TRACE("this->queue.size() = " + size_tToString(this->queue.size()));
this->queue.push_back(t);
Expand All @@ -25,7 +27,9 @@ class Queue {
}
}
T popBack() {
#ifndef CCAPI_USE_SINGLE_THREAD
std::lock_guard<std::mutex> lock(this->m);
#endif
if (this->queue.empty()) {
throw std::runtime_error(EXCEPTION_QUEUE_EMPTY);
} else {
Expand All @@ -35,13 +39,17 @@ class Queue {
}
}
std::vector<T> purge() {
#ifndef CCAPI_USE_SINGLE_THREAD
std::lock_guard<std::mutex> lock(this->m);
#endif
std::vector<T> p;
std::swap(p, this->queue);
return p;
}
void removeAll(std::vector<T>& c) {
#ifndef CCAPI_USE_SINGLE_THREAD
std::lock_guard<std::mutex> lock(this->m);
#endif
if (c.empty()) {
c = std::move(this->queue);
} else {
Expand All @@ -51,19 +59,25 @@ class Queue {
this->queue.clear();
}
size_t size() const {
#ifndef CCAPI_USE_SINGLE_THREAD
std::lock_guard<std::mutex> lock(this->m);
#endif
return this->queue.size();
}
bool empty() const {
#ifndef CCAPI_USE_SINGLE_THREAD
std::lock_guard<std::mutex> lock(this->m);
#endif
return this->queue.empty();
}
#ifndef CCAPI_EXPOSE_INTERNAL

private:
#endif
std::vector<T> queue;
#ifndef CCAPI_USE_SINGLE_THREAD
mutable std::mutex m;
#endif
size_t maxSize{};
};
} /* namespace ccapi */
Expand Down
13 changes: 13 additions & 0 deletions include/ccapi_cpp/ccapi_session.h
Original file line number Diff line number Diff line change
Expand Up @@ -620,6 +620,18 @@ class Session {
} else {
if (this->eventHandler) {
CCAPI_LOGGER_TRACE("handle event in immediate mode");
#ifdef CCAPI_USE_SINGLE_THREAD
bool shouldContinue = true;
try {
shouldContinue = this->eventHandler->processEvent(event, this);
} catch (const std::runtime_error& e) {
CCAPI_LOGGER_ERROR(e.what());
}
if (!shouldContinue) {
CCAPI_LOGGER_DEBUG("about to pause the event dispatcher");
this->eventDispatcher->pause();
}
#else
this->eventDispatcher->dispatch([that = this, event = std::move(event)] {
bool shouldContinue = true;
try {
Expand All @@ -632,6 +644,7 @@ class Session {
that->eventDispatcher->pause();
}
});
#endif
} else {
CCAPI_LOGGER_TRACE("handle event in batching mode");
this->eventQueue.pushBack(std::move(event));
Expand Down
2 changes: 1 addition & 1 deletion include/ccapi_cpp/service/ccapi_service_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class ServiceContext CCAPI_FINAL {
typedef wspp::lib::asio::io_service IoContext;
typedef wspp::lib::shared_ptr<wspp::lib::asio::io_service> IoContextPtr;
struct CustomClientConfig : public wspp::config::asio_tls_client {
#ifdef WEBSOCKETPP_ENABLE_SINGLE_THREADING
#ifdef CCAPI_USE_SINGLE_THREAD
typedef wspp::config::asio_tls_client base;
static bool const enable_multithreading = false;
struct transport_config : public base::transport_config {
Expand Down

0 comments on commit 67f5c30

Please sign in to comment.