diff --git a/piker/data/_sampling.py b/piker/data/_sampling.py index 4c5aadedf..160f3fc98 100644 --- a/piker/data/_sampling.py +++ b/piker/data/_sampling.py @@ -246,7 +246,7 @@ async def sample_and_broadcast( if tick_throttle: # this is a send mem chan that likely # pushes to the ``uniform_rate_send()`` below. - await stream.send(quote) + await stream.send((sym, quote)) else: await stream.send({sym: quote}) @@ -285,10 +285,14 @@ async def uniform_rate_send( sleep_period = 1/rate - 0.000616 last_send = time.time() + aname = stream._ctx.chan.uid[0] + fsp = False + if 'fsp' in aname: + fsp = True while True: - first_quote = await quote_stream.receive() + sym, first_quote = await quote_stream.receive() start = time.time() # append quotes since last iteration into the last quote's @@ -301,7 +305,7 @@ async def uniform_rate_send( # while True: try: - next_quote = quote_stream.receive_nowait() + sym, next_quote = quote_stream.receive_nowait() ticks = next_quote.get('ticks') if ticks: @@ -312,12 +316,12 @@ async def uniform_rate_send( rate = 1 / (now - last_send) last_send = now - # print(f'{rate} Hz sending quotes') # \n{first_quote}') + # log.info(f'{rate} Hz sending quotes') # \n{first_quote}') # TODO: now if only we could sync this to the display # rate timing exactly lul try: - await stream.send({first_quote['symbol']: first_quote}) + await stream.send({sym: first_quote}) break except trio.ClosedResourceError: # if the feed consumer goes down then drop diff --git a/piker/data/_sharedmem.py b/piker/data/_sharedmem.py index f7f9f9047..2169e262c 100644 --- a/piker/data/_sharedmem.py +++ b/piker/data/_sharedmem.py @@ -31,7 +31,7 @@ import numpy as np from ..log import get_logger -from ._source import base_ohlc_dtype, base_iohlc_dtype +from ._source import base_iohlc_dtype log = get_logger(__name__) @@ -168,6 +168,7 @@ def __init__( self._len = len(shmarr) self._shm = shm + self._post_init: bool = False # pushing data does not write the index (aka primary key) self._write_fields = list(shmarr.dtype.fields.keys())[1:] @@ -196,7 +197,24 @@ def index(self) -> int: @property def array(self) -> np.ndarray: - return self._array[self._first.value:self._last.value] + '''Return an up-to-date ``np.ndarray`` view of the + so-far-written data to the underlying shm buffer. + + ''' + a = self._array[self._first.value:self._last.value] + + # first, last = self._first.value, self._last.value + # a = self._array[first:last] + + # TODO: eventually comment this once we've not seen it in the + # wild in a long time.. + # XXX: race where first/last indexes cause a reader + # to load an empty array.. + if len(a) == 0 and self._post_init: + raise RuntimeError('Empty array race condition hit!?') + # breakpoint() + + return a def last( self, @@ -209,6 +227,7 @@ def push( data: np.ndarray, prepend: bool = False, + start: Optional[int] = None, ) -> int: '''Ring buffer like "push" to append data @@ -217,12 +236,18 @@ def push( NB: no actual ring logic yet to give a "loop around" on overflow condition, lel. ''' + self._post_init = True length = len(data) + index = start or self._last.value if prepend: index = self._first.value - length - else: - index = self._last.value + + if index < 0: + raise ValueError( + f'Array size of {self._len} was overrun during prepend.\n' + 'You have passed {abs(index)} too many datums.' + ) end = index + length @@ -230,11 +255,22 @@ def push( try: self._array[fields][index:end] = data[fields][:] + + # NOTE: there was a race here between updating + # the first and last indices and when the next reader + # tries to access ``.array`` (which due to the index + # overlap will be empty). Pretty sure we've fixed it now + # but leaving this here as a reminder. if prepend: + assert index < self._first.value + + if index < self._first.value: self._first.value = index else: self._last.value = end + return end + except ValueError as err: # shoudl raise if diff detected self.diff_err_fields(data) @@ -290,20 +326,25 @@ def flush(self) -> None: # how much is probably dependent on lifestyle -_secs_in_day = int(60 * 60 * 12) -_default_size = 2 * _secs_in_day +_secs_in_day = int(60 * 60 * 24) +# we try for 3 times but only on a run-every-other-day kinda week. +_default_size = 3 * _secs_in_day + def open_shm_array( + key: Optional[str] = None, size: int = _default_size, dtype: Optional[np.dtype] = None, readonly: bool = False, + ) -> ShmArray: - """Open a memory shared ``numpy`` using the standard library. + '''Open a memory shared ``numpy`` using the standard library. This call unlinks (aka permanently destroys) the buffer on teardown and thus should be used from the parent-most accessor (process). - """ + + ''' # create new shared mem segment for which we # have write permission a = np.zeros(size, dtype=dtype) diff --git a/piker/fsp/__init__.py b/piker/fsp/__init__.py index f3f2d5dee..5e88ed696 100644 --- a/piker/fsp/__init__.py +++ b/piker/fsp/__init__.py @@ -1,5 +1,5 @@ # piker: trading gear for hackers -# Copyright (C) 2018-present Tyler Goodlet (in stewardship of piker0) +# Copyright (C) Tyler Goodlet (in stewardship of piker0) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by @@ -14,33 +14,17 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . -""" -Financial signal processing for the peeps. -""" -from functools import partial -from typing import AsyncIterator, Callable, Tuple, Optional +''' +Fin-sig-proc for the peeps! -import trio -from trio_typing import TaskStatus -import tractor -import numpy as np - -from ..log import get_logger, get_console_log -from .. import data -from ._momo import _rsi, _wma -from ._volume import _tina_vwap -from ..data import attach_shm_array -from ..data.feed import Feed -from ..data._sharedmem import ShmArray +''' +from typing import AsyncIterator -log = get_logger(__name__) +import numpy as np +from ._engine import cascade -_fsps = { - 'rsi': _rsi, - 'wma': _wma, - 'vwap': _tina_vwap, -} +__all__ = ['cascade'] async def latency( @@ -63,183 +47,3 @@ async def latency( # stack tracing. value = quote['brokerd_ts'] - quote['broker_ts'] yield value - - -async def fsp_compute( - ctx: tractor.Context, - symbol: str, - feed: Feed, - stream: trio.abc.ReceiveChannel, - - src: ShmArray, - dst: ShmArray, - - fsp_func_name: str, - func: Callable, - - task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED, - -) -> None: - - # TODO: load appropriate fsp with input args - - async def filter_by_sym( - sym: str, - stream, - ): - - # TODO: make this the actualy first quote from feed - # XXX: this allows for a single iteration to run for history - # processing without waiting on the real-time feed for a new quote - yield {} - - # task cancellation won't kill the channel - # since we shielded at the `open_feed()` call - async for quotes in stream: - for symbol, quotes in quotes.items(): - if symbol == sym: - yield quotes - - out_stream = func( - filter_by_sym(symbol, stream), - feed.shm, - ) - - # TODO: XXX: - # THERE'S A BIG BUG HERE WITH THE `index` field since we're - # prepending a copy of the first value a few times to make - # sub-curves align with the parent bar chart. - # This likely needs to be fixed either by, - # - manually assigning the index and historical data - # seperately to the shm array (i.e. not using .push()) - # - developing some system on top of the shared mem array that - # is `index` aware such that historical data can be indexed - # relative to the true first datum? Not sure if this is sane - # for incremental compuations. - dst._first.value = src._first.value - dst._last.value = src._first.value - - # Conduct a single iteration of fsp with historical bars input - # and get historical output - history_output = await out_stream.__anext__() - - # build a struct array which includes an 'index' field to push - # as history - history = np.array( - np.arange(len(history_output)), - dtype=dst.array.dtype - ) - history[fsp_func_name] = history_output - - # check for data length mis-allignment and fill missing values - diff = len(src.array) - len(history) - if diff >= 0: - log.warning(f"WTF DIFF SIGNAL to HISTORY {diff}") - for _ in range(diff): - dst.push(history[:1]) - - # compare with source signal and time align - index = dst.push(history) - - await ctx.send_yield(index) - - # setup a respawn handle - with trio.CancelScope() as cs: - task_status.started(cs) - - # rt stream - async for processed in out_stream: - - # period = time.time() - last - # hz = 1/period if period else float('nan') - # if hz > 60: - # log.info(f'FSP quote too fast: {hz}') - - log.debug(f"{fsp_func_name}: {processed}") - index = src.index - dst.array[-1][fsp_func_name] = processed - - # stream latest shm array index entry - await ctx.send_yield(index) - - -@tractor.stream -async def cascade( - ctx: tractor.Context, - brokername: str, - src_shm_token: dict, - dst_shm_token: Tuple[str, np.dtype], - symbol: str, - fsp_func_name: str, - loglevel: Optional[str] = None, - -) -> None: - '''Chain streaming signal processors and deliver output to - destination mem buf. - - ''' - if loglevel: - get_console_log(loglevel) - - src = attach_shm_array(token=src_shm_token) - dst = attach_shm_array(readonly=False, token=dst_shm_token) - - func: Callable = _fsps[fsp_func_name] - - # open a data feed stream with requested broker - async with data.feed.maybe_open_feed( - brokername, - [symbol], - - # TODO: - # tick_throttle=60, - - ) as (feed, stream): - - assert src.token == feed.shm.token - - last_len = new_len = len(src.array) - - fsp_target = partial( - fsp_compute, - ctx=ctx, - symbol=symbol, - feed=feed, - stream=stream, - - src=src, - dst=dst, - - fsp_func_name=fsp_func_name, - func=func - ) - - async with trio.open_nursery() as n: - - cs = await n.start(fsp_target) - - # Increment the underlying shared memory buffer on every - # "increment" msg received from the underlying data feed. - - async with feed.index_stream() as stream: - async for msg in stream: - - new_len = len(src.array) - - if new_len > last_len + 1: - # respawn the signal compute task if the source - # signal has been updated - cs.cancel() - cs = await n.start(fsp_target) - - # TODO: adopt an incremental update engine/approach - # where possible here eventually! - - # read out last shm row - array = dst.array - last = array[-1:].copy() - - # write new row to the shm buffer - dst.push(last) - - last_len = new_len diff --git a/piker/fsp/_engine.py b/piker/fsp/_engine.py new file mode 100644 index 000000000..883f58530 --- /dev/null +++ b/piker/fsp/_engine.py @@ -0,0 +1,342 @@ +# piker: trading gear for hackers +# Copyright (C) Tyler Goodlet (in stewardship of piker0) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +''' +core task logic for processing chains + +''' +from dataclasses import dataclass +from functools import partial +from typing import AsyncIterator, Callable, Optional + +import numpy as np +import pyqtgraph as pg +import trio +from trio_typing import TaskStatus +import tractor + +from ..log import get_logger, get_console_log +from .. import data +from ..data import attach_shm_array +from ..data.feed import Feed +from ..data._sharedmem import ShmArray +from ._momo import _rsi, _wma +from ._volume import _tina_vwap + +log = get_logger(__name__) + +_fsp_builtins = { + 'rsi': _rsi, + 'wma': _wma, + 'vwap': _tina_vwap, +} + +# TODO: things to figure the heck out: +# - how to handle non-plottable values (pyqtgraph has facility for this +# now in `arrayToQPath()`) +# - composition of fsps / implicit chaining syntax (we need an issue) + + +@dataclass +class TaskTracker: + complete: trio.Event + cs: trio.CancelScope + + +async def filter_quotes_by_sym( + + sym: str, + quote_stream: tractor.MsgStream, + +) -> AsyncIterator[dict]: + ''' + Filter quote stream by target symbol. + + ''' + # TODO: make this the actual first quote from feed + # XXX: this allows for a single iteration to run for history + # processing without waiting on the real-time feed for a new quote + yield {} + + async for quotes in quote_stream: + quote = quotes.get(sym) + if quote: + yield quote + + +async def fsp_compute( + + stream: tractor.MsgStream, + symbol: str, + feed: Feed, + quote_stream: trio.abc.ReceiveChannel, + + src: ShmArray, + dst: ShmArray, + + func_name: str, + func: Callable, + + attach_stream: bool = False, + task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED, + +) -> None: + + profiler = pg.debug.Profiler( + delayed=False, + disabled=True + ) + + out_stream = func( + + # TODO: do we even need this if we do the feed api right? + # shouldn't a local stream do this before we get a handle + # to the async iterable? it's that or we do some kinda + # async itertools style? + filter_quotes_by_sym(symbol, quote_stream), + feed.shm, + ) + + # Conduct a single iteration of fsp with historical bars input + # and get historical output + history_output = await out_stream.__anext__() + + profiler(f'{func_name} generated history') + + # build a struct array which includes an 'index' field to push + # as history + history = np.array( + np.arange(len(history_output)), + dtype=dst.array.dtype + ) + history[func_name] = history_output + + # TODO: XXX: + # THERE'S A BIG BUG HERE WITH THE `index` field since we're + # prepending a copy of the first value a few times to make + # sub-curves align with the parent bar chart. + # This likely needs to be fixed either by, + # - manually assigning the index and historical data + # seperately to the shm array (i.e. not using .push()) + # - developing some system on top of the shared mem array that + # is `index` aware such that historical data can be indexed + # relative to the true first datum? Not sure if this is sane + # for incremental compuations. + first = dst._first.value = src._first.value + + # TODO: can we use this `start` flag instead of the manual + # setting above? + index = dst.push(history, start=first) + + profiler(f'{func_name} pushed history') + profiler.finish() + + # setup a respawn handle + with trio.CancelScope() as cs: + tracker = TaskTracker(trio.Event(), cs) + task_status.started((tracker, index)) + profiler(f'{func_name} yield last index') + + # import time + # last = time.time() + + try: + # rt stream + async for processed in out_stream: + + log.debug(f"{func_name}: {processed}") + index = src.index + dst.array[-1][func_name] = processed + + # NOTE: for now we aren't streaming this to the consumer + # stream latest array index entry which basically just acts + # as trigger msg to tell the consumer to read from shm + if attach_stream: + await stream.send(index) + + # period = time.time() - last + # hz = 1/period if period else float('nan') + # if hz > 60: + # log.info(f'FSP quote too fast: {hz}') + # last = time.time() + finally: + tracker.complete.set() + + +@tractor.context +async def cascade( + + ctx: tractor.Context, + brokername: str, + + src_shm_token: dict, + dst_shm_token: tuple[str, np.dtype], + + symbol: str, + func_name: str, + zero_on_step: bool = False, + + loglevel: Optional[str] = None, + +) -> None: + ''' + Chain streaming signal processors and deliver output to + destination shm array buffer. + + ''' + profiler = pg.debug.Profiler(delayed=False, disabled=False) + + if loglevel: + get_console_log(loglevel) + + src = attach_shm_array(token=src_shm_token) + dst = attach_shm_array(readonly=False, token=dst_shm_token) + + func: Callable = _fsp_builtins.get(func_name) + if not func: + # TODO: assume it's a func target path + raise ValueError('Unknown fsp target: {func_name}') + + # open a data feed stream with requested broker + async with data.feed.maybe_open_feed( + brokername, + [symbol], + + # TODO throttle tick outputs from *this* daemon since + # it'll emit tons of ticks due to the throttle only + # limits quote arrival periods, so the consumer of *this* + # needs to get throttled the ticks we generate. + # tick_throttle=60, + + ) as (feed, quote_stream): + + profiler(f'{func_name}: feed up') + + assert src.token == feed.shm.token + # last_len = new_len = len(src.array) + + async with ( + ctx.open_stream() as stream, + trio.open_nursery() as n, + ): + + fsp_target = partial( + + fsp_compute, + stream=stream, + symbol=symbol, + feed=feed, + quote_stream=quote_stream, + + # shm + src=src, + dst=dst, + + func_name=func_name, + func=func + ) + + tracker, index = await n.start(fsp_target) + + if zero_on_step: + last = dst.array[-1:] + zeroed = np.zeros(last.shape, dtype=last.dtype) + + await ctx.started(index) + profiler(f'{func_name}: fsp up') + + async def resync(tracker: TaskTracker) -> tuple[TaskTracker, int]: + # TODO: adopt an incremental update engine/approach + # where possible here eventually! + log.warning(f're-syncing fsp {func_name} to source') + tracker.cs.cancel() + await tracker.complete.wait() + return await n.start(fsp_target) + + def is_synced( + src: ShmArray, + dst: ShmArray + ) -> tuple[bool, int, int]: + '''Predicate to dertmine if a destination FSP + output array is aligned to its source array. + + ''' + step_diff = src.index - dst.index + len_diff = abs(len(src.array) - len(dst.array)) + return not ( + # the source is likely backfilling and we must + # sync history calculations + len_diff > 2 or + + # we aren't step synced to the source and may be + # leading/lagging by a step + step_diff > 1 or + step_diff < 0 + ), step_diff, len_diff + + async def poll_and_sync_to_step( + + tracker: TaskTracker, + src: ShmArray, + dst: ShmArray, + + ) -> tuple[TaskTracker, int]: + + synced, step_diff, _ = is_synced(src, dst) + while not synced: + tracker, index = await resync(tracker) + synced, step_diff, _ = is_synced(src, dst) + + return tracker, step_diff + + s, step, ld = is_synced(src, dst) + + # Increment the underlying shared memory buffer on every + # "increment" msg received from the underlying data feed. + async with feed.index_stream() as stream: + + profiler(f'{func_name}: sample stream up') + profiler.finish() + + async for msg in stream: + + # respawn the compute task if the source + # array has been updated such that we compute + # new history from the (prepended) source. + synced, step_diff, _ = is_synced(src, dst) + if not synced: + tracker, step_diff = await poll_and_sync_to_step( + tracker, + src, + dst, + ) + + # skip adding a last bar since we should already + # be step alinged + if step_diff == 0: + continue + + # read out last shm row, copy and write new row + array = dst.array + + # some metrics like vlm should be reset + # to zero every step. + if zero_on_step: + last = zeroed + else: + last = array[-1:].copy() + + dst.push(last) diff --git a/piker/fsp/_momo.py b/piker/fsp/_momo.py index f8811afa5..78461d8a1 100644 --- a/piker/fsp/_momo.py +++ b/piker/fsp/_momo.py @@ -16,6 +16,7 @@ """ Momentum bby. + """ from typing import AsyncIterator, Optional @@ -23,12 +24,9 @@ from numba import jit, float64, optional, int64 from ..data._normalize import iterticks +from ..data._sharedmem import ShmArray -# TODO: things to figure the fuck out: -# - how to handle non-plottable values -# - composition of fsps / implicit chaining - @jit( float64[:]( float64[:], @@ -39,11 +37,14 @@ nogil=True ) def ema( + y: 'np.ndarray[float64]', alpha: optional(float64) = None, ylast: optional(float64) = None, + ) -> 'np.ndarray[float64]': - r"""Exponential weighted moving average owka 'Exponential smoothing'. + r''' + Exponential weighted moving average owka 'Exponential smoothing'. - https://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average - https://en.wikipedia.org/wiki/Exponential_smoothing @@ -68,7 +69,8 @@ def ema( More discussion here: https://stackoverflow.com/questions/42869495/numpy-version-of-exponential-weighted-moving-average-equivalent-to-pandas-ewm - """ + + ''' n = y.shape[0] if alpha is None: @@ -105,14 +107,21 @@ def ema( # nogil=True # ) def rsi( + + # TODO: use https://github.com/ramonhagenaars/nptyping signal: 'np.ndarray[float64]', period: int64 = 14, up_ema_last: float64 = None, down_ema_last: float64 = None, + ) -> 'np.ndarray[float64]': + ''' + relative strengggth. + + ''' alpha = 1/period - df = np.diff(signal) + df = np.diff(signal, prepend=0) up = np.where(df > 0, df, 0) up_ema = ema(up, alpha, up_ema_last) @@ -120,11 +129,12 @@ def rsi( down = np.where(df < 0, -df, 0) down_ema = ema(down, alpha, down_ema_last) - # avoid dbz errors + # avoid dbz errors, this leaves the first + # index == 0 right? rs = np.divide( up_ema, down_ema, - out=np.zeros_like(up_ema), + out=np.zeros_like(signal), where=down_ema != 0 ) @@ -137,10 +147,18 @@ def rsi( def wma( + signal: np.ndarray, length: int, weights: Optional[np.ndarray] = None, + ) -> np.ndarray: + ''' + Compute a windowed moving average of ``signal`` with window + ``length`` and optional ``weights`` (must be same size as + ``signal``). + + ''' if weights is None: # default is a standard arithmetic mean seq = np.full((length,), 1) @@ -151,18 +169,22 @@ def wma( return np.convolve(signal, weights, 'valid') -# @piker.fsp.signal( +# @piker.fsp.emit( # timeframes=['1s', '5s', '15s', '1m', '5m', '1H'], # ) async def _rsi( + source: 'QuoteStream[Dict[str, Any]]', # noqa - ohlcv: "ShmArray[T<'close'>]", + ohlcv: ShmArray, period: int = 14, + ) -> AsyncIterator[np.ndarray]: - """Multi-timeframe streaming RSI. + ''' + Multi-timeframe streaming RSI. https://en.wikipedia.org/wiki/Relative_strength_index - """ + + ''' sig = ohlcv.array['close'] # wilder says to seed the RSI EMAs with the SMA for the "period" @@ -170,7 +192,8 @@ async def _rsi( # TODO: the emas here should be seeded with a period SMA as per # wilder's original formula.. - rsi_h, last_up_ema_close, last_down_ema_close = rsi(sig, period, seed, seed) + rsi_h, last_up_ema_close, last_down_ema_close = rsi( + sig, period, seed, seed) up_ema_last = last_up_ema_close down_ema_last = last_down_ema_close @@ -178,7 +201,6 @@ async def _rsi( yield rsi_h index = ohlcv.index - async for quote in source: # tick based updates for tick in iterticks(quote): @@ -206,16 +228,20 @@ async def _rsi( async def _wma( + source, #: AsyncStream[np.ndarray], length: int, ohlcv: np.ndarray, # price time-frame "aware" + ) -> AsyncIterator[np.ndarray]: # maybe something like like FspStream? - """Streaming weighted moving average. + ''' + Streaming weighted moving average. ``weights`` is a sequence of already scaled values. As an example for the WMA often found in "techincal analysis": ``weights = np.arange(1, N) * N*(N-1)/2``. - """ + + ''' # deliver historical output as "first yield" yield wma(ohlcv.array['close'], length) diff --git a/piker/ui/_display.py b/piker/ui/_display.py index 483db8a8f..ab85e761f 100644 --- a/piker/ui/_display.py +++ b/piker/ui/_display.py @@ -323,7 +323,7 @@ async def fan_out_spawn_fsp_daemons( conf['shm'] = shm portal = await n.start_actor( - enable_modules=['piker.fsp'], + enable_modules=['piker.fsp._engine'], name='fsp.' + display_name, )