diff --git a/numalogic/models/vae/layer.py b/numalogic/models/vae/layer.py index b2317904..54364200 100644 --- a/numalogic/models/vae/layer.py +++ b/numalogic/models/vae/layer.py @@ -32,34 +32,6 @@ def forward(self, x: Tensor) -> Tensor: return super().forward(F.pad(x, (self.__padding, 0))) -class LazyCausalConv1d(nn.LazyConv1d): - """Temporal lazy convolutional layer with causal padding.""" - - def __init__( - self, - out_channels: int, - kernel_size: int, - stride: int = 1, - dilation: int = 1, - groups: int = 1, - bias: bool = True, - ): - super().__init__( - out_channels, - kernel_size=kernel_size, - stride=stride, - padding=0, - dilation=dilation, - groups=groups, - bias=bias, - ) - - self.__padding = (kernel_size - 1) * dilation - - def forward(self, x: Tensor) -> Tensor: - return super().forward(F.pad(x, (self.__padding, 0))) - - class CausalConvBlock(nn.Module): """Basic convolutional block consisting of: - causal 1D convolutional layer diff --git a/numalogic/models/vae/trainer.py b/numalogic/models/vae/trainer.py index ec29384f..0800bd36 100644 --- a/numalogic/models/vae/trainer.py +++ b/numalogic/models/vae/trainer.py @@ -11,29 +11,29 @@ class VAETrainer(Trainer): - r"""A PyTorch Lightning Trainer for Autoencoder models. + """A PyTorch Lightning Trainer for VAE models. Args: ---- max_epochs: The maximum number of epochs to train for. (default: 100) - logger: The logger to use. (default: False) + logger: Whether to use a console logger to log metrics. (default: True) + log_freq: The number of epochs between logging. (default: 5) check_val_every_n_epoch: The number of epochs between validation checks. (default: 5) enable_checkpointing: Whether to enable checkpointing. (default: False) enable_progress_bar: Whether to enable the progress bar. (default: False) enable_model_summary: Whether to enable the model summary. (default: False) - callbacks: A list of callbacks to use. (default: None) **trainer_kw: Additional keyword arguments to pass to the Lightning Trainer. """ def __init__( self, - max_epochs=100, - logger=True, - log_freq=5, - check_val_every_n_epoch=5, - enable_checkpointing=False, - enable_progress_bar=False, - enable_model_summary=False, + max_epochs: int = 100, + logger: bool = True, + log_freq: int = 5, + check_val_every_n_epoch: int = 5, + enable_checkpointing: bool = False, + enable_progress_bar: bool = False, + enable_model_summary: bool = False, **trainer_kw ): if not sys.warnoptions: diff --git a/numalogic/models/vae/variants/conv.py b/numalogic/models/vae/variants/conv.py index 649b1aec..303315c0 100644 --- a/numalogic/models/vae/variants/conv.py +++ b/numalogic/models/vae/variants/conv.py @@ -1,4 +1,4 @@ -from collections.abc import Sequence, Callable +from collections.abc import Sequence from typing import Final import torch @@ -138,16 +138,6 @@ def forward(self, z: Tensor) -> Tensor: return self.td_linear(out) -def _init_criterion(loss_fn: str) -> Callable: - if loss_fn == "huber": - return F.huber_loss - if loss_fn == "l1": - return F.l1_loss - if loss_fn == "mse": - return F.mse_loss - raise ValueError(f"Unsupported loss function provided: {loss_fn}") - - class Conv1dVAE(BaseVAE): """ Convolutional Variational Autoencoder for time series data. diff --git a/numalogic/tools/callbacks.py b/numalogic/tools/callbacks.py index 41c20fcb..ac1a2af3 100644 --- a/numalogic/tools/callbacks.py +++ b/numalogic/tools/callbacks.py @@ -84,6 +84,10 @@ def version(self) -> Optional[Union[int, str]]: def name(self) -> Optional[str]: return self._name + @property + def experiment(self) -> Optional[str]: + return self._name + def log_hyperparams(self, params, *args, **kwargs): raise NotImplementedError("ConsoleLogger does not log hyperparameters") diff --git a/numalogic/tools/data.py b/numalogic/tools/data.py index 7a221d3b..08b7e555 100644 --- a/numalogic/tools/data.py +++ b/numalogic/tools/data.py @@ -125,6 +125,10 @@ def as_array(self) -> npt.NDArray[float]: """Returns the full data in a sequence of shape (batch, seq_len, num_features).""" return self[:] + def as_tensor(self) -> Tensor: + """Returns the full data in a sequence of shape (batch, seq_len, num_features).""" + return torch.from_numpy(self[:]).contiguous() + def create_seq(self, input_: npt.NDArray[float]) -> Generator[npt.NDArray[float], None, None]: r"""Yields sequences of specified length from the input data. diff --git a/tests/models/vae/__init__.py b/tests/models/vae/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/models/vae/test_conv.py b/tests/models/vae/test_conv.py new file mode 100644 index 00000000..d9c48064 --- /dev/null +++ b/tests/models/vae/test_conv.py @@ -0,0 +1,122 @@ +import logging +import os +import unittest + +import pandas as pd +import torch +from sklearn.preprocessing import StandardScaler +from torch import nn, Tensor +from torch.utils.data import DataLoader + +from numalogic._constants import TESTS_DIR +from numalogic.models.vae.trainer import VAETrainer +from numalogic.models.vae.variants.conv import Conv1dVAE +from numalogic.tools.data import TimeseriesDataModule, StreamingDataset +from numalogic.tools.exceptions import ModelInitializationError + +ROOT_DIR = os.path.join(TESTS_DIR, "resources", "data") +DATA_FILE = os.path.join(ROOT_DIR, "interactionstatus.csv") +EPOCHS = 2 +BATCH_SIZE = 32 +SEQ_LEN = 12 +LR = 0.001 +ACCELERATOR = "cuda" if torch.cuda.is_available() else "cpu" +torch.manual_seed(42) + + +logging.basicConfig(level=logging.INFO) + + +class TestConv1dVAE(unittest.TestCase): + x_train = None + x_val = None + + @classmethod + def setUpClass(cls) -> None: + df = pd.read_csv(DATA_FILE) + df = df[["success", "failure"]] + scaler = StandardScaler() + cls.x_train = scaler.fit_transform(df[:-240]) + cls.x_val = scaler.transform(df[-240:]) + + def test_model_01(self): + model = Conv1dVAE(seq_len=SEQ_LEN, n_features=2, latent_dim=1, loss_fn="l1") + datamodule = TimeseriesDataModule(SEQ_LEN, self.x_train, batch_size=BATCH_SIZE) + trainer = VAETrainer(accelerator=ACCELERATOR, max_epochs=EPOCHS, fast_dev_run=True) + trainer.fit(model, datamodule=datamodule) + + streamloader = DataLoader(StreamingDataset(self.x_val, SEQ_LEN), batch_size=BATCH_SIZE) + stream_trainer = VAETrainer(accelerator=ACCELERATOR) + test_reconerr = stream_trainer.predict(model, dataloaders=streamloader) + test_reconerr_w_seq = stream_trainer.predict(model, dataloaders=streamloader, unbatch=False) + + self.assertTupleEqual(self.x_val.shape, test_reconerr.shape) + self.assertTupleEqual(streamloader.dataset.as_tensor().shape, test_reconerr_w_seq.shape) + + def test_model_02(self): + model = Conv1dVAE(seq_len=SEQ_LEN, n_features=2, latent_dim=1, conv_channels=(8, 4)) + trainer = VAETrainer(accelerator=ACCELERATOR, max_epochs=EPOCHS, log_freq=1) + trainer.fit( + model, + train_dataloaders=DataLoader( + StreamingDataset(self.x_train, SEQ_LEN), batch_size=BATCH_SIZE + ), + ) + + test_ds = StreamingDataset(self.x_val, SEQ_LEN) + + model.eval() + with torch.no_grad(): + _, recon = model(test_ds.as_tensor()) + + self.assertTupleEqual(test_ds.as_tensor().size(), recon.shape) + self.assertEqual(recon.dim(), 3) + + def test_native_train(self): + model = Conv1dVAE( + seq_len=SEQ_LEN, + n_features=2, + latent_dim=1, + loss_fn="huber", + ) + optimizer = torch.optim.Adam(model.parameters(), lr=LR) + criterion = nn.HuberLoss(delta=0.5) + + train_loader = DataLoader( + StreamingDataset(self.x_train, seq_len=SEQ_LEN), batch_size=BATCH_SIZE + ) + + model.train() + loss = Tensor([0.0]) + for epoch in range(1, EPOCHS + 1): + for _X_batch in train_loader: + optimizer.zero_grad() + encoded, decoded = model(_X_batch) + decoded = decoded.view(-1, SEQ_LEN, self.x_train.shape[1]) + + loss = criterion(decoded, _X_batch) + loss.backward() + optimizer.step() + + if epoch % 5 == 0: + print(f"epoch : {epoch}, loss_mean : {loss.item():.7f}") + + def test_err(self): + with self.assertRaises(ValueError): + Conv1dVAE( + seq_len=SEQ_LEN, + n_features=2, + latent_dim=1, + loss_fn="random", + ) + with self.assertRaises(ModelInitializationError): + Conv1dVAE( + seq_len=SEQ_LEN, + n_features=2, + latent_dim=1, + conv_channels=(8, 4, 2, 1), + ) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/models/vae/test_trainer.py b/tests/models/vae/test_trainer.py new file mode 100644 index 00000000..34821eff --- /dev/null +++ b/tests/models/vae/test_trainer.py @@ -0,0 +1,10 @@ +import unittest + + +class MyTestCase(unittest.TestCase): + def test_something(self): + self.assertEqual(True, False) # add assertion here + + +if __name__ == "__main__": + unittest.main()