Subscribe to our newsletter
📬 Receive new lessons straight to your inbox (once a month) and join 40K+ developers in learning how to responsibly deliver value with ML.
While one-hot encoding allows us to preserve the structural information, it does poses two major disadvantages.
In this notebook, we're going to motivate the need for embeddings and how they address all the shortcomings of one-hot encoding. The main idea of embeddings is to have fixed length representations for the tokens in a text regardless of the number of tokens in the vocabulary. With one-hot encoding, each token is represented by an array of size vocab_size, but with embeddings, each token now has the shape embed_dim. The values in the representation will are not fixed binary values but rather, changing floating points allowing for fine-grained learned representations.
We can learn embeddings by creating our models in PyTorch but first, we're going to use a library that specializes in embeddings and topic modeling called Gensim.
1
2
3
4
5 | import nltk
nltk.download("punkt");
import numpy as np
import re
import urllib
|
1 | SEED = 1234
|
1
2 | # Set seed for reproducibility
np.random.seed(SEED)
|
1
2
3
4
5 | # Split text into sentences
tokenizer = nltk.data.load("tokenizers/punkt/english.pickle")
book = urllib.request.urlopen(url="https://raw.githubusercontent.com/GokuMohandas/Made-With-ML/main/datasets/harrypotter.txt")
sentences = tokenizer.tokenize(str(book.read()))
print (f"{len(sentences)} sentences")
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 | def preprocess(text):
"""Conditional preprocessing on our text."""
# Lower
text = text.lower()
# Spacing and filters
text = re.sub(r"([-;;.,!?<=>])", r" \1 ", text)
text = re.sub("[^A-Za-z0-9]+", " ", text) # remove non alphanumeric chars
text = re.sub(" +", " ", text) # remove multiple spaces
text = text.strip()
# Separate into word tokens
text = text.split(" ")
return text
|
1
2
3
4 | # Preprocess sentences
print (sentences[11])
sentences = [preprocess(sentence) for sentence in sentences]
print (sentences[11])
|
But how do we learn the embeddings the first place? The intuition behind embeddings is that the definition of a token doesn't depend on the token itself but on its context. There are several different ways of doing this:
All of these approaches involve create data to train our model on. Every word in a sentence becomes the target word and the context words are determines by a window. In the image below (skip-gram), the window size is 2 (2 words to the left and right of the target word). We repeat this for every sentence in our corpus and this results in our training data for the unsupervised task. This in an unsupervised learning technique since we don't have official labels for contexts. The idea is that similar target words will appear with similar contexts and we can learn this relationship by repeatedly training our mode with (context, target) pairs.
We can learn embeddings using any of these approaches above and some work better than others. You can inspect the learned embeddings but the best way to choose an approach is to empirically validate the performance on a supervised task.
When we have large vocabularies to learn embeddings for, things can get complex very quickly. Recall that the backpropagation with softmax updates both the correct and incorrect class weights. This becomes a massive computation for every backwards pass we do so a workaround is to use negative sampling which only updates the correct class and a few arbitrary incorrect classes (NEGATIVE_SAMPLING=20). We're able to do this because of the large amount of training data where we'll see the same word as the target class multiple times.
1
2
3 | import gensim
from gensim.models import KeyedVectors
from gensim.models import Word2Vec
|
1
2
3
4
5 | EMBEDDING_DIM = 100
WINDOW = 5
MIN_COUNT = 3 # Ignores all words with total frequency lower than this
SKIP_GRAM = 1 # 0 = CBOW
NEGATIVE_SAMPLING = 20
|
1
2
3
4
5
6 | # Super fast because of optimized C code under the hood
w2v = Word2Vec(
sentences=sentences, size=EMBEDDING_DIM,
window=WINDOW, min_count=MIN_COUNT,
sg=SKIP_GRAM, negative=NEGATIVE_SAMPLING)
print (w2v)
|
1
2 | # Vector for each word
w2v.wv.get_vector("potter")
|
1
2 | # Get nearest neighbors (excluding itself)
w2v.wv.most_similar(positive="scar", topn=5)
|
1
2
3 | # Saving and loading
w2v.wv.save_word2vec_format("model.bin", binary=True)
w2v = KeyedVectors.load_word2vec_format("model.bin", binary=True)
|
What happens when a word doesn't exist in our vocabulary? We could assign an UNK token which is used for all OOV (out of vocabulary) words or we could use FastText, which uses character-level n-grams to embed a word. This helps embed rare words, misspelled words, and also words that don't exist in our corpus but are similar to words in our corpus.
1 | from gensim.models import FastText
|
1
2
3
4
5 | # Super fast because of optimized C code under the hood
ft = FastText(sentences=sentences, size=EMBEDDING_DIM,
window=WINDOW, min_count=MIN_COUNT,
sg=SKIP_GRAM, negative=NEGATIVE_SAMPLING)
print (ft)
|
1
2 | # This word doesn't exist so the word2vec model will error out
w2v.wv.most_similar(positive="scarring", topn=5)
|
1
2 | # FastText will use n-grams to embed an OOV word
ft.wv.most_similar(positive="scarring", topn=5)
|
1
2
3 | # Save and loading
ft.wv.save("model.bin")
ft = KeyedVectors.load("model.bin")
|
We can learn embeddings from scratch using one of the approaches above but we can also leverage pretrained embeddings that have been trained on millions of documents. Popular ones include Word2Vec (skip-gram) or GloVe (global word-word co-occurrence). We can validate that these embeddings captured meaningful semantic relationships by confirming them.
1
2
3
4
5
6 | from gensim.scripts.glove2word2vec import glove2word2vec
from io import BytesIO
import matplotlib.pyplot as plt
from sklearn.decomposition import PCA
from urllib.request import urlopen
from zipfile import ZipFile
|
1
2 | # Arguments
EMBEDDING_DIM = 100
|
1
2
3
4
5
6 | def plot_embeddings(words, embeddings, pca_results):
for word in words:
index = embeddings.index2word.index(word)
plt.scatter(pca_results[index, 0], pca_results[index, 1])
plt.annotate(word, xy=(pca_results[index, 0], pca_results[index, 1]))
plt.show()
|
1
2
3
4 | # Unzip the file (may take ~3-5 minutes)
resp = urlopen("http://nlp.stanford.edu/data/glove.6B.zip")
zipfile = ZipFile(BytesIO(resp.read()))
zipfile.namelist()
|
1
2
3 | # Write embeddings to file
embeddings_file = "glove.6B.{0}d.txt".format(EMBEDDING_DIM)
zipfile.extract(embeddings_file)
|
1
2
3
4
5
6
7
8
9 | # Preview of the GloVe embeddings file
with open(embeddings_file, "r") as fp:
line = next(fp)
values = line.split()
word = values[0]
embedding = np.asarray(values[1:], dtype='float32')
print (f"word: {word}")
print (f"embedding:\n{embedding}")
print (f"embedding dim: {len(embedding)}")
|
1
2
3 | # Save GloVe embeddings to local directory in word2vec format
word2vec_output_file = "{0}.word2vec".format(embeddings_file)
glove2word2vec(embeddings_file, word2vec_output_file)
|
1
2 | # Load embeddings (may take a minute)
glove = KeyedVectors.load_word2vec_format(word2vec_output_file, binary=False)
|
1
2
3 | # (king - man) + woman = ?
# king - man = ? - woman
glove.most_similar(positive=["woman", "king"], negative=["man"], topn=5)
|
1
2 | # Get nearest neighbors (excluding itself)
glove.wv.most_similar(positive="goku", topn=5)
|
1
2
3
4 | # Reduce dimensionality for plotting
X = glove[glove.wv.vocab]
pca = PCA(n_components=2)
pca_results = pca.fit_transform(X)
|
1
2
3
4 | # Visualize
plot_embeddings(
words=["king", "queen", "man", "woman"], embeddings=glove,
pca_results=pca_results)
|
1
2 | # Bias in embeddings
glove.most_similar(positive=["woman", "doctor"], negative=["man"], topn=5)
|
Let's set our seed and device for our main task.
1
2
3
4
5 | import numpy as np
import pandas as pd
import random
import torch
import torch.nn as nn
|
1 | SEED = 1234
|
1
2
3
4
5
6
7 | def set_seeds(seed=1234):
"""Set seeds for reproducibility."""
np.random.seed(seed)
random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
torch.cuda.manual_seed_all(seed) # multi-GPU
|
1
2 | # Set seeds for reproducibility
set_seeds(seed=SEED)
|
1
2
3
4
5
6
7
8 | # Set device
cuda = True
device = torch.device("cuda" if (
torch.cuda.is_available() and cuda) else "cpu")
torch.set_default_tensor_type("torch.FloatTensor")
if device.type == "cuda":
torch.set_default_tensor_type("torch.cuda.FloatTensor")
print (device)
|
We will download the AG News dataset, which consists of 120K text samples from 4 unique classes (Business, Sci/Tech, Sports, World)
1
2
3
4
5 | # Load data
url = "https://raw.githubusercontent.com/GokuMohandas/Made-With-ML/main/datasets/news.csv"
df = pd.read_csv(url, header=0) # load
df = df.sample(frac=1).reset_index(drop=True) # shuffle
df.head()
|
We're going to clean up our input data first by doing operations such as lower text, removing stop (filler) words, filters using regular expressions, etc.
1
2
3
4 | import nltk
from nltk.corpus import stopwords
from nltk.stem import PorterStemmer
import re
|
1
2
3
4 | nltk.download("stopwords")
STOPWORDS = stopwords.words("english")
print (STOPWORDS[:5])
porter = PorterStemmer()
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 | def preprocess(text, stopwords=STOPWORDS):
"""Conditional preprocessing on our text unique to our task."""
# Lower
text = text.lower()
# Remove stopwords
pattern = re.compile(r"\b(" + r"|".join(stopwords) + r")\b\s*")
text = pattern.sub("", text)
# Remove words in parenthesis
text = re.sub(r"\([^)]*\)", "", text)
# Spacing and filters
text = re.sub(r"([-;;.,!?<=>])", r" \1 ", text)
text = re.sub("[^A-Za-z0-9]+", " ", text) # remove non alphanumeric chars
text = re.sub(" +", " ", text) # remove multiple spaces
text = text.strip()
return text
|
1
2
3 | # Sample
text = "Great week for the NYSE!"
preprocess(text=text)
|
1
2
3
4 | # Apply to dataframe
preprocessed_df = df.copy()
preprocessed_df.title = preprocessed_df.title.apply(preprocess)
print (f"{df.title.values[0]}\n\n{preprocessed_df.title.values[0]}")
|
Warning
If you have preprocessing steps like standardization, etc. that are calculated, you need to separate the training and test set first before applying those operations. This is because we cannot apply any knowledge gained from the test set accidentally (data leak) during preprocessing/training. However for global preprocessing steps like the function above where we aren't learning anything from the data itself, we can perform before splitting the data.
1
2 | import collections
from sklearn.model_selection import train_test_split
|
1
2
3 | TRAIN_SIZE = 0.7
VAL_SIZE = 0.15
TEST_SIZE = 0.15
|
1
2
3
4
5 | def train_val_test_split(X, y, train_size):
"""Split dataset into data splits."""
X_train, X_, y_train, y_ = train_test_split(X, y, train_size=TRAIN_SIZE, stratify=y)
X_val, X_test, y_val, y_test = train_test_split(X_, y_, train_size=0.5, stratify=y_)
return X_train, X_val, X_test, y_train, y_val, y_test
|
1
2
3 | # Data
X = preprocessed_df["title"].values
y = preprocessed_df["category"].values
|
1
2
3
4
5
6
7 | # Create data splits
X_train, X_val, X_test, y_train, y_val, y_test = train_val_test_split(
X=X, y=y, train_size=TRAIN_SIZE)
print (f"X_train: {X_train.shape}, y_train: {y_train.shape}")
print (f"X_val: {X_val.shape}, y_val: {y_val.shape}")
print (f"X_test: {X_test.shape}, y_test: {y_test.shape}")
print (f"Sample point: {X_train[0]} → {y_train[0]}")
|
Next we'll define a LabelEncoder to encode our text labels into unique indices
1 | import itertools
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43 | class LabelEncoder(object):
"""Label encoder for tag labels."""
def __init__(self, class_to_index={}):
self.class_to_index = class_to_index or {} # mutable defaults ;)
self.index_to_class = {v: k for k, v in self.class_to_index.items()}
self.classes = list(self.class_to_index.keys())
def __len__(self):
return len(self.class_to_index)
def __str__(self):
return f"<LabelEncoder(num_classes={len(self)})>"
def fit(self, y):
classes = np.unique(y)
for i, class_ in enumerate(classes):
self.class_to_index[class_] = i
self.index_to_class = {v: k for k, v in self.class_to_index.items()}
self.classes = list(self.class_to_index.keys())
return self
def encode(self, y):
encoded = np.zeros((len(y)), dtype=int)
for i, item in enumerate(y):
encoded[i] = self.class_to_index[item]
return encoded
def decode(self, y):
classes = []
for i, item in enumerate(y):
classes.append(self.index_to_class[item])
return classes
def save(self, fp):
with open(fp, "w") as fp:
contents = {'class_to_index': self.class_to_index}
json.dump(contents, fp, indent=4, sort_keys=False)
@classmethod
def load(cls, fp):
with open(fp, "r") as fp:
kwargs = json.load(fp=fp)
return cls(**kwargs)
|
1
2
3
4
5 | # Encode
label_encoder = LabelEncoder()
label_encoder.fit(y_train)
NUM_CLASSES = len(label_encoder)
label_encoder.class_to_index
|
1
2
3
4
5
6 | # Convert labels to tokens
print (f"y_train[0]: {y_train[0]}")
y_train = label_encoder.encode(y_train)
y_val = label_encoder.encode(y_val)
y_test = label_encoder.encode(y_test)
print (f"y_train[0]: {y_train[0]}")
|
1
2
3
4 | # Class weights
counts = np.bincount(y_train)
class_weights = {i: 1.0/count for i, count in enumerate(counts)}
print (f"counts: {counts}\nweights: {class_weights}")
|
We'll define a Tokenizer to convert our text input data into token indices.
1
2
3 | import json
from collections import Counter
from more_itertools import take
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68 | class Tokenizer(object):
def __init__(self, char_level, num_tokens=None,
pad_token="<PAD>", oov_token="<UNK>",
token_to_index=None):
self.char_level = char_level
self.separator = "" if self.char_level else " "
if num_tokens: num_tokens -= 2 # pad + unk tokens
self.num_tokens = num_tokens
self.pad_token = pad_token
self.oov_token = oov_token
if not token_to_index:
token_to_index = {pad_token: 0, oov_token: 1}
self.token_to_index = token_to_index
self.index_to_token = {v: k for k, v in self.token_to_index.items()}
def __len__(self):
return len(self.token_to_index)
def __str__(self):
return f"<Tokenizer(num_tokens={len(self)})>"
def fit_on_texts(self, texts):
if not self.char_level:
texts = [text.split(" ") for text in texts]
all_tokens = [token for text in texts for token in text]
counts = Counter(all_tokens).most_common(self.num_tokens)
self.min_token_freq = counts[-1][1]
for token, count in counts:
index = len(self)
self.token_to_index[token] = index
self.index_to_token[index] = token
return self
def texts_to_sequences(self, texts):
sequences = []
for text in texts:
if not self.char_level:
text = text.split(" ")
sequence = []
for token in text:
sequence.append(self.token_to_index.get(
token, self.token_to_index[self.oov_token]))
sequences.append(np.asarray(sequence))
return sequences
def sequences_to_texts(self, sequences):
texts = []
for sequence in sequences:
text = []
for index in sequence:
text.append(self.index_to_token.get(index, self.oov_token))
texts.append(self.separator.join([token for token in text]))
return texts
def save(self, fp):
with open(fp, "w") as fp:
contents = {
"char_level": self.char_level,
"oov_token": self.oov_token,
"token_to_index": self.token_to_index
}
json.dump(contents, fp, indent=4, sort_keys=False)
@classmethod
def load(cls, fp):
with open(fp, "r") as fp:
kwargs = json.load(fp=fp)
return cls(**kwargs)
|
Warning
It's important that we only fit using our train data split because during inference, our model will not always know every token so it's important to replicate that scenario with our validation and test splits as well.
1
2
3
4
5 | # Tokenize
tokenizer = Tokenizer(char_level=False, num_tokens=5000)
tokenizer.fit_on_texts(texts=X_train)
VOCAB_SIZE = len(tokenizer)
print (tokenizer)
|
1
2
3 | # Sample of tokens
print (take(5, tokenizer.token_to_index.items()))
print (f"least freq token's freq: {tokenizer.min_token_freq}") # use this to adjust num_tokens
|
1
2
3
4
5
6
7
8 | # Convert texts to sequences of indices
X_train = tokenizer.texts_to_sequences(X_train)
X_val = tokenizer.texts_to_sequences(X_val)
X_test = tokenizer.texts_to_sequences(X_test)
preprocessed_text = tokenizer.sequences_to_texts([X_train[0]])[0]
print ("Text to indices:\n"
f" (preprocessed) → {preprocessed_text}\n"
f" (tokenized) → {X_train[0]}")
|
We can embed our inputs using PyTorch's embedding layer.
1
2
3
4
5 | # Input
vocab_size = 10
x = torch.randint(high=vocab_size, size=(1,5))
print (x)
print (x.shape)
|
1
2
3 | # Embedding layer
embeddings = nn.Embedding(embedding_dim=100, num_embeddings=vocab_size)
print (embeddings.weight.shape)
|
1
2 | # Embed the input
embeddings(x).shape
|
Each token in the input is represented via embeddings (all out-of-vocabulary (OOV) tokens are given the embedding for UNK token.) In the model below, we'll see how to set these embeddings to be pretrained GloVe embeddings and how to choose whether to freeze (fixed embedding weights) those embeddings or not during training.
Our inputs are all of varying length but we need each batch to be uniformly shaped. Therefore, we will use padding to make all the inputs in the batch the same length. Our padding index will be 0 (note that this is consistent with the <PAD> token defined in our Tokenizer).
While embedding our input tokens will create a batch of shape (N, max_seq_len, embed_dim) we only need to provide a 2D matrix (N, max_seq_len) for using embeddings with PyTorch.
1
2
3
4
5
6
7 | def pad_sequences(sequences, max_seq_len=0):
"""Pad sequences to max length in sequence."""
max_seq_len = max(max_seq_len, max(len(sequence) for sequence in sequences))
padded_sequences = np.zeros((len(sequences), max_seq_len))
for i, sequence in enumerate(sequences):
padded_sequences[i][:len(sequence)] = sequence
return padded_sequences
|
1
2
3
4 | # 2D sequences
padded = pad_sequences(X_train[0:3])
print (padded.shape)
print (padded)
|
We're going to create Datasets and DataLoaders to be able to efficiently create batches with our data splits.
1 | FILTER_SIZES = list(range(1, 4)) # uni, bi and tri grams
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37 | class Dataset(torch.utils.data.Dataset):
def __init__(self, X, y, max_filter_size):
self.X = X
self.y = y
self.max_filter_size = max_filter_size
def __len__(self):
return len(self.y)
def __str__(self):
return f"<Dataset(N={len(self)})>"
def __getitem__(self, index):
X = self.X[index]
y = self.y[index]
return [X, y]
def collate_fn(self, batch):
"""Processing on a batch."""
# Get inputs
batch = np.array(batch)
X = batch[:, 0]
y = batch[:, 1]
# Pad sequences
X = pad_sequences(X)
# Cast
X = torch.LongTensor(X.astype(np.int32))
y = torch.LongTensor(y.astype(np.int32))
return X, y
def create_dataloader(self, batch_size, shuffle=False, drop_last=False):
return torch.utils.data.DataLoader(
dataset=self, batch_size=batch_size, collate_fn=self.collate_fn,
shuffle=shuffle, drop_last=drop_last, pin_memory=True)
|
1
2
3
4
5
6
7
8
9
10
11
12 | # Create datasets
max_filter_size = max(FILTER_SIZES)
train_dataset = Dataset(X=X_train, y=y_train, max_filter_size=max_filter_size)
val_dataset = Dataset(X=X_val, y=y_val, max_filter_size=max_filter_size)
test_dataset = Dataset(X=X_test, y=y_test, max_filter_size=max_filter_size)
print ("Datasets:\n"
f" Train dataset:{train_dataset.__str__()}\n"
f" Val dataset: {val_dataset.__str__()}\n"
f" Test dataset: {test_dataset.__str__()}\n"
"Sample point:\n"
f" X: {train_dataset[0][0]}\n"
f" y: {train_dataset[0][1]}")
|
1
2
3
4
5
6
7
8
9
10
11
12 | # Create dataloaders
batch_size = 64
train_dataloader = train_dataset.create_dataloader(batch_size=batch_size)
val_dataloader = val_dataset.create_dataloader(batch_size=batch_size)
test_dataloader = test_dataset.create_dataloader(batch_size=batch_size)
batch_X, batch_y = next(iter(train_dataloader))
print ("Sample batch:\n"
f" X: {list(batch_X.size())}\n"
f" y: {list(batch_y.size())}\n"
"Sample point:\n"
f" X: {batch_X[0]}\n"
f" y: {batch_y[0]}")
|
We'll be using a convolutional neural network on top of our embedded tokens to extract meaningful spatial signal. This time, we'll be using many filter widths to act as n-gram feature extractors.
Let's visualize the model's forward pass.
1
2 | import math
import torch.nn.functional as F
|
1
2
3 | EMBEDDING_DIM = 100
HIDDEN_DIM = 100
DROPOUT_P = 0.1
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67 | class CNN(nn.Module):
def __init__(self, embedding_dim, vocab_size, num_filters,
filter_sizes, hidden_dim, dropout_p, num_classes,
pretrained_embeddings=None, freeze_embeddings=False,
padding_idx=0):
super(CNN, self).__init__()
# Filter sizes
self.filter_sizes = filter_sizes
# Initialize embeddings
if pretrained_embeddings is None:
self.embeddings = nn.Embedding(
embedding_dim=embedding_dim, num_embeddings=vocab_size,
padding_idx=padding_idx)
else:
pretrained_embeddings = torch.from_numpy(pretrained_embeddings).float()
self.embeddings = nn.Embedding(
embedding_dim=embedding_dim, num_embeddings=vocab_size,
padding_idx=padding_idx, _weight=pretrained_embeddings)
# Freeze embeddings or not
if freeze_embeddings:
self.embeddings.weight.requires_grad = False
# Conv weights
self.conv = nn.ModuleList(
[nn.Conv1d(in_channels=embedding_dim,
out_channels=num_filters,
kernel_size=f) for f in filter_sizes])
# FC weights
self.dropout = nn.Dropout(dropout_p)
self.fc1 = nn.Linear(num_filters*len(filter_sizes), hidden_dim)
self.fc2 = nn.Linear(hidden_dim, num_classes)
def forward(self, inputs, channel_first=False):
# Embed
x_in, = inputs
x_in = self.embeddings(x_in)
# Rearrange input so num_channels is in dim 1 (N, C, L)
if not channel_first:
x_in = x_in.transpose(1, 2)
# Conv outputs
z = []
max_seq_len = x_in.shape[2]
for i, f in enumerate(self.filter_sizes):
# `SAME` padding
padding_left = int((self.conv[i].stride[0]*(max_seq_len-1) - max_seq_len + self.filter_sizes[i])/2)
padding_right = int(math.ceil((self.conv[i].stride[0]*(max_seq_len-1) - max_seq_len + self.filter_sizes[i])/2))
# Conv + pool
_z = self.conv[i](F.pad(x_in, (padding_left, padding_right)))
_z = F.max_pool1d(_z, _z.size(2)).squeeze(2)
z.append(_z)
# Concat conv outputs
z = torch.cat(z, 1)
# FC layers
z = self.fc1(z)
z = self.dropout(z)
z = self.fc2(z)
return z
|
We're going create some utility functions to be able to load the pretrained GloVe embeddings into our Embeddings layer.
1
2
3
4
5
6
7
8
9
10 | def load_glove_embeddings(embeddings_file):
"""Load embeddings from a file."""
embeddings = {}
with open(embeddings_file, "r") as fp:
for index, line in enumerate(fp):
values = line.split()
word = values[0]
embedding = np.asarray(values[1:], dtype='float32')
embeddings[word] = embedding
return embeddings
|
1
2
3
4
5
6
7
8 | def make_embeddings_matrix(embeddings, word_index, embedding_dim):
"""Create embeddings matrix to use in Embedding layer."""
embedding_matrix = np.zeros((len(word_index), embedding_dim))
for word, i in word_index.items():
embedding_vector = embeddings.get(word)
if embedding_vector is not None:
embedding_matrix[i] = embedding_vector
return embedding_matrix
|
1
2
3
4
5
6
7 | # Create embeddings
embeddings_file = 'glove.6B.{0}d.txt'.format(EMBEDDING_DIM)
glove_embeddings = load_glove_embeddings(embeddings_file=embeddings_file)
embedding_matrix = make_embeddings_matrix(
embeddings=glove_embeddings, word_index=tokenizer.token_to_index,
embedding_dim=EMBEDDING_DIM)
print (f"<Embeddings(words={embedding_matrix.shape[0]}, dim={embedding_matrix.shape[1]})>")
|
We have first have to decide whether to use pretrained embeddings randomly initialized ones. Then, we can choose to freeze our embeddings or continue to train them using the supervised data (this could lead to overfitting). Here are the three experiments we're going to conduct:
1
2
3 | import json
from sklearn.metrics import precision_recall_fscore_support
from torch.optim import Adam
|
1
2
3
4 | NUM_FILTERS = 50
LEARNING_RATE = 1e-3
PATIENCE = 5
NUM_EPOCHS = 10
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108 | class Trainer(object):
def __init__(self, model, device, loss_fn=None, optimizer=None, scheduler=None):
# Set params
self.model = model
self.device = device
self.loss_fn = loss_fn
self.optimizer = optimizer
self.scheduler = scheduler
def train_step(self, dataloader):
"""Train step."""
# Set model to train mode
self.model.train()
loss = 0.0
# Iterate over train batches
for i, batch in enumerate(dataloader):
# Step
batch = [item.to(self.device) for item in batch] # Set device
inputs, targets = batch[:-1], batch[-1]
self.optimizer.zero_grad() # Reset gradients
z = self.model(inputs) # Forward pass
J = self.loss_fn(z, targets) # Define loss
J.backward() # Backward pass
self.optimizer.step() # Update weights
# Cumulative Metrics
loss += (J.detach().item() - loss) / (i + 1)
return loss
def eval_step(self, dataloader):
"""Validation or test step."""
# Set model to eval mode
self.model.eval()
loss = 0.0
y_trues, y_probs = [], []
# Iterate over val batches
with torch.inference_mode():
for i, batch in enumerate(dataloader):
# Step
batch = [item.to(self.device) for item in batch] # Set device
inputs, y_true = batch[:-1], batch[-1]
z = self.model(inputs) # Forward pass
J = self.loss_fn(z, y_true).item()
# Cumulative Metrics
loss += (J - loss) / (i + 1)
# Store outputs
y_prob = F.softmax(z).cpu().numpy()
y_probs.extend(y_prob)
y_trues.extend(y_true.cpu().numpy())
return loss, np.vstack(y_trues), np.vstack(y_probs)
def predict_step(self, dataloader):
"""Prediction step."""
# Set model to eval mode
self.model.eval()
y_probs = []
# Iterate over val batches
with torch.inference_mode():
for i, batch in enumerate(dataloader):
# Forward pass w/ inputs
inputs, targets = batch[:-1], batch[-1]
z = self.model(inputs)
# Store outputs
y_prob = F.softmax(z).cpu().numpy()
y_probs.extend(y_prob)
return np.vstack(y_probs)
def train(self, num_epochs, patience, train_dataloader, val_dataloader):
best_val_loss = np.inf
for epoch in range(num_epochs):
# Steps
train_loss = self.train_step(dataloader=train_dataloader)
val_loss, _, _ = self.eval_step(dataloader=val_dataloader)
self.scheduler.step(val_loss)
# Early stopping
if val_loss < best_val_loss:
best_val_loss = val_loss
best_model = self.model
_patience = patience # reset _patience
else:
_patience -= 1
if not _patience: # 0
print("Stopping early!")
break
# Logging
print(
f"Epoch: {epoch+1} | "
f"train_loss: {train_loss:.5f}, "
f"val_loss: {val_loss:.5f}, "
f"lr: {self.optimizer.param_groups[0]['lr']:.2E}, "
f"_patience: {_patience}"
)
return best_model
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23 | def get_metrics(y_true, y_pred, classes):
"""Per-class performance metrics."""
# Performance
performance = {"overall": {}, "class": {}}
# Overall performance
metrics = precision_recall_fscore_support(y_true, y_pred, average="weighted")
performance["overall"]["precision"] = metrics[0]
performance["overall"]["recall"] = metrics[1]
performance["overall"]["f1"] = metrics[2]
performance["overall"]["num_samples"] = np.float64(len(y_true))
# Per-class performance
metrics = precision_recall_fscore_support(y_true, y_pred, average=None)
for i in range(len(classes)):
performance["class"][classes[i]] = {
"precision": metrics[0][i],
"recall": metrics[1][i],
"f1": metrics[2][i],
"num_samples": np.float64(metrics[3][i]),
}
return performance
|
1
2 | PRETRAINED_EMBEDDINGS = None
FREEZE_EMBEDDINGS = False
|
1
2
3
4
5
6
7
8 | # Initialize model
model = CNN(
embedding_dim=EMBEDDING_DIM, vocab_size=VOCAB_SIZE,
num_filters=NUM_FILTERS, filter_sizes=FILTER_SIZES,
hidden_dim=HIDDEN_DIM, dropout_p=DROPOUT_P, num_classes=NUM_CLASSES,
pretrained_embeddings=PRETRAINED_EMBEDDINGS, freeze_embeddings=FREEZE_EMBEDDINGS)
model = model.to(device) # set device
print (model.named_parameters)
|
1
2
3 | # Define Loss
class_weights_tensor = torch.Tensor(list(class_weights.values())).to(device)
loss_fn = nn.CrossEntropyLoss(weight=class_weights_tensor)
|
1
2
3
4 | # Define optimizer & scheduler
optimizer = Adam(model.parameters(), lr=LEARNING_RATE)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
optimizer, mode="min", factor=0.1, patience=3)
|
1
2
3
4 | # Trainer module
trainer = Trainer(
model=model, device=device, loss_fn=loss_fn,
optimizer=optimizer, scheduler=scheduler)
|
1
2
3 | # Train
best_model = trainer.train(
NUM_EPOCHS, PATIENCE, train_dataloader, val_dataloader)
|
1
2
3 | # Get predictions
test_loss, y_true, y_prob = trainer.eval_step(dataloader=test_dataloader)
y_pred = np.argmax(y_prob, axis=1)
|
1
2
3
4 | # Determine performance
performance = get_metrics(
y_true=y_test, y_pred=y_pred, classes=label_encoder.classes)
print (json.dumps(performance["overall"], indent=2))
|
1
2 | PRETRAINED_EMBEDDINGS = embedding_matrix
FREEZE_EMBEDDINGS = True
|
1
2
3
4
5
6
7
8 | # Initialize model
model = CNN(
embedding_dim=EMBEDDING_DIM, vocab_size=VOCAB_SIZE,
num_filters=NUM_FILTERS, filter_sizes=FILTER_SIZES,
hidden_dim=HIDDEN_DIM, dropout_p=DROPOUT_P, num_classes=NUM_CLASSES,
pretrained_embeddings=PRETRAINED_EMBEDDINGS, freeze_embeddings=FREEZE_EMBEDDINGS)
model = model.to(device) # set device
print (model.named_parameters)
|
1
2
3 | # Define Loss
class_weights_tensor = torch.Tensor(list(class_weights.values())).to(device)
loss_fn = nn.CrossEntropyLoss(weight=class_weights_tensor)
|
1
2
3
4 | # Define optimizer & scheduler
optimizer = Adam(model.parameters(), lr=LEARNING_RATE)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
optimizer, mode="min", factor=0.1, patience=3)
|
1
2
3
4 | # Trainer module
trainer = Trainer(
model=model, device=device, loss_fn=loss_fn,
optimizer=optimizer, scheduler=scheduler)
|
1
2
3 | # Train
best_model = trainer.train(
NUM_EPOCHS, PATIENCE, train_dataloader, val_dataloader)
|
1
2
3 | # Get predictions
test_loss, y_true, y_prob = trainer.eval_step(dataloader=test_dataloader)
y_pred = np.argmax(y_prob, axis=1)
|
1
2
3
4 | # Determine performance
performance = get_metrics(
y_true=y_test, y_pred=y_pred, classes=label_encoder.classes)
print (json.dumps(performance["overall"], indent=2))
|
1
2 | PRETRAINED_EMBEDDINGS = embedding_matrix
FREEZE_EMBEDDINGS = False
|
1
2
3
4
5
6
7
8 | # Initialize model
model = CNN(
embedding_dim=EMBEDDING_DIM, vocab_size=VOCAB_SIZE,
num_filters=NUM_FILTERS, filter_sizes=FILTER_SIZES,
hidden_dim=HIDDEN_DIM, dropout_p=DROPOUT_P, num_classes=NUM_CLASSES,
pretrained_embeddings=PRETRAINED_EMBEDDINGS, freeze_embeddings=FREEZE_EMBEDDINGS)
model = model.to(device) # set device
print (model.named_parameters)
|
1
2
3 | # Define Loss
class_weights_tensor = torch.Tensor(list(class_weights.values())).to(device)
loss_fn = nn.CrossEntropyLoss(weight=class_weights_tensor)
|
1
2
3
4 | # Define optimizer & scheduler
optimizer = Adam(model.parameters(), lr=LEARNING_RATE)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
optimizer, mode="min", factor=0.1, patience=3)
|
1
2
3
4 | # Trainer module
trainer = Trainer(
model=model, device=device, loss_fn=loss_fn,
optimizer=optimizer, scheduler=scheduler)
|
1
2
3 | # Train
best_model = trainer.train(
NUM_EPOCHS, PATIENCE, train_dataloader, val_dataloader)
|
1
2
3 | # Get predictions
test_loss, y_true, y_prob = trainer.eval_step(dataloader=test_dataloader)
y_pred = np.argmax(y_prob, axis=1)
|
1
2
3
4 | # Determine performance
performance = get_metrics(
y_true=y_test, y_pred=y_pred, classes=label_encoder.classes)
print (json.dumps(performance["overall"], indent=2))
|
1
2
3
4
5
6
7
8
9 | # Save artifacts
from pathlib import Path
dir = Path("cnn")
dir.mkdir(parents=True, exist_ok=True)
label_encoder.save(fp=Path(dir, "label_encoder.json"))
tokenizer.save(fp=Path(dir, "tokenizer.json"))
torch.save(best_model.state_dict(), Path(dir, "model.pt"))
with open(Path(dir, "performance.json"), "w") as fp:
json.dump(performance, indent=2, sort_keys=False, fp=fp)
|
1
2
3
4
5
6
7
8 | def get_probability_distribution(y_prob, classes):
"""Create a dict of class probabilities from an array."""
results = {}
for i, class_ in enumerate(classes):
results[class_] = np.float64(y_prob[i])
sorted_results = {k: v for k, v in sorted(
results.items(), key=lambda item: item[1], reverse=True)}
return sorted_results
|
1
2
3
4
5
6
7
8
9
10
11 | # Load artifacts
device = torch.device("cpu")
label_encoder = LabelEncoder.load(fp=Path(dir, "label_encoder.json"))
tokenizer = Tokenizer.load(fp=Path(dir, "tokenizer.json"))
model = CNN(
embedding_dim=EMBEDDING_DIM, vocab_size=VOCAB_SIZE,
num_filters=NUM_FILTERS, filter_sizes=FILTER_SIZES,
hidden_dim=HIDDEN_DIM, dropout_p=DROPOUT_P, num_classes=NUM_CLASSES,
pretrained_embeddings=PRETRAINED_EMBEDDINGS, freeze_embeddings=FREEZE_EMBEDDINGS)
model.load_state_dict(torch.load(Path(dir, "model.pt"), map_location=device))
model.to(device)
|
1
2 | # Initialize trainer
trainer = Trainer(model=model, device=device)
|
1
2
3
4
5
6
7 | # Dataloader
text = "The final tennis tournament starts next week."
X = tokenizer.texts_to_sequences([preprocess(text)])
print (tokenizer.sequences_to_texts(X))
y_filler = label_encoder.encode([label_encoder.classes[0]]*len(X))
dataset = Dataset(X=X, y=y_filler, max_filter_size=max_filter_size)
dataloader = dataset.create_dataloader(batch_size=batch_size)
|
1
2
3
4 | # Inference
y_prob = trainer.predict_step(dataloader)
y_pred = np.argmax(y_prob, axis=1)
label_encoder.decode(y_pred)
|
1
2
3 | # Class distributions
prob_dist = get_probability_distribution(y_prob=y_prob[0], classes=label_encoder.classes)
print (json.dumps(prob_dist, indent=2))
|
We went through all the trouble of padding our inputs before convolution to result is outputs of the same shape as our inputs so we can try to get some interpretability. Since every token is mapped to a convolutional output on which we apply max pooling, we can see which token's output was most influential towards the prediction. We first need to get the conv outputs from our model:
1
2 | import collections
import seaborn as sns
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59 | class InterpretableCNN(nn.Module):
def __init__(self, embedding_dim, vocab_size, num_filters,
filter_sizes, hidden_dim, dropout_p, num_classes,
pretrained_embeddings=None, freeze_embeddings=False,
padding_idx=0):
super(InterpretableCNN, self).__init__()
# Filter sizes
self.filter_sizes = filter_sizes
# Initialize embeddings
if pretrained_embeddings is None:
self.embeddings = nn.Embedding(
embedding_dim=embedding_dim, num_embeddings=vocab_size,
padding_idx=padding_idx)
else:
pretrained_embeddings = torch.from_numpy(pretrained_embeddings).float()
self.embeddings = nn.Embedding(
embedding_dim=embedding_dim, num_embeddings=vocab_size,
padding_idx=padding_idx, _weight=pretrained_embeddings)
# Freeze embeddings or not
if freeze_embeddings:
self.embeddings.weight.requires_grad = False
# Conv weights
self.conv = nn.ModuleList(
[nn.Conv1d(in_channels=embedding_dim,
out_channels=num_filters,
kernel_size=f) for f in filter_sizes])
# FC weights
self.dropout = nn.Dropout(dropout_p)
self.fc1 = nn.Linear(num_filters*len(filter_sizes), hidden_dim)
self.fc2 = nn.Linear(hidden_dim, num_classes)
def forward(self, inputs, channel_first=False):
# Embed
x_in, = inputs
x_in = self.embeddings(x_in)
# Rearrange input so num_channels is in dim 1 (N, C, L)
if not channel_first:
x_in = x_in.transpose(1, 2)
# Conv outputs
z = []
max_seq_len = x_in.shape[2]
for i, f in enumerate(self.filter_sizes):
# `SAME` padding
padding_left = int((self.conv[i].stride[0]*(max_seq_len-1) - max_seq_len + self.filter_sizes[i])/2)
padding_right = int(math.ceil((self.conv[i].stride[0]*(max_seq_len-1) - max_seq_len + self.filter_sizes[i])/2))
# Conv + pool
_z = self.conv[i](F.pad(x_in, (padding_left, padding_right)))
z.append(_z.cpu().numpy())
return z
|
1
2 | PRETRAINED_EMBEDDINGS = embedding_matrix
FREEZE_EMBEDDINGS = False
|
1
2
3
4
5
6
7
8 | # Initialize model
interpretable_model = InterpretableCNN(
embedding_dim=EMBEDDING_DIM, vocab_size=VOCAB_SIZE,
num_filters=NUM_FILTERS, filter_sizes=FILTER_SIZES,
hidden_dim=HIDDEN_DIM, dropout_p=DROPOUT_P, num_classes=NUM_CLASSES,
pretrained_embeddings=PRETRAINED_EMBEDDINGS, freeze_embeddings=FREEZE_EMBEDDINGS)
interpretable_model.load_state_dict(torch.load(Path(dir, "model.pt"), map_location=device))
interpretable_model.to(device)
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 | # Get conv outputs
interpretable_model.eval()
conv_outputs = []
with torch.inference_mode():
for i, batch in enumerate(dataloader):
# Forward pass w/ inputs
inputs, targets = batch[:-1], batch[-1]
z = interpretable_model(inputs)
# Store conv outputs
conv_outputs.extend(z)
conv_outputs = np.vstack(conv_outputs)
print (conv_outputs.shape) # (len(filter_sizes), num_filters, max_seq_len)
|
1
2
3
4 | # Visualize a bi-gram filter's outputs
tokens = tokenizer.sequences_to_texts(X)[0].split(" ")
filter_size = 2
sns.heatmap(conv_outputs[filter_size-1][:, len(tokens)], xticklabels=tokens)
|
1D global max-pooling would extract the highest value from each of our num_filters for each filter_size. We could also follow this same approach to figure out which n-gram is most relevant but notice in the heatmap above that many filters don't have much variance. To mitigate this, this paper uses threshold values to determine which filters to use for interpretability. But to keep things simple, let's extract which tokens' filter outputs were extracted via max-pooling the most frequently.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 | sample_index = 0
print (f"Original text:\n{text}")
print (f"\nPreprocessed text:\n{tokenizer.sequences_to_texts(X)[0]}")
print ("\nMost important n-grams:")
# Process conv outputs for each unique filter size
for i, filter_size in enumerate(FILTER_SIZES):
# Identify most important n-gram (excluding last token)
popular_indices = collections.Counter([np.argmax(conv_output) \
for conv_output in conv_outputs[i]])
# Get corresponding text
start = popular_indices.most_common(1)[-1][0]
n_gram = " ".join([token for token in tokens[start:start+filter_size]])
print (f"[{filter_size}-gram]: {n_gram}")
|
To cite this content, please use:
1
2
3
4
5
6 | @article{madewithml,
author = {Goku Mohandas},
title = { Embeddings - Made With ML },
howpublished = {\url{https://madewithml.com/}},
year = {2023}
}
|