From 1bc781ecd2f374f3d26462c81e90a2523a082875 Mon Sep 17 00:00:00 2001 From: Crypto Chassis Date: Wed, 13 Sep 2023 07:02:32 -0700 Subject: [PATCH 1/7] dev: add draft for streaming exchange provided candlestick --- include/ccapi_cpp/ccapi_macro.h | 39 ++++-- include/ccapi_cpp/ccapi_market_data_message.h | 32 +++++ include/ccapi_cpp/ccapi_message.h | 4 + include/ccapi_cpp/ccapi_session_configs.h | 1 + include/ccapi_cpp/ccapi_session_options.h | 5 +- include/ccapi_cpp/ccapi_subscription.h | 3 +- .../service/ccapi_market_data_service.h | 121 ++++++++++++++---- .../service/ccapi_market_data_service_okx.h | 40 ++++++ 8 files changed, 207 insertions(+), 38 deletions(-) diff --git a/include/ccapi_cpp/ccapi_macro.h b/include/ccapi_cpp/ccapi_macro.h index 0f1655aa..8c02ffe6 100644 --- a/include/ccapi_cpp/ccapi_macro.h +++ b/include/ccapi_cpp/ccapi_macro.h @@ -27,6 +27,9 @@ #ifndef CCAPI_AGG_TRADE #define CCAPI_AGG_TRADE "AGG_TRADE" #endif +#ifndef CCAPI_CANDLESTICK +#define CCAPI_CANDLESTICK "CANDLESTICK" +#endif #ifndef CCAPI_MARKET_DEPTH_MAX #define CCAPI_MARKET_DEPTH_MAX CCAPI_MARKET_DEPTH "_MAX" #endif @@ -49,6 +52,12 @@ #define CCAPI_FETCH_MARKET_DEPTH_INITIAL_SNAPSHOT_DELAY_MILLISECONDS "FETCH_MARKET_DEPTH_INITIAL_SNAPSHOT_DELAY_MILLISECONDS" #endif #define CCAPI_FETCH_MARKET_DEPTH_INITIAL_SNAPSHOT_DELAY_MILLISECONDS_DEFAULT "0" +#ifndef CCAPI_CANDLESTICK_INTERVAL_SECONDS +#define CCAPI_CANDLESTICK_INTERVAL_SECONDS CCAPI_CANDLESTICK "_INTERVAL_SECONDS" +#endif +#ifndef CCAPI_CANDLESTICK_INTERVAL_SECONDS_DEFAULT +#define CCAPI_CANDLESTICK_INTERVAL_SECONDS_DEFAULT "60" +#endif #ifndef CCAPI_EXCHANGE_NAME_COINBASE #define CCAPI_EXCHANGE_NAME_COINBASE "coinbase" #endif @@ -196,20 +205,26 @@ #ifndef CCAPI_BEST_ASK_N_SIZE_EMPTY #define CCAPI_BEST_ASK_N_SIZE_EMPTY "" #endif -#ifndef CCAPI_OHLC_EMPTY -#define CCAPI_OHLC_EMPTY "" +#ifndef CCAPI_CANDLESTICK_EMPTY +#define CCAPI_CANDLESTICK_EMPTY "" #endif -#ifndef CCAPI_OPEN -#define CCAPI_OPEN "OPEN" +#ifndef CCAPI_OPEN_PRICE +#define CCAPI_OPEN_PRICE "OPEN" #endif -#ifndef CCAPI_HIGH -#define CCAPI_HIGH "HIGH" +#ifndef CCAPI_HIGH_PRICE +#define CCAPI_HIGH_PRICE "HIGH" #endif -#ifndef CCAPI_LOW -#define CCAPI_LOW "LOW" +#ifndef CCAPI_LOW_PRICE +#define CCAPI_LOW_PRICE "LOW" #endif -#ifndef CCAPI_CLOSE -#define CCAPI_CLOSE "CLOSE" +#ifndef CCAPI_CLOSE_PRICE +#define CCAPI_CLOSE_PRICE "CLOSE" +#endif +#ifndef CCAPI_VOLUME +#define CCAPI_VOLUME "VOLUME" +#endif +#ifndef CCAPI_QUOTE_VOLUME +#define CCAPI_QUOTE_VOLUME "QUOTE_VOLUME" #endif #ifndef CCAPI_LIMIT #define CCAPI_LIMIT "LIMIT" @@ -281,6 +296,7 @@ #define CCAPI_WEBSOCKET_OKX_CHANNEL_PUBLIC_DEPTH5 "books5" #define CCAPI_WEBSOCKET_OKX_CHANNEL_PUBLIC_DEPTH400 "books" #define CCAPI_WEBSOCKET_OKX_CHANNEL_PUBLIC_DEPTH1_L2_TBT "bbo-tbt" +#define CCAPI_WEBSOCKET_OKX_CHANNEL_CANDLESTICK "candle" #define CCAPI_WEBSOCKET_ERISX_CHANNEL_MARKET_DATA_SUBSCRIBE "MarketDataSubscribe" #define CCAPI_WEBSOCKET_ERISX_CHANNEL_TOP_OF_BOOK_MARKET_DATA_SUBSCRIBE "TopOfBookMarketDataSubscribe" #define CCAPI_WEBSOCKET_KUCOIN_CHANNEL_MARKET_MATCH "/market/match" @@ -802,6 +818,9 @@ #ifndef CCAPI_OKX_PRIVATE_WS_PATH #define CCAPI_OKX_PRIVATE_WS_PATH "/ws/v5/private" #endif +#ifndef CCAPI_OKX_BUSINESS_WS_PATH +#define CCAPI_OKX_BUSINESS_WS_PATH "/ws/v5/business" +#endif #ifndef CCAPI_ERISX_URL_WS_BASE #define CCAPI_ERISX_URL_WS_BASE "wss://publicmd-api.erisx.com" #endif diff --git a/include/ccapi_cpp/ccapi_market_data_message.h b/include/ccapi_cpp/ccapi_market_data_message.h index f9a8a12d..721098ed 100644 --- a/include/ccapi_cpp/ccapi_market_data_message.h +++ b/include/ccapi_cpp/ccapi_market_data_message.h @@ -16,6 +16,7 @@ class MarketDataMessage CCAPI_FINAL { MARKET_DATA_EVENTS_MARKET_DEPTH, MARKET_DATA_EVENTS_TRADE, MARKET_DATA_EVENTS_AGG_TRADE, + MARKET_DATA_EVENTS_CANDLESTICK, }; enum class RecapType { UNKNOWN, @@ -44,6 +45,7 @@ class MarketDataMessage CCAPI_FINAL { ASK = 1, TRADE = 2, AGG_TRADE = 3, + CANDLESTICK = 4, }; static std::string dataTypeToString(DataType dataType) { std::string output; @@ -60,6 +62,9 @@ class MarketDataMessage CCAPI_FINAL { case DataType::AGG_TRADE: output = "AGG_TRADE"; break; + case DataType::CANDLESTICK: + output = "CANDLESTICK"; + break; default: CCAPI_LOGGER_FATAL(CCAPI_UNSUPPORTED_VALUE); } @@ -72,6 +77,12 @@ class MarketDataMessage CCAPI_FINAL { AGG_TRADE_ID = 3, IS_BUYER_MAKER = 4, SEQUENCE_NUMBER = 5, + OPEN_PRICE = 6, + HIGH_PRICE = 7, + LOW_PRICE = 8, + CLOSE_PRICE = 9, + VOLUME = 10, + QUOTE_VOLUME = 11, }; static std::string dataFieldTypeToString(DataFieldType dataFieldType) { std::string output; @@ -94,6 +105,24 @@ class MarketDataMessage CCAPI_FINAL { case DataFieldType::SEQUENCE_NUMBER: output = "SEQUENCE_NUMBER"; break; + case DataFieldType::OPEN_PRICE: + output = "OPEN_PRICE"; + break; + case DataFieldType::HIGH_PRICE: + output = "HIGH_PRICE"; + break; + case DataFieldType::LOW_PRICE: + output = "LOW_PRICE"; + break; + case DataFieldType::CLOSE_PRICE: + output = "CLOSE_PRICE"; + break; + case DataFieldType::VOLUME: + output = "VOLUME"; + break; + case DataFieldType::QUOTE_VOLUME: + output = "QUOTE_VOLUME"; + break; default: CCAPI_LOGGER_FATAL(CCAPI_UNSUPPORTED_VALUE); } @@ -156,6 +185,9 @@ class MarketDataMessage CCAPI_FINAL { case Type::MARKET_DATA_EVENTS_AGG_TRADE: output = "MARKET_DATA_EVENTS_AGG_TRADE"; break; + case Type::MARKET_DATA_EVENTS_CANDLESTICK: + output = "MARKET_DATA_EVENTS_CANDLESTICK"; + break; default: CCAPI_LOGGER_FATAL(CCAPI_UNSUPPORTED_VALUE); } diff --git a/include/ccapi_cpp/ccapi_message.h b/include/ccapi_cpp/ccapi_message.h index 4fb3c26d..eab02372 100644 --- a/include/ccapi_cpp/ccapi_message.h +++ b/include/ccapi_cpp/ccapi_message.h @@ -45,6 +45,7 @@ class Message CCAPI_FINAL { MARKET_DATA_EVENTS_MARKET_DEPTH, MARKET_DATA_EVENTS_TRADE, MARKET_DATA_EVENTS_AGG_TRADE, + MARKET_DATA_EVENTS_CANDLESTICK, EXECUTION_MANAGEMENT_EVENTS_ORDER_UPDATE, EXECUTION_MANAGEMENT_EVENTS_PRIVATE_TRADE, SUBSCRIPTION_STARTED, @@ -89,6 +90,9 @@ class Message CCAPI_FINAL { case Type::MARKET_DATA_EVENTS_AGG_TRADE: output = "MARKET_DATA_EVENTS_AGG_TRADE"; break; + case Type::MARKET_DATA_EVENTS_CANDLESTICK: + output = "MARKET_DATA_EVENTS_CANDLESTICK"; + break; case Type::AUTHORIZATION_SUCCESS: output = "AUTHORIZATION_SUCCESS"; break; diff --git a/include/ccapi_cpp/ccapi_session_configs.h b/include/ccapi_cpp/ccapi_session_configs.h index dc77aa6b..f4383166 100644 --- a/include/ccapi_cpp/ccapi_session_configs.h +++ b/include/ccapi_cpp/ccapi_session_configs.h @@ -97,6 +97,7 @@ class SessionConfigs CCAPI_FINAL { std::map fieldWebsocketChannelMapOkx = { {CCAPI_TRADE, CCAPI_WEBSOCKET_OKX_CHANNEL_TRADE}, {CCAPI_MARKET_DEPTH, CCAPI_WEBSOCKET_OKX_CHANNEL_PUBLIC_DEPTH400}, + {CCAPI_CANDLESTICK, CCAPI_WEBSOCKET_OKX_CHANNEL_CANDLESTICK}, }; std::map fieldWebsocketChannelMapErisx = { {CCAPI_TRADE, CCAPI_WEBSOCKET_ERISX_CHANNEL_MARKET_DATA_SUBSCRIBE}, diff --git a/include/ccapi_cpp/ccapi_session_options.h b/include/ccapi_cpp/ccapi_session_options.h index 4574ce0f..e7b71eed 100644 --- a/include/ccapi_cpp/ccapi_session_options.h +++ b/include/ccapi_cpp/ccapi_session_options.h @@ -12,8 +12,7 @@ namespace ccapi { class SessionOptions CCAPI_FINAL { public: std::string toString() const { - std::string output = "SessionOptions [warnLateEventMaxMilliSeconds = " + ccapi::toString(warnLateEventMaxMilliSeconds) + - ", enableCheckSequence = " + ccapi::toString(enableCheckSequence) + + std::string output = "SessionOptions [enableCheckSequence = " + ccapi::toString(enableCheckSequence) + ", enableCheckOrderBookChecksum = " + ccapi::toString(enableCheckOrderBookChecksum) + ", enableCheckOrderBookCrossed = " + ccapi::toString(enableCheckOrderBookCrossed) + ", enableCheckPingPongWebsocketProtocolLevel = " + ccapi::toString(enableCheckPingPongWebsocketProtocolLevel) + @@ -33,7 +32,7 @@ class SessionOptions CCAPI_FINAL { ", enableOneHttpConnectionPerRequest = " + ccapi::toString(enableOneHttpConnectionPerRequest) + "]"; return output; } - long warnLateEventMaxMilliSeconds{}; // used to print a warning log message if en event arrives late + // long warnLateEventMaxMilliSeconds{}; // used to print a warning log message if en event arrives late bool enableCheckSequence{}; // used to check sequence number discontinuity bool enableCheckOrderBookChecksum{}; // used to check order book checksum bool enableCheckOrderBookCrossed{true}; // used to check order book cross, usually this should be set to true diff --git a/include/ccapi_cpp/ccapi_subscription.h b/include/ccapi_cpp/ccapi_subscription.h index 44e53bad..16aaa8b8 100644 --- a/include/ccapi_cpp/ccapi_subscription.h +++ b/include/ccapi_cpp/ccapi_subscription.h @@ -35,6 +35,7 @@ class Subscription CCAPI_FINAL { this->optionMap[CCAPI_CONFLATE_GRACE_PERIOD_MILLISECONDS] = CCAPI_CONFLATE_GRACE_PERIOD_MILLISECONDS_DEFAULT; this->optionMap[CCAPI_MARKET_DEPTH_RETURN_UPDATE] = CCAPI_MARKET_DEPTH_RETURN_UPDATE_DEFAULT; this->optionMap[CCAPI_FETCH_MARKET_DEPTH_INITIAL_SNAPSHOT_DELAY_MILLISECONDS] = CCAPI_FETCH_MARKET_DEPTH_INITIAL_SNAPSHOT_DELAY_MILLISECONDS_DEFAULT; + this->optionMap[CCAPI_CANDLESTICK_INTERVAL_SECONDS] = CCAPI_CANDLESTICK_INTERVAL_SECONDS_DEFAULT; for (const auto& option : optionList) { auto optionKeyValue = UtilString::split(option, "="); this->optionMap[optionKeyValue.at(0)] = optionKeyValue.at(1); @@ -48,7 +49,7 @@ class Subscription CCAPI_FINAL { } else if (std::includes(executionManagementSubscriptionFieldSet.begin(), executionManagementSubscriptionFieldSet.end(), this->fieldSet.begin(), this->fieldSet.end())) { this->serviceName = CCAPI_EXECUTION_MANAGEMENT; - } else if (field == CCAPI_MARKET_DEPTH || field == CCAPI_TRADE || field == CCAPI_AGG_TRADE) { + } else if (field == CCAPI_MARKET_DEPTH || field == CCAPI_TRADE || field == CCAPI_AGG_TRADE || field == CCAPI_CANDLESTICK) { this->serviceName = CCAPI_MARKET_DATA; } CCAPI_LOGGER_TRACE("this->serviceName = " + this->serviceName); diff --git a/include/ccapi_cpp/service/ccapi_market_data_service.h b/include/ccapi_cpp/service/ccapi_market_data_service.h index fdbc54d5..68ba5a67 100644 --- a/include/ccapi_cpp/service/ccapi_market_data_service.h +++ b/include/ccapi_cpp/service/ccapi_market_data_service.h @@ -459,14 +459,16 @@ class MarketDataService : public Service { for (auto& marketDataMessage : marketDataMessageList) { if (marketDataMessage.type == MarketDataMessage::Type::MARKET_DATA_EVENTS_MARKET_DEPTH || marketDataMessage.type == MarketDataMessage::Type::MARKET_DATA_EVENTS_TRADE || - marketDataMessage.type == MarketDataMessage::Type::MARKET_DATA_EVENTS_AGG_TRADE) { - if (this->sessionOptions.warnLateEventMaxMilliSeconds > 0 && - std::chrono::duration_cast(timeReceived - marketDataMessage.tp).count() > - this->sessionOptions.warnLateEventMaxMilliSeconds && - marketDataMessage.recapType == MarketDataMessage::RecapType::NONE) { - CCAPI_LOGGER_WARN("late websocket message: timeReceived = " + toString(timeReceived) + ", marketDataMessage.tp = " + toString(marketDataMessage.tp) + - ", wsConnection = " + toString(wsConnection)); - } + marketDataMessage.type == MarketDataMessage::Type::MARKET_DATA_EVENTS_AGG_TRADE || + marketDataMessage.type == MarketDataMessage::Type::MARKET_DATA_EVENTS_CANDLESTICK) { + // if (this->sessionOptions.warnLateEventMaxMilliSeconds > 0 && + // std::chrono::duration_cast(timeReceived - marketDataMessage.tp).count() > + // this->sessionOptions.warnLateEventMaxMilliSeconds && + // marketDataMessage.recapType == MarketDataMessage::RecapType::NONE) { + // CCAPI_LOGGER_WARN("late websocket message: timeReceived = " + toString(timeReceived) + ", marketDataMessage.tp = " + toString(marketDataMessage.tp) + // + + // ", wsConnection = " + toString(wsConnection)); + // } std::string& exchangeSubscriptionId = marketDataMessage.exchangeSubscriptionId; CCAPI_LOGGER_TRACE("this->channelIdSymbolIdByConnectionIdExchangeSubscriptionIdMap = " + @@ -531,6 +533,11 @@ class MarketDataService : public Service { this->processTrade(wsConnection, channelId, symbolId, event, marketDataMessage.tp, timeReceived, marketDataMessage.data, field, optionMap, correlationIdList, isSolicited); } + if (marketDataMessage.data.find(MarketDataMessage::DataType::CANDLESTICK) != marketDataMessage.data.end()) { + bool isSolicited = marketDataMessage.recapType == MarketDataMessage::RecapType::SOLICITED; + this->processExchangeProvidedCandlestick(wsConnection, channelId, symbolId, event, marketDataMessage.tp, timeReceived, marketDataMessage.data, field, + optionMap, correlationIdList, isSolicited); + } } else { CCAPI_LOGGER_WARN("websocket event type is unknown for " + toString(marketDataMessage)); CCAPI_LOGGER_WARN("textMessage is " + std::string(textMessage)); @@ -968,20 +975,62 @@ class MarketDataService : public Service { } } } - void updateElementListWithOhlc(const WsConnection& wsConnection, const std::string& channelId, const std::string& symbolId, const std::string& field, - std::vector& elementList) { + void updateElementListWithExchangeProvidedCandlestick(const std::string& field, MarketDataMessage::TypeForData& input, std::vector& elementList) { + if (field == CCAPI_CANDLESTICK) { + for (auto& x : input) { + auto& type = x.first; + auto& detail = x.second; + if (type == MarketDataMessage::DataType::CANDLESTICK) { + for (auto& y : detail) { + auto& openPrice = y.at(MarketDataMessage::DataFieldType::OPEN_PRICE); + auto& highPrice = y.at(MarketDataMessage::DataFieldType::HIGH_PRICE); + auto& lowPrice = y.at(MarketDataMessage::DataFieldType::LOW_PRICE); + auto& closePrice = y.at(MarketDataMessage::DataFieldType::CLOSE_PRICE); + Element element; + std::string k1(CCAPI_OPEN_PRICE); + std::string k2(CCAPI_HIGH_PRICE); + std::string k3(CCAPI_LOW_PRICE); + std::string k4(CCAPI_CLOSE_PRICE); + element.emplace(k1, y.at(MarketDataMessage::DataFieldType::OPEN_PRICE)); + element.emplace(k2, y.at(MarketDataMessage::DataFieldType::HIGH_PRICE)); + element.emplace(k3, y.at(MarketDataMessage::DataFieldType::LOW_PRICE)); + element.emplace(k4, y.at(MarketDataMessage::DataFieldType::CLOSE_PRICE)); + { + auto it = y.find(MarketDataMessage::DataFieldType::VOLUME); + if (it != y.end()) { + std::string k(CCAPI_VOLUME); + element.emplace(k, it->second); + } + } + { + auto it = y.find(MarketDataMessage::DataFieldType::QUOTE_VOLUME); + if (it != y.end()) { + std::string k(CCAPI_QUOTE_VOLUME); + element.emplace(k, it->second); + } + } + elementList.emplace_back(std::move(element)); + } + } else { + CCAPI_LOGGER_WARN("extra type " + MarketDataMessage::dataTypeToString(type)); + } + } + } + } + void updateElementListWithCalculatedCandlestick(const WsConnection& wsConnection, const std::string& channelId, const std::string& symbolId, + const std::string& field, std::vector& elementList) { if (field == CCAPI_TRADE || field == CCAPI_AGG_TRADE) { Element element; if (this->openByConnectionIdChannelIdSymbolIdMap[wsConnection.id][channelId][symbolId].empty()) { - element.insert(CCAPI_OPEN, CCAPI_OHLC_EMPTY); - element.insert(CCAPI_HIGH, CCAPI_OHLC_EMPTY); - element.insert(CCAPI_LOW, CCAPI_OHLC_EMPTY); - element.insert(CCAPI_CLOSE, CCAPI_OHLC_EMPTY); + element.insert(CCAPI_OPEN_PRICE, CCAPI_CANDLESTICK_EMPTY); + element.insert(CCAPI_HIGH_PRICE, CCAPI_CANDLESTICK_EMPTY); + element.insert(CCAPI_LOW_PRICE, CCAPI_CANDLESTICK_EMPTY); + element.insert(CCAPI_CLOSE_PRICE, CCAPI_CANDLESTICK_EMPTY); } else { - element.insert(CCAPI_OPEN, this->openByConnectionIdChannelIdSymbolIdMap[wsConnection.id][channelId][symbolId]); - element.insert(CCAPI_HIGH, this->highByConnectionIdChannelIdSymbolIdMap[wsConnection.id][channelId][symbolId].toString()); - element.insert(CCAPI_LOW, this->lowByConnectionIdChannelIdSymbolIdMap[wsConnection.id][channelId][symbolId].toString()); - element.insert(CCAPI_CLOSE, this->closeByConnectionIdChannelIdSymbolIdMap[wsConnection.id][channelId][symbolId]); + element.insert(CCAPI_OPEN_PRICE, this->openByConnectionIdChannelIdSymbolIdMap[wsConnection.id][channelId][symbolId]); + element.insert(CCAPI_HIGH_PRICE, this->highByConnectionIdChannelIdSymbolIdMap[wsConnection.id][channelId][symbolId].toString()); + element.insert(CCAPI_LOW_PRICE, this->lowByConnectionIdChannelIdSymbolIdMap[wsConnection.id][channelId][symbolId].toString()); + element.insert(CCAPI_CLOSE_PRICE, this->closeByConnectionIdChannelIdSymbolIdMap[wsConnection.id][channelId][symbolId]); } elementList.emplace_back(std::move(element)); this->openByConnectionIdChannelIdSymbolIdMap[wsConnection.id][channelId][symbolId] = ""; @@ -1220,7 +1269,7 @@ class MarketDataService : public Service { std::vector messageList; std::vector elementList; if (shouldConflate && intervalChanged) { - this->updateElementListWithOhlc(wsConnection, channelId, symbolId, field, elementList); + this->updateElementListWithCalculatedCandlestick(wsConnection, channelId, symbolId, field, elementList); } else { this->updateElementListWithTrade(field, input, elementList); } @@ -1242,14 +1291,38 @@ class MarketDataService : public Service { } if (shouldConflate) { this->previousConflateTimeMapByConnectionIdChannelIdSymbolIdMap.at(wsConnection.id).at(channelId).at(symbolId) = conflateTp; - this->updateOhlc(wsConnection, channelId, symbolId, field, input); + this->updateCalculatedCandlestick(wsConnection, channelId, symbolId, field, input); } } else { - this->updateOhlc(wsConnection, channelId, symbolId, field, input); + this->updateCalculatedCandlestick(wsConnection, channelId, symbolId, field, input); + } + } + void processExchangeProvidedCandlestick(const WsConnection& wsConnection, const std::string& channelId, const std::string& symbolId, Event& event, + const TimePoint& tp, const TimePoint& timeReceived, MarketDataMessage::TypeForData& input, const std::string& field, + const std::map& optionMap, const std::vector& correlationIdList, + bool isSolicited) { + CCAPI_LOGGER_TRACE("input = " + MarketDataMessage::dataToString(input)); + CCAPI_LOGGER_TRACE("optionMap = " + toString(optionMap)); + std::vector messageList; + std::vector elementList; + this->updateElementListWithExchangeProvidedCandlestick(field, input, elementList); + CCAPI_LOGGER_TRACE("elementList = " + toString(elementList)); + if (!elementList.empty()) { + Message message; + message.setTimeReceived(timeReceived); + message.setType(Message::Type::MARKET_DATA_EVENTS_CANDLESTICK); + message.setRecapType(isSolicited ? Message::RecapType::SOLICITED : Message::RecapType::NONE); + message.setTime(tp); + message.setElementList(elementList); + message.setCorrelationIdList(correlationIdList); + messageList.emplace_back(std::move(message)); + } + if (!messageList.empty()) { + event.addMessages(messageList); } } - void updateOhlc(const WsConnection& wsConnection, const std::string& channelId, const std::string& symbolId, const std::string& field, - const MarketDataMessage::TypeForData& input) { + void updateCalculatedCandlestick(const WsConnection& wsConnection, const std::string& channelId, const std::string& symbolId, const std::string& field, + const MarketDataMessage::TypeForData& input) { if (field == CCAPI_TRADE || field == CCAPI_AGG_TRADE) { for (const auto& x : input) { auto type = x.first; @@ -1390,7 +1463,7 @@ class MarketDataService : public Service { this->updateElementListWithUpdateMarketDepth(field, optionMap, snapshotBid, std::map(), snapshotAsk, std::map(), elementList, true); } else if (field == CCAPI_TRADE || field == CCAPI_AGG_TRADE) { - this->updateElementListWithOhlc(wsConnection, channelId, symbolId, field, elementList); + this->updateElementListWithCalculatedCandlestick(wsConnection, channelId, symbolId, field, elementList); } CCAPI_LOGGER_TRACE("elementList = " + toString(elementList)); this->previousConflateTimeMapByConnectionIdChannelIdSymbolIdMap.at(wsConnection.id).at(channelId).at(symbolId) = conflateTp; diff --git a/include/ccapi_cpp/service/ccapi_market_data_service_okx.h b/include/ccapi_cpp/service/ccapi_market_data_service_okx.h index eab320d9..8428e222 100644 --- a/include/ccapi_cpp/service/ccapi_market_data_service_okx.h +++ b/include/ccapi_cpp/service/ccapi_market_data_service_okx.h @@ -40,6 +40,14 @@ class MarketDataServiceOkx : public MarketDataService { private: #endif + std::string getInstrumentGroup(const Subscription& subscription) override { + std::string baseUrlWsGivenSubscription(this->baseUrlWs); + if (subscription.getField() == CCAPI_CANDLESTICK) { + baseUrlWsGivenSubscription = this->sessionConfigs.getUrlWebsocketBase().at(this->exchangeName) + CCAPI_OKX_BUSINESS_WS_PATH; + } + return baseUrlWsGivenSubscription + "|" + subscription.getField() + "|" + subscription.getSerializedOptions() + "|" + + subscription.getSerializedCredential(); + } bool doesHttpBodyContainError(const std::string& body) override { return !std::regex_search(body, std::regex("\"code\":\\s*\"0\"")); } void prepareSubscriptionDetail(std::string& channelId, std::string& symbolId, const std::string& field, const WsConnection& wsConnection, const Subscription& subscription, const std::map optionMap) override { @@ -61,6 +69,21 @@ class MarketDataServiceOkx : public MarketDataService { channelId = CCAPI_WEBSOCKET_OKX_CHANNEL_PUBLIC_DEPTH400; } } + } else if (field == CCAPI_CANDLESTICK) { + int intervalSeconds = std::stoi(optionMap.at(CCAPI_CANDLESTICK_INTERVAL_SECONDS)); + std::string interval; + if (intervalSeconds < 60) { + interval = std::to_string(intervalSeconds) + "s"; + } else if (intervalSeconds < 3600) { + interval = std::to_string(intervalSeconds / 60) + "m"; + } else if (intervalSeconds < 86400) { + interval = std::to_string(intervalSeconds / 3600) + "H"; + } else if (intervalSeconds < 604800) { + interval = std::to_string(intervalSeconds / 86400) + "D"; + } else { + interval = std::to_string(intervalSeconds / 604800) + "W"; + } + channelId = CCAPI_WEBSOCKET_OKX_CHANNEL_CANDLESTICK + interval; } } #ifdef CCAPI_LEGACY_USE_WEBSOCKETPP @@ -249,6 +272,23 @@ class MarketDataServiceOkx : public MarketDataService { marketDataMessage.data[MarketDataMessage::DataType::TRADE].emplace_back(std::move(dataPoint)); marketDataMessageList.emplace_back(std::move(marketDataMessage)); } + } else if (channelId.rfind(CCAPI_WEBSOCKET_OKX_CHANNEL_CANDLESTICK, 0) == 0) { + for (const auto& datum : document["data"].GetArray()) { + MarketDataMessage marketDataMessage; + marketDataMessage.type = MarketDataMessage::Type::MARKET_DATA_EVENTS_CANDLESTICK; + marketDataMessage.recapType = MarketDataMessage::RecapType::NONE; + marketDataMessage.tp = TimePoint(std::chrono::milliseconds(std::stoll(datum[0].GetString()))); + marketDataMessage.exchangeSubscriptionId = exchangeSubscriptionId; + MarketDataMessage::TypeForDataPoint dataPoint; + dataPoint.insert({MarketDataMessage::DataFieldType::OPEN_PRICE, datum[1].GetString()}); + dataPoint.insert({MarketDataMessage::DataFieldType::HIGH_PRICE, datum[2].GetString()}); + dataPoint.insert({MarketDataMessage::DataFieldType::LOW_PRICE, datum[3].GetString()}); + dataPoint.insert({MarketDataMessage::DataFieldType::CLOSE_PRICE, datum[4].GetString()}); + dataPoint.insert({MarketDataMessage::DataFieldType::VOLUME, datum[5].GetString()}); + dataPoint.insert({MarketDataMessage::DataFieldType::QUOTE_VOLUME, datum[7].GetString()}); + marketDataMessage.data[MarketDataMessage::DataType::CANDLESTICK].emplace_back(std::move(dataPoint)); + marketDataMessageList.emplace_back(std::move(marketDataMessage)); + } } } } From 2a9ccdfcdb97b95fec0a7d882f0cd9619231ef78 Mon Sep 17 00:00:00 2001 From: Crypto Chassis Date: Wed, 13 Sep 2023 21:32:55 -0700 Subject: [PATCH 2/7] dev: add draft for streaming exchange provided candlestick - binance --- include/ccapi_cpp/ccapi_macro.h | 1 + include/ccapi_cpp/ccapi_session_configs.h | 4 ++++ .../service/ccapi_market_data_service.h | 16 +++++++++++++++ .../ccapi_market_data_service_binance_base.h | 20 +++++++++++++++++++ ...et_data_service_binance_derivatives_base.h | 4 ++++ .../service/ccapi_market_data_service_okx.h | 16 +++------------ 6 files changed, 48 insertions(+), 13 deletions(-) diff --git a/include/ccapi_cpp/ccapi_macro.h b/include/ccapi_cpp/ccapi_macro.h index 8c02ffe6..be97fd04 100644 --- a/include/ccapi_cpp/ccapi_macro.h +++ b/include/ccapi_cpp/ccapi_macro.h @@ -282,6 +282,7 @@ #define CCAPI_WEBSOCKET_BINANCE_BASE_CHANNEL_BOOK_TICKER "bookTicker" #define CCAPI_WEBSOCKET_BINANCE_BASE_CHANNEL_PARTIAL_BOOK_DEPTH "depth" #define CCAPI_WEBSOCKET_BINANCE_BASE_CHANNEL_AGG_TRADE "aggTrade" +#define CCAPI_WEBSOCKET_BINANCE_BASE_CHANNEL_KLINE "kline" #define CCAPI_WEBSOCKET_HUOBI_CHANNEL_TRADE_DETAIL "market.$symbol.trade.detail" #define CCAPI_WEBSOCKET_HUOBI_CHANNEL_TRADE_DETAIL_REGEX "market\\.(.+)\\.trade\\.detail" #define CCAPI_WEBSOCKET_HUOBI_CHANNEL_MARKET_BBO "market.$symbol.bbo" diff --git a/include/ccapi_cpp/ccapi_session_configs.h b/include/ccapi_cpp/ccapi_session_configs.h index f4383166..454ceed8 100644 --- a/include/ccapi_cpp/ccapi_session_configs.h +++ b/include/ccapi_cpp/ccapi_session_configs.h @@ -68,19 +68,23 @@ class SessionConfigs CCAPI_FINAL { {CCAPI_TRADE, CCAPI_WEBSOCKET_BINANCE_BASE_CHANNEL_TRADE}, {CCAPI_AGG_TRADE, CCAPI_WEBSOCKET_BINANCE_BASE_CHANNEL_AGG_TRADE}, {CCAPI_MARKET_DEPTH, CCAPI_WEBSOCKET_BINANCE_BASE_CHANNEL_PARTIAL_BOOK_DEPTH}, + {CCAPI_CANDLESTICK, CCAPI_WEBSOCKET_BINANCE_BASE_CHANNEL_KLINE}, }; std::map fieldWebsocketChannelMapBinance = { {CCAPI_TRADE, CCAPI_WEBSOCKET_BINANCE_BASE_CHANNEL_TRADE}, {CCAPI_AGG_TRADE, CCAPI_WEBSOCKET_BINANCE_BASE_CHANNEL_AGG_TRADE}, {CCAPI_MARKET_DEPTH, CCAPI_WEBSOCKET_BINANCE_BASE_CHANNEL_PARTIAL_BOOK_DEPTH}, + {CCAPI_CANDLESTICK, CCAPI_WEBSOCKET_BINANCE_BASE_CHANNEL_KLINE}, }; std::map fieldWebsocketChannelMapBinanceUsdsFutures = { {CCAPI_AGG_TRADE, CCAPI_WEBSOCKET_BINANCE_BASE_CHANNEL_AGG_TRADE}, {CCAPI_MARKET_DEPTH, CCAPI_WEBSOCKET_BINANCE_BASE_CHANNEL_PARTIAL_BOOK_DEPTH}, + {CCAPI_CANDLESTICK, CCAPI_WEBSOCKET_BINANCE_BASE_CHANNEL_KLINE}, }; std::map fieldWebsocketChannelMapBinanceCoinFutures = { {CCAPI_AGG_TRADE, CCAPI_WEBSOCKET_BINANCE_BASE_CHANNEL_AGG_TRADE}, {CCAPI_MARKET_DEPTH, CCAPI_WEBSOCKET_BINANCE_BASE_CHANNEL_PARTIAL_BOOK_DEPTH}, + {CCAPI_CANDLESTICK, CCAPI_WEBSOCKET_BINANCE_BASE_CHANNEL_KLINE}, }; std::map fieldWebsocketChannelMapHuobi = { {CCAPI_TRADE, CCAPI_WEBSOCKET_HUOBI_CHANNEL_TRADE_DETAIL}, diff --git a/include/ccapi_cpp/service/ccapi_market_data_service.h b/include/ccapi_cpp/service/ccapi_market_data_service.h index 68ba5a67..5906eaef 100644 --- a/include/ccapi_cpp/service/ccapi_market_data_service.h +++ b/include/ccapi_cpp/service/ccapi_market_data_service.h @@ -1808,6 +1808,22 @@ class MarketDataService : public Service { }, this->sessionOptions.httpRequestTimeoutMilliSeconds); } + std::string convertCandlestickIntervalSecondsToInterval(int intervalSeconds, const std::string& secondStr, const std::string& minuteStr, + const std::string& hourStr, const std::string& dayStr, const std::string& weekStr) { + std::string interval; + if (intervalSeconds < 60) { + interval = std::to_string(intervalSeconds) + secondStr; + } else if (intervalSeconds < 3600) { + interval = std::to_string(intervalSeconds / 60) + minuteStr; + } else if (intervalSeconds < 86400) { + interval = std::to_string(intervalSeconds / 3600) + hourStr; + } else if (intervalSeconds < 604800) { + interval = std::to_string(intervalSeconds / 86400) + dayStr; + } else { + interval = std::to_string(intervalSeconds / 604800) + weekStr; + } + return interval; + } virtual std::vector createSendStringListFromSubscriptionList(const WsConnection& wsConnection, const std::vector& subscriptionList, const TimePoint& now, const std::map& credential) { return {}; diff --git a/include/ccapi_cpp/service/ccapi_market_data_service_binance_base.h b/include/ccapi_cpp/service/ccapi_market_data_service_binance_base.h index ba9585b3..088d25ed 100644 --- a/include/ccapi_cpp/service/ccapi_market_data_service_binance_base.h +++ b/include/ccapi_cpp/service/ccapi_market_data_service_binance_base.h @@ -53,6 +53,10 @@ class MarketDataServiceBinanceBase : public MarketDataService { } this->marketDepthSubscribedToExchangeByConnectionIdChannelIdSymbolIdMap[wsConnection.id][channelId][symbolId] = marketDepthSubscribedToExchange; } + } else if (field == CCAPI_CANDLESTICK) { + std::string interval = + this->convertCandlestickIntervalSecondsToInterval(std::stoi(optionMap.at(CCAPI_CANDLESTICK_INTERVAL_SECONDS)), "s", "m", "h", "d", "w"); + channelId = channelId + "_" + interval; } } std::vector createSendStringList(const WsConnection& wsConnection) override { @@ -240,6 +244,22 @@ class MarketDataServiceBinanceBase : public MarketDataService { dataPoint.insert({MarketDataMessage::DataFieldType::IS_BUYER_MAKER, data["m"].GetBool() ? "1" : "0"}); marketDataMessage.data[MarketDataMessage::DataType::AGG_TRADE].emplace_back(std::move(dataPoint)); marketDataMessageList.emplace_back(std::move(marketDataMessage)); + } else if (channelId.find(CCAPI_WEBSOCKET_BINANCE_BASE_CHANNEL_KLINE) != std::string::npos) { + MarketDataMessage marketDataMessage; + marketDataMessage.type = MarketDataMessage::Type::MARKET_DATA_EVENTS_CANDLESTICK; + marketDataMessage.recapType = MarketDataMessage::RecapType::NONE; + const rj::Value& k = data["k"]; + marketDataMessage.tp = TimePoint(std::chrono::milliseconds(std::stoll(k["t"].GetString()))); + marketDataMessage.exchangeSubscriptionId = exchangeSubscriptionId; + MarketDataMessage::TypeForDataPoint dataPoint; + dataPoint.insert({MarketDataMessage::DataFieldType::OPEN_PRICE, k["o"].GetString()}); + dataPoint.insert({MarketDataMessage::DataFieldType::HIGH_PRICE, k["h"].GetString()}); + dataPoint.insert({MarketDataMessage::DataFieldType::LOW_PRICE, k["l"].GetString()}); + dataPoint.insert({MarketDataMessage::DataFieldType::CLOSE_PRICE, k["c"].GetString()}); + dataPoint.insert({MarketDataMessage::DataFieldType::VOLUME, k["v"].GetString()}); + dataPoint.insert({MarketDataMessage::DataFieldType::QUOTE_VOLUME, k["q"].GetString()}); + marketDataMessage.data[MarketDataMessage::DataType::CANDLESTICK].emplace_back(std::move(dataPoint)); + marketDataMessageList.emplace_back(std::move(marketDataMessage)); } } } diff --git a/include/ccapi_cpp/service/ccapi_market_data_service_binance_derivatives_base.h b/include/ccapi_cpp/service/ccapi_market_data_service_binance_derivatives_base.h index 053a4469..26154a45 100644 --- a/include/ccapi_cpp/service/ccapi_market_data_service_binance_derivatives_base.h +++ b/include/ccapi_cpp/service/ccapi_market_data_service_binance_derivatives_base.h @@ -38,6 +38,10 @@ class MarketDataServiceBinanceDerivativesBase : public MarketDataServiceBinanceB } this->marketDepthSubscribedToExchangeByConnectionIdChannelIdSymbolIdMap[wsConnection.id][channelId][symbolId] = marketDepthSubscribedToExchange; } + } else if (field == CCAPI_CANDLESTICK) { + std::string interval = + this->convertCandlestickIntervalSecondsToInterval(std::stoi(optionMap.at(CCAPI_CANDLESTICK_INTERVAL_SECONDS)), "s", "m", "h", "d", "w"); + channelId = channelId + "_" + interval; } } void extractInstrumentInfo(Element& element, const rj::Value& x) { diff --git a/include/ccapi_cpp/service/ccapi_market_data_service_okx.h b/include/ccapi_cpp/service/ccapi_market_data_service_okx.h index 8428e222..b1b9f2c0 100644 --- a/include/ccapi_cpp/service/ccapi_market_data_service_okx.h +++ b/include/ccapi_cpp/service/ccapi_market_data_service_okx.h @@ -70,19 +70,9 @@ class MarketDataServiceOkx : public MarketDataService { } } } else if (field == CCAPI_CANDLESTICK) { - int intervalSeconds = std::stoi(optionMap.at(CCAPI_CANDLESTICK_INTERVAL_SECONDS)); - std::string interval; - if (intervalSeconds < 60) { - interval = std::to_string(intervalSeconds) + "s"; - } else if (intervalSeconds < 3600) { - interval = std::to_string(intervalSeconds / 60) + "m"; - } else if (intervalSeconds < 86400) { - interval = std::to_string(intervalSeconds / 3600) + "H"; - } else if (intervalSeconds < 604800) { - interval = std::to_string(intervalSeconds / 86400) + "D"; - } else { - interval = std::to_string(intervalSeconds / 604800) + "W"; - } + ; + std::string interval = + this->convertCandlestickIntervalSecondsToInterval(std::stoi(optionMap.at(CCAPI_CANDLESTICK_INTERVAL_SECONDS)), "s", "m", "H", "D", "W"); channelId = CCAPI_WEBSOCKET_OKX_CHANNEL_CANDLESTICK + interval; } } From 7f0845a47d60e529b4596c31ddc195e1436d6efc Mon Sep 17 00:00:00 2001 From: Crypto Chassis Date: Wed, 13 Sep 2023 22:34:01 -0700 Subject: [PATCH 3/7] dev: add draft for streaming exchange provided candlestick - bybit --- include/ccapi_cpp/ccapi_macro.h | 4 +++ include/ccapi_cpp/ccapi_session_configs.h | 2 ++ .../service/ccapi_market_data_service_bybit.h | 19 +++++++++++ ...pi_market_data_service_bybit_derivatives.h | 33 +++++++++++++++++-- 4 files changed, 55 insertions(+), 3 deletions(-) diff --git a/include/ccapi_cpp/ccapi_macro.h b/include/ccapi_cpp/ccapi_macro.h index be97fd04..2a9c378b 100644 --- a/include/ccapi_cpp/ccapi_macro.h +++ b/include/ccapi_cpp/ccapi_macro.h @@ -328,8 +328,12 @@ #define CCAPI_WEBSOCKET_BYBIT_CHANNEL_TRADE "trade.{symbol}" #define CCAPI_WEBSOCKET_BYBIT_CHANNEL_BOOK_TICKER "bookticker.{symbol}" #define CCAPI_WEBSOCKET_BYBIT_CHANNEL_DEPTH "orderbook.40.{symbol}" +#define CCAPI_WEBSOCKET_BYBIT_CHANNEL_KLINE "kline.{interval}.{symbol}" +#define CCAPI_WEBSOCKET_BYBIT_CHANNEL_KLINE_2 "kline" #define CCAPI_WEBSOCKET_BYBIT_DERIVATIVES_CHANNEL_TRADE "publicTrade.{symbol}" #define CCAPI_WEBSOCKET_BYBIT_DERIVATIVES_CHANNEL_ORDERBOOK "orderbook.{depth}.{symbol}" +#define CCAPI_WEBSOCKET_BYBIT_DERIVATIVES_CHANNEL_KLINE "kline.{interval}.{symbol}" +#define CCAPI_WEBSOCKET_BYBIT_DERIVATIVES_CHANNEL_KLINE_2 "kline" #define CCAPI_WEBSOCKET_ASCENDEX_CHANNEL_TRADES "trades" #define CCAPI_WEBSOCKET_ASCENDEX_CHANNEL_BBO "bbo" #define CCAPI_WEBSOCKET_ASCENDEX_CHANNEL_DEPTH "depth" diff --git a/include/ccapi_cpp/ccapi_session_configs.h b/include/ccapi_cpp/ccapi_session_configs.h index 454ceed8..c62a5e45 100644 --- a/include/ccapi_cpp/ccapi_session_configs.h +++ b/include/ccapi_cpp/ccapi_session_configs.h @@ -142,10 +142,12 @@ class SessionConfigs CCAPI_FINAL { std::map fieldWebsocketChannelMapBybit = { {CCAPI_TRADE, CCAPI_WEBSOCKET_BYBIT_CHANNEL_TRADE}, {CCAPI_MARKET_DEPTH, CCAPI_WEBSOCKET_BYBIT_CHANNEL_DEPTH}, + {CCAPI_CANDLESTICK, CCAPI_WEBSOCKET_BYBIT_CHANNEL_KLINE}, }; std::map fieldWebsocketChannelMapBybitDerivatives = { {CCAPI_TRADE, CCAPI_WEBSOCKET_BYBIT_DERIVATIVES_CHANNEL_TRADE}, {CCAPI_MARKET_DEPTH, CCAPI_WEBSOCKET_BYBIT_DERIVATIVES_CHANNEL_ORDERBOOK}, + {CCAPI_CANDLESTICK, CCAPI_WEBSOCKET_BYBIT_DERIVATIVES_CHANNEL_KLINE}, }; std::map fieldWebsocketChannelMapAscendex = { {CCAPI_TRADE, CCAPI_WEBSOCKET_ASCENDEX_CHANNEL_TRADES}, diff --git a/include/ccapi_cpp/service/ccapi_market_data_service_bybit.h b/include/ccapi_cpp/service/ccapi_market_data_service_bybit.h index 2ea449f5..1db33737 100644 --- a/include/ccapi_cpp/service/ccapi_market_data_service_bybit.h +++ b/include/ccapi_cpp/service/ccapi_market_data_service_bybit.h @@ -44,6 +44,11 @@ class MarketDataServiceBybit : public MarketDataServiceBybitBase { } else { channelId = CCAPI_WEBSOCKET_BYBIT_CHANNEL_DEPTH; } + } else if (field == CCAPI_CANDLESTICK) { + std::string interval = + this->convertCandlestickIntervalSecondsToInterval(std::stoi(optionMap.at(CCAPI_CANDLESTICK_INTERVAL_SECONDS)), "s", "m", "h", "d", "w"); + std::string toReplace = "{interval}"; + channelId.replace(channelId.find(toReplace), toReplace.length(), interval); } } std::vector createSendStringList(const WsConnection& wsConnection) override { @@ -204,6 +209,20 @@ class MarketDataServiceBybit : public MarketDataServiceBybitBase { dataPoint.insert({MarketDataMessage::DataFieldType::IS_BUYER_MAKER, data["m"].GetBool() ? "0" : "1"}); marketDataMessage.data[MarketDataMessage::DataType::TRADE].emplace_back(std::move(dataPoint)); marketDataMessageList.emplace_back(std::move(marketDataMessage)); + } else if (channelId.rfind(CCAPI_WEBSOCKET_BYBIT_CHANNEL_KLINE_2, 0) == 0) { + MarketDataMessage marketDataMessage; + marketDataMessage.type = MarketDataMessage::Type::MARKET_DATA_EVENTS_CANDLESTICK; + marketDataMessage.recapType = MarketDataMessage::RecapType::NONE; + marketDataMessage.tp = TimePoint(std::chrono::milliseconds(std::stoll(data["t"].GetString()))); + marketDataMessage.exchangeSubscriptionId = exchangeSubscriptionId; + MarketDataMessage::TypeForDataPoint dataPoint; + dataPoint.insert({MarketDataMessage::DataFieldType::OPEN_PRICE, data["o"].GetString()}); + dataPoint.insert({MarketDataMessage::DataFieldType::HIGH_PRICE, data["h"].GetString()}); + dataPoint.insert({MarketDataMessage::DataFieldType::LOW_PRICE, data["l"].GetString()}); + dataPoint.insert({MarketDataMessage::DataFieldType::CLOSE_PRICE, data["c"].GetString()}); + dataPoint.insert({MarketDataMessage::DataFieldType::VOLUME, data["v"].GetString()}); + marketDataMessage.data[MarketDataMessage::DataType::CANDLESTICK].emplace_back(std::move(dataPoint)); + marketDataMessageList.emplace_back(std::move(marketDataMessage)); } } } diff --git a/include/ccapi_cpp/service/ccapi_market_data_service_bybit_derivatives.h b/include/ccapi_cpp/service/ccapi_market_data_service_bybit_derivatives.h index b3b9f9a6..7c7d6079 100644 --- a/include/ccapi_cpp/service/ccapi_market_data_service_bybit_derivatives.h +++ b/include/ccapi_cpp/service/ccapi_market_data_service_bybit_derivatives.h @@ -66,6 +66,18 @@ class MarketDataServiceBybitDerivatives : public MarketDataServiceBybitBase { marketDepthSubscribedToExchange = this->calculateMarketDepthSubscribedToExchange(marketDepthRequested, depths); channelId += std::string("?") + CCAPI_MARKET_DEPTH_SUBSCRIBED_TO_EXCHANGE + "=" + std::to_string(marketDepthSubscribedToExchange); this->marketDepthSubscribedToExchangeByConnectionIdChannelIdSymbolIdMap[wsConnection.id][channelId][symbolId] = marketDepthSubscribedToExchange; + } else if (field == CCAPI_CANDLESTICK) { + std::string interval; + int intervalSeconds = std::stoi(optionMap.at(CCAPI_CANDLESTICK_INTERVAL_SECONDS)); + if (intervalSeconds < 86400) { + interval = std::to_string(intervalSeconds / 60); + } else if (intervalSeconds == 86400) { + interval = "D"; + } else { + interval = "W"; + } + std::string toReplace = "{interval}"; + channelId.replace(channelId.find(toReplace), toReplace.length(), interval); } } std::vector createSendStringList(const WsConnection& wsConnection) override { @@ -80,15 +92,13 @@ class MarketDataServiceBybitDerivatives : public MarketDataServiceBybitBase { auto channelId = subscriptionListByChannelIdSymbolId.first; for (const auto& subscriptionListByInstrument : subscriptionListByChannelIdSymbolId.second) { auto symbolId = subscriptionListByInstrument.first; - std::string exchangeSubscriptionId; + std::string exchangeSubscriptionId = channelId; if (channelId.rfind(CCAPI_WEBSOCKET_BYBIT_DERIVATIVES_CHANNEL_ORDERBOOK, 0) == 0) { int marketDepthSubscribedToExchange = this->marketDepthSubscribedToExchangeByConnectionIdChannelIdSymbolIdMap.at(wsConnection.id).at(channelId).at(symbolId); exchangeSubscriptionId = CCAPI_WEBSOCKET_BYBIT_DERIVATIVES_CHANNEL_ORDERBOOK; std::string toReplace = "{depth}"; exchangeSubscriptionId.replace(exchangeSubscriptionId.find(toReplace), toReplace.length(), std::to_string(marketDepthSubscribedToExchange)); - } else if (channelId == CCAPI_WEBSOCKET_BYBIT_DERIVATIVES_CHANNEL_TRADE) { - exchangeSubscriptionId = CCAPI_WEBSOCKET_BYBIT_DERIVATIVES_CHANNEL_TRADE; } std::string toReplace = "{symbol}"; exchangeSubscriptionId.replace(exchangeSubscriptionId.find(toReplace), toReplace.length(), symbolId); @@ -205,6 +215,23 @@ class MarketDataServiceBybitDerivatives : public MarketDataServiceBybitBase { marketDataMessage.data[MarketDataMessage::DataType::TRADE].emplace_back(std::move(dataPoint)); marketDataMessageList.emplace_back(std::move(marketDataMessage)); } + } else if (channelId.rfind(CCAPI_WEBSOCKET_BYBIT_DERIVATIVES_CHANNEL_KLINE_2, 0) == 0) { + for (const auto& x : data.GetArray()) { + MarketDataMessage marketDataMessage; + marketDataMessage.type = MarketDataMessage::Type::MARKET_DATA_EVENTS_CANDLESTICK; + marketDataMessage.recapType = MarketDataMessage::RecapType::NONE; + marketDataMessage.tp = TimePoint(std::chrono::milliseconds(std::stoll(x["start"].GetString()))); + marketDataMessage.exchangeSubscriptionId = exchangeSubscriptionId; + MarketDataMessage::TypeForDataPoint dataPoint; + dataPoint.insert({MarketDataMessage::DataFieldType::OPEN_PRICE, x["open"].GetString()}); + dataPoint.insert({MarketDataMessage::DataFieldType::HIGH_PRICE, x["high"].GetString()}); + dataPoint.insert({MarketDataMessage::DataFieldType::LOW_PRICE, x["low"].GetString()}); + dataPoint.insert({MarketDataMessage::DataFieldType::CLOSE_PRICE, x["close"].GetString()}); + dataPoint.insert({MarketDataMessage::DataFieldType::VOLUME, x["volume"].GetString()}); + dataPoint.insert({MarketDataMessage::DataFieldType::QUOTE_VOLUME, x["turnover"].GetString()}); + marketDataMessage.data[MarketDataMessage::DataType::CANDLESTICK].emplace_back(std::move(dataPoint)); + marketDataMessageList.emplace_back(std::move(marketDataMessage)); + } } } } From 69451f9d3d2507d4045834f9e2c9107916536315 Mon Sep 17 00:00:00 2001 From: Crypto Chassis Date: Thu, 14 Sep 2023 07:55:09 -0700 Subject: [PATCH 4/7] fix: python binding CMakeLists.txt regression in commands --- binding/python/CMakeLists.txt | 1 + .../python/example/market_data_simple_subscription/main.py | 7 +------ 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/binding/python/CMakeLists.txt b/binding/python/CMakeLists.txt index 2ccec4a3..e508c0c7 100644 --- a/binding/python/CMakeLists.txt +++ b/binding/python/CMakeLists.txt @@ -44,6 +44,7 @@ add_custom_target(${PYTHON_PACKAGING_TARGET_NAME} ALL COMMAND ${CMAKE_COMMAND} -E copy ${CMAKE_CURRENT_BINARY_DIR}/setup.py ${PACKAGING_DIR_FULL} COMMAND ${CMAKE_COMMAND} -E copy ${CMAKE_CURRENT_BINARY_DIR}/${SWIG_TARGET_NAME}/ccapi.py ${PACKAGING_DIR_FULL} COMMAND ${CMAKE_COMMAND} -E copy $ ${PACKAGING_DIR_FULL} + COMMAND ${Python_EXECUTABLE} -m pip install --upgrade setuptools wheel COMMAND ${Python_EXECUTABLE} setup.py bdist_wheel COMMAND ${Python_EXECUTABLE} -m pip install --upgrade . WORKING_DIRECTORY ${PACKAGING_DIR_FULL} diff --git a/binding/python/example/market_data_simple_subscription/main.py b/binding/python/example/market_data_simple_subscription/main.py index cccee76c..a0b10e4a 100644 --- a/binding/python/example/market_data_simple_subscription/main.py +++ b/binding/python/example/market_data_simple_subscription/main.py @@ -4,7 +4,7 @@ class MyEventHandler(EventHandler): def __init__(self): super().__init__() def processEvent(self, event: Event, session: Session) -> bool: - if event.getType() == Event.SUBSCRIPTION_STATUS: + if event.getType() == Event.Type_SUBSCRIPTION_STATUS: print(f'Received an event of type SUBSCRIPTION_STATUS:\n{event.toStringPretty(2, 2)}') elif event.getType() == Event.Type_SUBSCRIPTION_DATA: for message in event.getMessageList(): @@ -21,11 +21,6 @@ def processEvent(self, event: Event, session: Session) -> bool: session = Session(option, config, eventHandler) subscription = Subscription('okx', 'BTC-USDT', 'MARKET_DEPTH') session.subscribe(subscription) - request = Request(Request.Operation_GET_RECENT_TRADES, 'okx', 'BTC-USDT') - request.appendParam({ - 'LIMIT':'1', - }) - session.sendRequest(request) time.sleep(10) session.stop() print('Bye') From 4fbd9e4c433431cc77bbba857d8a9a60b97addc2 Mon Sep 17 00:00:00 2001 From: Crypto Chassis Date: Thu, 14 Sep 2023 11:03:45 -0700 Subject: [PATCH 5/7] docs: add example for data visualization --- .../python/example/data_visualization/main.py | 49 +++++++++++++++++++ .../example/enable_library_logging/main.py | 5 ++ .../main.py | 11 +++-- .../main.py | 27 +++++----- binding/python/example/fix_simple/main.py | 21 +++++--- .../python/example/handle_exception/main.py | 5 ++ .../market_data_multiple_subscription/main.py | 5 ++ .../market_data_simple_request/main.py | 7 ++- .../market_data_simple_subscription/main.py | 5 ++ binding/python/test/main.py | 7 ++- format.sh | 1 + format_python.sh | 2 + 12 files changed, 121 insertions(+), 24 deletions(-) create mode 100644 binding/python/example/data_visualization/main.py create mode 100755 format_python.sh diff --git a/binding/python/example/data_visualization/main.py b/binding/python/example/data_visualization/main.py new file mode 100644 index 00000000..50c677dc --- /dev/null +++ b/binding/python/example/data_visualization/main.py @@ -0,0 +1,49 @@ +import time +from ccapi import EventHandler, SessionOptions, SessionConfigs, Session, Subscription, Event +import matplotlib.pyplot as plt +import seaborn as sns + +if __name__ == '__main__': + option = SessionOptions() + config = SessionConfigs() + session = Session(option, config) + exchange = 'okx' + instrument = 'BTC-USDT' + subscription = Subscription(exchange, instrument, 'MARKET_DEPTH', 'MARKET_DEPTH_MAX=400&CONFLATE_INTERVAL_MILLISECONDS=100') + session.subscribe(subscription) + fig, ax = plt.subplots() + while True: + bids = { + 'price': [], + 'size': [] + } + asks = { + 'price': [], + 'size': [] + } + eventList = session.getEventQueue().purge() + if eventList: + event = eventList[-1] + if event.getType() == Event.Type_SUBSCRIPTION_DATA: + for message in event.getMessageList(): + for element in message.getElementList(): + elementNameValueMap = element.getNameValueMap() + for name, value in elementNameValueMap.items(): + if name == 'BID_PRICE': + bids['price'].append(float(value)) + if name == 'BID_SIZE': + bids['size'].append(float(value)) + if name == 'ASK_PRICE': + asks['price'].append(float(value)) + if name == 'ASK_SIZE': + asks['size'].append(float(value)) + ax.clear() + ax.set_title(f'{instrument} Order Book On {exchange.title()} at {message.getTimeISO()}') + ax.set_xlabel('Price') + ax.set_ylabel('Amount') + sns.ecdfplot(x='price', weights='size', legend=False, stat='count', complementary=True, + data={'price': bids['price'], 'size': bids['size']}, ax=ax, color='g') + sns.ecdfplot(x='price', weights='size', legend=False, stat='count', data={'price': asks['price'], 'size': asks['size']}, ax=ax, color='r') + plt.pause(0.1) + session.stop() + print('Bye') diff --git a/binding/python/example/enable_library_logging/main.py b/binding/python/example/enable_library_logging/main.py index 6201a174..d6bce2b9 100644 --- a/binding/python/example/enable_library_logging/main.py +++ b/binding/python/example/enable_library_logging/main.py @@ -2,14 +2,19 @@ from threading import Lock import time from ccapi import Logger, Session, Subscription + + class MyLogger(Logger): def __init__(self): super().__init__() self._lock = Lock() + def logMessage(self, severity: str, threadId: str, timeISO: str, fileName: str, lineNumber: str, message: str) -> None: self._lock.acquire() print(f'{threadId}: [{timeISO}] {{{fileName}:{lineNumber}}} {severity}{" " * 8}{message}') self._lock.release() + + myLogger = MyLogger() Logger.logger = myLogger if __name__ == '__main__': diff --git a/binding/python/example/execution_management_simple_request/main.py b/binding/python/example/execution_management_simple_request/main.py index e172011d..d09909be 100644 --- a/binding/python/example/execution_management_simple_request/main.py +++ b/binding/python/example/execution_management_simple_request/main.py @@ -2,12 +2,17 @@ import sys import time from ccapi import Event, SessionOptions, SessionConfigs, Session, EventHandler, Request + + class MyEventHandler(EventHandler): def __init__(self): super().__init__() + def processEvent(self, event: Event, session: Session) -> bool: print(f'Received an event:\n{event.toStringPretty(2, 2)}') return True # This line is needed. + + if __name__ == '__main__': if not os.environ.get('BINANCE_US_API_KEY'): print('Please set environment variable BINANCE_US_API_KEY', file=sys.stderr) @@ -21,9 +26,9 @@ def processEvent(self, event: Event, session: Session) -> bool: session = Session(option, config, eventHandler) request = Request(Request.Operation_CREATE_ORDER, 'binance-us', 'BTCUSD') request.appendParam({ - 'SIDE':'BUY', - 'QUANTITY':'0.0005', - 'LIMIT_PRICE':'20000', + 'SIDE': 'BUY', + 'QUANTITY': '0.0005', + 'LIMIT_PRICE': '20000', }) session.sendRequest(request) time.sleep(10) diff --git a/binding/python/example/execution_management_simple_subscription/main.py b/binding/python/example/execution_management_simple_subscription/main.py index fb8b2f8b..8658f92a 100644 --- a/binding/python/example/execution_management_simple_subscription/main.py +++ b/binding/python/example/execution_management_simple_subscription/main.py @@ -2,24 +2,29 @@ import sys import time from ccapi import EventHandler, SessionOptions, SessionConfigs, Session, Subscription, Request, Event, Message + + class MyEventHandler(EventHandler): def __init__(self): super().__init__() + def processEvent(self, event: Event, session: Session) -> bool: if event.getType() == Event.Type_SUBSCRIPTION_STATUS: - print(f'Received an event of type SUBSCRIPTION_STATUS:\n{event.toStringPretty(2, 2)}') - message = event.getMessageList()[0] - if message.getType() == Message.Type_SUBSCRIPTION_STARTED: - request = Request(Request.Operation_CREATE_ORDER, 'coinbase', 'BTC-USD') - request.appendParam({ - 'SIDE':'BUY', - 'LIMIT_PRICE':'20000', - 'QUANTITY':'0.001', - }) - session.sendRequest(request) + print(f'Received an event of type SUBSCRIPTION_STATUS:\n{event.toStringPretty(2, 2)}') + message = event.getMessageList()[0] + if message.getType() == Message.Type_SUBSCRIPTION_STARTED: + request = Request(Request.Operation_CREATE_ORDER, 'coinbase', 'BTC-USD') + request.appendParam({ + 'SIDE': 'BUY', + 'LIMIT_PRICE': '20000', + 'QUANTITY': '0.001', + }) + session.sendRequest(request) elif event.getType() == Event.Type_SUBSCRIPTION_DATA: - print(f'Received an event of type SUBSCRIPTION_DATA:\n{event.toStringPretty(2, 2)}') + print(f'Received an event of type SUBSCRIPTION_DATA:\n{event.toStringPretty(2, 2)}') return True # This line is needed. + + if __name__ == '__main__': if not os.environ.get('COINBASE_API_KEY'): print('Please set environment variable COINBASE_API_KEY', file=sys.stderr) diff --git a/binding/python/example/fix_simple/main.py b/binding/python/example/fix_simple/main.py index 95373ca0..96676260 100644 --- a/binding/python/example/fix_simple/main.py +++ b/binding/python/example/fix_simple/main.py @@ -2,9 +2,12 @@ import sys import time from ccapi import EventHandler, SessionOptions, SessionConfigs, Session, Subscription, Request, Event, Message + + class MyEventHandler(EventHandler): def __init__(self): super().__init__() + def processEvent(self, event: Event, session: Session) -> bool: if event.getType() == Event.Type_AUTHORIZATION_STATUS: print(f'Received an event of type AUTHORIZATION_STATUS:\n{event.toStringPretty(2, 2)}') @@ -12,19 +15,21 @@ def processEvent(self, event: Event, session: Session) -> bool: if message.getType() == Message.Type_AUTHORIZATION_SUCCESS: request = Request(Request.Operation_FIX, 'coinbase', '', 'same correlation id for subscription and request') request.appendParamFix([ - (35,'D'), - (11,'6d4eb0fb-2229-469f-873e-557dd78ac11e'), - (55,'BTC-USD'), - (54,'1'), - (44,'20000'), - (38,'0.001'), - (40,'2'), - (59,'1'), + (35, 'D'), + (11, '6d4eb0fb-2229-469f-873e-557dd78ac11e'), + (55, 'BTC-USD'), + (54, '1'), + (44, '20000'), + (38, '0.001'), + (40, '2'), + (59, '1'), ]) session.sendRequestByFix(request) elif event.getType() == Event.Type_FIX: print(f'Received an event of type FIX:\n{event.toStringPretty(2, 2)}') return True # This line is needed. + + if __name__ == '__main__': if not os.environ.get('COINBASE_API_KEY'): print('Please set environment variable COINBASE_API_KEY', file=sys.stderr) diff --git a/binding/python/example/handle_exception/main.py b/binding/python/example/handle_exception/main.py index f0da8e73..3c1a0909 100644 --- a/binding/python/example/handle_exception/main.py +++ b/binding/python/example/handle_exception/main.py @@ -2,9 +2,12 @@ import time import traceback from ccapi import EventHandler, SessionOptions, SessionConfigs, Session, Subscription, Event + + class MyEventHandler(EventHandler): def __init__(self): super().__init__() + def processEvent(self, event: Event, session: Session) -> bool: try: raise Exception('oops') @@ -12,6 +15,8 @@ def processEvent(self, event: Event, session: Session) -> bool: print(traceback.format_exc()) sys.exit(1) return True # This line is needed. + + if __name__ == '__main__': eventHandler = MyEventHandler() option = SessionOptions() diff --git a/binding/python/example/market_data_multiple_subscription/main.py b/binding/python/example/market_data_multiple_subscription/main.py index 1daea3e6..dafaf5f1 100644 --- a/binding/python/example/market_data_multiple_subscription/main.py +++ b/binding/python/example/market_data_multiple_subscription/main.py @@ -1,8 +1,11 @@ import time from ccapi import EventHandler, SessionOptions, SessionConfigs, Session, Subscription, Event, SubscriptionList + + class MyEventHandler(EventHandler): def __init__(self): super().__init__() + def processEvent(self, event: Event, session: Session) -> bool: if event.getType() == Event.Type_SUBSCRIPTION_DATA: for message in event.getMessageList(): @@ -13,6 +16,8 @@ def processEvent(self, event: Event, session: Session) -> bool: for name, value in elementNameValueMap.items(): print(f' {name} = {value}') return True # This line is needed. + + if __name__ == '__main__': eventHandler = MyEventHandler() option = SessionOptions() diff --git a/binding/python/example/market_data_simple_request/main.py b/binding/python/example/market_data_simple_request/main.py index 6e4a8ed0..cc801044 100644 --- a/binding/python/example/market_data_simple_request/main.py +++ b/binding/python/example/market_data_simple_request/main.py @@ -1,11 +1,16 @@ import time from ccapi import EventHandler, SessionOptions, SessionConfigs, Session, Request, Event + + class MyEventHandler(EventHandler): def __init__(self): super().__init__() + def processEvent(self, event: Event, session: Session) -> bool: print(f'Received an event:\n{event.toStringPretty(2, 2)}') return True # This line is needed. + + if __name__ == '__main__': eventHandler = MyEventHandler() option = SessionOptions() @@ -13,7 +18,7 @@ def processEvent(self, event: Event, session: Session) -> bool: session = Session(option, config, eventHandler) request = Request(Request.Operation_GET_RECENT_TRADES, 'coinbase', 'BTC-USD') request.appendParam({ - 'LIMIT':'1', + 'LIMIT': '1', }) session.sendRequest(request) time.sleep(10) diff --git a/binding/python/example/market_data_simple_subscription/main.py b/binding/python/example/market_data_simple_subscription/main.py index a0b10e4a..94cd7982 100644 --- a/binding/python/example/market_data_simple_subscription/main.py +++ b/binding/python/example/market_data_simple_subscription/main.py @@ -1,8 +1,11 @@ import time from ccapi import EventHandler, SessionOptions, SessionConfigs, Session, Subscription, Event + + class MyEventHandler(EventHandler): def __init__(self): super().__init__() + def processEvent(self, event: Event, session: Session) -> bool: if event.getType() == Event.Type_SUBSCRIPTION_STATUS: print(f'Received an event of type SUBSCRIPTION_STATUS:\n{event.toStringPretty(2, 2)}') @@ -14,6 +17,8 @@ def processEvent(self, event: Event, session: Session) -> bool: for name, value in elementNameValueMap.items(): print(f' {name} = {value}') return True # This line is needed. + + if __name__ == '__main__': eventHandler = MyEventHandler() option = SessionOptions() diff --git a/binding/python/test/main.py b/binding/python/test/main.py index 6756854d..4a40d989 100644 --- a/binding/python/test/main.py +++ b/binding/python/test/main.py @@ -1,10 +1,15 @@ import time from ccapi import EventHandler, SessionOptions, SessionConfigs, Session, Subscription, Event, Request + + class MyEventHandler(EventHandler): def __init__(self): super().__init__() + def processEvent(self, event: Event, session: Session) -> bool: return True # This line is needed. + + if __name__ == '__main__': eventHandler = MyEventHandler() option = SessionOptions() @@ -14,7 +19,7 @@ def processEvent(self, event: Event, session: Session) -> bool: session.subscribe(subscription) request = Request(Request.Operation_GET_RECENT_TRADES, 'okx', 'BTC-USDT') request.appendParam({ - 'LIMIT':'1', + 'LIMIT': '1', }) session.sendRequest(request) session.stop() diff --git a/format.sh b/format.sh index 1c57883e..8c56cc90 100755 --- a/format.sh +++ b/format.sh @@ -4,3 +4,4 @@ ./format_go.sh ./format_java.sh ./format_javascript.sh +./format_python.sh diff --git a/format_python.sh b/format_python.sh new file mode 100755 index 00000000..dbe610b5 --- /dev/null +++ b/format_python.sh @@ -0,0 +1,2 @@ +#!/usr/bin/env bash +find . -type f -path "*/binding/*" -not -path "*/build/*" -not -path "*/.venv/*" -name "*.py" -exec autopep8 --in-place --max-line-length 160 {} \+ From 05acec8873b0f23092c8f89a36219805108cd11e Mon Sep 17 00:00:00 2001 From: Crypto Chassis Date: Thu, 14 Sep 2023 11:35:47 -0700 Subject: [PATCH 6/7] chore: format python code --- .../python/example/data_visualization/main.py | 56 ++++++++++--------- .../example/enable_library_logging/main.py | 6 +- .../main.py | 28 +++++----- .../main.py | 36 ++++++------ binding/python/example/fix_simple/main.py | 46 +++++++-------- .../python/example/handle_exception/main.py | 8 +-- .../market_data_multiple_subscription/main.py | 12 ++-- .../market_data_simple_request/main.py | 16 +++--- .../market_data_simple_subscription/main.py | 12 ++-- binding/python/test/main.py | 16 +++--- format_python.sh | 2 +- 11 files changed, 125 insertions(+), 113 deletions(-) diff --git a/binding/python/example/data_visualization/main.py b/binding/python/example/data_visualization/main.py index 50c677dc..74ec5313 100644 --- a/binding/python/example/data_visualization/main.py +++ b/binding/python/example/data_visualization/main.py @@ -3,24 +3,18 @@ import matplotlib.pyplot as plt import seaborn as sns -if __name__ == '__main__': +if __name__ == "__main__": option = SessionOptions() config = SessionConfigs() session = Session(option, config) - exchange = 'okx' - instrument = 'BTC-USDT' - subscription = Subscription(exchange, instrument, 'MARKET_DEPTH', 'MARKET_DEPTH_MAX=400&CONFLATE_INTERVAL_MILLISECONDS=100') + exchange = "okx" + instrument = "BTC-USDT" + subscription = Subscription(exchange, instrument, "MARKET_DEPTH", "MARKET_DEPTH_MAX=400&CONFLATE_INTERVAL_MILLISECONDS=100") session.subscribe(subscription) fig, ax = plt.subplots() while True: - bids = { - 'price': [], - 'size': [] - } - asks = { - 'price': [], - 'size': [] - } + bids = {"price": [], "size": []} + asks = {"price": [], "size": []} eventList = session.getEventQueue().purge() if eventList: event = eventList[-1] @@ -29,21 +23,29 @@ for element in message.getElementList(): elementNameValueMap = element.getNameValueMap() for name, value in elementNameValueMap.items(): - if name == 'BID_PRICE': - bids['price'].append(float(value)) - if name == 'BID_SIZE': - bids['size'].append(float(value)) - if name == 'ASK_PRICE': - asks['price'].append(float(value)) - if name == 'ASK_SIZE': - asks['size'].append(float(value)) + if name == "BID_PRICE": + bids["price"].append(float(value)) + if name == "BID_SIZE": + bids["size"].append(float(value)) + if name == "ASK_PRICE": + asks["price"].append(float(value)) + if name == "ASK_SIZE": + asks["size"].append(float(value)) ax.clear() - ax.set_title(f'{instrument} Order Book On {exchange.title()} at {message.getTimeISO()}') - ax.set_xlabel('Price') - ax.set_ylabel('Amount') - sns.ecdfplot(x='price', weights='size', legend=False, stat='count', complementary=True, - data={'price': bids['price'], 'size': bids['size']}, ax=ax, color='g') - sns.ecdfplot(x='price', weights='size', legend=False, stat='count', data={'price': asks['price'], 'size': asks['size']}, ax=ax, color='r') + ax.set_title(f"{instrument} Order Book On {exchange.title()} at {message.getTimeISO()}") + ax.set_xlabel("Price") + ax.set_ylabel("Amount") + sns.ecdfplot( + x="price", + weights="size", + legend=False, + stat="count", + complementary=True, + data={"price": bids["price"], "size": bids["size"]}, + ax=ax, + color="g", + ) + sns.ecdfplot(x="price", weights="size", legend=False, stat="count", data={"price": asks["price"], "size": asks["size"]}, ax=ax, color="r") plt.pause(0.1) session.stop() - print('Bye') + print("Bye") diff --git a/binding/python/example/enable_library_logging/main.py b/binding/python/example/enable_library_logging/main.py index d6bce2b9..e4273081 100644 --- a/binding/python/example/enable_library_logging/main.py +++ b/binding/python/example/enable_library_logging/main.py @@ -17,11 +17,11 @@ def logMessage(self, severity: str, threadId: str, timeISO: str, fileName: str, myLogger = MyLogger() Logger.logger = myLogger -if __name__ == '__main__': +if __name__ == "__main__": session = Session() - subscription = Subscription('coinbase', 'BTC-USD', 'MARKET_DEPTH') + subscription = Subscription("coinbase", "BTC-USD", "MARKET_DEPTH") session.subscribe(subscription) time.sleep(10) session.stop() - print('Bye') + print("Bye") Logger.logger = None diff --git a/binding/python/example/execution_management_simple_request/main.py b/binding/python/example/execution_management_simple_request/main.py index d09909be..f32414c6 100644 --- a/binding/python/example/execution_management_simple_request/main.py +++ b/binding/python/example/execution_management_simple_request/main.py @@ -9,28 +9,30 @@ def __init__(self): super().__init__() def processEvent(self, event: Event, session: Session) -> bool: - print(f'Received an event:\n{event.toStringPretty(2, 2)}') + print(f"Received an event:\n{event.toStringPretty(2, 2)}") return True # This line is needed. -if __name__ == '__main__': - if not os.environ.get('BINANCE_US_API_KEY'): - print('Please set environment variable BINANCE_US_API_KEY', file=sys.stderr) +if __name__ == "__main__": + if not os.environ.get("BINANCE_US_API_KEY"): + print("Please set environment variable BINANCE_US_API_KEY", file=sys.stderr) sys.exit(1) - if not os.environ.get('BINANCE_US_API_SECRET'): - print('Please set environment variable BINANCE_US_API_SECRET', file=sys.stderr) + if not os.environ.get("BINANCE_US_API_SECRET"): + print("Please set environment variable BINANCE_US_API_SECRET", file=sys.stderr) sys.exit(1) eventHandler = MyEventHandler() option = SessionOptions() config = SessionConfigs() session = Session(option, config, eventHandler) - request = Request(Request.Operation_CREATE_ORDER, 'binance-us', 'BTCUSD') - request.appendParam({ - 'SIDE': 'BUY', - 'QUANTITY': '0.0005', - 'LIMIT_PRICE': '20000', - }) + request = Request(Request.Operation_CREATE_ORDER, "binance-us", "BTCUSD") + request.appendParam( + { + "SIDE": "BUY", + "QUANTITY": "0.0005", + "LIMIT_PRICE": "20000", + } + ) session.sendRequest(request) time.sleep(10) session.stop() - print('Bye') + print("Bye") diff --git a/binding/python/example/execution_management_simple_subscription/main.py b/binding/python/example/execution_management_simple_subscription/main.py index 8658f92a..5f8898bd 100644 --- a/binding/python/example/execution_management_simple_subscription/main.py +++ b/binding/python/example/execution_management_simple_subscription/main.py @@ -10,37 +10,39 @@ def __init__(self): def processEvent(self, event: Event, session: Session) -> bool: if event.getType() == Event.Type_SUBSCRIPTION_STATUS: - print(f'Received an event of type SUBSCRIPTION_STATUS:\n{event.toStringPretty(2, 2)}') + print(f"Received an event of type SUBSCRIPTION_STATUS:\n{event.toStringPretty(2, 2)}") message = event.getMessageList()[0] if message.getType() == Message.Type_SUBSCRIPTION_STARTED: - request = Request(Request.Operation_CREATE_ORDER, 'coinbase', 'BTC-USD') - request.appendParam({ - 'SIDE': 'BUY', - 'LIMIT_PRICE': '20000', - 'QUANTITY': '0.001', - }) + request = Request(Request.Operation_CREATE_ORDER, "coinbase", "BTC-USD") + request.appendParam( + { + "SIDE": "BUY", + "LIMIT_PRICE": "20000", + "QUANTITY": "0.001", + } + ) session.sendRequest(request) elif event.getType() == Event.Type_SUBSCRIPTION_DATA: - print(f'Received an event of type SUBSCRIPTION_DATA:\n{event.toStringPretty(2, 2)}') + print(f"Received an event of type SUBSCRIPTION_DATA:\n{event.toStringPretty(2, 2)}") return True # This line is needed. -if __name__ == '__main__': - if not os.environ.get('COINBASE_API_KEY'): - print('Please set environment variable COINBASE_API_KEY', file=sys.stderr) +if __name__ == "__main__": + if not os.environ.get("COINBASE_API_KEY"): + print("Please set environment variable COINBASE_API_KEY", file=sys.stderr) sys.exit(1) - if not os.environ.get('COINBASE_API_SECRET'): - print('Please set environment variable COINBASE_API_SECRET', file=sys.stderr) + if not os.environ.get("COINBASE_API_SECRET"): + print("Please set environment variable COINBASE_API_SECRET", file=sys.stderr) sys.exit(1) - if not os.environ.get('COINBASE_API_PASSPHRASE'): - print('Please set environment variable COINBASE_API_PASSPHRASE', file=sys.stderr) + if not os.environ.get("COINBASE_API_PASSPHRASE"): + print("Please set environment variable COINBASE_API_PASSPHRASE", file=sys.stderr) sys.exit(1) eventHandler = MyEventHandler() option = SessionOptions() config = SessionConfigs() session = Session(option, config, eventHandler) - subscription = Subscription('coinbase', 'BTC-USD', 'ORDER_UPDATE') + subscription = Subscription("coinbase", "BTC-USD", "ORDER_UPDATE") session.subscribe(subscription) time.sleep(10) session.stop() - print('Bye') + print("Bye") diff --git a/binding/python/example/fix_simple/main.py b/binding/python/example/fix_simple/main.py index 96676260..81d7dd36 100644 --- a/binding/python/example/fix_simple/main.py +++ b/binding/python/example/fix_simple/main.py @@ -10,42 +10,44 @@ def __init__(self): def processEvent(self, event: Event, session: Session) -> bool: if event.getType() == Event.Type_AUTHORIZATION_STATUS: - print(f'Received an event of type AUTHORIZATION_STATUS:\n{event.toStringPretty(2, 2)}') + print(f"Received an event of type AUTHORIZATION_STATUS:\n{event.toStringPretty(2, 2)}") message = event.getMessageList()[0] if message.getType() == Message.Type_AUTHORIZATION_SUCCESS: - request = Request(Request.Operation_FIX, 'coinbase', '', 'same correlation id for subscription and request') - request.appendParamFix([ - (35, 'D'), - (11, '6d4eb0fb-2229-469f-873e-557dd78ac11e'), - (55, 'BTC-USD'), - (54, '1'), - (44, '20000'), - (38, '0.001'), - (40, '2'), - (59, '1'), - ]) + request = Request(Request.Operation_FIX, "coinbase", "", "same correlation id for subscription and request") + request.appendParamFix( + [ + (35, "D"), + (11, "6d4eb0fb-2229-469f-873e-557dd78ac11e"), + (55, "BTC-USD"), + (54, "1"), + (44, "20000"), + (38, "0.001"), + (40, "2"), + (59, "1"), + ] + ) session.sendRequestByFix(request) elif event.getType() == Event.Type_FIX: - print(f'Received an event of type FIX:\n{event.toStringPretty(2, 2)}') + print(f"Received an event of type FIX:\n{event.toStringPretty(2, 2)}") return True # This line is needed. -if __name__ == '__main__': - if not os.environ.get('COINBASE_API_KEY'): - print('Please set environment variable COINBASE_API_KEY', file=sys.stderr) +if __name__ == "__main__": + if not os.environ.get("COINBASE_API_KEY"): + print("Please set environment variable COINBASE_API_KEY", file=sys.stderr) sys.exit(1) - if not os.environ.get('COINBASE_API_SECRET'): - print('Please set environment variable COINBASE_API_SECRET', file=sys.stderr) + if not os.environ.get("COINBASE_API_SECRET"): + print("Please set environment variable COINBASE_API_SECRET", file=sys.stderr) sys.exit(1) - if not os.environ.get('COINBASE_API_PASSPHRASE'): - print('Please set environment variable COINBASE_API_PASSPHRASE', file=sys.stderr) + if not os.environ.get("COINBASE_API_PASSPHRASE"): + print("Please set environment variable COINBASE_API_PASSPHRASE", file=sys.stderr) sys.exit(1) eventHandler = MyEventHandler() option = SessionOptions() config = SessionConfigs() session = Session(option, config, eventHandler) - subscription = Subscription('coinbase', '', 'FIX', '', 'same correlation id for subscription and request') + subscription = Subscription("coinbase", "", "FIX", "", "same correlation id for subscription and request") session.subscribeByFix(subscription) time.sleep(10) session.stop() - print('Bye') + print("Bye") diff --git a/binding/python/example/handle_exception/main.py b/binding/python/example/handle_exception/main.py index 3c1a0909..23f8ab25 100644 --- a/binding/python/example/handle_exception/main.py +++ b/binding/python/example/handle_exception/main.py @@ -10,20 +10,20 @@ def __init__(self): def processEvent(self, event: Event, session: Session) -> bool: try: - raise Exception('oops') + raise Exception("oops") except Exception: print(traceback.format_exc()) sys.exit(1) return True # This line is needed. -if __name__ == '__main__': +if __name__ == "__main__": eventHandler = MyEventHandler() option = SessionOptions() config = SessionConfigs() session = Session(option, config, eventHandler) - subscription = Subscription('coinbase', 'BTC-USD', 'MARKET_DEPTH') + subscription = Subscription("coinbase", "BTC-USD", "MARKET_DEPTH") session.subscribe(subscription) time.sleep(10) session.stop() - print('Bye') + print("Bye") diff --git a/binding/python/example/market_data_multiple_subscription/main.py b/binding/python/example/market_data_multiple_subscription/main.py index dafaf5f1..96c2b0b9 100644 --- a/binding/python/example/market_data_multiple_subscription/main.py +++ b/binding/python/example/market_data_multiple_subscription/main.py @@ -10,23 +10,23 @@ def processEvent(self, event: Event, session: Session) -> bool: if event.getType() == Event.Type_SUBSCRIPTION_DATA: for message in event.getMessageList(): correlationId = message.getCorrelationIdList()[0] - print(f'{correlationId}: Best bid and ask at {message.getTimeISO()} are:') + print(f"{correlationId}: Best bid and ask at {message.getTimeISO()} are:") for element in message.getElementList(): elementNameValueMap = element.getNameValueMap() for name, value in elementNameValueMap.items(): - print(f' {name} = {value}') + print(f" {name} = {value}") return True # This line is needed. -if __name__ == '__main__': +if __name__ == "__main__": eventHandler = MyEventHandler() option = SessionOptions() config = SessionConfigs() session = Session(option, config, eventHandler) subscriptionList = SubscriptionList() - subscriptionList.append(Subscription('coinbase', 'BTC-USD', 'MARKET_DEPTH', '', 'BTC')) - subscriptionList.append(Subscription('coinbase', 'ETH-USD', 'MARKET_DEPTH', '', 'ETH')) + subscriptionList.append(Subscription("coinbase", "BTC-USD", "MARKET_DEPTH", "", "BTC")) + subscriptionList.append(Subscription("coinbase", "ETH-USD", "MARKET_DEPTH", "", "ETH")) session.subscribe(subscriptionList) time.sleep(10) session.stop() - print('Bye') + print("Bye") diff --git a/binding/python/example/market_data_simple_request/main.py b/binding/python/example/market_data_simple_request/main.py index cc801044..be1c3719 100644 --- a/binding/python/example/market_data_simple_request/main.py +++ b/binding/python/example/market_data_simple_request/main.py @@ -7,20 +7,22 @@ def __init__(self): super().__init__() def processEvent(self, event: Event, session: Session) -> bool: - print(f'Received an event:\n{event.toStringPretty(2, 2)}') + print(f"Received an event:\n{event.toStringPretty(2, 2)}") return True # This line is needed. -if __name__ == '__main__': +if __name__ == "__main__": eventHandler = MyEventHandler() option = SessionOptions() config = SessionConfigs() session = Session(option, config, eventHandler) - request = Request(Request.Operation_GET_RECENT_TRADES, 'coinbase', 'BTC-USD') - request.appendParam({ - 'LIMIT': '1', - }) + request = Request(Request.Operation_GET_RECENT_TRADES, "coinbase", "BTC-USD") + request.appendParam( + { + "LIMIT": "1", + } + ) session.sendRequest(request) time.sleep(10) session.stop() - print('Bye') + print("Bye") diff --git a/binding/python/example/market_data_simple_subscription/main.py b/binding/python/example/market_data_simple_subscription/main.py index 94cd7982..ebfa7be2 100644 --- a/binding/python/example/market_data_simple_subscription/main.py +++ b/binding/python/example/market_data_simple_subscription/main.py @@ -8,24 +8,24 @@ def __init__(self): def processEvent(self, event: Event, session: Session) -> bool: if event.getType() == Event.Type_SUBSCRIPTION_STATUS: - print(f'Received an event of type SUBSCRIPTION_STATUS:\n{event.toStringPretty(2, 2)}') + print(f"Received an event of type SUBSCRIPTION_STATUS:\n{event.toStringPretty(2, 2)}") elif event.getType() == Event.Type_SUBSCRIPTION_DATA: for message in event.getMessageList(): - print(f'Best bid and ask at {message.getTimeISO()} are:') + print(f"Best bid and ask at {message.getTimeISO()} are:") for element in message.getElementList(): elementNameValueMap = element.getNameValueMap() for name, value in elementNameValueMap.items(): - print(f' {name} = {value}') + print(f" {name} = {value}") return True # This line is needed. -if __name__ == '__main__': +if __name__ == "__main__": eventHandler = MyEventHandler() option = SessionOptions() config = SessionConfigs() session = Session(option, config, eventHandler) - subscription = Subscription('okx', 'BTC-USDT', 'MARKET_DEPTH') + subscription = Subscription("okx", "BTC-USDT", "MARKET_DEPTH") session.subscribe(subscription) time.sleep(10) session.stop() - print('Bye') + print("Bye") diff --git a/binding/python/test/main.py b/binding/python/test/main.py index 4a40d989..03a54924 100644 --- a/binding/python/test/main.py +++ b/binding/python/test/main.py @@ -10,17 +10,19 @@ def processEvent(self, event: Event, session: Session) -> bool: return True # This line is needed. -if __name__ == '__main__': +if __name__ == "__main__": eventHandler = MyEventHandler() option = SessionOptions() config = SessionConfigs() session = Session(option, config, eventHandler) - subscription = Subscription('okx', 'BTC-USDT', 'MARKET_DEPTH') + subscription = Subscription("okx", "BTC-USDT", "MARKET_DEPTH") session.subscribe(subscription) - request = Request(Request.Operation_GET_RECENT_TRADES, 'okx', 'BTC-USDT') - request.appendParam({ - 'LIMIT': '1', - }) + request = Request(Request.Operation_GET_RECENT_TRADES, "okx", "BTC-USDT") + request.appendParam( + { + "LIMIT": "1", + } + ) session.sendRequest(request) session.stop() - print('Bye') + print("Bye") diff --git a/format_python.sh b/format_python.sh index dbe610b5..b2e08401 100755 --- a/format_python.sh +++ b/format_python.sh @@ -1,2 +1,2 @@ #!/usr/bin/env bash -find . -type f -path "*/binding/*" -not -path "*/build/*" -not -path "*/.venv/*" -name "*.py" -exec autopep8 --in-place --max-line-length 160 {} \+ +find . -type f -path "*/binding/*" -not -path "*/build/*" -not -path "*/.venv/*" -name "*.py" -exec black --line-length 160 {} \+ From b7825ba0e898f29b6db39f3ab85798a6367ef58c Mon Sep 17 00:00:00 2001 From: Crypto Chassis Date: Thu, 14 Sep 2023 13:34:06 -0700 Subject: [PATCH 7/7] chore: make graceful exit in data_visualization example --- binding/python/example/data_visualization/main.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/binding/python/example/data_visualization/main.py b/binding/python/example/data_visualization/main.py index 74ec5313..2e5ac4e7 100644 --- a/binding/python/example/data_visualization/main.py +++ b/binding/python/example/data_visualization/main.py @@ -2,6 +2,7 @@ from ccapi import EventHandler, SessionOptions, SessionConfigs, Session, Subscription, Event import matplotlib.pyplot as plt import seaborn as sns +import time if __name__ == "__main__": option = SessionOptions() @@ -12,6 +13,7 @@ subscription = Subscription(exchange, instrument, "MARKET_DEPTH", "MARKET_DEPTH_MAX=400&CONFLATE_INTERVAL_MILLISECONDS=100") session.subscribe(subscription) fig, ax = plt.subplots() + startTime = time.time() while True: bids = {"price": [], "size": []} asks = {"price": [], "size": []} @@ -47,5 +49,7 @@ ) sns.ecdfplot(x="price", weights="size", legend=False, stat="count", data={"price": asks["price"], "size": asks["size"]}, ax=ax, color="r") plt.pause(0.1) + if time.time() - startTime > 10: + break session.stop() print("Bye")