import copy
import os
from functools import partial
from typing import Optional, Union

import matplotlib.pyplot as plt
import numpy as np
import torch
import torchvision
from scipy.spatial.distance import pdist
from scipy.stats import gaussian_kde
from torch import nn
from torch.utils.data import Dataset
from tqdm.notebook import tqdm

from frouros.callbacks import PermutationTestDistanceBased
from frouros.detectors.data_drift import MMD
from frouros.utils.kernels import rbf_kernel
plt.rcParams.update({"font.size": 12})

MNIST dataset#

A more advanced example is shown below, where MMD [1] is applied as a drift detector to the well-known MNIST image dataset.

The goal is to apply GaussianBlur and ElasticTransform transformations to some of the images in order to simulate a drift process at the feature level (data drift) that would likely end up affecting the performance of a hypothetical machine learning model.

Set seed for reproducibility#

To ensure reproducibility, a set_seed function is used.

def set_seed(seed=31):
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False
    torch.manual_seed(seed)
    torch.set_num_threads(1)
    np.random.seed(seed)
    os.environ["PYTHONHASHSEED"] = str(seed)


seed = 31
set_seed(seed=seed)

Set device for PyTorch#

The device is set to GPU if available; otherwise, it is set to CPU.

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

Data split#

To avoid any bias between the original MNIST dataset and the test dataset, the train and test datasets are merged, shuffle and then split again into train and test. The train dataset is further split into a model dataset, an autoencoder dataset, and a reference dataset. The model dataset is used to train a CNN model, the autoencoder dataset is used to train an Autoencoder, and the reference dataset is used as a reference for the MMD detector. The test dataset is used to test the CNN model and the MMD detector.

Therefore, the data is split as follows among the different components:

  • Model: These images are exclusively used to train a CNN model. The model dataset consists of 40,000 samples.

  • Autoencoder: These images are exclusively used to train an autoencoder, reducing the dimensionality from 784 (28x28x1) to 5 dimensions through the encoder function \(\phi : \mathbb{R}^{28x28x1} \rightarrow \mathbb{R}^{5}\). The autoencoder dataset consists of 19,000 samples.

  • Reference: These images represent samples from the training distribution and serve as a reference for the data drift detector. There are 1,000 samples in this set.

  • Test: These images may belong to the original distribution of the data or undergo some of the transformations mentioned above. This set consists of 10,000 samples.

transform = torchvision.transforms.Compose(
    [
        # Convert images to the range [0.0, 1.0] (normalize)
        torchvision.transforms.ToTensor(),
    ]
)

train_original_dataset = torchvision.datasets.MNIST(
    root="/tmp/mnist/train/",
    train=True,
    download=True,
    transform=transform,
)

test_original_dataset = torchvision.datasets.MNIST(
    root="/tmp/mnist/test/",
    train=False,
    download=True,
    transform=transform,
)

# Merge train and test datasets to avoid bias and split them again
dataset = torch.utils.data.ConcatDataset(
    datasets=[
        train_original_dataset,
        test_original_dataset,
    ]
)
train_dataset, test_dataset = torch.utils.data.random_split(
    dataset=dataset,
    lengths=[
        len(train_original_dataset),
        len(test_original_dataset),
    ],
)

model_dataset_size = 40000
autoencoder_dataset_size = 19000
reference_dataset_size = (
    len(train_dataset) - model_dataset_size - autoencoder_dataset_size
)

model_dataset, autoencoder_dataset, reference_dataset = torch.utils.data.random_split(
    dataset=train_dataset,
    lengths=[
        model_dataset_size,
        autoencoder_dataset_size,
        reference_dataset_size,
    ],
)

# Split model dataset into train and validation
model_dataset_size = len(model_dataset)
model_train_dataset, model_val_dataset = torch.utils.data.random_split(
    dataset=model_dataset,
    lengths=[
        int(model_dataset_size * 0.8),
        int(model_dataset_size * 0.2),
    ],
)

batch_size = 128

model_train_data_loader = torch.utils.data.DataLoader(
    dataset=model_train_dataset, batch_size=batch_size, shuffle=True
)

model_val_data_loader = torch.utils.data.DataLoader(
    dataset=model_val_dataset, batch_size=batch_size, shuffle=False
)

autoencoder_dataset_size = len(autoencoder_dataset)
autoencoder_train_dataset, autoencoder_val_dataset = torch.utils.data.random_split(
    dataset=autoencoder_dataset,
    lengths=[
        int(autoencoder_dataset_size * 0.8),
        int(autoencoder_dataset_size * 0.2),
    ],
)

autoencoder_train_data_loader = torch.utils.data.DataLoader(
    dataset=autoencoder_train_dataset,
    batch_size=batch_size,
    shuffle=True,
)

autoencoder_val_data_loader = torch.utils.data.DataLoader(
    dataset=autoencoder_val_dataset,
    batch_size=batch_size,
    shuffle=False,
)
Downloading http://yann.lecun.com/exdb/mnist/train-images-idx3-ubyte.gz
Failed to download (trying next):
HTTP Error 403: Forbidden

Downloading https://ossci-datasets.s3.amazonaws.com/mnist/train-images-idx3-ubyte.gz
Downloading https://ossci-datasets.s3.amazonaws.com/mnist/train-images-idx3-ubyte.gz to /tmp/mnist/train/MNIST/raw/train-images-idx3-ubyte.gz
Extracting /tmp/mnist/train/MNIST/raw/train-images-idx3-ubyte.gz to /tmp/mnist/train/MNIST/raw

Downloading http://yann.lecun.com/exdb/mnist/train-labels-idx1-ubyte.gz
Failed to download (trying next):
HTTP Error 403: Forbidden

Downloading https://ossci-datasets.s3.amazonaws.com/mnist/train-labels-idx1-ubyte.gz
Downloading https://ossci-datasets.s3.amazonaws.com/mnist/train-labels-idx1-ubyte.gz to /tmp/mnist/train/MNIST/raw/train-labels-idx1-ubyte.gz
Extracting /tmp/mnist/train/MNIST/raw/train-labels-idx1-ubyte.gz to /tmp/mnist/train/MNIST/raw

Downloading http://yann.lecun.com/exdb/mnist/t10k-images-idx3-ubyte.gz
Failed to download (trying next):
HTTP Error 403: Forbidden

Downloading https://ossci-datasets.s3.amazonaws.com/mnist/t10k-images-idx3-ubyte.gz
Downloading https://ossci-datasets.s3.amazonaws.com/mnist/t10k-images-idx3-ubyte.gz to /tmp/mnist/train/MNIST/raw/t10k-images-idx3-ubyte.gz
Extracting /tmp/mnist/train/MNIST/raw/t10k-images-idx3-ubyte.gz to /tmp/mnist/train/MNIST/raw

Downloading http://yann.lecun.com/exdb/mnist/t10k-labels-idx1-ubyte.gz
Failed to download (trying next):
HTTP Error 403: Forbidden

Downloading https://ossci-datasets.s3.amazonaws.com/mnist/t10k-labels-idx1-ubyte.gz
Downloading https://ossci-datasets.s3.amazonaws.com/mnist/t10k-labels-idx1-ubyte.gz to /tmp/mnist/train/MNIST/raw/t10k-labels-idx1-ubyte.gz
Extracting /tmp/mnist/train/MNIST/raw/t10k-labels-idx1-ubyte.gz to /tmp/mnist/train/MNIST/raw

Downloading http://yann.lecun.com/exdb/mnist/train-images-idx3-ubyte.gz
Failed to download (trying next):
HTTP Error 403: Forbidden

Downloading https://ossci-datasets.s3.amazonaws.com/mnist/train-images-idx3-ubyte.gz
Downloading https://ossci-datasets.s3.amazonaws.com/mnist/train-images-idx3-ubyte.gz to /tmp/mnist/test/MNIST/raw/train-images-idx3-ubyte.gz
Extracting /tmp/mnist/test/MNIST/raw/train-images-idx3-ubyte.gz to /tmp/mnist/test/MNIST/raw

Downloading http://yann.lecun.com/exdb/mnist/train-labels-idx1-ubyte.gz
Failed to download (trying next):
HTTP Error 403: Forbidden

Downloading https://ossci-datasets.s3.amazonaws.com/mnist/train-labels-idx1-ubyte.gz
Downloading https://ossci-datasets.s3.amazonaws.com/mnist/train-labels-idx1-ubyte.gz to /tmp/mnist/test/MNIST/raw/train-labels-idx1-ubyte.gz
Extracting /tmp/mnist/test/MNIST/raw/train-labels-idx1-ubyte.gz to /tmp/mnist/test/MNIST/raw

Downloading http://yann.lecun.com/exdb/mnist/t10k-images-idx3-ubyte.gz
Failed to download (trying next):
HTTP Error 403: Forbidden

Downloading https://ossci-datasets.s3.amazonaws.com/mnist/t10k-images-idx3-ubyte.gz
Downloading https://ossci-datasets.s3.amazonaws.com/mnist/t10k-images-idx3-ubyte.gz to /tmp/mnist/test/MNIST/raw/t10k-images-idx3-ubyte.gz
Extracting /tmp/mnist/test/MNIST/raw/t10k-images-idx3-ubyte.gz to /tmp/mnist/test/MNIST/raw

Downloading http://yann.lecun.com/exdb/mnist/t10k-labels-idx1-ubyte.gz
Failed to download (trying next):
HTTP Error 403: Forbidden

Downloading https://ossci-datasets.s3.amazonaws.com/mnist/t10k-labels-idx1-ubyte.gz
Downloading https://ossci-datasets.s3.amazonaws.com/mnist/t10k-labels-idx1-ubyte.gz to /tmp/mnist/test/MNIST/raw/t10k-labels-idx1-ubyte.gz
Extracting /tmp/mnist/test/MNIST/raw/t10k-labels-idx1-ubyte.gz to /tmp/mnist/test/MNIST/raw
100%|██████████| 9912422/9912422 [00:01<00:00, 5723957.20it/s]
100%|██████████| 28881/28881 [00:00<00:00, 1017656.25it/s]
100%|██████████| 1648877/1648877 [00:00<00:00, 9175847.61it/s]
100%|██████████| 4542/4542 [00:00<00:00, 8272048.97it/s]
100%|██████████| 9912422/9912422 [00:00<00:00, 28723140.09it/s]
100%|██████████| 28881/28881 [00:00<00:00, 1010719.09it/s]
100%|██████████| 1648877/1648877 [00:00<00:00, 2671756.09it/s]
100%|██████████| 4542/4542 [00:00<00:00, 7462016.75it/s]

Some images from the model dataset are shown below.

def imshow(img):
    npimg = img.numpy()
    fig = plt.figure(
        figsize=(10, 10),
        dpi=300,
    )
    plt.imshow(np.transpose(npimg, (1, 2, 0)), cmap=plt.cm.gray)
    plt.axis("off")
    fig.tight_layout()
    plt.show()


dataiter = iter(model_train_data_loader)
images, _ = next(dataiter)
imshow(torchvision.utils.make_grid(images[:32]))
../../_images/277f6c0da6463d8fcca5214985506134684f1f7481fecc9aab47be5289743ab1.png

Model definition#

The CNN model is defined with two convolutional layers, two max-pooling layers, and two fully connected layers.

# Define the CNN model
class CNN(nn.Module):
    def __init__(self):
        super(CNN, self).__init__()
        self.conv1 = nn.Conv2d(1, 32, kernel_size=3, padding=1)
        self.relu1 = nn.ReLU()
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, padding=1)
        self.relu2 = nn.ReLU()
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)
        self.fc1 = nn.Linear(64 * 7 * 7, 128)
        self.relu3 = nn.ReLU()
        self.dropout = nn.Dropout(0.2)
        self.fc2 = nn.Linear(128, 10)

    def forward(self, x):
        x = self.conv1(x)
        x = self.relu1(x)
        x = self.pool(x)
        x = self.conv2(x)
        x = self.relu2(x)
        x = self.pool(x)
        x = x.view(-1, 64 * 7 * 7)
        x = self.fc1(x)
        x = self.relu3(x)
        x = self.dropout(x)
        x = self.fc2(x)
        return x

Training the model#

The model is trained for 20 epochs, with early stopping. The training and validation losses are printed every 100 batches of images. The best model on the validation set is saved.

model = CNN().to(device)

criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.AdamW(model.parameters(), lr=1e-3)

epochs = 20
patience = 3
patience_counter = patience
best_val_loss = np.inf
best_model = None

for epoch in range(epochs):
    running_loss = 0.0
    print(f"Epoch {epoch + 1}:")

    # Training
    model.train()
    for i, (inputs, labels) in enumerate(model_train_data_loader, 0):
        inputs, labels = inputs.to(device), labels.to(device)

        optimizer.zero_grad()

        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()

        if i % 100 == 99:
            print(f"\tTraining loss: {running_loss / inputs.size(0):.4f}")
            running_loss = 0.0

    # Validation
    model.eval()
    val_loss = 0.0
    with torch.no_grad():
        for inputs, labels in model_val_data_loader:
            inputs, labels = inputs.to(device), labels.to(device)

            outputs = model(inputs)
            loss = criterion(outputs, labels)

            val_loss += loss.item()

    val_loss /= len(model_val_data_loader)
    print(f"\tValidation loss: {val_loss:.4f}")

    # Early stopping and save best model
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        best_model = copy.deepcopy(model)
        patience_counter = patience
    else:
        patience_counter -= 1
        if patience_counter == 0:
            print(f"Early stopping at epoch {epoch + 1}")
            break

model = best_model
Epoch 1:
	Training loss: 0.5691
	Training loss: 0.1446
	Validation loss: 0.0986
Epoch 2:
	Training loss: 0.0876
	Training loss: 0.0736
	Validation loss: 0.0598
Epoch 3:
	Training loss: 0.0533
	Training loss: 0.0513
	Validation loss: 0.0567
Epoch 4:
	Training loss: 0.0425
	Training loss: 0.0442
	Validation loss: 0.0451
Epoch 5:
	Training loss: 0.0394
	Training loss: 0.0347
	Validation loss: 0.0386
Epoch 6:
	Training loss: 0.0266
	Training loss: 0.0270
	Validation loss: 0.0402
Epoch 7:
	Training loss: 0.0285
	Training loss: 0.0227
	Validation loss: 0.0347
Epoch 8:
	Training loss: 0.0189
	Training loss: 0.0215
	Validation loss: 0.0315
Epoch 9:
	Training loss: 0.0169
	Training loss: 0.0181
	Validation loss: 0.0357
Epoch 10:
	Training loss: 0.0132
	Training loss: 0.0161
	Validation loss: 0.0363
Epoch 11:
	Training loss: 0.0131
	Training loss: 0.0157
	Validation loss: 0.0531
Early stopping at epoch 11

Autoencoder definition#

The autoencoder is composed of the following parts:

First, an encoder is implemented with a straightforward architecture. It consists of a convolutional section, followed by a flatten layer, and finally, a pair of standard hidden layers that map the output of the convolutional section to the desired embedding dimension.

The decoder is implemented with a similar architecture to the encoder, but in reverse. It consists of a pair of standard hidden layers, followed by an unflatten layer, and finally, a convolutional section that maps the output of the hidden layers to the original dimensionality of the input.

class Encoder(nn.Module):
    def __init__(self, embedding_dim: int):
        super().__init__()

        self.encoder_conv = nn.Sequential(
            nn.Conv2d(1, 8, 3, stride=2, padding=1),
            nn.ReLU(True),
            nn.Conv2d(8, 16, 3, stride=2, padding=1),
            nn.BatchNorm2d(16),
            nn.ReLU(True),
            nn.Conv2d(16, 32, 3, stride=2, padding=0),
            nn.ReLU(True),
        )

        self.flatten = nn.Flatten(start_dim=1)

        self.encoder_lin = nn.Sequential(
            nn.Linear(3 * 3 * 32, 128), nn.ReLU(True), nn.Linear(128, embedding_dim)
        )

    def forward(self, x):
        x = self.encoder_conv(x)
        x = self.flatten(x)
        x = self.encoder_lin(x)
        return x
class Decoder(nn.Module):
    def __init__(self, embedding_dim: int):
        super().__init__()

        self.decoder_lin = nn.Sequential(
            nn.Linear(embedding_dim, 128),
            nn.ReLU(True),
            nn.Linear(128, 3 * 3 * 32),
            nn.ReLU(True),
        )

        self.unflatten = nn.Unflatten(dim=1, unflattened_size=(32, 3, 3))

        self.decoder_conv = nn.Sequential(
            nn.ConvTranspose2d(32, 16, 3, stride=2, output_padding=0),
            nn.BatchNorm2d(16),
            nn.ReLU(True),
            nn.ConvTranspose2d(16, 8, 3, stride=2, padding=1, output_padding=1),
            nn.BatchNorm2d(8),
            nn.ReLU(True),
            nn.ConvTranspose2d(8, 1, 3, stride=2, padding=1, output_padding=1),
        )

    def forward(self, x):
        x = self.decoder_lin(x)
        x = self.unflatten(x)
        x = self.decoder_conv(x)
        x = torch.sigmoid(x)
        return x
class Autoencoder(nn.Module):
    def __init__(self, embedding_dim: int) -> None:
        super().__init__()
        self.encoder = Encoder(
            embedding_dim=embedding_dim,
        )
        self.decoder = Decoder(
            embedding_dim=embedding_dim,
        )

    def forward(self, x):
        encoded = self.encoder(x)
        decoded = self.decoder(encoded)
        return encoded, decoded

Training the autoencoder#

To prevent the autoencoder from simply learning to copy the input to the output (acting as an identity function), we implement the contractive loss as a regularizer to learn a meaningful representation of the input in the embedding dimensions [3]. For optimization, Adam [2] is used.

def contractive_loss(outputs_e, outputs, inputs, lambda_=1e-4):
    assert (
        outputs.shape == inputs.shape
    ), f"outputs.shape : {outputs.shape} != inputs.shape : {inputs.shape}"
    criterion = nn.MSELoss()
    loss1 = criterion(outputs, inputs)

    outputs_e.backward(torch.ones(outputs_e.size()), retain_graph=True)
    loss2 = torch.sqrt(torch.sum(torch.pow(inputs.grad, 2)))
    inputs.grad.data.zero_()

    loss = loss1 + (lambda_ * loss2)
    return loss

The embedding dimension is set to 5, and the autoencoder is trained for 20 epochs, with early stopping. The training and validation losses are printed every 100 batches of images.

embedding_dim = 5
autoencoder = Autoencoder(
    embedding_dim=embedding_dim,
).to(device)

optimizer = torch.optim.AdamW(autoencoder.parameters(), lr=1e-3)

epochs = 20
patience = 3
patience_counter = patience
best_val_loss = np.inf
best_autoencoder = None

for epoch in range(epochs):
    print(f"Epoch {epoch + 1}:")
    running_loss = 0.0

    # Training
    autoencoder.train()
    for i, (inputs, _) in enumerate(autoencoder_train_data_loader, 0):
        inputs = inputs.to(device)
        inputs.requires_grad = True
        inputs.retain_grad()

        outputs_e, outputs = autoencoder(inputs)
        loss = contractive_loss(outputs_e, outputs, inputs)

        inputs.requires_grad = False

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        running_loss += loss.item()

        if i % 100 == 99:
            print(f"\tTraining loss: {running_loss / inputs.size(0):.4f}")
            running_loss = 0.0

    # Validation
    autoencoder.eval()
    val_loss = 0.0
    # with torch.no_grad() is not used here
    # to allow for the calculation of the gradient of the inputs
    for inputs, _ in autoencoder_val_data_loader:
        inputs = inputs.to(device)
        inputs.requires_grad = True
        inputs.retain_grad()

        outputs_e, outputs = autoencoder(inputs)
        loss = contractive_loss(outputs_e, outputs, inputs)

        inputs.requires_grad = False

        val_loss += loss.item()

    val_loss /= len(autoencoder_val_data_loader)
    print(f"\tValidation loss: {val_loss:.4f}")

    # Early stopping and save best model
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        best_autoencoder = copy.deepcopy(autoencoder)
        patience_counter = patience
    else:
        patience_counter -= 1
        if patience_counter == 0:
            print(f"Early stopping at epoch {epoch + 1}")
            break

autoencoder = best_autoencoder
Epoch 1:
	Training loss: 0.1349
	Validation loss: 0.1108
Epoch 2:
	Training loss: 0.0712
	Validation loss: 0.0701
Epoch 3:
	Training loss: 0.0474
	Validation loss: 0.0495
Epoch 4:
	Training loss: 0.0352
	Validation loss: 0.0403
Epoch 5:
	Training loss: 0.0296
	Validation loss: 0.0356
Epoch 6:
	Training loss: 0.0268
	Validation loss: 0.0330
Epoch 7:
	Training loss: 0.0253
	Validation loss: 0.0318
Epoch 8:
	Training loss: 0.0241
	Validation loss: 0.0308
Epoch 9:
	Training loss: 0.0235
	Validation loss: 0.0303
Epoch 10:
	Training loss: 0.0230
	Validation loss: 0.0294
Epoch 11:
	Training loss: 0.0225
	Validation loss: 0.0292
Epoch 12:
	Training loss: 0.0222
	Validation loss: 0.0287
Epoch 13:
	Training loss: 0.0219
	Validation loss: 0.0284
Epoch 14:
	Training loss: 0.0217
	Validation loss: 0.0282
Epoch 15:
	Training loss: 0.0213
	Validation loss: 0.0278
Epoch 16:
	Training loss: 0.0212
	Validation loss: 0.0278
Epoch 17:
	Training loss: 0.0210
	Validation loss: 0.0275
Epoch 18:
	Training loss: 0.0209
	Validation loss: 0.0273
Epoch 19:
	Training loss: 0.0206
	Validation loss: 0.0273
Epoch 20:
	Training loss: 0.0204
	Validation loss: 0.0273

A sample is shown below, featuring the reconstruction made by the autoencoder from the 5 embedding dimensions with a sample not seen during training.

sample = reference_dataset[1][0]  # Sample not seen by the autoencoder during training
_, X_reconstructed = autoencoder(sample.unsqueeze(0))
X_reconstructed = torch.squeeze(X_reconstructed, axis=0)

fig, axs = plt.subplots(
    nrows=1,
    ncols=2,
    figsize=(10, 10),
    dpi=300,
)

axs[0].set_title("Input")
axs[0].imshow(np.transpose(sample, (1, 2, 0)), cmap=plt.cm.gray)
axs[0].axis("off")

axs[1].set_title("Output (reconstruction)")
axs[1].imshow(np.transpose(X_reconstructed.detach(), (1, 2, 0)), cmap=plt.cm.gray)
axs[1].axis("off")

fig.tight_layout()
plt.show()
../../_images/02e930aa3d0bb1643567c9e28332ac97028794cc5b9158003572f7714c0dbad2.png

Custom dataset definition#

To apply each transformation to the images, a custom dataset class is utilized.

class CustomMNIST(Dataset):
    def __init__(
        self,
        subset,
        transform: Optional[
            Union[torch.nn.Module, torchvision.transforms.Compose]
        ] = None,
    ) -> None:
        self.subset = subset
        self.transform = transform

    def __getitem__(self, index):
        x, y = self.subset[index]
        if self.transform:
            x = self.transform(x)
        return x, y

    def __len__(self) -> int:
        return len(self.subset)

Apply GaussianBlur and ElasticTransform transformations to the test dataset.

gaussian_blur_dataset = CustomMNIST(
    subset=test_dataset,
    transform=torchvision.transforms.Compose(
        [
            torchvision.transforms.ToPILImage(),
            torchvision.transforms.GaussianBlur(
                kernel_size=(5, 9),
                sigma=1.5,
            ),
            torchvision.transforms.ToTensor(),
        ],
    ),
)

elastic_transform_dataset = CustomMNIST(
    subset=test_dataset,
    transform=torchvision.transforms.Compose(
        [
            torchvision.transforms.ToPILImage(),
            torchvision.transforms.ElasticTransform(
                alpha=100.0,
                sigma=5.0,
            ),
            torchvision.transforms.ToTensor(),
        ],
    ),
)

Some images from the original, GaussianBlur, and ElasticTransform datasets are shown below to see the effect of the transformations.

fig, axs = plt.subplots(
    nrows=1,
    ncols=3,
    figsize=(10, 10),
    dpi=300,
)

for ax, img, title in zip(
    axs.flat,
    [test_dataset[0][0], gaussian_blur_dataset[0][0], elastic_transform_dataset[0][0]],
    [
        "Original",
        "Gaussian Blur",
        "Elastic Transform",
    ],
):
    ax.imshow(np.transpose(img, (1, 2, 0)), cmap=plt.cm.gray)
    ax.set_title(title)
    ax.axis("off")

fig.tight_layout()
plt.show()
../../_images/8bae9dbc1d9b26f4b07605017f6a45ef5e153a6958b6ba9d8278eb02a07e7be6.png

Test the model#

The accuracy of the model is tested with the original, GaussianBlur, and ElasticTransform datasets. The goal is to verify if the model is affected by the transformations.

def test_model_accuracy(model, data_loader):
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for data in data_loader:
            images, labels = data
            images = images.to(device)
            labels = labels.to(device)
            outputs = model(images)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    accuracy = 100 * correct / total
    return accuracy
# Test model with test dataset
model_test_original_data_loader = torch.utils.data.DataLoader(
    dataset=test_dataset,
    batch_size=batch_size,
    shuffle=False,
)

model_test_gaussian_blur_data_loader = torch.utils.data.DataLoader(
    dataset=gaussian_blur_dataset,
    batch_size=batch_size,
    shuffle=False,
)

model_test_elastic_transform_data_loader = torch.utils.data.DataLoader(
    dataset=elastic_transform_dataset,
    batch_size=batch_size,
    shuffle=False,
)
for data_loader, type_ in zip(
    [
        model_test_original_data_loader,
        model_test_gaussian_blur_data_loader,
        model_test_elastic_transform_data_loader,
    ],
    [
        "Original",
        "Gaussian Blur",
        "Elastic Transform",
    ],
):
    accuracy = test_model_accuracy(
        model=model,
        data_loader=data_loader,
    )
    print(f"{type_} accuracy: {accuracy:.2f}%")
Original accuracy: 99.06%
Gaussian Blur accuracy: 97.07%
Elastic Transform accuracy: 82.88%

As we can see, the model’s accuracy is affected by the transformations, with the GaussianBlur and ElasticTransform datasets showing a significant decrease in accuracy with respect to the original (unmodified) dataset.

Data drift detection#

Finally, the data drift detector is applied to the original, GaussianBlur, and ElasticTransform datasets. The goal is to verify if the detector can detect the drift caused by the transformations.

Each dataset is converted to a numpy array to be used as input for the MMD detector.

X_ref_sample = np.array(
    [X_sample.tolist() for X_sample, _ in tqdm(reference_dataset)]
).astype(np.float32)

idx_test_sample = np.random.choice(
    np.arange(0, len(test_dataset)), size=reference_dataset_size, replace=False
)
X_test_original_sample = np.array(
    [
        X_sample.tolist()
        for X_sample, _ in tqdm(torch.utils.data.Subset(test_dataset, idx_test_sample))
    ]
).astype(np.float32)
X_test_gaussian_blur_sample = np.array(
    [
        X_sample.tolist()
        for X_sample, _ in tqdm(
            torch.utils.data.Subset(gaussian_blur_dataset, idx_test_sample)
        )
    ]
).astype(np.float32)
X_test_elastic_transform_sample = np.array(
    [
        X_sample.tolist()
        for X_sample, _ in tqdm(
            torch.utils.data.Subset(elastic_transform_dataset, idx_test_sample)
        )
    ]
).astype(np.float32)

The reference dataset and the test datasets are encoded using the autoencoder. Therefore, the original 784 dimensions are reduced to 5 dimensions.

with torch.no_grad():
    # Reference dataset
    X_ref_encoded = autoencoder.encoder(torch.Tensor(X_ref_sample)).numpy()
    # Test datasets
    X_test_original_encoded = autoencoder.encoder(
        torch.Tensor(X_test_original_sample)
    ).numpy()
    X_test_gaussian_blur_encoded = autoencoder.encoder(
        torch.Tensor(X_test_gaussian_blur_sample)
    ).numpy()
    X_test_elastic_transform_encoded = autoencoder.encoder(
        torch.Tensor(X_test_elastic_transform_sample)
    ).numpy()

The distribution of each embedding dimension is plotted for each test dataset and the reference. The goal is to visually inspect the differences between the distributions of the reference and each test dataset.

x_values = 100
fig, axs = plt.subplots(
    nrows=embedding_dim,
    ncols=1,
    figsize=(8, 10),
    sharex=True,
    dpi=300,
)

for i, ax in enumerate(axs.flat):
    for sample, type_ in zip(
        [
            X_ref_encoded[:, i],
            X_test_original_encoded[:, i],
            X_test_gaussian_blur_encoded[:, i],
            X_test_elastic_transform_encoded[:, i],
        ],
        [
            "Reference",
            "Original",
            "Gaussian Blur",
            "Elastic Transform",
        ],
    ):
        xs = np.linspace(np.min(sample), np.max(sample), num=x_values)
        permutation_tests_density = gaussian_kde(sample).evaluate(xs)
        ax.plot(
            xs,
            permutation_tests_density,
            alpha=0.5,
            label=type_,
            linestyle="--" if type_ != "Reference" else None,
        )
    ax.set_title(f"Dim. {i+1}")
    ax.legend(fontsize=8)

fig.suptitle("Embedding distributions")
fig.supxlabel("Value")
fig.supylabel("Density")
fig.tight_layout()

plt.show()
../../_images/9cddd9fe4a63ef0d3f7fe768d14f9d90eeb519a13a02364fa38ecb7da4017442.png

We can see that the distributions of the GaussianBlur and ElasticTransform datasets are different from the reference dataset. The original dataset, however, has a distribution that is similar to the reference dataset for each embedding dimension.

A significance level of \(\alpha = 0.01\) is set for the hypothesis test.

alpha = 0.01

Maximum Mean Discrepancy (MMD) [1], imported from Frouros, is utilized with a Radial Basis Function kernel (RBF), set by default in MMD. In addition to calculating the corresponding MMD statistic, the p-value is estimated using a permutation test with 5,000 permutations.

Note: Since a rbf kernel is used, the median of the pairwise distances between the reference dataset is used as the bandwidth parameter \(\sigma\).

sigma = np.median(
    pdist(
        X=X_ref_encoded,
        metric="euclidean",
    ),
)
sigma
2.277419975242692
num_permutations = 5000

detector = MMD(
    kernel=partial(
        rbf_kernel,
        sigma=sigma,
    ),
    callbacks=[
        PermutationTestDistanceBased(
            num_permutations=num_permutations,
            random_state=seed,
            num_jobs=-1,
            method="exact",
            name="permutation_test",
            verbose=False,
        ),
    ],
)
_ = detector.fit(X=X_ref_encoded)
idx = 10

fig, axs = plt.subplots(
    nrows=1,
    ncols=3,
    figsize=(10, 10),
    dpi=300,
)

axs[0].set_title("Original")
axs[0].imshow(np.transpose(X_test_original_sample[idx], (1, 2, 0)), cmap=plt.cm.gray)
axs[0].axis("off")

axs[1].set_title("Gaussian Blur")
axs[1].imshow(
    np.transpose(X_test_gaussian_blur_sample[idx], (1, 2, 0)), cmap=plt.cm.gray
)
axs[1].axis("off")

axs[2].set_title("Elastic Transform")
axs[2].imshow(
    np.transpose(X_test_elastic_transform_sample[idx], (1, 2, 0)), cmap=plt.cm.gray
)
axs[2].axis("off")

fig.tight_layout()
plt.show()
../../_images/8c4abbd00c9201d9b63899ee34ee5885d1e18c4a4761e6837cee5fce826e791e.png

The MMD statistic and p-value are calculated for each test dataset. The p-value is used to determine if the test dataset is significantly different from the reference dataset. If the p-value is less than or equal to the significance level \(\alpha\), the test dataset is considered to exhibit drift.

permutation_test_logs = {}

for sample, type_ in zip(
    [
        X_test_original_encoded,
        X_test_gaussian_blur_encoded,
        X_test_elastic_transform_encoded,
    ],
    [
        "Original (unmodified)",
        "Gaussian Blur",
        "Elastic Transform",
    ],
):
    mmd, callbacks_logs = detector.compare(X=sample)
    permutation_test_logs[type_] = copy.copy(callbacks_logs["permutation_test"])
    mmd, p_value = mmd.distance, callbacks_logs["permutation_test"]["p_value"]
    drift = p_value <= alpha
    print(
        f"{type_}:\n\tMMD statistic={round(mmd, 8)},"
        f" p-value={round(p_value, 8)},"
        f" drift={drift}"
    )
Original (unmodified):
	MMD statistic=-0.00023613, p-value=0.67466457, drift=False
Gaussian Blur:
	MMD statistic=0.00809993, p-value=0.00019946, drift=True
Elastic Transform:
	MMD statistic=0.00679139, p-value=0.00019946, drift=True

As expected, the GaussianBlur and ElasticTransform datasets exhibit a significant difference (drift) from the reference dataset. Conversely, unmodified samples are confirmed to be from the same distribution as the reference dataset (no drift).

Finally, the MMD statistic and permutation test results are plotted for each test dataset.

num_bins = 50
x_values = 100
num_percentile = 100 - alpha * 100

fig, axs = plt.subplots(
    nrows=3,
    ncols=1,
    figsize=(10, 12),
    sharex=True,
    sharey=True,
    dpi=300,
)

for ax, (type_, permutation_test) in zip(
    axs.flat,
    permutation_test_logs.items(),
):
    permutation_tests = permutation_test["permuted_statistics"]
    observed_statistic = permutation_test["observed_statistic"]
    p_value = permutation_test["p_value"]

    ax.hist(permutation_tests, bins=num_bins, density=True)
    xs = np.linspace(min(permutation_tests), max(permutation_tests), num=x_values)
    permutation_tests_density = gaussian_kde(permutation_tests).evaluate(xs)
    ax.plot(xs, permutation_tests_density, label="KDE")
    ax.axvline(
        observed_statistic, color="green", linestyle="--", label="Observed distance"
    )
    ax.set_title(type_)
    drift = p_value <= alpha
    drift_str = (
        f"Drift\np-value$\leq${alpha}"
        if drift
        else f"No drift\np-value={round(p_value, 4)}"
    )
    ax.text(
        0.7,
        0.875,
        drift_str,
        transform=ax.transAxes,
        fontsize=8,
        bbox={
            "boxstyle": "round",
            "facecolor": "red" if drift else "green",
            "alpha": 0.5,
        },
    )
    percentile = np.percentile(permutation_tests, q=num_percentile)
    ax.axvline(percentile, color="red", linestyle="--", label="Significance threshold")
    ax.legend(fontsize=8, loc="upper right")

fig.supxlabel("MMD²")
fig.supylabel("Density")
fig.tight_layout()
plt.show()
../../_images/812e16280a8273902b8143c66a2db2d145548ab63dabfcf3fc3f40ad689aad4c.png
[1] (1,2)

Arthur Gretton, Karsten M. Borgwardt, Malte J. Rasch, Bernhard Schölkopf, and Alexander Smola. A kernel two-sample test. Journal of Machine Learning Research, 13(25):723–773, 2012. URL: http://jmlr.org/papers/v13/gretton12a.html.

[2]

Diederik P Kingma and Jimmy Ba. Adam: a method for stochastic optimization. arXiv preprint arXiv:1412.6980, 2014.

[3]

Salah Rifai, Pascal Vincent, Xavier Muller, Xavier Glorot, and Yoshua Bengio. Contractive auto-encoders: explicit invariance during feature extraction. In Proceedings of the 28th international conference on international conference on machine learning, 833–840. 2011.