Skip to content

Commit

Permalink
Drop marketstore mod import from CLIs loader
Browse files Browse the repository at this point in the history
Means commenting out the `data.cli.ingest()` as it will be deleted in
the up coming #486 anyway.
  • Loading branch information
goodboy committed Aug 30, 2023
1 parent 7d84b5a commit 19a6f5c
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 51 deletions.
2 changes: 0 additions & 2 deletions piker/cli/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,8 +226,6 @@ async def list_services():


def _load_clis() -> None:
from ..service import marketstore # noqa
from ..service import elastic
from ..data import cli # noqa
from ..brokers import cli # noqa
from ..ui import cli # noqa
Expand Down
98 changes: 49 additions & 49 deletions piker/data/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@
import tractor
import click

from ..service.marketstore import (
# get_client,
# stream_quotes,
ingest_quote_stream,
# _url,
# _tick_tbk_ids,
# mk_tbk,
)
# from ..service.marketstore import (
# # get_client,
# # stream_quotes,
# ingest_quote_stream,
# # _url,
# # _tick_tbk_ids,
# # mk_tbk,
# )
from ..cli import cli
from .. import watchlists as wl
from ._util import (
Expand Down Expand Up @@ -212,44 +212,44 @@ async def main():
trio.run(main)


@cli.command()
@click.option('--test-file', '-t', help='Test quote stream file')
@click.option('--tl', is_flag=True, help='Enable tractor logging')
@click.argument('name', nargs=1, required=True)
@click.pass_obj
def ingest(config, name, test_file, tl):
'''
Ingest real-time broker quotes and ticks to a marketstore instance.
'''
# global opts
loglevel = config['loglevel']
tractorloglevel = config['tractorloglevel']
# log = config['log']

watchlist_from_file = wl.ensure_watchlists(config['wl_path'])
watchlists = wl.merge_watchlist(watchlist_from_file, wl._builtins)
symbols = watchlists[name]

grouped_syms = {}
for sym in symbols:
symbol, _, provider = sym.rpartition('.')
if provider not in grouped_syms:
grouped_syms[provider] = []

grouped_syms[provider].append(symbol)

async def entry_point():
async with tractor.open_nursery() as n:
for provider, symbols in grouped_syms.items():
await n.run_in_actor(
ingest_quote_stream,
name='ingest_marketstore',
symbols=symbols,
brokername=provider,
tries=1,
actorloglevel=loglevel,
loglevel=tractorloglevel
)

tractor.run(entry_point)
# @cli.command()
# @click.option('--test-file', '-t', help='Test quote stream file')
# @click.option('--tl', is_flag=True, help='Enable tractor logging')
# @click.argument('name', nargs=1, required=True)
# @click.pass_obj
# def ingest(config, name, test_file, tl):
# '''
# Ingest real-time broker quotes and ticks to a marketstore instance.

# '''
# # global opts
# loglevel = config['loglevel']
# tractorloglevel = config['tractorloglevel']
# # log = config['log']

# watchlist_from_file = wl.ensure_watchlists(config['wl_path'])
# watchlists = wl.merge_watchlist(watchlist_from_file, wl._builtins)
# symbols = watchlists[name]

# grouped_syms = {}
# for sym in symbols:
# symbol, _, provider = sym.rpartition('.')
# if provider not in grouped_syms:
# grouped_syms[provider] = []

# grouped_syms[provider].append(symbol)

# async def entry_point():
# async with tractor.open_nursery() as n:
# for provider, symbols in grouped_syms.items():
# await n.run_in_actor(
# ingest_quote_stream,
# name='ingest_marketstore',
# symbols=symbols,
# brokername=provider,
# tries=1,
# actorloglevel=loglevel,
# loglevel=tractorloglevel
# )

# tractor.run(entry_point)

0 comments on commit 19a6f5c

Please sign in to comment.