Skip to content

Commit

Permalink
finish first draft of polygon update data
Browse files Browse the repository at this point in the history
  • Loading branch information
briangu committed Sep 24, 2023
1 parent d60fc0c commit 570662f
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 37 deletions.
4 changes: 3 additions & 1 deletion examples/db/dfs.kg
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,11 @@ a
tbs::.tables(.os.argv@0)

:" TODO: we need this because we don't yet proxy tbs to the client "
:" read::{[d];d::tbs?x;:[:_d;d;.rindex(d)];d} "
read::{tbs?x}
write::{tbs,x,y;1}
write::{:[(#x)>0;tbs,x,y;0];1}

.srv(8888)

.p("dfs server at port 8888")

53 changes: 30 additions & 23 deletions examples/polygon/update_data.kg
Original file line number Diff line number Diff line change
@@ -1,35 +1,42 @@
.pyf("polygon";"RESTClient")
.pyf("requests";"get")
.pyf("numpy";"mean")
.py("json")
.comment("****")

rc::RESTClient()
Get a list of all tickers from Polygon.
For each ticker, get the latest stored time from the dfs and use that as the start time for the next request
map the new ticker data to the columns of the dfs
append the new data to the dfs
write the data back to the dfs

symbols::.pyc([rc "list_tickers"];[];:{["market" "stocks"] ["limit" 1000]})
tickers::{x;.pyc(x,"ticker";[];:{})}'symbols
****

.py("klongpy.db")
.pyf("polygon";"RESTClient")
.pyf("requests";"get")

callpoly::{[r];x;.p(x);r::get(x,"&apiKey=",.os.env?"POLYGON_API_KEY");:[.pyc([r "status_code"];[];:{})=200;.pyc([r "json"];[];:{});0]}
paginate::{[a];a::[];{:[x;{a::a,(x?"results");~:_x?"next_url"}(x);x]}{callpoly(x?"next_url")}:~callpoly(x);a}
getresults::{:[(x?"resultsCount")>0;x?"results";[]]}
paginate::{[a];a::[];{:[x;{a::a,getresults(x);~:_x?"next_url"}(x);x]}{callpoly(x?"next_url")}:~callpoly(x);a}

aggsurl::{"https://api.polygon.io/v2/aggs/ticker/",(x?"symbol"),"/range/1/",(x?"timespan"),"/",(x?"from"),"/",(x?"to"),"?adjusted=true&sort=asc&limit=",(x?"limit")}
u::aggsurl(:{["symbol" "AAPL"] ["timespan" "hour"] ["from" "2022-09-09"] ["to" "2023-09-09"] ["limit" "50000"]})
aapl::paginate(u)

closes::{x?"c"}'aapl

.comment("****")

for each ticker, get the latest stored time from the dfs and use that as the start time for the next request
map the new ticker data to the columns of the dfs
append the new data to the dfs
:" open connection to the dfs "
cols::["t" "c" "h" "l" "n" "o" "v" "vw"]
colsFromNames::{{x,,[]}'x}
newt::{.table(colsFromNames(cols))}

aggs::{[a];a:::{["multiplier" 1] ["from_" "2023-01-09"] ["to" "2023-09-09"] ["raw" 1]};a::a,"ticker",,x;a::a,"timespan",,y}
data::.pyc([rc "list_aggs"];[];aggs("AAPL";"day"))
.p(#data)
:" open connection to the dfs server "
cli::.cli(8888)

closes::{x;.pyc(x,"close";[];:{})}'data
latest::{:[:_x;"2022-09-09";:[(#x)>0;0$((x?"t")@-1);"2022-09-09"]]}
mkd::{[d];d:::{};d,"symbol",,x;d,"timespan",,y;d,"from",,z;d,"to",,"2023-09-09";d,"limit",,"50000";d}
readtbl::{[dfs sym tbl];dfs::x;sym::y;tbl::dfs(:read,sym);:[:_tbl;newt();tbl]}
extract::{[d];d::x;{d?x}'cols}
updtbl::{[tbl];tbl::x;{.insert(tbl;extract(x))}'y}
update::{[dfs sym ts path tbl d];dfs::x;sym::y;ts::z;path::ts,"/",sym;tbl::readtbl(dfs;path);d::paginate(aggsurl(mkd(sym;ts;latest(tbl))));.d("retrieved: ");updtbl(tbl;d);:[(#d)>0;dfs(:write,path,tbl);"skipping"]}

****
:" Get the list of tickers from polygon "
rc::RESTClient()
symbols::.pyc([rc "list_tickers"];[];:{["market" "stocks"] ["limit" 1000]});1
tickers::{x;.pyc(x,"ticker";[];:{})}'symbols;1

.d("avg close prices");.p(mean(closes))
{.d("updating: ");.p(x);update(cli;x;"hour")}'tickers

29 changes: 16 additions & 13 deletions klongpy/db/sys_fn_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,17 +70,20 @@ def set_index(self, idx_cols):
def reset_index(self):
df = self.get_dataframe()
self._df = df.reset_index()
iic = [f"{ic}_idx" for ic in self.idx_cols]
self._df.drop(columns=iic, inplace=True)
self.idx_cols = None
if self.idx_cols is not None:
iic = [f"{ic}_idx" for ic in self.idx_cols]
self._df.drop(columns=iic, inplace=True)
self.idx_cols = None

def has_index(self):
# TODO: add tests for loading pandas dataframes with indexes but no idx_cols
# return (not isinstance(self._df.index, pd.RangeIndex)) or self.idx_cols is not None
return self.idx_cols is not None

def get_dataframe(self):
self.commit()
return self._df

def insert(self, y):
self.buffer.append(y)

Expand All @@ -100,10 +103,10 @@ def commit(self):
values = np.concatenate([self._df.values] + [y.reshape(1, -1) for y in self.buffer])
self._df = pd.DataFrame(values, columns=self.columns, copy=False)
self.buffer = []

def __len__(self):
return len(self.get_dataframe())

def __str__(self):
full_df = self.get_dataframe()
df = full_df.head(10)
Expand Down Expand Up @@ -171,7 +174,7 @@ def eval_sys_fn_create_table(x):
d::d,,"b",,b
t::.table(d)
t,"c",,c
"""
if np.isarray(x):
return Table({k:v for k,v in x}, columns=[k for k,_ in x])
Expand All @@ -185,7 +188,7 @@ def eval_sys_fn_index(x, y):
.index(x) [Create-Table-Index]
Creates an index on a table as specified by the columns in array "x".
Creates an index on a table as specified by the columns in array "x".
An index may be one or more columns.
t::.table(d)
Expand Down Expand Up @@ -227,7 +230,7 @@ def eval_sys_fn_schema(x):
.schema(x) [Table-Schema]
Returns the schema of either a table or dictionary "x". If "x" is a table,
then the columns are returned. If "x" is a database, then a dict of
then the columns are returned. If "x" is a database, then a dict of
table name to table is returned.
"""
Expand All @@ -243,7 +246,7 @@ def eval_sys_fn_insert_table(x, y):
.insert(x, y) [Table-Insert]
Insert values "y" into a table "x". The values provided by "y" must be in the
Insert values "y" into a table "x". The values provided by "y" must be in the
corrensponding column position as specified when the table was created.
If the table is indexed, then the appropriate columns will be used as keys when
inserting values. If the table is unindexed, then the values are appended.
Expand Down Expand Up @@ -272,11 +275,11 @@ def eval_sys_fn_create_db(x):
.db(x) [Create-db]
Create a database from a map of tables "x".
Create a database from a map of tables "x".
The keys are table names and values are the tables.
"""
if not isinstance(x,dict):
return "a db must be created from a dict"
Expand Down

0 comments on commit 570662f

Please sign in to comment.