Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implemented max pain daemon #482

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions piker/_daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,7 @@ async def open_pikerd(
# db init flags
tsdb: bool = False,
es: bool = False,
mpd: bool = False,

) -> Services:
'''
Expand Down Expand Up @@ -396,6 +397,14 @@ async def open_pikerd(
f'config: {pformat(config)}'
)

if mpd:
from piker.data._max_pain_daemon import start_max_pain_daemon

start_max_pain_daemon()

log.info('Collecting data from deribit...')


# assign globally for future daemon/task creation
Services.actor_n = actor_nursery
Services.service_n = service_nursery
Expand Down
7 changes: 7 additions & 0 deletions piker/cli/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@
is_flag=True,
help='Enable local ``elasticsearch`` instance'
)
@click.option(
'--mpd',
is_flag=True,
help='Read from deribit and dump data to elastic db'
)
def pikerd(
loglevel: str,
host: str,
Expand All @@ -62,6 +67,7 @@ def pikerd(
pdb: bool,
tsdb: bool,
es: bool,
mpd: bool,
):
'''
Spawn the piker broker-daemon.
Expand Down Expand Up @@ -92,6 +98,7 @@ async def main():
open_pikerd(
tsdb=tsdb,
es=es,
mpd=mpd,
loglevel=loglevel,
debug_mode=pdb,
registry_addr=reg_addr,
Expand Down
132 changes: 132 additions & 0 deletions piker/data/_max_pain_daemon.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
#!/usr/bin/env python
import time
import json
from datetime import datetime, timedelta
import requests

from cryptofeed import FeedHandler
from cryptofeed.callback import OrderInfoCallback, BalancesCallback, UserFillsCallback
from cryptofeed.exchanges.deribit import Deribit
from cryptofeed.defines import DERIBIT, TRADES, OPEN_INTEREST, OPTION, CALL, PUT
from cryptofeed.symbols import Symbol

from elasticsearch import Elasticsearch

from .elastic import (
ES_HOST,
oi_mapping,
trades_mapping,
es_prefix
)

def start_max_pain_daemon():

config = \
{
'log':{
'filename': 'feedhandler.log',
'level': 'INFO'
},
'deribit': {
'key_id': 'KPJN6bFQ',
'key_secret': 'TMPatQRXMQUQ83OCVoqYomcMvPYpTUPOUqayuJJ3zMA'
}
}

# maybe import this function from ṕiker.brokers.deribit.api
def piker_sym_to_cb_sym(name: str) -> Symbol:
base, expiry_date, strike_price, option_type = tuple(
name.upper().split('-'))
quote = base

if option_type == 'P':
option_type = PUT
elif option_type == 'C':
option_type = CALL
else:
raise Exception("Couldn\'t parse option type")

return Symbol(
base,
quote,
type=OPTION,
strike_price=strike_price,
option_type=option_type,
expiry_date=expiry_date.upper()
)

def get_instruments(currency, kind):
payload = {'currency': currency, 'kind': kind}
r = requests.get('https://test.deribit.com/api/v2/public/get_instruments', params=payload)
resp = json.loads(r.text)
response_list = []

# For now only get half of the instruments
for i in range(len(resp['result']) // 2):
element = resp['result'][i]
response_list.append(piker_sym_to_cb_sym(element['instrument_name']))

return response_list

# trade and oi are user defined functions that
# will be called when trade and open interest updates are received
# data type is not dict, is an object: cryptofeed.types.OpenINterest
async def oi(data: dict, receipt_timestamp):

# Get timestamp and convert it to isoformat
date = (datetime.utcfromtimestamp(data.timestamp)).isoformat()

index = es_prefix(data.symbol, 'oi')
document = {
'timestamp': date,
'open_interest': data.open_interest
}
#Save to db
es.index(index=index, document=document)
print('Saving to db...')
print(date)
print(data)

# Data type is not dict, is an object: cryptofeed.types.Ticker
async def trade(data: dict, receipt_timestamp):
# Get timestamp and convert it to isoformat
date = (datetime.utcfromtimestamp(data.timestamp)).isoformat()

index = es_prefix(data.symbol, 'trades')
document = {
'direction': data.side,
'amount': data.amount,
'price': data.price,
'timestamp': date,
}
#Save to db
es.index(index=index, document=document)
print('Saving to db...')
print(date)
print(data)

callbacks = {TRADES: trade, OPEN_INTEREST: oi}

fh = FeedHandler(config=config)

fh.add_feed(
DERIBIT,
channels=[TRADES, OPEN_INTEREST],
symbols=get_instruments('BTC', 'option'),
callbacks=callbacks
)

es = Elasticsearch(ES_HOST)

es.indices.put_template(
name='oi_mapping',
body=oi_mapping)

es.indices.put_template(
name='trades_mapping',
body=trades_mapping)

fh.run()

# if __name__ == '__main__':
# main()
69 changes: 69 additions & 0 deletions piker/data/elastic.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
)

import asks
import math


log = get_logger(__name__)
Expand Down Expand Up @@ -107,3 +108,71 @@ async def stop_matcher(msg: str):
start_matcher,
stop_matcher,
)



ES_HOST = 'http://localhost:19200'

shards = 2;
replicas = 0;
refresh = '1s';

compression = 'best_compression';

default_idx_settings = {
'index': {
'number_of_shards': shards,
'refresh_interval': refresh,
'number_of_replicas': replicas,
'codec': compression
}
};

oi_mapping = {
'order': 0,
'index_patterns': [
'*-oi'
],
'settings': default_idx_settings,
'mappings': {
'properties': {
'timestamp': {'type': 'date'},
'open_interest': {'type': 'double'}
}
}
};

trades_mapping = {
'order': 0,
'index_patterns': [
'*-trades'
],
'settings': default_idx_settings,
'mappings': {
'properties': {
'direction': {'type': 'keyword'},
'amount': {'type': 'double'},
'price': {'type': 'double'},
'timestamp': {'type': 'date'}
}
}
};

max_pain_mapping = {
'order': 0,
'index_patterns': [
'*-max-pain'
],
'settings': default_idx_settings,
'mappings': {
'properties': {
'max_pain': {'type': 'double'},
'dollar_value': {'type': 'double'},
'timestamp': {'type': 'date'}
}
}
}


def es_prefix(instrument_name, kind):
return f'{instrument_name.lower()}-{kind}'
3 changes: 2 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@
],
'es': [
'docker',
'elasticsearch'
'elasticsearch',
'cryptofeed',
]
},
tests_require=['pytest'],
Expand Down