Skip to content

Commit

Permalink
change the way we get recent ticks to clean up and hopefully should a…
Browse files Browse the repository at this point in the history
…lso work without market data subscription
  • Loading branch information
rob committed Jul 13, 2023
1 parent 9e8ea35 commit 88a5a84
Show file tree
Hide file tree
Showing 5 changed files with 133 additions and 79 deletions.
31 changes: 24 additions & 7 deletions sysbrokers/IB/client/ib_price_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

from sysobjects.contracts import futuresContract
from sysexecution.trade_qty import tradeQuantity
from sysexecution.tick_data import get_next_n_ticks_from_ticker_object

TIMEOUT_SECONDS_ON_HISTORICAL_DATA = 20

Expand Down Expand Up @@ -68,12 +69,26 @@ def broker_get_historical_futures_data_for_contract(

return price_data

def get_ticker_object(
def get_ticker_object_with_BS(
self,
contract_object_with_ib_data: futuresContract,
trade_list_for_multiple_legs: tradeQuantity = None,
) -> tickerWithBS:

ib_ticker = self.get_ib_ticker_object(contract_object_with_ib_data)

ib_BS_str, ib_qty = resolveBS_for_list(trade_list_for_multiple_legs)

ticker_with_bs = tickerWithBS(ib_ticker, ib_BS_str)

return ticker_with_bs

def get_ib_ticker_object(
self,
contract_object_with_ib_data: futuresContract,
trade_list_for_multiple_legs: tradeQuantity = None,
) -> "ib.ticker":

specific_log = contract_object_with_ib_data.specific_log(self.log)

try:
Expand All @@ -92,13 +107,14 @@ def get_ticker_object(
self.ib.reqMktData(ibcontract, "", False, False)
ticker = self.ib.ticker(ibcontract)

ib_BS_str, ib_qty = resolveBS_for_list(trade_list_for_multiple_legs)
return ticker

ticker_with_bs = tickerWithBS(ticker, ib_BS_str)

return ticker_with_bs
def cancel_market_data_for_contract(
self, contract_object_with_ib_data: futuresContract
):
self.cancel_market_data_for_contract_and_trade_qty(contract_object_with_ib_data)

def cancel_market_data_for_contract_object(
def cancel_market_data_for_contract_and_trade_qty(
self,
contract_object_with_ib_data: futuresContract,
trade_list_for_multiple_legs: tradeQuantity = None,
Expand All @@ -120,11 +136,12 @@ def cancel_market_data_for_contract_object(

self.ib.cancelMktData(ibcontract)

def ib_get_recent_bid_ask_tick_data(
def _ib_get_recent_bid_ask_tick_data_using_reqHistoricalTicks(
self,
contract_object_with_ib_data: futuresContract,
tick_count=200,
) -> list:
## FIXME DEPRECATE AS DOESN'T WORK WITH DELAYED DATA
"""
:param contract_object_with_ib_data:
Expand Down
98 changes: 39 additions & 59 deletions sysbrokers/IB/ib_futures_contract_price_data.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
from syscore.dateutils import Frequency, DAILY_PRICE_FREQ, MIXED_FREQ
from syscore.exceptions import missingContract, missingData
from sysdata.data_blob import dataBlob

from sysbrokers.IB.ib_futures_contracts_data import ibFuturesContractData
from sysbrokers.IB.ib_instruments_data import ibFuturesInstrumentData
from sysbrokers.IB.ib_translate_broker_order_objects import sign_from_BS, ibBrokerOrder
from sysbrokers.IB.ib_connection import connectionIB
from sysbrokers.IB.client.ib_price_client import tickerWithBS, ibPriceClient
from sysbrokers.broker_futures_contract_price_data import brokerFuturesContractPriceData


from sysexecution.tick_data import tickerObject, dataFrameOfRecentTicks
from sysexecution.orders.contract_orders import contractOrder

from sysexecution.trade_qty import tradeQuantity

from sysobjects.futures_per_contract_prices import futuresContractPrices
from sysobjects.contracts import futuresContract, listOfFuturesContracts
Expand Down Expand Up @@ -45,25 +45,6 @@ def ask_size(self):
return self.ticker.askSize


def from_ib_bid_ask_tick_data_to_dataframe(tick_data) -> dataFrameOfRecentTicks:
"""
:param tick_data: list of HistoricalTickBidAsk()
:return: pd.DataFrame,['priceBid', 'priceAsk', 'sizeAsk', 'sizeBid']
"""
time_index = [tick_item.time for tick_item in tick_data]
fields = ["priceBid", "priceAsk", "sizeAsk", "sizeBid"]

value_dict = {}
for field_name in fields:
field_values = [getattr(tick_item, field_name) for tick_item in tick_data]
value_dict[field_name] = field_values

output = dataFrameOfRecentTicks(value_dict, time_index)

return output


class ibFuturesContractPriceData(brokerFuturesContractPriceData):
"""
Extends the baseData object to a data source that reads in and writes prices for specific futures contracts
Expand Down Expand Up @@ -272,22 +253,41 @@ def _get_prices_at_frequency_for_ibcontract_object_no_checking(
return futuresContractPrices(price_data)

def get_ticker_object_for_order(self, order: contractOrder) -> tickerObject:
contract_object = order.futures_contract
futures_contract = order.futures_contract
trade_list_for_multiple_legs = order.trade

new_log = order.log_with_attributes(self.log)
ticker = self.get_ticker_object_for_contract_and_trade_qty(
futures_contract=futures_contract,
trade_list_for_multiple_legs=trade_list_for_multiple_legs,
)

return ticker

def get_ticker_object_for_contract(
self, futures_contract: futuresContract
) -> tickerObject:
return self.get_ticker_object_for_contract_and_trade_qty(
futures_contract=futures_contract
)

def get_ticker_object_for_contract_and_trade_qty(
self,
futures_contract: futuresContract,
trade_list_for_multiple_legs: tradeQuantity = None,
) -> tickerObject:
new_log = futures_contract.specific_log(self.log)

try:
contract_object_with_ib_data = (
self.futures_contract_data.get_contract_object_with_IB_data(
contract_object
futures_contract
)
)
except missingContract:
new_log.warning("Can't get data for %s" % str(contract_object))
new_log.warning("Can't get data for %s" % str(futures_contract))
return futuresContractPrices.create_empty()

ticker_with_bs = self.ib_client.get_ticker_object(
ticker_with_bs = self.ib_client.get_ticker_object_with_BS(
contract_object_with_ib_data,
trade_list_for_multiple_legs=trade_list_for_multiple_legs,
)
Expand All @@ -296,6 +296,18 @@ def get_ticker_object_for_order(self, order: contractOrder) -> tickerObject:

return ticker_object

def cancel_market_data_for_contract(self, contract: futuresContract):
new_log = contract.specific_log(self.log)
try:
contract_object_with_ib_data = (
self.futures_contract_data.get_contract_object_with_IB_data(contract)
)
except missingContract:
new_log.warning("Can't get data for %s" % str(contract))
return futuresContractPrices.create_empty()

self.ib_client.cancel_market_data_for_contract(contract_object_with_ib_data)

def cancel_market_data_for_order(self, order: ibBrokerOrder):
contract_object = order.futures_contract
trade_list_for_multiple_legs = order.trade
Expand All @@ -312,39 +324,7 @@ def cancel_market_data_for_order(self, order: ibBrokerOrder):
new_log.warning("Can't get data for %s" % str(contract_object))
return futuresContractPrices.create_empty()

self.ib_client.cancel_market_data_for_contract_object(
self.ib_client.cancel_market_data_for_contract_and_trade_qty(
contract_object_with_ib_data,
trade_list_for_multiple_legs=trade_list_for_multiple_legs,
)

def get_recent_bid_ask_tick_data_for_contract_object(
self, contract_object: futuresContract
) -> dataFrameOfRecentTicks:
"""
Get last few price ticks
:param contract_object: futuresContract
:return:
"""
new_log = contract_object.log(self.log)

try:
contract_object_with_ib_data = (
self.futures_contract_data.get_contract_object_with_IB_data(
contract_object
)
)
except missingContract:
new_log.warning("Can't get data for %s" % str(contract_object))
return dataFrameOfRecentTicks.create_empty()

try:
tick_data = self.ib_client.ib_get_recent_bid_ask_tick_data(
contract_object_with_ib_data
)
except missingContract:
raise missingData

tick_data_as_df = from_ib_bid_ask_tick_data_to_dataframe(tick_data)

return tick_data_as_df
8 changes: 5 additions & 3 deletions sysbrokers/broker_futures_contract_price_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from sysexecution.tick_data import tickerObject, dataFrameOfRecentTicks
from sysexecution.orders.contract_orders import contractOrder
from sysexecution.orders.broker_orders import brokerOrder
from sysexecution.trade_qty import tradeQuantity

from sysobjects.futures_per_contract_prices import futuresContractPrices
from sysobjects.contracts import futuresContract, listOfFuturesContracts
Expand Down Expand Up @@ -35,15 +36,16 @@ def get_prices_at_frequency_for_potentially_expired_contract_object(
) -> futuresContractPrices:
raise NotImplementedError

def get_ticker_object_for_contract(self, contract: futuresContract) -> tickerObject:
raise NotImplementedError

def get_ticker_object_for_order(self, order: contractOrder) -> tickerObject:
raise NotImplementedError

def cancel_market_data_for_order(self, order: brokerOrder):
raise NotImplementedError

def get_recent_bid_ask_tick_data_for_contract_object(
self, contract_object: futuresContract
) -> dataFrameOfRecentTicks:
def cancel_market_data_for_contract(self, contract: futuresContract):
raise NotImplementedError

def _write_merged_prices_for_contract_object_no_checking(self, *args, **kwargs):
Expand Down
58 changes: 50 additions & 8 deletions sysexecution/tick_data.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from typing import List
from copy import copy
import numpy as np
import pandas as pd
Expand All @@ -7,7 +8,7 @@
from syscore.exceptions import missingData
from syscore.constants import arg_not_supplied

TICK_REQUIRED_COLUMNS = ["priceAsk", "priceBid", "sizeAsk", "sizeBid"]
TICK_REQUIRED_COLUMNS = ["bid_price", "ask_price", "bid_size", "ask_size"]


class dataFrameOfRecentTicks(pd.DataFrame):
Expand Down Expand Up @@ -50,7 +51,7 @@ def analyse_tick_data_frame(
return results


oneTick = namedtuple("oneTick", ["bid_price", "ask_price", "bid_size", "ask_size"])
oneTick = namedtuple("oneTick", TICK_REQUIRED_COLUMNS)
analysisTick = namedtuple(
"analysisTick",
[
Expand Down Expand Up @@ -87,10 +88,10 @@ def extract_nth_row_of_tick_data_frame(
else:
filled_data = copy(tick_data)

bid_price = filled_data.priceBid[row_id]
ask_price = filled_data.priceAsk[row_id]
bid_size = filled_data.sizeBid[row_id]
ask_size = filled_data.sizeAsk[row_id]
bid_price = filled_data.bid_price[row_id]
ask_price = filled_data.ask_price[row_id]
bid_size = filled_data.bid_size[row_id]
ask_size = filled_data.ask_size[row_id]

return oneTick(bid_price, ask_price, bid_size, ask_size)

Expand All @@ -100,7 +101,7 @@ def average_bid_offer_spread(
) -> float:
if tick_data.is_empty():
raise missingData("Tick data is empty")
all_spreads = tick_data.priceAsk - tick_data.priceBid
all_spreads = tick_data.ask_price - tick_data.bid_price
if remove_negative:
all_spreads[all_spreads < 0] = np.nan
average_spread = all_spreads.mean(skipna=True)
Expand Down Expand Up @@ -190,7 +191,7 @@ def qty(self) -> int:
return qty

@property
def ticks(self) -> list:
def ticks(self) -> List[oneTick]:
return self._ticks

@property
Expand Down Expand Up @@ -364,3 +365,44 @@ def adverse_price_movement(qty: int, price_old: float, price_new: float) -> bool
return True
else:
return False


def get_df_of_ticks_from_ticker_object(
ticker_object: tickerObject, n_ticks: int = 200, time_out_seconds=10
) -> dataFrameOfRecentTicks:
list_of_ticks = get_next_n_ticks_from_ticker_object(
ticker_object=ticker_object, n_ticks=n_ticks, time_out_seconds=time_out_seconds
)
df_of_recent_ticks = from_list_of_ticks_to_dataframe(list_of_ticks)

return df_of_recent_ticks


def get_next_n_ticks_from_ticker_object(
ticker_object: tickerObject, n_ticks: int, time_out_seconds=10
) -> List[oneTick]:
timer = quickTimer(time_out_seconds)
list_of_ticks = []
while len(list_of_ticks) < n_ticks:
tick = ticker_object.current_tick(require_refresh=True)
list_of_ticks.append(tick)
if timer.finished: ## life is too short
break

return ticker_object.ticks


def from_list_of_ticks_to_dataframe(
list_of_ticks: List[oneTick],
) -> dataFrameOfRecentTicks:

fields = TICK_REQUIRED_COLUMNS

value_dict = {}
for field_name in fields:
field_values = [getattr(tick_item, field_name) for tick_item in list_of_ticks]
value_dict[field_name] = field_values

output = dataFrameOfRecentTicks(value_dict)

return output
17 changes: 15 additions & 2 deletions sysproduction/data/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@

from sysexecution.orders.broker_orders import brokerOrder
from sysexecution.orders.list_of_orders import listOfOrders
from sysexecution.tick_data import dataFrameOfRecentTicks
from sysexecution.tick_data import (
dataFrameOfRecentTicks,
get_df_of_ticks_from_ticker_object,
)
from sysexecution.tick_data import analyse_tick_data_frame, tickerObject, analysisTick
from sysexecution.orders.contract_orders import contractOrder
from sysexecution.trade_qty import tradeQuantity
Expand Down Expand Up @@ -160,9 +163,14 @@ def get_prices_at_frequency_for_contract_object(
def get_recent_bid_ask_tick_data_for_contract_object(
self, contract: futuresContract
) -> dataFrameOfRecentTicks:
return self.broker_futures_contract_price_data.get_recent_bid_ask_tick_data_for_contract_object(

ticker = self.broker_futures_contract_price_data.get_ticker_object_for_contract(
contract
)
ticker_df = get_df_of_ticks_from_ticker_object(ticker)
self.cancel_market_data_for_contract(contract)

return ticker_df

def get_actual_expiry_date_for_single_contract(
self, contract_object: futuresContract
Expand Down Expand Up @@ -250,6 +258,11 @@ def get_ticker_object_for_order(self, order: contractOrder) -> tickerObject:
def cancel_market_data_for_order(self, order: brokerOrder):
self.broker_futures_contract_price_data.cancel_market_data_for_order(order)

def cancel_market_data_for_contract(self, contract: futuresContract):
self.broker_futures_contract_price_data.cancel_market_data_for_contract(
contract
)

def get_broker_account(self) -> str:
return self.broker_static_data.get_broker_account()

Expand Down

0 comments on commit 88a5a84

Please sign in to comment.