My PyTorch Cookbook

My goto PyTorch training snippets

·

6 min read

In this article, I'm going to introduce some of my most-used python code snippets about deep learning in PyTorch. During my college journey into machine learning both in class and in my industrial practice, they have been very handy of quickly setting up a model training experiment and getting my job done. Most code snippets are mainly targeting at use cases under Jupyter Notebook/Lab, but theoretically they should work in the CLI mode all the same.

Common Imports

Experiment with Ignite library

import pandas as pd
from fastprogress import progress_bar
from argparse import Namespace
from pathlib import Path
import numpy as np
import torch
from torch import nn, optim
from torch.utils.data import DataLoader, Dataset
import random
from sklearn.preprocessing import normalize

from ignite.engine import Events, create_supervised_trainer, create_supervised_evaluator
from ignite.metrics import Accuracy, MeanSquaredError

Experiment with fast.ai library

import pandas as pd
from fastprogress import progress_bar
from argparse import Namespace
from pathlib import Path
import torch
from torch import nn
import random
from sklearn.preprocessing import normalize

from fastai import * # using fastai
from fastai.basic_data import *
from fastai.text import *
from fastai.tabular import *

Initialization

We can use Namespace to scope our project-wise arguments. The set_seeds function here sets the Numpy and PyTorch random seeds for us. By using the same seed, we make sure that our results can be reproduced on the same hardware every time we rerun the notebook.

Toggling cuda argument here helps us easily turning GPU acceleration on and off.

# Set Numpy and PyTorch seeds
def set_seeds(seed, cuda):
    torch.manual_seed(seed)
    if cuda:
        torch.cuda.manual_seed_all(seed)

args = Namespace(
    seed=1234,
    cuda=True,
    norm_main_df_path=google_drive_path + 'data/main.csv',
    sample_length=120,
    batch_size=256,
    num_workers=4
)

# Set seeds
set_seeds(seed=args.seed, cuda=args.cuda)

# Check CUDA
if not torch.cuda.is_available():
    print("CUDA not available")
    args.cuda = False
args.device = torch.device("cuda" if args.cuda else "cpu")
print("Using CUDA: {}".format(args.cuda))

Data Preprocessing

Add new columns to a DataFrame

origin_df = pd.DataFrame({...});
new_columns = {"col_a": [...], "col_b": [...]}
df_to_merge = pd.DataFrame(new_columns)
df_to_merge.index = origin_df.index
dest_df = origin_df.join(df_to_merge)

Functional version

def add_column_to_df(df:pd.DataFrame, new_columns:dict)->pd.DataFrame:
    df_to_merge = pd.DataFrame(new_columns)
    df_to_merge.index = df.index
    return df.join(df_to_merge)

Split Dataset

We can split a DataFrame into training set and validation set using the following functions.

Randomly split DataFrame:

def rand_split_df(df, valid_pct:float=0.2):
    msk = np.random.rand(len(df)) < valid_pct
    train_df = df[~msk]
    valid_df = df[msk]
    return train_df, valid_df

Sequently split DataFrame:

def seq_split_df(df, valid_pct:float=0.2):
  valid_size = int(len(df) * 0.2)
  return df[:-valid_size], df[-valid_size:]

Randomly split list:

msk = np.random.rand(len(input_matrices)) < 0.2

train_input_matrices = [x for i, x in enumerate(input_matrices) if not msk[i]]
valid_input_matrices = [x for i, x in enumerate(input_matrices) if msk[i]]

train_truths = [y for i, y in enumerate(ground_truths) if not msk[i]]
valid_truths = [y for i, y in enumerate(ground_truths) if msk[i]]

Randomly split ItemList while using fast.ai:

databunch = main_itemlist.split_by_rand_pct(0.2).databunch(bs=args.batch_size, collate_fn=data_batch_collate)

Sequently split ItemList while using fast.ai:

main_itemlist_size = len(position_predictions)
train_itemlist_size = int(main_itemlist_size * 0.8)
databunch = main_itemlist.split_by_idx(list(range(train_itemlist_size, main_itemlist_size))).label_from_df().databunch(bs=args.batch_size, collate_fn=data_batch_collate)

Custom Dataset

torch.utils.data

A simple custom Dataset(torch.utils.data.DataSet)

class PriceCurveDataset(Dataset):
  def __init__(self, matrices, prices):
    self.matrices = matrices
    self.prices = prices

  def __len__(self):
    return len(self.prices)

  def __getitem__(self, index):
    return self.matrices[index], self.prices[index]

A more complex and complete version of a custom time-series Dataset with oversampling, matrix forming, normalization and etc.

def split_df(df, valid_pct: float = 0.2):
  valid_size = int(len(df) * 0.2)
  return df[:-valid_size], df[-valid_size:]

def random_select(a:float, b:float, prob_a:float):
  return random.choices([a, b], weights=[prob_a, 1-prob_a])[0]

def random_oversmpl_times(n_times: float) -> int:
  floor_val = math.floor(n_times)
  ceil_val = math.ceil(n_times)
  floor_prob = ceil_val - n_times
  return int(random_select(floor_val, ceil_val, floor_prob))

def over_sample_df_with_dict(df, label_col: str, oversmpl_multi_dict: dict):
  index_map_list = []
  for key in oversmpl_multi_dict:
    oversmpl_ratio = oversmpl_multi_dict[key]
    indexes = df.index[df[label_col] == key].tolist()
    for i in indexes:
      random_neighbor = random_oversmpl_times(oversmpl_ratio)
      index_map_list += [i for n in range(random_neighbor)]
  return index_map_list

class TimeSerialClasDataset(Dataset):
  def __init__(self, df, label_col, drop_cols=[], time_len=120, 
               oversmpl_multi_dict:dict=None, normalize_per_item:bool=False):
    self.label_df = df[label_col]
    self.df = df.drop(drop_cols + [label_col], axis=1)
    self.df_len = len(df)
    self.row_width = len(self.df.iloc[0])
    self.padding_row = [0 for i in range(self.row_width)]
    self.time_len = time_len

    self.valid_len = self.df_len

    self.oversmpl_idx_map = None
    if oversmpl_multi_dict is not None:
      self.oversmpl_idx_map = over_sample_df_with_dict(df, label_col, oversmpl_multi_dict)
      self.valid_len = len(self.oversmpl_idx_map)
    self.normalize_per_item = normalize_per_item

  def __len__(self):
    return self.valid_len

  def __getitem__(self, i):
    i = self.get_real_index(i)
    begin_i = i - self.time_len + 1
    begin_i = begin_i if begin_i >= 0 else 0
    end_i = i + 1
    x = self.df.iloc[begin_i: end_i].values.tolist()
    if i < self.time_len - 1:
      pad_len = self.time_len - i - 1
      x += [self.padding_row for pad_i in range(pad_len)]
    y = self.label_df.iloc[i]
    if self.normalize_per_item:
      x = normalize(x, axis=0).tolist()
    return x, y

  def get_real_index(self, i):
    if self.oversmpl_idx_map is None:
      return i
    return self.oversmpl_idx_map[i]

Getting DataLoader or DataBunch(if you are using fast.ai):

def get_seq_clas_databunch(df: DataFrame, label_col:str, label_index_map: dict, path:PathOrStr = '.', drop_cols = [], 
                           time_len:int = 120, valid_pct: float = 0.2, train_bs: int = 64, 
                           valid_bs: int = 64, num_workers:int = 2, oversmpl_multi_dict:dict=None,
                           normalize_per_item:bool=False):

  def data_batch_collate(batch):
    x_list = []
    y_list = []

    for item in batch:
      x_list.append(item[0])
      y_list.append(label_index_map[item[1]])

    batch_x = torch.FloatTensor(x_list)
    batch_y = torch.LongTensor(y_list)

    return batch_x, batch_y

  train_df, valid_df = split_df(df, valid_pct)
  train_dataset = TimeSerialClasDataset(train_df, label_col, drop_cols, time_len, oversmpl_multi_dict, normalize_per_item)
  valid_dataset = TimeSerialClasDataset(valid_df, label_col, drop_cols, time_len, normalize_per_item=normalize_per_item)
  train_dataloader = DataLoader(train_dataset, batch_size=train_bs, shuffle=True, num_workers=num_workers)
  valid_dataloader = DataLoader(valid_dataset, batch_size=valid_bs, shuffle=True, num_workers=num_workers)
  data = DataBunch(train_dataloader, valid_dataloader, collate_fn=data_batch_collate, path=path)
  return data

Design Models

Count parameters of a model

def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

print(f'The model has {count_parameters(model):,} trainable parameters')

Sample Module Class

class ACRNN(nn.Module):
  def __init__(self, input_dim = 4, hidden_dim = 60, n_layers = 1, linears: list = [100, 20], bidirectional=False):
    super(ACRNN, self).__init__()
    self.input_dim=input_dim
    self.hidden_dim=hidden_dim
    self.n_layers=n_layers
    self.bidirectional=bidirectional
    self.rnn = nn.LSTM(input_dim, hidden_dim, num_layers=n_layers, batch_first=True, bidirectional=bidirectional)

    last_in_features = hidden_dim * self.get_rnn_layers(False) * 3
    linear_layers = []

    for linear_num in linears:
      linear_layers.append(nn.Linear(last_in_features, linear_num))
      linear_layers.append(nn.ReLU())
      last_in_features = linear_num

    linear_layers.append(nn.Linear(last_in_features, 1))
    self.linears = nn.Sequential(*linear_layers)

  def get_rnn_layers(self, with_n_layers = True):
    return (self.n_layers if with_n_layers else 1) * (2 if self.bidirectional else 1)

  def init_hidden(self, x):
    hidden_state_layers = self.get_rnn_layers()
    self.h_t = torch.zeros(hidden_state_layers, x.size(0), self.hidden_dim).to(args.device)
    self.c_t = torch.zeros(hidden_state_layers, x.size(0), self.hidden_dim).to(args.device)

  def forward(self, x):
    self.init_hidden(x)

    x, (self.h_t, self.c_t) = self.rnn(x, (self.h_t, self.c_t))

    x_max_pooled = nn.functional.max_pool2d(x, (x.size(1), 1)).flatten(1)
    x_avg_pooled = nn.functional.avg_pool2d(x, (x.size(1), 1)).flatten(1)
    x = x[:, -1, :]
    x = torch.cat((x, x_max_pooled, x_avg_pooled), 1)

    x = self.linears(x)
    return x.flatten(0)

Train The Model

Vanilla PyTorch style:

optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.8)
lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=7, gamma=0.1)

criterion = nn.MSELoss()
criterion.to(args.device)

from copy import deepcopy
import time

dataloaders = {'train': train_dataloader, 'valid': valid_dataloader}
dataset_sizes = {'train': len(train_dataset), 'valid': len(valid_dataset)}

def train_model(model, criterion, optimizer, scheduler, num_epochs = 5):
    since = time.time()

    best_model_wts = deepcopy(model.state_dict())
    best_loss = float("inf")

    for epoch in range(num_epochs):
        print('Epoch {}/{}'.format(epoch, num_epochs - 1))
        print('-' * 10)

        for phase in ['train', 'valid']:
            if phase == 'train':
                model.train() 
            else:
                model.eval()

            running_loss = 0.0

            for data in dataloaders[phase]:
                inputs = data[0].to(args.device)
                truths = data[1].to(args.device)

                optimizer.zero_grad()

                with torch.set_grad_enabled(phase == 'train'):
                    preds = model(inputs)

                    # Alter the MSE to RMSE by adding the sqrt computation
                    loss = torch.sqrt(criterion(preds.flatten(), truths))

                    if phase == 'train':
                        loss.backward()
                        optimizer.step()

                    running_loss += loss.item() * inputs.size(0)

            epoch_loss = running_loss / dataset_sizes[phase]

            print('{} Loss: {:.4f}'.format(phase, epoch_loss))

            if phase == 'valid':
                scheduler.step()

            # deep copy the model
            if phase == 'valid' and epoch_loss < best_loss:
                best_loss = epoch_loss
                best_model_wts = deepcopy(model.state_dict())



    time_elapsed = time.time() - since
    print('Training complete in {:.0f}m {:.0f}s'.format(
            time_elapsed // 60, time_elapsed % 60))
    print('Best val Loss: {:4f}'.format(best_loss))

    # load best model weights
    model.load_state_dict(best_model_wts)
    return model


model = train_model(model, criterion, optimizer, lr_scheduler,
                    num_epochs=10)

Using Ignite:

trainer = create_supervised_trainer(model, optimizer, criterion, device=args.device)
evaluator = create_supervised_evaluator(model,
                                        metrics={
                                            'mse': MeanSquaredError()
                                        },
                                        device=args.device)

# @trainer.on(Events.ITERATION_COMPLETED)
# def log_training_loss(trainer):
#     print("Epoch[{}] Loss: {:.2f}".format(trainer.state.epoch, trainer.state.output))

@trainer.on(Events.EPOCH_COMPLETED)
def log_training_results(trainer):
    evaluator.run(train_dataloader)
    metrics = evaluator.state.metrics
    print("Training Results - Epoch: {} Avg loss: {:.2f}"
          .format(trainer.state.epoch, metrics['mse']))

@trainer.on(Events.EPOCH_COMPLETED)
def log_validation_results(trainer):
    evaluator.run(valid_dataloader)
    metrics = evaluator.state.metrics
    print("Validation Results - Epoch: {} Avg loss: {:.2f}"
          .format(trainer.state.epoch, metrics['mse']))

trainer.run(train_dataloader, 4) # 3 epochs

The approach utilizing fast.ai will be discussed in later posts.