Skip to content

Commit

Permalink
chore: add tests
Browse files Browse the repository at this point in the history
Signed-off-by: Avik Basu <[email protected]>
  • Loading branch information
ab93 committed Aug 4, 2023
1 parent c6fa463 commit b1f05f4
Show file tree
Hide file tree
Showing 8 changed files with 151 additions and 49 deletions.
28 changes: 0 additions & 28 deletions numalogic/models/vae/layer.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,34 +32,6 @@ def forward(self, x: Tensor) -> Tensor:
return super().forward(F.pad(x, (self.__padding, 0)))


class LazyCausalConv1d(nn.LazyConv1d):
"""Temporal lazy convolutional layer with causal padding."""

def __init__(
self,
out_channels: int,
kernel_size: int,
stride: int = 1,
dilation: int = 1,
groups: int = 1,
bias: bool = True,
):
super().__init__(
out_channels,
kernel_size=kernel_size,
stride=stride,
padding=0,
dilation=dilation,
groups=groups,
bias=bias,
)

self.__padding = (kernel_size - 1) * dilation

def forward(self, x: Tensor) -> Tensor:
return super().forward(F.pad(x, (self.__padding, 0)))


class CausalConvBlock(nn.Module):
"""Basic convolutional block consisting of:
- causal 1D convolutional layer
Expand Down
20 changes: 10 additions & 10 deletions numalogic/models/vae/trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,29 +11,29 @@


class VAETrainer(Trainer):
r"""A PyTorch Lightning Trainer for Autoencoder models.
"""A PyTorch Lightning Trainer for VAE models.
Args:
----
max_epochs: The maximum number of epochs to train for. (default: 100)
logger: The logger to use. (default: False)
logger: Whether to use a console logger to log metrics. (default: True)
log_freq: The number of epochs between logging. (default: 5)
check_val_every_n_epoch: The number of epochs between validation checks. (default: 5)
enable_checkpointing: Whether to enable checkpointing. (default: False)
enable_progress_bar: Whether to enable the progress bar. (default: False)
enable_model_summary: Whether to enable the model summary. (default: False)
callbacks: A list of callbacks to use. (default: None)
**trainer_kw: Additional keyword arguments to pass to the Lightning Trainer.
"""

def __init__(
self,
max_epochs=100,
logger=True,
log_freq=5,
check_val_every_n_epoch=5,
enable_checkpointing=False,
enable_progress_bar=False,
enable_model_summary=False,
max_epochs: int = 100,
logger: bool = True,
log_freq: int = 5,
check_val_every_n_epoch: int = 5,
enable_checkpointing: bool = False,
enable_progress_bar: bool = False,
enable_model_summary: bool = False,
**trainer_kw
):
if not sys.warnoptions:
Expand Down
12 changes: 1 addition & 11 deletions numalogic/models/vae/variants/conv.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from collections.abc import Sequence, Callable
from collections.abc import Sequence
from typing import Final

import torch
Expand Down Expand Up @@ -138,16 +138,6 @@ def forward(self, z: Tensor) -> Tensor:
return self.td_linear(out)


def _init_criterion(loss_fn: str) -> Callable:
if loss_fn == "huber":
return F.huber_loss
if loss_fn == "l1":
return F.l1_loss
if loss_fn == "mse":
return F.mse_loss
raise ValueError(f"Unsupported loss function provided: {loss_fn}")


class Conv1dVAE(BaseVAE):
"""
Convolutional Variational Autoencoder for time series data.
Expand Down
4 changes: 4 additions & 0 deletions numalogic/tools/callbacks.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@ def version(self) -> Optional[Union[int, str]]:
def name(self) -> Optional[str]:
return self._name

@property
def experiment(self) -> Optional[str]:
return self._name

def log_hyperparams(self, params, *args, **kwargs):
raise NotImplementedError("ConsoleLogger does not log hyperparameters")

Expand Down
4 changes: 4 additions & 0 deletions numalogic/tools/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,10 @@ def as_array(self) -> npt.NDArray[float]:
"""Returns the full data in a sequence of shape (batch, seq_len, num_features)."""
return self[:]

def as_tensor(self) -> Tensor:
"""Returns the full data in a sequence of shape (batch, seq_len, num_features)."""
return torch.from_numpy(self[:]).contiguous()

def create_seq(self, input_: npt.NDArray[float]) -> Generator[npt.NDArray[float], None, None]:
r"""Yields sequences of specified length from the input data.
Expand Down
Empty file added tests/models/vae/__init__.py
Empty file.
122 changes: 122 additions & 0 deletions tests/models/vae/test_conv.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
import logging
import os
import unittest

import pandas as pd
import torch
from sklearn.preprocessing import StandardScaler
from torch import nn, Tensor
from torch.utils.data import DataLoader

from numalogic._constants import TESTS_DIR
from numalogic.models.vae.trainer import VAETrainer
from numalogic.models.vae.variants.conv import Conv1dVAE
from numalogic.tools.data import TimeseriesDataModule, StreamingDataset
from numalogic.tools.exceptions import ModelInitializationError

ROOT_DIR = os.path.join(TESTS_DIR, "resources", "data")
DATA_FILE = os.path.join(ROOT_DIR, "interactionstatus.csv")
EPOCHS = 2
BATCH_SIZE = 32
SEQ_LEN = 12
LR = 0.001
ACCELERATOR = "cuda" if torch.cuda.is_available() else "cpu"
torch.manual_seed(42)


logging.basicConfig(level=logging.INFO)


class TestConv1dVAE(unittest.TestCase):
x_train = None
x_val = None

@classmethod
def setUpClass(cls) -> None:
df = pd.read_csv(DATA_FILE)
df = df[["success", "failure"]]
scaler = StandardScaler()
cls.x_train = scaler.fit_transform(df[:-240])
cls.x_val = scaler.transform(df[-240:])

def test_model_01(self):
model = Conv1dVAE(seq_len=SEQ_LEN, n_features=2, latent_dim=1, loss_fn="l1")
datamodule = TimeseriesDataModule(SEQ_LEN, self.x_train, batch_size=BATCH_SIZE)
trainer = VAETrainer(accelerator=ACCELERATOR, max_epochs=EPOCHS, fast_dev_run=True)
trainer.fit(model, datamodule=datamodule)

streamloader = DataLoader(StreamingDataset(self.x_val, SEQ_LEN), batch_size=BATCH_SIZE)
stream_trainer = VAETrainer(accelerator=ACCELERATOR)
test_reconerr = stream_trainer.predict(model, dataloaders=streamloader)
test_reconerr_w_seq = stream_trainer.predict(model, dataloaders=streamloader, unbatch=False)

self.assertTupleEqual(self.x_val.shape, test_reconerr.shape)
self.assertTupleEqual(streamloader.dataset.as_tensor().shape, test_reconerr_w_seq.shape)

def test_model_02(self):
model = Conv1dVAE(seq_len=SEQ_LEN, n_features=2, latent_dim=1, conv_channels=(8, 4))
trainer = VAETrainer(accelerator=ACCELERATOR, max_epochs=EPOCHS, log_freq=1)
trainer.fit(
model,
train_dataloaders=DataLoader(
StreamingDataset(self.x_train, SEQ_LEN), batch_size=BATCH_SIZE
),
)

test_ds = StreamingDataset(self.x_val, SEQ_LEN)

model.eval()
with torch.no_grad():
_, recon = model(test_ds.as_tensor())

self.assertTupleEqual(test_ds.as_tensor().size(), recon.shape)
self.assertEqual(recon.dim(), 3)

def test_native_train(self):
model = Conv1dVAE(
seq_len=SEQ_LEN,
n_features=2,
latent_dim=1,
loss_fn="huber",
)
optimizer = torch.optim.Adam(model.parameters(), lr=LR)
criterion = nn.HuberLoss(delta=0.5)

train_loader = DataLoader(
StreamingDataset(self.x_train, seq_len=SEQ_LEN), batch_size=BATCH_SIZE
)

model.train()
loss = Tensor([0.0])
for epoch in range(1, EPOCHS + 1):
for _X_batch in train_loader:
optimizer.zero_grad()
encoded, decoded = model(_X_batch)
decoded = decoded.view(-1, SEQ_LEN, self.x_train.shape[1])

loss = criterion(decoded, _X_batch)
loss.backward()
optimizer.step()

if epoch % 5 == 0:
print(f"epoch : {epoch}, loss_mean : {loss.item():.7f}")

def test_err(self):
with self.assertRaises(ValueError):
Conv1dVAE(
seq_len=SEQ_LEN,
n_features=2,
latent_dim=1,
loss_fn="random",
)
with self.assertRaises(ModelInitializationError):
Conv1dVAE(
seq_len=SEQ_LEN,
n_features=2,
latent_dim=1,
conv_channels=(8, 4, 2, 1),
)


if __name__ == "__main__":
unittest.main()
10 changes: 10 additions & 0 deletions tests/models/vae/test_trainer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import unittest


class MyTestCase(unittest.TestCase):
def test_something(self):
self.assertEqual(True, False) # add assertion here


if __name__ == "__main__":
unittest.main()

0 comments on commit b1f05f4

Please sign in to comment.