In this article, I'm going to introduce some of my most-used python code snippets about deep learning in PyTorch. During my college journey into machine learning both in class and in my industrial practice, they have been very handy of quickly setting up a model training experiment and getting my job done. Most code snippets are mainly targeting at use cases under Jupyter Notebook/Lab, but theoretically they should work in the CLI mode all the same.
Common Imports
Experiment with Ignite library
import pandas as pd
from fastprogress import progress_bar
from argparse import Namespace
from pathlib import Path
import numpy as np
import torch
from torch import nn, optim
from torch.utils.data import DataLoader, Dataset
import random
from sklearn.preprocessing import normalize
from ignite.engine import Events, create_supervised_trainer, create_supervised_evaluator
from ignite.metrics import Accuracy, MeanSquaredError
Experiment with fast.ai library
import pandas as pd
from fastprogress import progress_bar
from argparse import Namespace
from pathlib import Path
import torch
from torch import nn
import random
from sklearn.preprocessing import normalize
from fastai import * # using fastai
from fastai.basic_data import *
from fastai.text import *
from fastai.tabular import *
Initialization
We can use Namespace to scope our project-wise arguments. The set_seeds
function here sets the Numpy and PyTorch random seeds for us. By using the same seed, we make sure that our results can be reproduced on the same hardware every time we rerun the notebook.
Toggling cuda
argument here helps us easily turning GPU acceleration on and off.
# Set Numpy and PyTorch seeds
def set_seeds(seed, cuda):
torch.manual_seed(seed)
if cuda:
torch.cuda.manual_seed_all(seed)
args = Namespace(
seed=1234,
cuda=True,
norm_main_df_path=google_drive_path + 'data/main.csv',
sample_length=120,
batch_size=256,
num_workers=4
)
# Set seeds
set_seeds(seed=args.seed, cuda=args.cuda)
# Check CUDA
if not torch.cuda.is_available():
print("CUDA not available")
args.cuda = False
args.device = torch.device("cuda" if args.cuda else "cpu")
print("Using CUDA: {}".format(args.cuda))
Data Preprocessing
Add new columns to a DataFrame
origin_df = pd.DataFrame({...});
new_columns = {"col_a": [...], "col_b": [...]}
df_to_merge = pd.DataFrame(new_columns)
df_to_merge.index = origin_df.index
dest_df = origin_df.join(df_to_merge)
Functional version
def add_column_to_df(df:pd.DataFrame, new_columns:dict)->pd.DataFrame:
df_to_merge = pd.DataFrame(new_columns)
df_to_merge.index = df.index
return df.join(df_to_merge)
Split Dataset
We can split a DataFrame into training set and validation set using the following functions.
Randomly split DataFrame:
def rand_split_df(df, valid_pct:float=0.2):
msk = np.random.rand(len(df)) < valid_pct
train_df = df[~msk]
valid_df = df[msk]
return train_df, valid_df
Sequently split DataFrame:
def seq_split_df(df, valid_pct:float=0.2):
valid_size = int(len(df) * 0.2)
return df[:-valid_size], df[-valid_size:]
Randomly split list:
msk = np.random.rand(len(input_matrices)) < 0.2
train_input_matrices = [x for i, x in enumerate(input_matrices) if not msk[i]]
valid_input_matrices = [x for i, x in enumerate(input_matrices) if msk[i]]
train_truths = [y for i, y in enumerate(ground_truths) if not msk[i]]
valid_truths = [y for i, y in enumerate(ground_truths) if msk[i]]
Randomly split ItemList while using fast.ai:
databunch = main_itemlist.split_by_rand_pct(0.2).databunch(bs=args.batch_size, collate_fn=data_batch_collate)
Sequently split ItemList while using fast.ai:
main_itemlist_size = len(position_predictions)
train_itemlist_size = int(main_itemlist_size * 0.8)
databunch = main_itemlist.split_by_idx(list(range(train_itemlist_size, main_itemlist_size))).label_from_df().databunch(bs=args.batch_size, collate_fn=data_batch_collate)
Custom Dataset
torch.utils.data
A simple custom Dataset(torch.utils.data.DataSet
)
class PriceCurveDataset(Dataset):
def __init__(self, matrices, prices):
self.matrices = matrices
self.prices = prices
def __len__(self):
return len(self.prices)
def __getitem__(self, index):
return self.matrices[index], self.prices[index]
A more complex and complete version of a custom time-series Dataset with oversampling, matrix forming, normalization and etc.
def split_df(df, valid_pct: float = 0.2):
valid_size = int(len(df) * 0.2)
return df[:-valid_size], df[-valid_size:]
def random_select(a:float, b:float, prob_a:float):
return random.choices([a, b], weights=[prob_a, 1-prob_a])[0]
def random_oversmpl_times(n_times: float) -> int:
floor_val = math.floor(n_times)
ceil_val = math.ceil(n_times)
floor_prob = ceil_val - n_times
return int(random_select(floor_val, ceil_val, floor_prob))
def over_sample_df_with_dict(df, label_col: str, oversmpl_multi_dict: dict):
index_map_list = []
for key in oversmpl_multi_dict:
oversmpl_ratio = oversmpl_multi_dict[key]
indexes = df.index[df[label_col] == key].tolist()
for i in indexes:
random_neighbor = random_oversmpl_times(oversmpl_ratio)
index_map_list += [i for n in range(random_neighbor)]
return index_map_list
class TimeSerialClasDataset(Dataset):
def __init__(self, df, label_col, drop_cols=[], time_len=120,
oversmpl_multi_dict:dict=None, normalize_per_item:bool=False):
self.label_df = df[label_col]
self.df = df.drop(drop_cols + [label_col], axis=1)
self.df_len = len(df)
self.row_width = len(self.df.iloc[0])
self.padding_row = [0 for i in range(self.row_width)]
self.time_len = time_len
self.valid_len = self.df_len
self.oversmpl_idx_map = None
if oversmpl_multi_dict is not None:
self.oversmpl_idx_map = over_sample_df_with_dict(df, label_col, oversmpl_multi_dict)
self.valid_len = len(self.oversmpl_idx_map)
self.normalize_per_item = normalize_per_item
def __len__(self):
return self.valid_len
def __getitem__(self, i):
i = self.get_real_index(i)
begin_i = i - self.time_len + 1
begin_i = begin_i if begin_i >= 0 else 0
end_i = i + 1
x = self.df.iloc[begin_i: end_i].values.tolist()
if i < self.time_len - 1:
pad_len = self.time_len - i - 1
x += [self.padding_row for pad_i in range(pad_len)]
y = self.label_df.iloc[i]
if self.normalize_per_item:
x = normalize(x, axis=0).tolist()
return x, y
def get_real_index(self, i):
if self.oversmpl_idx_map is None:
return i
return self.oversmpl_idx_map[i]
Getting DataLoader or DataBunch(if you are using fast.ai):
def get_seq_clas_databunch(df: DataFrame, label_col:str, label_index_map: dict, path:PathOrStr = '.', drop_cols = [],
time_len:int = 120, valid_pct: float = 0.2, train_bs: int = 64,
valid_bs: int = 64, num_workers:int = 2, oversmpl_multi_dict:dict=None,
normalize_per_item:bool=False):
def data_batch_collate(batch):
x_list = []
y_list = []
for item in batch:
x_list.append(item[0])
y_list.append(label_index_map[item[1]])
batch_x = torch.FloatTensor(x_list)
batch_y = torch.LongTensor(y_list)
return batch_x, batch_y
train_df, valid_df = split_df(df, valid_pct)
train_dataset = TimeSerialClasDataset(train_df, label_col, drop_cols, time_len, oversmpl_multi_dict, normalize_per_item)
valid_dataset = TimeSerialClasDataset(valid_df, label_col, drop_cols, time_len, normalize_per_item=normalize_per_item)
train_dataloader = DataLoader(train_dataset, batch_size=train_bs, shuffle=True, num_workers=num_workers)
valid_dataloader = DataLoader(valid_dataset, batch_size=valid_bs, shuffle=True, num_workers=num_workers)
data = DataBunch(train_dataloader, valid_dataloader, collate_fn=data_batch_collate, path=path)
return data
Design Models
Count parameters of a model
def count_parameters(model):
return sum(p.numel() for p in model.parameters() if p.requires_grad)
print(f'The model has {count_parameters(model):,} trainable parameters')
Sample Module Class
class ACRNN(nn.Module):
def __init__(self, input_dim = 4, hidden_dim = 60, n_layers = 1, linears: list = [100, 20], bidirectional=False):
super(ACRNN, self).__init__()
self.input_dim=input_dim
self.hidden_dim=hidden_dim
self.n_layers=n_layers
self.bidirectional=bidirectional
self.rnn = nn.LSTM(input_dim, hidden_dim, num_layers=n_layers, batch_first=True, bidirectional=bidirectional)
last_in_features = hidden_dim * self.get_rnn_layers(False) * 3
linear_layers = []
for linear_num in linears:
linear_layers.append(nn.Linear(last_in_features, linear_num))
linear_layers.append(nn.ReLU())
last_in_features = linear_num
linear_layers.append(nn.Linear(last_in_features, 1))
self.linears = nn.Sequential(*linear_layers)
def get_rnn_layers(self, with_n_layers = True):
return (self.n_layers if with_n_layers else 1) * (2 if self.bidirectional else 1)
def init_hidden(self, x):
hidden_state_layers = self.get_rnn_layers()
self.h_t = torch.zeros(hidden_state_layers, x.size(0), self.hidden_dim).to(args.device)
self.c_t = torch.zeros(hidden_state_layers, x.size(0), self.hidden_dim).to(args.device)
def forward(self, x):
self.init_hidden(x)
x, (self.h_t, self.c_t) = self.rnn(x, (self.h_t, self.c_t))
x_max_pooled = nn.functional.max_pool2d(x, (x.size(1), 1)).flatten(1)
x_avg_pooled = nn.functional.avg_pool2d(x, (x.size(1), 1)).flatten(1)
x = x[:, -1, :]
x = torch.cat((x, x_max_pooled, x_avg_pooled), 1)
x = self.linears(x)
return x.flatten(0)
Train The Model
Vanilla PyTorch style:
optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.8)
lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=7, gamma=0.1)
criterion = nn.MSELoss()
criterion.to(args.device)
from copy import deepcopy
import time
dataloaders = {'train': train_dataloader, 'valid': valid_dataloader}
dataset_sizes = {'train': len(train_dataset), 'valid': len(valid_dataset)}
def train_model(model, criterion, optimizer, scheduler, num_epochs = 5):
since = time.time()
best_model_wts = deepcopy(model.state_dict())
best_loss = float("inf")
for epoch in range(num_epochs):
print('Epoch {}/{}'.format(epoch, num_epochs - 1))
print('-' * 10)
for phase in ['train', 'valid']:
if phase == 'train':
model.train()
else:
model.eval()
running_loss = 0.0
for data in dataloaders[phase]:
inputs = data[0].to(args.device)
truths = data[1].to(args.device)
optimizer.zero_grad()
with torch.set_grad_enabled(phase == 'train'):
preds = model(inputs)
# Alter the MSE to RMSE by adding the sqrt computation
loss = torch.sqrt(criterion(preds.flatten(), truths))
if phase == 'train':
loss.backward()
optimizer.step()
running_loss += loss.item() * inputs.size(0)
epoch_loss = running_loss / dataset_sizes[phase]
print('{} Loss: {:.4f}'.format(phase, epoch_loss))
if phase == 'valid':
scheduler.step()
# deep copy the model
if phase == 'valid' and epoch_loss < best_loss:
best_loss = epoch_loss
best_model_wts = deepcopy(model.state_dict())
time_elapsed = time.time() - since
print('Training complete in {:.0f}m {:.0f}s'.format(
time_elapsed // 60, time_elapsed % 60))
print('Best val Loss: {:4f}'.format(best_loss))
# load best model weights
model.load_state_dict(best_model_wts)
return model
model = train_model(model, criterion, optimizer, lr_scheduler,
num_epochs=10)
Using Ignite:
trainer = create_supervised_trainer(model, optimizer, criterion, device=args.device)
evaluator = create_supervised_evaluator(model,
metrics={
'mse': MeanSquaredError()
},
device=args.device)
# @trainer.on(Events.ITERATION_COMPLETED)
# def log_training_loss(trainer):
# print("Epoch[{}] Loss: {:.2f}".format(trainer.state.epoch, trainer.state.output))
@trainer.on(Events.EPOCH_COMPLETED)
def log_training_results(trainer):
evaluator.run(train_dataloader)
metrics = evaluator.state.metrics
print("Training Results - Epoch: {} Avg loss: {:.2f}"
.format(trainer.state.epoch, metrics['mse']))
@trainer.on(Events.EPOCH_COMPLETED)
def log_validation_results(trainer):
evaluator.run(valid_dataloader)
metrics = evaluator.state.metrics
print("Validation Results - Epoch: {} Avg loss: {:.2f}"
.format(trainer.state.epoch, metrics['mse']))
trainer.run(train_dataloader, 4) # 3 epochs
The approach utilizing fast.ai will be discussed in later posts.