The ninth in Pytorch's series on Performance Profiling and Optimization aims to highlight the key role of performance analysis and optimization in machine learning development. Throughout the series, we reviewed a variety of practical tools and techniques for analyzing and improving the runtime performance of Pytorch-based AI/ML models. Our goals were two:
- Emphasises the importance of routine evaluation and optimization of AI/ML workloads.
- To demonstrate the accessibility of a variety of tools and technologies for analyzing and optimizing AI/ML runtime performance. You don't need to be a CUDA expert to significantly improve model performance and reduce computational costs.
In this post, we consider using Cuda Streams, a powerful feature of Nvidia's CUDA programming model, which provides a sophisticated way to run simultaneously with overlapping GPU operations. Normally, you associate the training workload of an AI/ML model with a single monolithic (aka “unbreakable”) calculation graph g When running on a GPU, there are some scenarios where you can break down the graph into two different subgraphs G1 and G2where g = g2*g1. In such cases, the CUDA stream allows you to “pipe” the computational graph, that is, program the training steps to execute G1 (Batch input) n+1) Parallel G2 (In nth Output of G1). This technique is particularly useful when:
- Neither subgraph utilizes the GPU completely when running it alone.
- The two subgraphs have similar computational costs (i.e. they do not control the runtime either).
We investigate two common scenarios where a “pipeline” can be achieved.
- Training or fine-tuning partial models:
It is common to freeze pre-trained models backbone Training only the (e.g., feature extractor or encoder) and models head (Example, decoder). Since freezing backbone It does not depend on the slope from headtwo can be run at the same time. - Offloading preprocessing data to the GPU:
Data preprocessing can be moved to the GPU as it is a common way to deal with bottlenecks in input pipelines (also known as GPU hunger). Preparing preprocessing operations on the model graph improves performance, but additional gain can be achieved by running preprocessing on separate CUDA streams in parallel with model execution. Compared to model calculations, preprocessing is not trivial.
To facilitate discussion, we define two toy training scripts and measure training performance in various scenarios. The experiment was run on an Amazon EC2 G5.2XLARGE instance (including NVIDIA A10G GPU and 8 VCPU) and ran Pytorch (2.6) Deep Learning Ami (Dlami).
Note: The code snippets we share are for demonstration purposes only. Don't rely on its correctness or optimality. The impact of using CUDA streams depends on the model architecture and system configuration. It is recommended that you perform your own profiling and experimentation before integrating your CUDA stream (or any other tool technique to be mentioned) into your workflow.
Part 1: Encoder decoder model pipeline
The first use cases we explore include a CNN-based image segmentation model consisting of a fixed (pre-training) encoder and a trainable decoder. In this scenario, the encoder weights are frozen and not affected by backpropagation, so the encoder can be run independently of decoder training. In this section, we evaluate the impact of using CUDA streams to pipe the training process.
Toy Image Segmentation Training Experiments
First, we start by defining it with a simple CNN-based image encoder and its corresponding decoder.
undefined
Next, we create a composite dataset of random images and segmentation maps.
from torch.utils.data import DataLoader
from torchvision.datasets.vision import VisionDataset
# A dataset with random images and per-pixel labels
class FakeDataset(VisionDataset):
def __init__(self):
super().__init__(root=None)
self.size = 1000000
def __getitem__(self, index):
# create a random image
img = torch.randint(0, 256, (3, img_size, img_size),
dtype=torch.uint8)
# create a random label map
target = torch.randint(0, num_classes, (img_size, img_size))
return img, target
def __len__(self):
return self.size
train_set = FakeDataset()
train_loader = DataLoader(
dataset=train_set,
batch_size=8,
num_workers=8
)
Finally, we define the loss function, optimizer, and training loop. Note that it freezes the encoder weights and trains only the decoder.
import time
device = torch.device("cuda")
criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(decoder.parameters())
# Freeze the encoder weights
encoder.requires_grad_(False)
encoder.eval().to(device)
decoder.train().to(device)
warmup = 10
active_batches = 100
total_iters = warmup + active_batches
for idx, data in enumerate(train_loader):
inputs = data[0].to(device=device, non_blocking=True).float()
labels = data[1].to(device=device, non_blocking=True)
optimizer.zero_grad()
with torch.no_grad():
features = encoder(inputs)
output = decoder(features)
loss = criterion(output, labels)
loss.backward()
optimizer.step()
if idx == warmup:
# sync the GPU and start the timer
torch.cuda.synchronize()
t0 = time.perf_counter()
if idx == total_iters:
break
# wait for the GPU to finnish and then stop the timer
torch.cuda.synchronize()
total_time = time.perf_counter() - t0
print(f'throughput: {active_batches / total_time}')
The baseline training script achieves an average throughput of 83 steps, with an average GPU utilization of 85%.
Pipeline for model execution using CUDA streams
The revised version of the training loop shown below introduces two CUDA streams: One runs the encoder, and the other trains the decoder. Each iteration performs two operations simultaneously.
- Train your decoder with image features and labels from batches n.
- Run the encoder on the input batch n+1 Generates image functions.
encoder_stream = torch.cuda.Stream()
decoder_stream = torch.cuda.Stream()
# initialize the features to None
features = None
for idx, data in enumerate(train_loader):
inputs = data[0].to(device, non_blocking=True).float()
labels_next = data[1].to(device, non_blocking=True)
if features is not None:
with torch.cuda.stream(decoder_stream):
decoder_stream.wait_stream(encoder_stream)
optimizer.zero_grad()
output = decoder(features)
loss = criterion(output, labels)
loss.backward()
optimizer.step()
with torch.cuda.stream(encoder_stream):
with torch.no_grad():
features = encoder(inputs)
# Record that features was produced on s1_backbone
features.record_stream(encoder_stream)
labels = labels_next
if idx == warmup:
# sync the GPU and start the timer
torch.cuda.synchronize()
t0 = time.perf_counter()
if idx == total_iters:
break
# wait for the GPU to finish and then stop the timer
torch.cuda.synchronize()
total_time = time.perf_counter() - t0
print(f'throughput: {active_batches / total_time}')
This change represents a 9.6% speedup, with an average throughput of 91 steps per second. This is a major improvement. Especially considering that the baseline already has a high GPU utilization (85%).
Pipeline sensitivity to workload properties
The effectiveness of pipelines using CUDA streams depends heavily on the details of the training workload and runtime environment. If the encoder is significantly larger than the decoder (or vice versa), the pipeline can produce little benefit or interfere with performance. Conversely, if the GPU is underutilized, pipelines tend to provide greater benefits.
To illustrate this dependency, we reproduce the experiment with various batch sizes. The results are summarized below.

Larger batch sizes reduce the benefits of pipelining. This is likely because larger batch sizes will naturally lead to higher (more efficient) GPU usage, and there is less room for improvement through concurrent execution.
Part 2: Offloading GPU Enhancements
In this section, we apply the use of CUDA streams to accelerate data augmentation. Previous blog posts (here, here, etc.) have studied bottleneck problems on data entry pipelines from various perspectives and reviewed several techniques for diagnosing and addressing them. A common cause of these bottlenecks is exhausted CPU resources. The CPU is unable to meet the computational needs of Preprocessing Pipeline. As a result, GPU hunger is a scenario in which expensive GPUs are located idle and waiting for data to arrive.
One effective solution is to offload heavy data preprocessing to the GPU. We go a step further by demonstrating this technique and running augmentation on a dedicated CUDA stream, allowing model training and concurrent execution.
Toy Image Classification Training Experiments
First, we start by defining a simple CNN-based image classification model.
import torch
import torch.nn as nn
import torch
import torch.nn as nn
img_size = 256
num_classes = 10
model = nn.Sequential(
# Start with 256x256 image
nn.Conv2d(3, 16, kernel_size=1),
nn.ReLU(inplace=True),
nn.Conv2d(16, 32, kernel_size=2, stride=2), # 2x downsample
nn.ReLU(inplace=True),
nn.Conv2d(32, 64, kernel_size=2, stride=2), # 4x downsample
nn.ReLU(inplace=True),
nn.Conv2d(64, 128, kernel_size=2, stride=2), # 8x downsample
nn.ReLU(inplace=True),
nn.Conv2d(128, 256, kernel_size=2, stride=2), # 16x downsample
nn.ReLU(inplace=True),
nn.Conv2d(256, 512, kernel_size=2, stride=2), # 32x downsample
nn.ReLU(inplace=True),
nn.Conv2d(512, 1024, kernel_size=2, stride=2), # 64x downsample
nn.ReLU(inplace=True),
nn.Conv2d(1024, 2048, kernel_size=2, stride=2), # 128X downsample
nn.ReLU(inplace=True),
nn.Conv2d(2048, 4096, kernel_size=2, stride=2), # 256X
nn.Flatten(),
nn.Linear(4096, num_classes)
)
Next, we create a synthetic dataset with an augmentation pipeline that is intentionally designed to cause severe performance bottlenecks.
import random
from torch.utils.data import DataLoader
import torchvision.transforms.v2 as T
from torchvision.datasets.vision import VisionDataset
import torchvision.transforms.v2.functional as F
import torchvision.ops as ops
# A dataset with random images and labels
class FakeDataset(VisionDataset):
def __init__(self, transform = None):
super().__init__(root=None, transform=transform)
self.size = 1000000
def __getitem__(self, index):
# create a random image
img = torch.randint(0, 256, (3, img_size, img_size),
dtype=torch.uint8)
# create a random label
target = torch.randint(0, num_classes, (1, ))
if self.transform:
# Apply tranformations
img = self.transform(img)
return img, target
def __len__(self):
return self.size
augmentations = T.Compose([
T.ToDtype(torch.float32),
T.RandomCrop(img_size//2),
T.Resize(img_size),
T.RandomRotation(degrees=45.0),
T.GaussianBlur(kernel_size=7),
T.Normalize(mean=[0, 0, 0], std=[1, 1, 1])
])
train_set = FakeDataset(transform=augmentations)
train_loader = DataLoader(
dataset=train_set,
batch_size=32,
num_workers=8
)
Finally, we define the loss function, optimizer, and training loop.
import time
device = torch.device("cuda")
criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters())
model.train().to(device)
warmup = 10
active_batches = 100
total_iters = warmup + active_batches
for idx, data in enumerate(train_loader):
inputs = data[0].to(device=device, non_blocking=True)
labels = data[1].to(device=device, non_blocking=True).squeeze()
optimizer.zero_grad()
output = model(inputs)
loss = criterion(output, labels)
loss.backward()
optimizer.step()
if idx == warmup:
# sync the GPU and start the timer
torch.cuda.synchronize()
t0 = time.perf_counter()
if idx == total_iters:
break
# wait for the GPU to finnish and then stop the timer
torch.cuda.synchronize()
total_time = time.perf_counter() - t0
print(f'throughput: {active_batches / total_time}')
Running this baseline script will result in an average throughput of 20.41 steps per second, with only 42% GPU. Heavy data augmentation is choking the CPU, which leads to GPU hunger. For more information on detecting bottlenecks in your data entry pipeline, see my previous post.
Offloading data augmentation to GPU
Move the augmentation to the GPU to address the performance bottlenecks in the data input pipeline.
The first step is to define a custom data transformation that applies random rotations and crops per sample in batches. This is important. Because the built-in TorchVision transformation applies the same augmentation across the batch, thus losing the per-sample randomness seen on the CPU.
I'll implement it Batch Random Crop Convert using the ROI_ALIGN operator.
class BatchRandomCrop(T.Transform):
def __init__(self, output_size):
super().__init__()
self.output_size = output_size
def transform(self, img: torch.Tensor, params: dict):
batch_size, _, original_height, original_width = img.shape
device = img.device
max_top = original_height - self.output_size
max_left = original_width - self.output_size
# Generate random top and left coords for each image in the batch
random_top = torch.randint(0, max_top + 1, (batch_size,),
device=device, dtype=torch.float32)
random_left = torch.randint(0, max_left + 1, (batch_size,),
device=device, dtype=torch.float32)
image_indices = torch.arange(batch_size, device=device,
dtype=torch.float32)
boxes = torch.stack([
image_indices,
random_left,
random_top,
random_left + self.output_size,
random_top + self.output_size
], dim=1)
cropped_batch = ops.roi_align(
img,
boxes,
output_size=self.output_size
)
return cropped_batch
I'll implement it Batch Random Rotate Transfrom by repeating all images in the batch and applying a random rotation to each. Note that this version is not vectorized. A fully vectorized implementation would require more effort.
class BatchRandomRotation(T.Transform):
def __init__(self, degrees):
super().__init__()
self .degrees = degrees
def transform(self, inpt: torch.Tensor, params: dict):
# split the batch into a list of individual images
images = list(torch.unbind(inpt, dim=0))
augmented_images = []
for img_tensor in images:
# generate a random angle
angle = random.uniform(-self.degrees, self.degrees)
# apply the rotation to the single image
transformed_img = F.rotate(
img_tensor,
angle=angle
)
augmented_images.append(transformed_img)
# stack the transformed images
return torch.stack(augmented_images, dim=0)
This defines batch_transform This mimics the CPU-based augmentation pipeline defined above.
batch_transform = T.Compose([
T.ToDtype(torch.float32),
BatchRandomCrop(img_size//2),
T.Resize(img_size),
BatchRandomRotation(degrees=45.0),
T.GaussianBlur(kernel_size=7),
T.Normalize(mean=[0, 0, 0], std=[1, 1, 1])
])
Finally, reset the dataset and update the training loop to apply the new one batch_transform:
train_set = FakeDataset(transform=None)
train_loader = DataLoader(
dataset=train_set,
batch_size=32,
num_workers=8
)
for idx, data in enumerate(train_loader):
inputs = data[0].to(device=device, non_blocking=True)
labels = data[1].to(device=device, non_blocking=True).squeeze()
# apply augmentations
inputs = batch_transform(inputs)
optimizer.zero_grad()
output = model(inputs)
loss = criterion(output, labels)
loss.backward()
optimizer.step()
if idx == warmup:
torch.cuda.synchronize()
t0 = time.perf_counter()
if idx == total_iters:
break
torch.cuda.synchronize()
total_time = time.perf_counter() - t0
print(f'throughput: {active_batches / total_time}')
This updated training script improves throughput to 35.22 steps per second. Baseline results speed up by 72.57%.
Enhanced pipelining using CUDA streams
Next, pipeline the augmentation and training steps using two separate CUDA streams. One is for data conversion for training models. Each iteration of the loop performs two concurrent operations.
- Train your model in an extended batch n.
- Perform GPU-based data augmentation in batches n+1
transform_stream = torch.cuda.Stream()
model_stream = torch.cuda.Stream()
# initialize the transformed value to None
transformed = None
for idx, data in enumerate(train_loader):
inputs = data[0]
labels_next = data[1]
if transformed is not None:
with torch.cuda.stream(model_stream):
labels = labels.to(device, non_blocking=True).squeeze()
model_stream.wait_stream(transform_stream)
optimizer.zero_grad()
output = model(transformed)
loss = criterion(output, labels)
loss.backward()
optimizer.step()
with torch.cuda.stream(transform_stream):
inputs = inputs.to(device, non_blocking=True)
transformed = batch_transform(inputs)
# Record that the tensor was produced on transform_stream
transformed.record_stream(transform_stream)
labels = labels_next
if idx == warmup:
torch.cuda.synchronize()
t0 = time.perf_counter()
if idx == total_iters:
break
torch.cuda.synchronize()
total_time = time.perf_counter() - t0
print(f'throughput: {active_batches / total_time}')
This further improves throughput to 38.82 steps per second. This increases by 10.2% in the serialized solution and 90.20% more than the original baseline.
Pipeline sensitivity to workload properties
As we saw in Part 1, the benefits of pipelines using CUDA streams vary depending on the workload details. The table below captures results for several different batch sizes.

Larger batch sizes make GPU offloading more effective and significantly improve performance. At the same time, profits from the pipeline will decrease. This may be based on the fact that larger batch sizes increase GPU efficiency and reduce the chances of overlap.
summary
When it comes to running AI/ML workloads, they are counted every millisecond. In this post, we investigated the impact of pipelining AI/ML training steps using CUDA streams in two common scenarios. This is offloading of partial model training and data augmentation to the GPU. In both cases, the pipelined solution was superior to the serialized implementation, but the degree of improvement was significantly different based on the batch size value.
As highlighted throughout the post, the expected impact of using CUDA streams may vary widely based on your AI/ML workload. For example, if the GPU is already being utilized efficiently, the overhead of using a CUDA stream can actually lead to poor runtime performance. We highly recommend testing this technique with your own workload before adopting this approach.
I hope the techniques explained in this post are useful. Check out other posts in this series for tips, tricks and techniques for profiling and optimizing your AI/ML workflows.
