None
# code for loading the format for the notebook
import os
# path : store the current path to convert back to it later
path = os.getcwd()
os.chdir(os.path.join('..', '..', 'notebook_format'))
from formats import load_style
load_style(css_style='custom2.css', plot_style=False)
# os.chdir(path)
# 1. magic for inline plot
# 2. magic to print version
# 3. magic so that the notebook will reload external python modules
# 4. magic to enable retina (high resolution) plots
# https://gist.github.com/minrk/3301035
%matplotlib inline
%load_ext watermark
%load_ext autoreload
%autoreload 2
%config InlineBackend.figure_format='retina'
import os
import time
import torch
import numpy as np
import pandas as pd
import torch.nn as nn
import matplotlib.pyplot as plt
import sklearn.metrics as metrics
from math import ceil
from transformers import (
AutoTokenizer,
AutoModelForSequenceClassification,
TrainingArguments,
Trainer,
DataCollatorWithPadding
)
from tqdm import trange
from contextlib import contextmanager
from torch.utils.data import DataLoader
from onnxruntime import InferenceSession, SessionOptions
from datasets import load_dataset, DatasetDict, Dataset, disable_progress_bar
# we prevent progress bar from flooding notebook console
disable_progress_bar()
os.environ["DISABLE_MLFLOW_INTEGRATION"] = "TRUE"
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
%watermark -a 'Ethen' -u -d -t -v -p datasets,transformers,torch,tokenizers,numpy,pandas,onnxruntime
Using a serialized format like ONNX (Open Neural Network Exchange) [4] for inferencing instead of vanilla PyTorch has several benefits, especially when deploying machine learning models to production or sharing them across different platforms. A couple of reasons includes:
In this article, we'll be going over the process of converting a model into ONNX format, running inference with it, and performing benchmarks to see how it compares to vanilla PyTorch models.
dataset_dict = load_dataset("quora")
dataset_dict
dataset_dict['train'][0]
test_size = 0.1
val_size = 0.1
dataset_dict_test = dataset_dict['train'].train_test_split(test_size=test_size)
dataset_dict_train_val = dataset_dict_test['train'].train_test_split(test_size=val_size)
dataset_dict = DatasetDict({
"train": dataset_dict_train_val["train"],
"val": dataset_dict_train_val["test"],
"test": dataset_dict_test["test"]
})
dataset_dict
We won't be going over the details of the pre-trained tokenizer or model and only load one available from the huggingface model repository.
# https://huggingface.co/transformers/model_doc/distilbert.html
pretrained_model_name_or_path = "distilbert-base-uncased"
tokenizer = AutoTokenizer.from_pretrained(pretrained_model_name_or_path)
tokenizer
We can feed our tokenizer directly with a pair of sentences.
encoded_input = tokenizer(
"What is the step by step guide to invest in share market in india?",
"What is the step by step guide to invest in share market?"
)
encoded_input
Decoding the tokenized inputs, this model's tokenizer adds some special tokens such as, [SEP]
, that is used to indicate which token belongs to which segment/pair.
tokenizer.decode(encoded_input["input_ids"])
The proprocessing step will be task specific, if we happen to be using another dataset, this function needs to be modified accordingly.
def tokenize_fn(examples):
labels = [int(label) for label in examples["is_duplicate"]]
texts = [question["text"] for question in examples["questions"]]
tokenized_examples = tokenizer(texts)
tokenized_examples["labels"] = labels
return tokenized_examples
dataset_dict_tokenized = dataset_dict.map(
tokenize_fn,
batched=True,
num_proc=8,
remove_columns=["is_duplicate", "questions"]
)
dataset_dict_tokenized
dataset_dict_tokenized["train"][0]
Having preprocessed our raw dataset, for our text classification task, we use AutoModelForSequenceClassification
class to load the pre-trained model. The only other argument we need to specify is the number of class/label our text classification task has. Upon instantiating the model for the first time, we'll see some warnings generated, telling us we should fine tune this model on our down stream tasks before using it.
We'll also report standard binary classification evaluation metrics, including log loss, ROC-AUC, Average Precision (PR-AUC). As the focus is more around model inferencing, we only picked some default settings for training configuration.
def compute_metrics(eval_preds, round_digits: int = 3):
y_pred, y_true = eval_preds
y_score = y_pred[:, 1]
log_loss = round(metrics.log_loss(y_true, y_score), round_digits)
roc_auc = round(metrics.roc_auc_score(y_true, y_score), round_digits)
pr_auc = round(metrics.average_precision_score(y_true, y_score), round_digits)
return {
'roc_auc': roc_auc,
'pr_auc': pr_auc,
'log_loss': log_loss
}
model = AutoModelForSequenceClassification.from_pretrained(pretrained_model_name_or_path, num_labels=2)
print('# of parameters: ', model.num_parameters())
data_collator = DataCollatorWithPadding(tokenizer, padding=True)
args = TrainingArguments(
output_dir="quora_model_checkpoint",
learning_rate=0.0001, # 2e-5,
per_device_train_batch_size=64,
per_device_eval_batch_size=64,
gradient_accumulation_steps=2,
num_train_epochs=1,
weight_decay=0.01,
fp16=True,
lr_scheduler_type="constant",
evaluation_strategy="steps",
eval_steps=500,
save_strategy="steps",
save_steps=500,
save_total_limit=2,
load_best_model_at_end=True
)
trainer = Trainer(
model,
args=args,
data_collator=data_collator,
train_dataset=dataset_dict_tokenized["train"],
eval_dataset=dataset_dict_tokenized["val"],
compute_metrics=compute_metrics
)
trainer.train()
# load from the best checkpoint
model = AutoModelForSequenceClassification.from_pretrained(trainer.state.best_model_checkpoint)
print('# of parameters: ', model.num_parameters())
This section walks through the process of serializing our Pytorch model into ONNX format [4], and using ONNX runtime [5] for model inferencing.
Exporting the model can be done via the torch.onnx.export
function, which requires a model, sample input as well as some model specific configurations. Crucial parameters includes:
dynamic_axes
. By doing so, at inference time, ONNX won't be limited to the sample input size we've provided during this serialization process.samples = dataset_dict['test'][:1]
texts = [question["text"] for question in samples["questions"]]
tokenized_texts = tokenizer(texts, padding=True)
input_ids = tokenized_texts["input_ids"]
attention_mask = tokenized_texts["attention_mask"]
input_ids_tensor = torch.LongTensor(input_ids).to(model.device)
attention_mask_tensor = torch.LongTensor(attention_mask).to(model.device)
opset_version = 15
onnx_model_path = "text_classification.onnx"
torch.onnx.export(
model,
(input_ids_tensor, attention_mask_tensor),
onnx_model_path,
opset_version=opset_version,
input_names=["input_ids", "attention_mask"],
output_names=["output"],
dynamic_axes={
"input_ids": {0: "batch_size", 1: "sequence_len"},
"attention_mask": {0: "batch_size", 1: "sequence_len"},
"output": {0: "batch_size"},
}
)
The main entry point for using onnx runtime involves initiating the InferenceSession
class, and passing our inputs via its .run
method. Two things to note here:
def create_inference_session(
model_path: str,
intra_op_num_threads: int = 8,
provider: str = 'CPUExecutionProvider'
) -> InferenceSession:
"""
Create onnx runtime InferenceSession.
model_path : str
onnx model file.
intra_op_num_threads : int
Remember to tune this parameter.
provider : str
get_all_providers function can list all available providers.
e.g. CUDAExecutionProvider
"""
options = SessionOptions()
options.intra_op_num_threads = intra_op_num_threads
# load the model as a onnx graph
session = InferenceSession(model_path, options, providers=[provider])
session.disable_fallback()
return session
# pytorch inferencing
model.eval()
with torch.no_grad():
torch_output = model(input_ids_tensor, attention_mask_tensor).logits.detach().cpu().numpy()
# onnx runtime inferencing
input_feed = {
"input_ids": np.array(input_ids),
"attention_mask": np.array(attention_mask)
}
session = create_inference_session(onnx_model_path, provider="CUDAExecutionProvider")
onnx_output = session.run(output_names=["output"], input_feed=input_feed)[0]
if np.allclose(torch_output, onnx_output, atol=1e-5):
print("Exported model has been tested with ONNXRuntime, and the result looks good!")
print("input_ids: ", input_ids)
print("onnx output: ", onnx_output)
The next few section runs the benchmark on different batch sizes and inferencing options.
@contextmanager
def track_inference_time(buffer):
start = time.perf_counter()
yield
end = time.perf_counter()
elasped_time = (end - start)
buffer.append(elasped_time)
class Benchmark:
"""
Class to benchmark pytorch and onnx serialized models.
Each benchmark* method runs the model for n_rounds and
reports the average time it took to run inference.
"""
def __init__(
self,
model_pytorch: nn.Module,
model_onnx: InferenceSession,
n_rounds = 50,
n_warmup_rounds: int = 0
):
self.model_pytorch = model_pytorch
self.model_onnx = model_onnx
self.n_rounds = n_rounds
self.n_warmup_rounds = n_warmup_rounds
def benchmark_pytorch(self, input_ids, attention_mask):
"""Expects the input_ids to be padded to the same sequence length."""
device = self.model_pytorch.device
input_ids_tensor = torch.LongTensor(input_ids).to(device)
attention_mask_tensor = torch.LongTensor(attention_mask).to(device)
for _ in range(self.n_warmup_rounds):
self.model_pytorch(input_ids_tensor, attention_mask_tensor)
time_buffer = []
for _ in range(self.n_rounds):
with track_inference_time(time_buffer):
self.model_pytorch(input_ids_tensor, attention_mask_tensor).logits.detach().cpu().numpy()
avg_elapsed_time = np.mean(time_buffer)
return avg_elapsed_time
def benchmark_onnx(self, input_ids, attention_mask):
"""Expects the input_ids to be padded to the same sequence length."""
input_feed = {
"input_ids": input_ids,
"attention_mask": attention_mask
}
for _ in range(self.n_warmup_rounds):
self.model_onnx.run(["output"], input_feed)[0]
time_buffer = []
for _ in range(self.n_rounds):
with track_inference_time(time_buffer):
onnx_output = self.model_onnx.run(["output"], input_feed)[0]
avg_elapsed_time = np.mean(time_buffer)
return avg_elapsed_time
def benchmark_dynamic_onnx(self, input_ids, attention_mask):
"""
Expects the input ids to be of dynamic size (i.e. non-padded).
"""
input_feed = {
"input_ids": input_ids,
"attention_mask": attention_mask
}
for _ in range(self.n_warmup_rounds):
self.model_onnx.run(["output"], input_feed)[0]
time_buffer = []
for _ in range(self.n_rounds):
with track_inference_time(time_buffer):
batch_onnx_output = []
for input_id, mask in zip(input_ids, attention_mask):
input_feed = {"input_ids": [input_id], "attention_mask": [mask]}
onnx_output = self.model_onnx.run(["output"], input_feed)[0]
batch_onnx_output.append(onnx_output)
outputs = np.concatenate(batch_onnx_output)
avg_elapsed_time = np.mean(time_buffer)
return avg_elapsed_time
# benchmark on GPU
model.eval()
model.to("cuda")
session = create_inference_session(onnx_model_path, provider="CUDAExecutionProvider")
benchmark = Benchmark(model, session, n_rounds=50)
batch_sizes = [1, 4, 8, 16, 32, 64]
# {model_option: [throughput number for each batch size]}
# e.g. {pytorch_gpu: [10.0, 12.0]}
batch_throughput_results = {}
for batch_size in batch_sizes:
# tokenize & pad the raw text
samples = dataset_dict["test"][:batch_size]
texts = [question["text"] for question in samples["questions"]]
tokenized_texts = tokenizer(texts, padding=True)
input_ids = tokenized_texts["input_ids"]
attention_mask = tokenized_texts["attention_mask"]
pytorch_gpu_avg_elapsed_time = benchmark.benchmark_pytorch(input_ids, attention_mask)
onnx_gpu_avg_elapsed_time = benchmark.benchmark_onnx(input_ids, attention_mask)
throughput_results = {
"pytorch_gpu": round(batch_size / pytorch_gpu_avg_elapsed_time, 2),
"onnx_gpu": round(batch_size / onnx_gpu_avg_elapsed_time, 2)
}
for model_option, throughput in throughput_results.items():
if model_option in batch_throughput_results:
batch_throughput_results[model_option].append(throughput)
else:
batch_throughput_results[model_option] = [throughput]
batch_throughput_results
def plot_throughput(batch_throughput_results, batch_sizes):
fig = plt.figure(figsize=(10, 5))
bar_width = 0.3
br_list = np.arange(len(batch_sizes))
for i, (model_name, throughput_list) in enumerate(batch_throughput_results.items()):
br = br_list + (i * bar_width)
plt.bar(br, throughput_list, width=bar_width, label=model_name)
i = ceil(len(batch_throughput_results) / 2)
ticks = br_list + i * bar_width / 2
plt.xticks(ticks, batch_sizes)
plt.xlabel("Batch Size")
plt.ylabel("Throughput (samples/s)")
plt.title("Model Throughput Comparison")
plt.legend(fontsize=8, loc="upper left")
plt.show()
plot_throughput(batch_throughput_results, batch_sizes)
# benchmark on CPU
model.eval()
model.to("cpu")
session = create_inference_session(
onnx_model_path,
intra_op_num_threads=8,
provider="CPUExecutionProvider"
)
benchmark = Benchmark(model, session, n_rounds=50)
batch_sizes = [1, 4, 8, 16]
# {model_option: [throughput number for each batch size]}
# e.g. {pytorch_cpu: [10.0, 12.0]}
batch_throughput_results = {}
for batch_size in batch_sizes:
# tokenize & pad the raw text
samples = dataset_dict['test'][:batch_size]
texts = [question["text"] for question in samples["questions"]]
tokenized_texts = tokenizer(texts, padding=True)
input_ids = tokenized_texts["input_ids"]
attention_mask = tokenized_texts["attention_mask"]
pytorch_cpu_avg_elapsed_time = benchmark.benchmark_pytorch(input_ids, attention_mask)
onnx_cpu_avg_elapsed_time = benchmark.benchmark_onnx(input_ids, attention_mask)
tokenized_texts = tokenizer(texts)
input_ids = tokenized_texts["input_ids"]
attention_mask = tokenized_texts["attention_mask"]
dynamic_onnx_cpu_avg_elapsed_time = benchmark.benchmark_dynamic_onnx(input_ids, attention_mask)
throughput_results = {
"pytorch_cpu": round(batch_size / pytorch_cpu_avg_elapsed_time, 2),
"onnx_cpu": round(batch_size / onnx_cpu_avg_elapsed_time, 2),
"dynamic_onnx_cpu": round(batch_size / dynamic_onnx_cpu_avg_elapsed_time, 2)
}
for model_option, throughput in throughput_results.items():
if model_option in batch_throughput_results:
batch_throughput_results[model_option].append(throughput)
else:
batch_throughput_results[model_option] = [throughput]
batch_throughput_results
plot_throughput(batch_throughput_results, batch_sizes)
A couple of observations from the benchmark table and graph above.
We've only scratched the surface on what's possible in terms of speeding up our models for inferencing. There're additional techniques such as quantization [3] [7] and compression that we haven't covered. And in terms of model serialization, there're also various tools out there, with ONNX + ONNX runtime being just one of them. But hopefully, this introduction helps build an understanding that for model deployment, there's an additional model serialization step that is commonly used to optimize inference efficiency as well as development experience.