Skip to content

Commit

Permalink
feature sequencing
Browse files Browse the repository at this point in the history
add sequence-to-sequence input and output features
  • Loading branch information
mrconway committed Oct 20, 2017
1 parent d83b117 commit 4074585
Show file tree
Hide file tree
Showing 10 changed files with 213 additions and 54 deletions.
40 changes: 25 additions & 15 deletions alphapy/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
from alphapy.features import remove_lv_features
from alphapy.features import save_features
from alphapy.features import select_features
from alphapy.frame import write_frame
from alphapy.globals import CSEP, PSEP, SSEP, USEP
from alphapy.globals import ModelType
from alphapy.globals import Partition, datasets
Expand All @@ -59,6 +60,7 @@
from alphapy.optimize import rfe_search
from alphapy.optimize import rfecv_search
from alphapy.plots import generate_plots
from alphapy.utilities import get_datestamp
from alphapy.utilities import np_store_data

import argparse
Expand Down Expand Up @@ -106,14 +108,17 @@ def training_pipeline(model):
# Unpack the model specifications

calibration = model.specs['calibration']
directory = model.specs['directory']
drop = model.specs['drop']
extension = model.specs['extension']
feature_selection = model.specs['feature_selection']
grid_search = model.specs['grid_search']
model_type = model.specs['model_type']
predict_mode = model.specs['predict_mode']
rfe = model.specs['rfe']
sampling = model.specs['sampling']
scorer = model.specs['scorer']
separator = model.specs['separator']
target = model.specs['target']

# Get train and test data
Expand All @@ -128,13 +133,6 @@ def training_pipeline(model):
model.test_labels = True
model = save_features(model, X_train, X_test, y_train, y_test)

# Drop features

logger.info("Dropping Features: %s", drop)
X_train = drop_features(X_train, drop)
X_test = drop_features(X_test, drop)
model = save_features(model, X_train, X_test)

# Log feature statistics

logger.info("Original Feature Statistics")
Expand All @@ -161,10 +159,24 @@ def training_pipeline(model):
(X_train.shape[1], X_test.shape[1]))

# Apply treatments to the feature matrix

all_features = apply_treatments(model, X)
X_train, X_test = np.array_split(all_features, [split_point])
model = save_features(model, X_train, X_test)

# Drop features
all_features = drop_features(all_features, drop)

# Save the train and test files with extracted and dropped features

datestamp = get_datestamp()
data_dir = SSEP.join([directory, 'input'])
df_train = all_features.iloc[:split_point, :]
df_train = pd.concat([df_train, pd.DataFrame(y_train, columns=[target])], axis=1)
output_file = USEP.join([model.train_file, datestamp])
write_frame(df_train, data_dir, output_file, extension, separator)
df_test = all_features.iloc[split_point:, :]
if y_test.any():
df_test = pd.concat([df_test, pd.DataFrame(y_test, columns=[target])], axis=1)
output_file = USEP.join([model.test_file, datestamp])
write_frame(df_test, data_dir, output_file, extension, separator)

# Create crosstabs for any categorical features

Expand Down Expand Up @@ -315,11 +327,6 @@ def prediction_pipeline(model):
# Load feature_map
model = load_feature_map(model, directory)

# Drop features

logger.info("Dropping Features: %s", drop)
X_predict = drop_features(X_predict, drop)

# Log feature statistics

logger.info("Feature Statistics")
Expand All @@ -329,6 +336,9 @@ def prediction_pipeline(model):
# Apply treatments to the feature matrix
all_features = apply_treatments(model, X_predict)

# Drop features
all_features = drop_features(all_features, drop)

# Create initial features
all_features = create_features(model, all_features)

Expand Down
43 changes: 32 additions & 11 deletions alphapy/analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@

from alphapy.__main__ import main_pipeline
from alphapy.frame import load_frames
from alphapy.frame import sequence_frame
from alphapy.frame import write_frame
from alphapy.globals import SSEP, USEP
from alphapy.globals import SSEP, TAG_ID, USEP
from alphapy.utilities import subtract_days

from datetime import timedelta
Expand Down Expand Up @@ -133,7 +134,7 @@ def __str__(self):
# Function run_analysis
#

def run_analysis(analysis, forecast_period, leaders,
def run_analysis(analysis, lag_period, forecast_period, leaders,
predict_history, splits=True):
r"""Run an analysis for a given model and group.
Expand All @@ -147,10 +148,14 @@ def run_analysis(analysis, forecast_period, leaders,
----------
analysis : alphapy.Analysis
The analysis to run.
lag_period : int
The number of lagged features for the analysis.
forecast_period : int
The period for forecasting the target of the analysis.
leaders : list
The features that are contemporaneous with the target.
predict_history : int
The number of periods required for lookback calculations.
splits : bool, optional
If ``True``, then the data for each member of the analysis
group are in separate files.
Expand Down Expand Up @@ -185,7 +190,11 @@ def run_analysis(analysis, forecast_period, leaders,
train_date = model.specs['train_date']

# Calculate split date
logger.info("Analysis Dates")
split_date = subtract_days(predict_date, predict_history)
logger.info("Train Date: %s", train_date)
logger.info("Split Date: %s", split_date)
logger.info("Test Date: %s", predict_date)

# Load the data frames
data_frames = load_frames(group, directory, extension, separator, splits)
Expand All @@ -203,20 +212,24 @@ def run_analysis(analysis, forecast_period, leaders,
# Subset each individual frame and add to the master frame

for df in data_frames:
try:
tag = df[TAG_ID].unique()[0]
except:
tag = 'Unknown'
first_date = df.index[0]
last_date = df.index[-1]
# shift the target for the forecast period
if forecast_period > 0:
df[target] = df[target].shift(-forecast_period)
# shift any leading features if necessary
if leaders:
df[leaders] = df[leaders].shift(-1)
logger.info("Analyzing %s from %s to %s", tag, first_date, last_date)
# sequence leaders, laggards, and target(s)
df = sequence_frame(df, target, leaders, lag_period, forecast_period,
exclude_cols=[TAG_ID])
# get frame subsets
if predict_mode:
new_predict = df.loc[(df.index >= split_date) & (df.index <= last_date)]
if len(new_predict) > 0:
predict_frame = predict_frame.append(new_predict)
else:
logger.info("A prediction frame has zero rows. Check prediction date.")
logger.info("Prediction frame %s has zero rows. Check prediction date.",
tag)
else:
# split data into train and test
new_train = df.loc[(df.index >= train_date) & (df.index < split_date)]
Expand All @@ -225,12 +238,20 @@ def run_analysis(analysis, forecast_period, leaders,
train_frame = train_frame.append(new_train)
new_test = df.loc[(df.index >= split_date) & (df.index <= last_date)]
if len(new_test) > 0:
# check if target column has NaN values
nan_count = df[target].isnull().sum()
forecast_check = forecast_period - 1
if nan_count != forecast_check:
logger.info("%s has %d records with NaN targets", tag, nan_count)
# drop records with NaN values in target column
new_test = new_test.dropna(subset=[target])
# append selected records to the test frame
test_frame = test_frame.append(new_test)
else:
logger.info("A testing frame has zero rows. Check prediction date.")
logger.info("Testing frame %s has zero rows. Check prediction date.",
tag)
else:
logger.warning("A training frame has zero rows. Check data source.")
logger.info("Training frame %s has zero rows. Check data source.", tag)

# Write out the frames for input into the AlphaPy pipeline

Expand Down
15 changes: 10 additions & 5 deletions alphapy/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -420,12 +420,12 @@ def get_feed_data(group, lookback_period):
group : alphapy.Group
The group of symbols.
lookback_period : int
The number of days of data to retrieve.
The number of periods of data to retrieve.
Returns
-------
daily_data : bool
``True`` if daily data
n_periods : int
The maximum number of periods actually retrieved.
"""

Expand All @@ -442,6 +442,7 @@ def get_feed_data(group, lookback_period):
logger.info("Getting Intraday Data (Google 50-day limit)")
daily_data = False
# Get the data from the relevant feed
n_periods = 0
for item in group.members:
logger.info("Getting %s data for last %d days", item, lookback_period)
if daily_data:
Expand All @@ -453,7 +454,11 @@ def get_feed_data(group, lookback_period):
newf = Frame(item.lower(), gspace, df)
if newf is None:
logger.error("Could not allocate Frame for: %s", item)
# calculate maximum number of periods
df_len = len(df)
if df_len > n_periods:
n_periods = df_len
else:
logger.info("No DataFrame for %s", item)
# Indicate whether or not data is daily
return daily_data
# The number of periods actually retrieved
return n_periods
53 changes: 42 additions & 11 deletions alphapy/features.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,21 @@
# Imports
#

from alphapy.globals import BSEP, NULLTEXT, PSEP, SSEP, USEP
from alphapy.globals import BSEP, LOFF, NULLTEXT
from alphapy.globals import PSEP, SSEP, USEP
from alphapy.globals import Encoders
from alphapy.globals import ModelType
from alphapy.globals import Scalers
from alphapy.market_variables import Variable
from alphapy.market_variables import vparse

import category_encoders as ce
from importlib import import_module
from itertools import groupby
import logging
import math
import numpy as np
import os
import pandas as pd
import re
from scipy import sparse
Expand All @@ -61,6 +64,7 @@
from sklearn.preprocessing import MinMaxScaler
from sklearn.preprocessing import PolynomialFeatures
from sklearn.preprocessing import StandardScaler
import sys


#
Expand Down Expand Up @@ -424,6 +428,8 @@ def apply_treatment(fname, df, fparams):
module = fparams[0]
func_name = fparams[1]
plist = fparams[2:]
# Append to system path
sys.path.append(os.getcwd())
# Import the external treatment function
ext_module = import_module(module)
func = getattr(ext_module, func_name)
Expand Down Expand Up @@ -476,17 +482,34 @@ def apply_treatments(model, X):
logger.info("Applying Treatments")
all_features = X

for fname in X:
if treatments and fname in treatments:
features = apply_treatment(fname, X, treatments[fname])
if features is not None:
if features.shape[0] == X.shape[0]:
all_features = pd.concat([all_features, features], axis=1)
if treatments:
for fname in treatments:
# find feature series
fcols = []
for col in X.columns:
if col.split(LOFF)[0] == fname:
fcols.append(col)
# get lag values
lag_values = []
for item in fcols:
_, _, _, lag = vparse(item)
lag_values.append(lag)
# apply treatment to the most recent value
if lag_values:
f_latest = fcols[lag_values.index(min(lag_values))]
features = apply_treatment(f_latest, X, treatments[fname])
if features is not None:
if features.shape[0] == X.shape[0]:
all_features = pd.concat([all_features, features], axis=1)
else:
raise IndexError("The number of treatment rows [%d] must match X [%d]" %
(features.shape[0], X.shape[0]))
else:
raise IndexError("The number of treatment rows [%d] must match X [%d]" %
(features.shape[0], X.shape[0]))
logger.info("Could not apply treatment for feature %s", fname)
else:
logger.info("Could not apply treatment for feature %s", fname)
logger.info("Feature %s is missing for treatment", fname)
else:
logger.info("No Treatments Specified")

logger.info("New Feature Count : %d", all_features.shape[1])

Expand Down Expand Up @@ -1575,7 +1598,15 @@ def drop_features(X, drop):
The dataframe without the dropped features.
"""
X.drop(drop, axis=1, inplace=True, errors='ignore')
drop_cols = []
for d in drop:
for col in X.columns:
if col.split(LOFF)[0] == d:
drop_cols.append(col)
logger.info("Dropping Features: %s", drop_cols)
logger.info("Original Feature Count : %d", X.shape[1])
X.drop(drop_cols, axis=1, inplace=True, errors='ignore')
logger.info("Reduced Feature Count : %d", X.shape[1])
return X


Expand Down
Loading

0 comments on commit 4074585

Please sign in to comment.