Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Design for Base Deep class #26

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions steps/19_base_dl_class/Solution_1/basedeepclass.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import numpy as np
from sklearn.preprocessing import LabelEncoder, OneHotEncoder
from sklearn.utils import check_random_state

from sktime.classification.base import BaseClassifier


class BaseDeepClassifier(BaseClassifier):

def __init__(self, batch_size=40, random_state=None):
super(BaseDeepClassifier, self).__init__()

self.batch_size = batch_size
self.random_state = random_state
self.model_ = None

def summary(self):
return self.history.history

def _predict(self, X, **kwargs):
probs = self._predict_proba(X, **kwargs)
rng = check_random_state(self.random_state)
return np.array(
[
self.classes_[int(rng.choice(np.flatnonzero(prob == prob.max())))]
for prob in probs
]
)

def _predict_proba(self, X, **kwargs):
# Transpose to work correctly with keras
X = X.transpose((0, 2, 1))
probs = self.model_.predict(X, self.batch_size, **kwargs)

# check if binary classification
if probs.shape[1] == 1:
# first column is probability of class 0 and second is of class 1
probs = np.hstack([1 - probs, probs])
probs = probs / probs.sum(axis=1, keepdims=1)
return probs

def convert_y_to_keras(self, y):
self.label_encoder = LabelEncoder()
y = self.label_encoder.fit_transform(y)
self.classes_ = self.label_encoder.classes_
self.n_classes_ = len(self.classes_)
y = y.reshape(len(y), 1)
self.onehot_encoder = OneHotEncoder(sparse=False, categories="auto")
# categories='auto' to get rid of FutureWarning
y = self.onehot_encoder.fit_transform(y)
return y
38 changes: 38 additions & 0 deletions steps/19_base_dl_class/Solution_1/basedeepnetwork.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from abc import ABC, abstractmethod
from sktime.base import BaseObject

class BaseDeepNetwork(BaseObject, ABC):

@abstractmethod
def build_network(self, input_shape, **kwargs):
...

def build_model(self, input_shape, n_classes, **kwargs):
import tensorflow as tf
from tensorflow import keras

tf.random.set_seed(self.random_state)

if self.metrics is None:
metrics = ["accuracy"]
else:
metrics = self.metrics
input_layer, output_layer = self.build_network(input_shape, **kwargs)

output_layer = keras.layers.Dense(
units=n_classes, activation=self.activation, use_bias=self.use_bias
)(output_layer)

self.optimizer_ = (
keras.optimizers.Adam(learning_rate=0.01)
if self.optimizer is None
else self.optimizer
)

model = keras.models.Model(inputs=input_layer, outputs=output_layer)
model.compile(
loss=self.loss,
optimizer=self.optimizer_,
metrics=metrics,
)
return model
30 changes: 30 additions & 0 deletions steps/19_base_dl_class/Solution_1/basedeepreg.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import numpy as np

from sktime.regression.base import BaseRegressor

class BaseDeepRegressor(BaseRegressor, ABC):

def __init__(self, batch_size=40):
super(BaseDeepRegressor, self).__init__()

self.batch_size = batch_size
self.model_ = None

def _predict(self, X, **kwargs):
"""
Find regression estimate for all cases in X.

Parameters
----------
X : an np.ndarray of shape = (n_instances, n_dimensions, series_length)
The training input samples.

Returns
-------
predictions : 1d numpy array
array of predictions of each instance
"""
X = X.transpose((0, 2, 1))
y_pred = self.model_.predict(X, self.batch_size, **kwargs)
y_pred = np.squeeze(y_pred, axis=-1)
return y_pred
75 changes: 75 additions & 0 deletions steps/19_base_dl_class/Solution_1/cnnclass.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@

from sklearn.utils import check_random_state
from cnnnetwork import CNNNetwork
from basedeepclass import BaseDeepClassifier
from sktime.utils.validation._dependencies import _check_dl_dependencies

_check_dl_dependencies(severity="warning")


class CNNClassifier(BaseDeepClassifier, CNNNetwork):

def __init__(
self,
n_epochs=2000,
batch_size=16,
kernel_size=7,
avg_pool_size=3,
n_conv_layers=2,
callbacks=None,
verbose=False,
loss="mean_squared_error",
metrics=None,
random_state=None,
activation="sigmoid",
use_bias=True,
optimizer=None,
):
_check_dl_dependencies(severity="error")
super(CNNClassifier, self).__init__()
self.n_conv_layers = n_conv_layers
self.avg_pool_size = avg_pool_size
self.kernel_size = kernel_size
self.callbacks = callbacks
self.n_epochs = n_epochs
self.batch_size = batch_size
self.verbose = verbose
self.loss = loss
self.metrics = metrics
self.random_state = random_state
self.activation = activation
self.use_bias = use_bias
self.optimizer = optimizer
self.history = None

def _fit(self, X, y):
if self.callbacks is None:
self._callbacks = []

y_onehot = self.convert_y_to_keras(y)
# Transpose to conform to Keras input style.
X = X.transpose(0, 2, 1)

check_random_state(self.random_state)
self.input_shape = X.shape[1:]
self.model_ = self.build_model(self.input_shape, self.n_classes_)
if self.verbose:
self.model_.summary()
self.history = self.model_.fit(
X,
y_onehot,
batch_size=self.batch_size,
epochs=self.n_epochs,
verbose=self.verbose,
callbacks=self._callbacks,
)
return self

if __name__ == "__main__":
cnn = CNNClassifier()
from sktime.datasets import load_unit_test
X_train, y_train = load_unit_test(split='train', return_X_y=True)
cnn.fit(X_train, y_train)
X_test, y_test = load_unit_test(split='test', return_X_y=True)
print(cnn.predict(X_test))
print(y_test)
60 changes: 60 additions & 0 deletions steps/19_base_dl_class/Solution_1/cnnnetwork.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
from basedeepnetwork import BaseDeepNetwork
from sktime.utils.validation._dependencies import _check_dl_dependencies

_check_dl_dependencies(severity="warning")


class CNNNetwork(BaseDeepNetwork):
def __init__(
self,
kernel_size=7,
avg_pool_size=3,
n_conv_layers=2,
activation="sigmoid",
random_state=0,
):
_check_dl_dependencies(severity="error")
self.random_state = random_state
self.kernel_size = kernel_size
self.avg_pool_size = avg_pool_size
self.n_conv_layers = n_conv_layers
self.filter_sizes = [6, 12]
self.activation = activation

def build_network(self, input_shape, **kwargs):
# not sure of the whole padding thing
from tensorflow import keras

padding = "valid"
input_layer = keras.layers.Input(input_shape)
# sort this out, why hard coded to 60?
if input_shape[0] < 60:
padding = "same"

# this does what?
if len(self.filter_sizes) > self.n_conv_layers:
self.filter_sizes = self.filter_sizes[: self.n_conv_layers]
elif len(self.filter_sizes) < self.n_conv_layers:
self.filter_sizes = self.filter_sizes + [self.filter_sizes[-1]] * (
self.n_conv_layers - len(self.filter_sizes)
)
conv = keras.layers.Conv1D(
filters=self.filter_sizes[0],
kernel_size=self.kernel_size,
padding=padding,
activation=self.activation,
)(input_layer)
conv = keras.layers.AveragePooling1D(pool_size=self.avg_pool_size)(conv)

for i in range(1, self.n_conv_layers):
conv = keras.layers.Conv1D(
filters=self.filter_sizes[i],
kernel_size=self.kernel_size,
padding=padding,
activation=self.activation,
)(conv)
conv = keras.layers.AveragePooling1D(pool_size=self.avg_pool_size)(conv)

flatten_layer = keras.layers.Flatten()(conv)

return input_layer, flatten_layer
72 changes: 72 additions & 0 deletions steps/19_base_dl_class/Solution_1/cnnreg.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
from cnnnetwork import CNNNetwork
from basedeepreg import BaseDeepRegressor
from sktime.utils.validation._dependencies import _check_dl_dependencies

_check_dl_dependencies(severity="warning")


class CNNRegressor(BaseDeepRegressor, CNNRegressor):

def __init__(
self,
n_epochs=2000,
batch_size=16,
kernel_size=7,
avg_pool_size=3,
n_conv_layers=2,
callbacks=None,
verbose=False,
loss="mean_squared_error",
metrics=None,
random_seed=0,
):
_check_dl_dependencies(severity="error")
super(CNNRegressor, self).__init__(
batch_size=batch_size,
)
self.n_conv_layers = n_conv_layers
self.avg_pool_size = avg_pool_size
self.kernel_size = kernel_size
self.callbacks = callbacks
self.n_epochs = n_epochs
self.batch_size = batch_size
self.verbose = verbose
self.loss = loss
self.metrics = metrics
self.random_seed = random_seed


def _fit(self, X, y):
"""Fit the classifier on the training set (X, y).

Parameters
----------
X : np.ndarray of shape = (n_instances (n), n_dimensions (d), series_length (m))
The training input samples.
y : np.ndarray of shape n
The training data class labels.

Returns
-------
self : object
"""
if self.callbacks is None:
self._callbacks = []

# Transpose to conform to Keras input style.
X = X.transpose(0, 2, 1)

self.input_shape = X.shape[1:]
self.model_ = self.build_model(self.input_shape)
if self.verbose:
self.model.summary()

self.history = self.model_.fit(
X,
y,
batch_size=self.batch_size,
epochs=self.n_epochs,
verbose=self.verbose,
callbacks=self._callbacks,
)
return self
Loading