Skip to content

Commit

Permalink
Add Quandl and Data Schema
Browse files Browse the repository at this point in the history
AlphaPy now supports local data schema and Quandl feeds using the pandas Web data reader package.
  • Loading branch information
mrconway committed Dec 31, 2017
1 parent e1ac840 commit a8b23b1
Show file tree
Hide file tree
Showing 19 changed files with 5,464 additions and 835 deletions.
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,7 @@
.idea/vcs.xml

.idea/workspace.xml
.idea/other.xml
alphapy/examples/Trading System/.ipynb_checkpoints/A Trading System-checkpoint.ipynb
*.pkl
*.png
150 changes: 119 additions & 31 deletions alphapy/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@
from alphapy.frame import read_frame
from alphapy.globals import ModelType
from alphapy.globals import Partition, datasets
from alphapy.globals import PSEP, SSEP
from alphapy.globals import PD_INTRADAY_OFFSETS
from alphapy.globals import PD_WEB_DATA_FEEDS
from alphapy.globals import PSEP, SSEP, USEP
from alphapy.globals import SamplingMethod
from alphapy.globals import WILDCARD

Expand Down Expand Up @@ -282,6 +284,53 @@ def sample_data(model):
return model


#
# Function enhance_intraday_data
#

def enhance_intraday_data(df):
r"""Add columns to the intraday dataframe.
Parameters
----------
df : pandas.DataFrame
The intraday dataframe.
Returns
-------
df : pandas.DataFrame
The dataframe with bar number and end-of-day columns.
"""

# Convert the columns to proper data types

index_column = 'datetime'
dt_column = df['date'] + ' ' + df['time']
df[index_column] = pd.to_datetime(dt_column)
cols_float = ['open', 'high', 'low', 'close', 'volume']
df[cols_float] = df[cols_float].astype(float)

# Number the intraday bars

date_group = df.groupby('date')
df['bar_number'] = date_group.cumcount()

# Mark the end of the trading day

df['end_of_day'] = False
df.loc[date_group.tail(1).index, 'end_of_day'] = True

# Set the data frame's index
df.set_index(pd.DatetimeIndex(df[index_column]), drop=True, inplace=True)

# Return the enhanced frame

del df['date']
del df['time']
return df


#
# Function get_google_data
#
Expand Down Expand Up @@ -345,25 +394,12 @@ def get_google_data(symbol, lookback_period, fractal):
dt = datetime.fromtimestamp(day_item + (interval * offset))
dt = pd.to_datetime(dt)
dt_date = dt.strftime('%Y-%m-%d')
record = (dt, dt_date, open_item, high_item, low_item, close_item, volume_item)
dt_time = dt.strftime('%H:%M:%S')
record = (dt_date, dt_time, open_item, high_item, low_item, close_item, volume_item)
records.append(record)
# create data frame
cols = ['datetime', 'date', 'open', 'high', 'low', 'close', 'volume']
cols = ['date', 'time', 'open', 'high', 'low', 'close', 'volume']
df = pd.DataFrame.from_records(records, columns=cols)
# convert to proper data types
cols_float = ['open', 'high', 'low', 'close']
df[cols_float] = df[cols_float].astype(float)
df['volume'] = df['volume'].astype(int)
# number the intraday bars
date_group = df.groupby('date')
df['bar_number'] = date_group.cumcount()
# mark the end of the trading day
df['end_of_day'] = False
del df['date']
df.loc[date_group.tail(1).index, 'end_of_day'] = True
# set the index to datetime
df.index = df['datetime']
del df['datetime']
# return the dataframe
return df

Expand All @@ -373,7 +409,7 @@ def get_google_data(symbol, lookback_period, fractal):
#

def get_pandas_data(schema, symbol, lookback_period):
r"""Get Yahoo Finance daily data.
r"""Get Pandas Web Reader data.
Parameters
----------
Expand All @@ -391,32 +427,39 @@ def get_pandas_data(schema, symbol, lookback_period):
"""

# Quandl is a special case.

if 'quandl' in schema:
schema, symbol_prefix = schema.split(USEP)
symbol = SSEP.join([symbol_prefix, symbol]).upper()

# Calculate the start and end date for Yahoo.

start = datetime.now() - timedelta(lookback_period)
end = datetime.now()

# Call the Pandas Web data reader.

df = None
try:
df = web.DataReader(symbol, schema, start, end)
df = df.rename(columns = lambda x: x.lower().replace(' ',''))
except:
df = None
logger.info("Could not retrieve data for: %s", symbol)

return df


#
# Function get_feed_data
# Function get_market_data
#

def get_feed_data(group, lookback_period):
def get_market_data(model, group, lookback_period, resample_data):
r"""Get data from an external feed.
Parameters
----------
model : alphapy.Model
The model object describing the data.
group : alphapy.Group
The group of symbols.
lookback_period : int
Expand All @@ -429,27 +472,71 @@ def get_feed_data(group, lookback_period):
"""

# Unpack model specifications

directory = model.specs['directory']
extension = model.specs['extension']
separator = model.specs['separator']

# Unpack group elements

gspace = group.space
schema = gspace.schema
fractal = gspace.fractal

# Determine the feed source
if 'd' in fractal:
# daily data (date only)
logger.info("Getting Daily Data")
daily_data = True
else:

if any(substring in fractal for substring in PD_INTRADAY_OFFSETS):
# intraday data (date and time)
logger.info("Getting Intraday Data (Google 50-day limit)")
daily_data = False
logger.info("Getting Intraday Data [%s] from %s", fractal, schema)
intraday_data = True
index_column = 'datetime'
else:
# daily data or higher (date only)
logger.info("Getting Daily Data [%s] from %s", fractal, schema)
intraday_data = False
index_column = 'date'

# Get the data from the relevant feed

data_dir = SSEP.join([directory, 'data'])
pandas_data = any(substring in schema for substring in PD_WEB_DATA_FEEDS)
n_periods = 0

for item in group.members:
logger.info("Getting %s data for last %d days", item, lookback_period)
if daily_data:
# Locate the data source
if schema == 'data':
fname = frame_name(item.lower(), gspace)
df = read_frame(data_dir, fname, extension, separator)
if not intraday_data:
df.set_index(pd.DatetimeIndex(df[index_column]),
drop=True, inplace=True)
elif schema == 'google' and intraday_data:
df = get_google_data(item, lookback_period, fractal)
elif pandas_data:
df = get_pandas_data(schema, item, lookback_period)
else:
df = get_google_data(item, lookback_period, fractal)
logger.error("Unsupported Data Source: %s", schema)
# Now that we have content, standardize the data
if df is not None and not df.empty:
logger.info("Rows: %d", len(df))
# standardize column names
df = df.rename(columns = lambda x: x.lower().replace(' ',''))
# add intraday columns if necessary
if intraday_data:
df = enhance_intraday_data(df)
# order by increasing date if necessary
df = df.sort_index()
# resample data
if resample_data:
df = df.resample(fractal).agg({'open' : 'first',
'high' : 'max',
'low' : 'min',
'close' : 'last',
'volume' : 'sum'})
logger.info("Rows after Resampling at %s: %d",
fractal, len(df))
# allocate global Frame
newf = Frame(item.lower(), gspace, df)
if newf is None:
Expand All @@ -460,5 +547,6 @@ def get_feed_data(group, lookback_period):
n_periods = df_len
else:
logger.info("No DataFrame for %s", item)

# The number of periods actually retrieved
return n_periods
Loading

0 comments on commit a8b23b1

Please sign in to comment.