Deep Learning with PyTorch: A Practical Introduction
Learn to build and train neural networks using PyTorch with hands-on examples.
PyTorch has become the go-to framework for deep learning research and is increasingly adopted for production systems. Its dynamic computation graph, Pythonic interface, and strong community support make it an excellent choice for both beginners and experts. This comprehensive guide will take you from PyTorch basics to building production-ready deep learning models.
Table of Contents
- Getting Started with PyTorch
- Tensors and Operations
- Automatic Differentiation
- Building Neural Networks
- Training Deep Learning Models
- Convolutional Neural Networks
- Recurrent Neural Networks
- Transfer Learning
- Advanced Training Techniques
- Model Deployment
Getting Started with PyTorch
Installation
# CPU only
pip install torch torchvision torchaudio
# With CUDA support (check your CUDA version)
pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu118
# Verify installation
python -c "import torch; print(torch.__version__); print(torch.cuda.is_available())"
Why PyTorch?
Key Advantages:
- Dynamic Computation Graphs: Define-by-run approach for flexibility
- Pythonic: Natural Python integration, easy debugging
- Research-Friendly: Quick prototyping and experimentation
- Production-Ready: TorchScript, ONNX export, mobile support
- Strong Ecosystem: Hugging Face, PyTorch Lightning, timm, etc.
Basic Setup
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import DataLoader, Dataset, TensorDataset
import torchvision
import torchvision.transforms as transforms
import numpy as np
import matplotlib.pyplot as plt
# Set random seeds for reproducibility
torch.manual_seed(42)
np.random.seed(42)
# Check device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")
# Check GPU info if available
if torch.cuda.is_available():
print(f"GPU: {torch.cuda.get_device_name(0)}")
print(f"Memory: {torch.cuda.get_device_properties(0).total_memory / 1e9:.2f} GB")
Tensors and Operations
Tensors are the fundamental data structure in PyTorch, similar to NumPy arrays but with GPU support and automatic differentiation.
Creating Tensors
import torch
# From Python lists
x = torch.tensor([1, 2, 3, 4, 5])
y = torch.tensor([[1, 2, 3], [4, 5, 6]])
# Specific data types
x_float = torch.tensor([1.0, 2.0, 3.0], dtype=torch.float32)
x_int = torch.tensor([1, 2, 3], dtype=torch.int64)
# From NumPy
import numpy as np
np_array = np.array([1, 2, 3])
tensor_from_numpy = torch.from_numpy(np_array)
back_to_numpy = tensor_from_numpy.numpy()
# Special tensors
zeros = torch.zeros(3, 4) # All zeros
ones = torch.ones(3, 4) # All ones
rand = torch.rand(3, 4) # Uniform [0, 1)
randn = torch.randn(3, 4) # Normal distribution
arange = torch.arange(0, 10, 2) # [0, 2, 4, 6, 8]
linspace = torch.linspace(0, 1, 5) # 5 evenly spaced
eye = torch.eye(3) # Identity matrix
empty = torch.empty(3, 4) # Uninitialized
full = torch.full((3, 4), fill_value=7) # All 7s
# Like existing tensor
x = torch.randn(3, 4)
x_zeros = torch.zeros_like(x)
x_ones = torch.ones_like(x)
x_rand = torch.rand_like(x)
print(f"Shape: {x.shape}")
print(f"Data type: {x.dtype}")
print(f"Device: {x.device}")
Tensor Operations
# Basic operations
a = torch.tensor([1.0, 2.0, 3.0])
b = torch.tensor([4.0, 5.0, 6.0])
# Element-wise operations
add = a + b # or torch.add(a, b)
sub = a - b # or torch.sub(a, b)
mul = a * b # or torch.mul(a, b)
div = a / b # or torch.div(a, b)
pow = a ** 2 # or torch.pow(a, 2)
# In-place operations (modify tensor directly)
a.add_(b) # a = a + b
a.mul_(2) # a = a * 2
# Matrix operations
A = torch.randn(3, 4)
B = torch.randn(4, 5)
# Matrix multiplication
C = torch.mm(A, B) # 3x4 @ 4x5 = 3x5
C = A @ B # Same as torch.mm
C = torch.matmul(A, B) # More general, handles batches
# Batch matrix multiplication
batch_A = torch.randn(10, 3, 4)
batch_B = torch.randn(10, 4, 5)
batch_C = torch.bmm(batch_A, batch_B) # 10x3x5
# Dot product
dot = torch.dot(a, b)
# Transpose
At = A.T
At = A.transpose(0, 1)
At = torch.transpose(A, 0, 1)
# Aggregation
x = torch.randn(3, 4)
x.sum() # Sum of all elements
x.sum(dim=0) # Sum along dimension 0
x.sum(dim=1) # Sum along dimension 1
x.mean() # Mean
x.std() # Standard deviation
x.max() # Maximum value
x.min() # Minimum value
x.argmax() # Index of maximum
x.argmax(dim=1) # Index of max along dim 1
Reshaping Tensors
x = torch.randn(12)
# Reshape
x_reshaped = x.reshape(3, 4)
x_reshaped = x.view(3, 4) # Must be contiguous
x_reshaped = x.reshape(3, -1) # Infer dimension
# Add/remove dimensions
x = torch.randn(3, 4)
x_unsqueeze = x.unsqueeze(0) # Add dim at position 0: 1x3x4
x_unsqueeze = x.unsqueeze(-1) # Add dim at end: 3x4x1
x_squeeze = x_unsqueeze.squeeze() # Remove dims of size 1
# Expand dimensions
x = torch.randn(1, 3)
x_expanded = x.expand(5, 3) # Repeat along dim 0
# Concatenate
a = torch.randn(3, 4)
b = torch.randn(3, 4)
cat_dim0 = torch.cat([a, b], dim=0) # 6x4
cat_dim1 = torch.cat([a, b], dim=1) # 3x8
# Stack (creates new dimension)
stacked = torch.stack([a, b], dim=0) # 2x3x4
# Flatten
x = torch.randn(2, 3, 4)
x_flat = x.flatten() # 24
x_flat = x.flatten(1) # 2x12 (flatten from dim 1)
GPU Operations
# Move to GPU
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
x = torch.randn(3, 4)
x_gpu = x.to(device)
x_gpu = x.cuda() # Explicit CUDA
x_cpu = x_gpu.cpu() # Move back to CPU
# Create directly on GPU
x_gpu = torch.randn(3, 4, device=device)
# Check device
print(x_gpu.device)
print(x_gpu.is_cuda)
# Multi-GPU
if torch.cuda.device_count() > 1:
x = x.to('cuda:0') # First GPU
y = y.to('cuda:1') # Second GPU
# Memory management
torch.cuda.empty_cache() # Free unused memory
print(torch.cuda.memory_allocated())
print(torch.cuda.memory_reserved())
Automatic Differentiation
PyTorch’s autograd system automatically computes gradients, essential for training neural networks.
Basic Autograd
# Create tensor with gradient tracking
x = torch.tensor([2.0, 3.0], requires_grad=True)
# Forward computation
y = x ** 2
z = y.sum()
# Backward pass
z.backward()
# Gradients
print(x.grad) # tensor([4., 6.]) - dz/dx = 2x
# Clear gradients
x.grad.zero_()
# Multiple backward passes
x = torch.tensor([2.0, 3.0], requires_grad=True)
for i in range(3):
y = x ** 2
z = y.sum()
z.backward()
print(f"Iteration {i}: {x.grad}")
x.grad.zero_() # Must clear for next iteration
Gradient Control
# Disable gradient computation
x = torch.randn(3, requires_grad=True)
with torch.no_grad():
y = x * 2 # No gradient tracked
# Or use decorator
@torch.no_grad()
def inference(model, x):
return model(x)
# Detach from computation graph
x = torch.randn(3, requires_grad=True)
y = x * 2
z = y.detach() # z has no gradient history
# Enable/disable requires_grad
x = torch.randn(3)
x.requires_grad_(True) # Enable
x.requires_grad_(False) # Disable
# Gradient for specific parameters only
model = nn.Linear(10, 5)
for param in model.parameters():
param.requires_grad = False # Freeze all
model.weight.requires_grad = True # Unfreeze weight only
Custom Autograd Functions
class MyReLU(torch.autograd.Function):
@staticmethod
def forward(ctx, input):
ctx.save_for_backward(input)
return input.clamp(min=0)
@staticmethod
def backward(ctx, grad_output):
input, = ctx.saved_tensors
grad_input = grad_output.clone()
grad_input[input < 0] = 0
return grad_input
# Usage
x = torch.randn(5, requires_grad=True)
y = MyReLU.apply(x)
y.sum().backward()
print(x.grad)
Building Neural Networks
The nn.Module Class
import torch.nn as nn
class SimpleNet(nn.Module):
def __init__(self, input_size, hidden_size, num_classes):
super().__init__()
# Define layers
self.fc1 = nn.Linear(input_size, hidden_size)
self.bn1 = nn.BatchNorm1d(hidden_size)
self.relu = nn.ReLU()
self.dropout = nn.Dropout(0.5)
self.fc2 = nn.Linear(hidden_size, hidden_size)
self.bn2 = nn.BatchNorm1d(hidden_size)
self.fc3 = nn.Linear(hidden_size, num_classes)
def forward(self, x):
# Define forward pass
x = self.fc1(x)
x = self.bn1(x)
x = self.relu(x)
x = self.dropout(x)
x = self.fc2(x)
x = self.bn2(x)
x = self.relu(x)
x = self.dropout(x)
x = self.fc3(x)
return x
# Create model
model = SimpleNet(784, 256, 10)
print(model)
# Count parameters
total_params = sum(p.numel() for p in model.parameters())
trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
print(f"Total parameters: {total_params:,}")
print(f"Trainable parameters: {trainable_params:,}")
Using nn.Sequential
# Simple sequential model
model = nn.Sequential(
nn.Linear(784, 256),
nn.BatchNorm1d(256),
nn.ReLU(),
nn.Dropout(0.3),
nn.Linear(256, 128),
nn.BatchNorm1d(128),
nn.ReLU(),
nn.Dropout(0.3),
nn.Linear(128, 10)
)
# Named sequential
model = nn.Sequential(
('flatten', nn.Flatten()),
('fc1', nn.Linear(784, 256)),
('relu1', nn.ReLU()),
('fc2', nn.Linear(256, 10))
)
# Access layers
print(model[0]) # First layer
print(model.fc1) # By name
Common Layer Types
# Linear (Dense) layers
linear = nn.Linear(in_features=100, out_features=50, bias=True)
# Convolutional layers
conv1d = nn.Conv1d(in_channels=1, out_channels=32, kernel_size=3, stride=1, padding=1)
conv2d = nn.Conv2d(in_channels=3, out_channels=64, kernel_size=3, stride=1, padding=1)
conv3d = nn.Conv3d(in_channels=1, out_channels=32, kernel_size=3)
# Pooling layers
maxpool = nn.MaxPool2d(kernel_size=2, stride=2)
avgpool = nn.AvgPool2d(kernel_size=2)
adaptive_pool = nn.AdaptiveAvgPool2d(output_size=(1, 1))
# Normalization layers
batchnorm1d = nn.BatchNorm1d(num_features=256)
batchnorm2d = nn.BatchNorm2d(num_features=64)
layernorm = nn.LayerNorm(normalized_shape=256)
groupnorm = nn.GroupNorm(num_groups=8, num_channels=64)
# Dropout
dropout = nn.Dropout(p=0.5)
dropout2d = nn.Dropout2d(p=0.2)
# Recurrent layers
rnn = nn.RNN(input_size=100, hidden_size=256, num_layers=2, batch_first=True)
lstm = nn.LSTM(input_size=100, hidden_size=256, num_layers=2, batch_first=True, bidirectional=True)
gru = nn.GRU(input_size=100, hidden_size=256, num_layers=2, batch_first=True)
# Embedding
embedding = nn.Embedding(num_embeddings=10000, embedding_dim=128)
# Transformer layers
transformer = nn.Transformer(d_model=512, nhead=8, num_encoder_layers=6)
encoder_layer = nn.TransformerEncoderLayer(d_model=512, nhead=8)
Activation Functions
# As modules
relu = nn.ReLU()
leaky_relu = nn.LeakyReLU(negative_slope=0.01)
prelu = nn.PReLU()
elu = nn.ELU()
gelu = nn.GELU()
selu = nn.SELU()
sigmoid = nn.Sigmoid()
tanh = nn.Tanh()
softmax = nn.Softmax(dim=1)
log_softmax = nn.LogSoftmax(dim=1)
# As functions
import torch.nn.functional as F
x = torch.randn(10)
y = F.relu(x)
y = F.leaky_relu(x, negative_slope=0.01)
y = F.gelu(x)
y = F.sigmoid(x)
y = F.softmax(x, dim=0)
Weight Initialization
def init_weights(m):
if isinstance(m, nn.Linear):
# Xavier/Glorot initialization
nn.init.xavier_uniform_(m.weight)
if m.bias is not None:
nn.init.zeros_(m.bias)
elif isinstance(m, nn.Conv2d):
# Kaiming/He initialization
nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
if m.bias is not None:
nn.init.zeros_(m.bias)
elif isinstance(m, nn.BatchNorm2d):
nn.init.ones_(m.weight)
nn.init.zeros_(m.bias)
# Apply to model
model.apply(init_weights)
# Other initialization methods
nn.init.normal_(tensor, mean=0.0, std=1.0)
nn.init.uniform_(tensor, a=0.0, b=1.0)
nn.init.constant_(tensor, val=0.5)
nn.init.orthogonal_(tensor)
nn.init.sparse_(tensor, sparsity=0.1)
Training Deep Learning Models
Complete Training Pipeline
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
import matplotlib.pyplot as plt
from tqdm import tqdm
# Hyperparameters
batch_size = 64
learning_rate = 0.001
num_epochs = 10
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# Data transforms
transform = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.1307,), (0.3081,))
])
# Load MNIST dataset
train_dataset = datasets.MNIST(root='./data', train=True, download=True, transform=transform)
test_dataset = datasets.MNIST(root='./data', train=False, download=True, transform=transform)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=4)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, num_workers=4)
# Model
class MLP(nn.Module):
def __init__(self):
super().__init__()
self.flatten = nn.Flatten()
self.layers = nn.Sequential(
nn.Linear(784, 512),
nn.BatchNorm1d(512),
nn.ReLU(),
nn.Dropout(0.3),
nn.Linear(512, 256),
nn.BatchNorm1d(256),
nn.ReLU(),
nn.Dropout(0.3),
nn.Linear(256, 10)
)
def forward(self, x):
x = self.flatten(x)
return self.layers(x)
model = MLP().to(device)
# Loss and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
# Learning rate scheduler
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', patience=2, factor=0.5)
# Training function
def train_epoch(model, dataloader, criterion, optimizer, device):
model.train()
total_loss = 0
correct = 0
total = 0
for batch_idx, (data, target) in enumerate(tqdm(dataloader, desc='Training')):
data, target = data.to(device), target.to(device)
# Forward pass
output = model(data)
loss = criterion(output, target)
# Backward pass
optimizer.zero_grad()
loss.backward()
optimizer.step()
# Statistics
total_loss += loss.item()
_, predicted = output.max(1)
total += target.size(0)
correct += predicted.eq(target).sum().item()
return total_loss / len(dataloader), 100. * correct / total
# Validation function
def validate(model, dataloader, criterion, device):
model.eval()
total_loss = 0
correct = 0
total = 0
with torch.no_grad():
for data, target in tqdm(dataloader, desc='Validating'):
data, target = data.to(device), target.to(device)
output = model(data)
loss = criterion(output, target)
total_loss += loss.item()
_, predicted = output.max(1)
total += target.size(0)
correct += predicted.eq(target).sum().item()
return total_loss / len(dataloader), 100. * correct / total
# Training loop
train_losses, val_losses = [], []
train_accs, val_accs = [], []
for epoch in range(num_epochs):
print(f"\nEpoch {epoch+1}/{num_epochs}")
print("-" * 40)
train_loss, train_acc = train_epoch(model, train_loader, criterion, optimizer, device)
val_loss, val_acc = validate(model, test_loader, criterion, device)
# Scheduler step
scheduler.step(val_loss)
# Record metrics
train_losses.append(train_loss)
val_losses.append(val_loss)
train_accs.append(train_acc)
val_accs.append(val_acc)
print(f"Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.2f}%")
print(f"Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.2f}%")
print(f"LR: {optimizer.param_groups[0]['lr']:.6f}")
# Plot training history
fig, axes = plt.subplots(1, 2, figsize=(12, 4))
axes[0].plot(train_losses, label='Train')
axes[0].plot(val_losses, label='Val')
axes[0].set_xlabel('Epoch')
axes[0].set_ylabel('Loss')
axes[0].legend()
axes[0].set_title('Loss')
axes[1].plot(train_accs, label='Train')
axes[1].plot(val_accs, label='Val')
axes[1].set_xlabel('Epoch')
axes[1].set_ylabel('Accuracy (%)')
axes[1].legend()
axes[1].set_title('Accuracy')
plt.tight_layout()
plt.show()
Loss Functions
# Classification
cross_entropy = nn.CrossEntropyLoss() # Multi-class
nll_loss = nn.NLLLoss() # With log_softmax
bce = nn.BCELoss() # Binary (requires sigmoid output)
bce_logits = nn.BCEWithLogitsLoss() # Binary (raw logits)
# Regression
mse = nn.MSELoss() # Mean Squared Error
mae = nn.L1Loss() # Mean Absolute Error
smooth_l1 = nn.SmoothL1Loss() # Huber loss
huber = nn.HuberLoss(delta=1.0)
# Other
kl_div = nn.KLDivLoss() # KL Divergence
cosine = nn.CosineEmbeddingLoss() # Cosine similarity
triplet = nn.TripletMarginLoss() # Triplet loss
# Class weights for imbalanced data
class_weights = torch.tensor([1.0, 5.0, 2.0]) # Weight each class
criterion = nn.CrossEntropyLoss(weight=class_weights)
# Label smoothing
criterion = nn.CrossEntropyLoss(label_smoothing=0.1)
Optimizers
import torch.optim as optim
# Basic optimizers
sgd = optim.SGD(model.parameters(), lr=0.01, momentum=0.9, weight_decay=1e-5)
adam = optim.Adam(model.parameters(), lr=0.001, betas=(0.9, 0.999), weight_decay=1e-5)
adamw = optim.AdamW(model.parameters(), lr=0.001, weight_decay=0.01) # Decoupled weight decay
rmsprop = optim.RMSprop(model.parameters(), lr=0.01, alpha=0.99)
# Per-parameter options
optimizer = optim.Adam([
{'params': model.base.parameters(), 'lr': 1e-4},
{'params': model.classifier.parameters(), 'lr': 1e-3}
], lr=1e-3)
# Gradient clipping
torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
torch.nn.utils.clip_grad_value_(model.parameters(), clip_value=1.0)
Learning Rate Schedulers
# Step decay
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=10, gamma=0.1)
# Multi-step decay
scheduler = optim.lr_scheduler.MultiStepLR(optimizer, milestones=[30, 60, 90], gamma=0.1)
# Exponential decay
scheduler = optim.lr_scheduler.ExponentialLR(optimizer, gamma=0.95)
# Reduce on plateau
scheduler = optim.lr_scheduler.ReduceLROnPlateau(
optimizer, mode='min', factor=0.5, patience=5, verbose=True
)
# Cosine annealing
scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=100)
# Cosine annealing with warm restarts
scheduler = optim.lr_scheduler.CosineAnnealingWarmRestarts(optimizer, T_0=10, T_mult=2)
# One cycle policy
scheduler = optim.lr_scheduler.OneCycleLR(
optimizer, max_lr=0.01, total_steps=1000,
pct_start=0.3, anneal_strategy='cos'
)
# Linear warmup
def warmup_scheduler(optimizer, warmup_epochs, total_epochs):
def lr_lambda(epoch):
if epoch < warmup_epochs:
return epoch / warmup_epochs
return 1.0
return optim.lr_scheduler.LambdaLR(optimizer, lr_lambda)
# Usage in training loop
for epoch in range(num_epochs):
train_epoch(...)
val_loss = validate(...)
# For most schedulers
scheduler.step()
# For ReduceLROnPlateau
scheduler.step(val_loss)
Custom Dataset
from torch.utils.data import Dataset, DataLoader
from PIL import Image
import os
class CustomImageDataset(Dataset):
def __init__(self, root_dir, transform=None, target_transform=None):
self.root_dir = root_dir
self.transform = transform
self.target_transform = target_transform
# Load image paths and labels
self.image_paths = []
self.labels = []
self.classes = sorted(os.listdir(root_dir))
self.class_to_idx = {cls: idx for idx, cls in enumerate(self.classes)}
for class_name in self.classes:
class_dir = os.path.join(root_dir, class_name)
for img_name in os.listdir(class_dir):
self.image_paths.append(os.path.join(class_dir, img_name))
self.labels.append(self.class_to_idx[class_name])
def __len__(self):
return len(self.image_paths)
def __getitem__(self, idx):
img_path = self.image_paths[idx]
image = Image.open(img_path).convert('RGB')
label = self.labels[idx]
if self.transform:
image = self.transform(image)
if self.target_transform:
label = self.target_transform(label)
return image, label
# Data augmentation
train_transform = transforms.Compose([
transforms.RandomResizedCrop(224),
transforms.RandomHorizontalFlip(),
transforms.RandomRotation(15),
transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1),
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])
val_transform = transforms.Compose([
transforms.Resize(256),
transforms.CenterCrop(224),
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])
# Create dataloaders
train_dataset = CustomImageDataset('data/train', transform=train_transform)
val_dataset = CustomImageDataset('data/val', transform=val_transform)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, num_workers=4, pin_memory=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False, num_workers=4, pin_memory=True)
Convolutional Neural Networks
Basic CNN Architecture
class CNN(nn.Module):
def __init__(self, num_classes=10):
super().__init__()
# Convolutional layers
self.features = nn.Sequential(
# Block 1: 28x28 -> 14x14
nn.Conv2d(1, 32, kernel_size=3, padding=1),
nn.BatchNorm2d(32),
nn.ReLU(inplace=True),
nn.Conv2d(32, 32, kernel_size=3, padding=1),
nn.BatchNorm2d(32),
nn.ReLU(inplace=True),
nn.MaxPool2d(kernel_size=2, stride=2),
nn.Dropout2d(0.25),
# Block 2: 14x14 -> 7x7
nn.Conv2d(32, 64, kernel_size=3, padding=1),
nn.BatchNorm2d(64),
nn.ReLU(inplace=True),
nn.Conv2d(64, 64, kernel_size=3, padding=1),
nn.BatchNorm2d(64),
nn.ReLU(inplace=True),
nn.MaxPool2d(kernel_size=2, stride=2),
nn.Dropout2d(0.25),
# Block 3: 7x7 -> 3x3
nn.Conv2d(64, 128, kernel_size=3, padding=1),
nn.BatchNorm2d(128),
nn.ReLU(inplace=True),
nn.MaxPool2d(kernel_size=2, stride=2),
nn.Dropout2d(0.25),
)
# Classifier
self.classifier = nn.Sequential(
nn.Flatten(),
nn.Linear(128 * 3 * 3, 256),
nn.BatchNorm1d(256),
nn.ReLU(inplace=True),
nn.Dropout(0.5),
nn.Linear(256, num_classes)
)
def forward(self, x):
x = self.features(x)
x = self.classifier(x)
return x
model = CNN(num_classes=10)
VGG-style Network
def make_vgg_block(in_channels, out_channels, num_convs):
layers = []
for _ in range(num_convs):
layers.append(nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1))
layers.append(nn.BatchNorm2d(out_channels))
layers.append(nn.ReLU(inplace=True))
in_channels = out_channels
layers.append(nn.MaxPool2d(kernel_size=2, stride=2))
return nn.Sequential(*layers)
class VGGNet(nn.Module):
def __init__(self, num_classes=1000):
super().__init__()
self.features = nn.Sequential(
make_vgg_block(3, 64, 2), # 224 -> 112
make_vgg_block(64, 128, 2), # 112 -> 56
make_vgg_block(128, 256, 3), # 56 -> 28
make_vgg_block(256, 512, 3), # 28 -> 14
make_vgg_block(512, 512, 3), # 14 -> 7
)
self.classifier = nn.Sequential(
nn.AdaptiveAvgPool2d((7, 7)),
nn.Flatten(),
nn.Linear(512 * 7 * 7, 4096),
nn.ReLU(inplace=True),
nn.Dropout(0.5),
nn.Linear(4096, 4096),
nn.ReLU(inplace=True),
nn.Dropout(0.5),
nn.Linear(4096, num_classes)
)
def forward(self, x):
x = self.features(x)
x = self.classifier(x)
return x
ResNet-style Network with Skip Connections
class BasicBlock(nn.Module):
expansion = 1
def __init__(self, in_channels, out_channels, stride=1, downsample=None):
super().__init__()
self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3,
stride=stride, padding=1, bias=False)
self.bn1 = nn.BatchNorm2d(out_channels)
self.relu = nn.ReLU(inplace=True)
self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3,
stride=1, padding=1, bias=False)
self.bn2 = nn.BatchNorm2d(out_channels)
self.downsample = downsample
def forward(self, x):
identity = x
out = self.conv1(x)
out = self.bn1(out)
out = self.relu(out)
out = self.conv2(out)
out = self.bn2(out)
if self.downsample is not None:
identity = self.downsample(x)
out += identity # Skip connection
out = self.relu(out)
return out
class ResNet(nn.Module):
def __init__(self, block, layers, num_classes=1000):
super().__init__()
self.in_channels = 64
self.conv1 = nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3, bias=False)
self.bn1 = nn.BatchNorm2d(64)
self.relu = nn.ReLU(inplace=True)
self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
self.layer1 = self._make_layer(block, 64, layers[0])
self.layer2 = self._make_layer(block, 128, layers[1], stride=2)
self.layer3 = self._make_layer(block, 256, layers[2], stride=2)
self.layer4 = self._make_layer(block, 512, layers[3], stride=2)
self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
self.fc = nn.Linear(512 * block.expansion, num_classes)
def _make_layer(self, block, out_channels, blocks, stride=1):
downsample = None
if stride != 1 or self.in_channels != out_channels * block.expansion:
downsample = nn.Sequential(
nn.Conv2d(self.in_channels, out_channels * block.expansion,
kernel_size=1, stride=stride, bias=False),
nn.BatchNorm2d(out_channels * block.expansion),
)
layers = []
layers.append(block(self.in_channels, out_channels, stride, downsample))
self.in_channels = out_channels * block.expansion
for _ in range(1, blocks):
layers.append(block(self.in_channels, out_channels))
return nn.Sequential(*layers)
def forward(self, x):
x = self.conv1(x)
x = self.bn1(x)
x = self.relu(x)
x = self.maxpool(x)
x = self.layer1(x)
x = self.layer2(x)
x = self.layer3(x)
x = self.layer4(x)
x = self.avgpool(x)
x = torch.flatten(x, 1)
x = self.fc(x)
return x
# Create ResNet-18
def resnet18(num_classes=1000):
return ResNet(BasicBlock, [2, 2, 2, 2], num_classes)
Recurrent Neural Networks
Basic RNN
class RNNClassifier(nn.Module):
def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim, n_layers, dropout):
super().__init__()
self.embedding = nn.Embedding(vocab_size, embedding_dim)
self.rnn = nn.RNN(
embedding_dim, hidden_dim,
num_layers=n_layers,
batch_first=True,
dropout=dropout if n_layers > 1 else 0
)
self.fc = nn.Linear(hidden_dim, output_dim)
self.dropout = nn.Dropout(dropout)
def forward(self, text, text_lengths=None):
# text: [batch_size, seq_len]
embedded = self.dropout(self.embedding(text))
# embedded: [batch_size, seq_len, embedding_dim]
output, hidden = self.rnn(embedded)
# output: [batch_size, seq_len, hidden_dim]
# hidden: [n_layers, batch_size, hidden_dim]
# Use last hidden state
hidden = hidden[-1] # [batch_size, hidden_dim]
return self.fc(self.dropout(hidden))
LSTM Network
class LSTMClassifier(nn.Module):
def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim,
n_layers, bidirectional, dropout, pad_idx):
super().__init__()
self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=pad_idx)
self.lstm = nn.LSTM(
embedding_dim,
hidden_dim,
num_layers=n_layers,
bidirectional=bidirectional,
dropout=dropout if n_layers > 1 else 0,
batch_first=True
)
self.fc = nn.Linear(hidden_dim * 2 if bidirectional else hidden_dim, output_dim)
self.dropout = nn.Dropout(dropout)
def forward(self, text, text_lengths):
# text: [batch_size, seq_len]
embedded = self.dropout(self.embedding(text))
# Pack sequence for efficiency with variable length
packed = nn.utils.rnn.pack_padded_sequence(
embedded, text_lengths.cpu(),
batch_first=True, enforce_sorted=False
)
packed_output, (hidden, cell) = self.lstm(packed)
# Unpack
output, output_lengths = nn.utils.rnn.pad_packed_sequence(
packed_output, batch_first=True
)
# Concatenate forward and backward hidden states
if self.lstm.bidirectional:
hidden = torch.cat((hidden[-2,:,:], hidden[-1,:,:]), dim=1)
else:
hidden = hidden[-1,:,:]
return self.fc(self.dropout(hidden))
# Usage
model = LSTMClassifier(
vocab_size=10000,
embedding_dim=300,
hidden_dim=256,
output_dim=2,
n_layers=2,
bidirectional=True,
dropout=0.5,
pad_idx=0
)
GRU for Sequence Generation
class TextGenerator(nn.Module):
def __init__(self, vocab_size, embedding_dim, hidden_dim, n_layers):
super().__init__()
self.hidden_dim = hidden_dim
self.n_layers = n_layers
self.embedding = nn.Embedding(vocab_size, embedding_dim)
self.gru = nn.GRU(
embedding_dim, hidden_dim,
num_layers=n_layers,
batch_first=True
)
self.fc = nn.Linear(hidden_dim, vocab_size)
def forward(self, x, hidden=None):
# x: [batch_size, seq_len]
embedded = self.embedding(x)
if hidden is None:
hidden = self.init_hidden(x.size(0), x.device)
output, hidden = self.gru(embedded, hidden)
output = self.fc(output)
return output, hidden
def init_hidden(self, batch_size, device):
return torch.zeros(self.n_layers, batch_size, self.hidden_dim, device=device)
def generate(self, start_tokens, max_length, temperature=1.0):
self.eval()
generated = start_tokens.clone()
hidden = None
with torch.no_grad():
for _ in range(max_length):
output, hidden = self(generated[:, -1:], hidden)
logits = output[:, -1, :] / temperature
probs = F.softmax(logits, dim=-1)
next_token = torch.multinomial(probs, num_samples=1)
generated = torch.cat([generated, next_token], dim=1)
return generated
Sequence-to-Sequence with Attention
class Encoder(nn.Module):
def __init__(self, input_dim, embedding_dim, hidden_dim, n_layers, dropout):
super().__init__()
self.embedding = nn.Embedding(input_dim, embedding_dim)
self.rnn = nn.GRU(embedding_dim, hidden_dim, n_layers, dropout=dropout, batch_first=True)
self.dropout = nn.Dropout(dropout)
def forward(self, src):
embedded = self.dropout(self.embedding(src))
outputs, hidden = self.rnn(embedded)
return outputs, hidden
class Attention(nn.Module):
def __init__(self, hidden_dim):
super().__init__()
self.attn = nn.Linear(hidden_dim * 2, hidden_dim)
self.v = nn.Linear(hidden_dim, 1, bias=False)
def forward(self, hidden, encoder_outputs):
# hidden: [batch_size, hidden_dim]
# encoder_outputs: [batch_size, src_len, hidden_dim]
batch_size = encoder_outputs.shape[0]
src_len = encoder_outputs.shape[1]
hidden = hidden.unsqueeze(1).repeat(1, src_len, 1)
energy = torch.tanh(self.attn(torch.cat((hidden, encoder_outputs), dim=2)))
attention = self.v(energy).squeeze(2)
return F.softmax(attention, dim=1)
class Decoder(nn.Module):
def __init__(self, output_dim, embedding_dim, hidden_dim, n_layers, dropout):
super().__init__()
self.output_dim = output_dim
self.attention = Attention(hidden_dim)
self.embedding = nn.Embedding(output_dim, embedding_dim)
self.rnn = nn.GRU(embedding_dim + hidden_dim, hidden_dim, n_layers, dropout=dropout, batch_first=True)
self.fc = nn.Linear(hidden_dim * 2 + embedding_dim, output_dim)
self.dropout = nn.Dropout(dropout)
def forward(self, input, hidden, encoder_outputs):
# input: [batch_size, 1]
embedded = self.dropout(self.embedding(input))
a = self.attention(hidden[-1], encoder_outputs)
a = a.unsqueeze(1) # [batch_size, 1, src_len]
context = torch.bmm(a, encoder_outputs) # [batch_size, 1, hidden_dim]
rnn_input = torch.cat((embedded, context), dim=2)
output, hidden = self.rnn(rnn_input, hidden)
output = output.squeeze(1)
context = context.squeeze(1)
embedded = embedded.squeeze(1)
prediction = self.fc(torch.cat((output, context, embedded), dim=1))
return prediction, hidden
class Seq2Seq(nn.Module):
def __init__(self, encoder, decoder, device):
super().__init__()
self.encoder = encoder
self.decoder = decoder
self.device = device
def forward(self, src, trg, teacher_forcing_ratio=0.5):
batch_size = src.shape[0]
trg_len = trg.shape[1]
trg_vocab_size = self.decoder.output_dim
outputs = torch.zeros(batch_size, trg_len, trg_vocab_size).to(self.device)
encoder_outputs, hidden = self.encoder(src)
input = trg[:, 0:1]
for t in range(1, trg_len):
output, hidden = self.decoder(input, hidden, encoder_outputs)
outputs[:, t] = output
teacher_force = random.random() < teacher_forcing_ratio
top1 = output.argmax(1, keepdim=True)
input = trg[:, t:t+1] if teacher_force else top1
return outputs
Transfer Learning
Using Pretrained Models
import torchvision.models as models
# Load pretrained models
resnet = models.resnet50(pretrained=True)
vgg = models.vgg16(pretrained=True)
efficientnet = models.efficientnet_b0(pretrained=True)
vit = models.vit_b_16(pretrained=True)
# Freeze all layers
for param in resnet.parameters():
param.requires_grad = False
# Replace classifier for new task
num_classes = 10
resnet.fc = nn.Linear(resnet.fc.in_features, num_classes)
# Only train the new classifier
optimizer = optim.Adam(resnet.fc.parameters(), lr=0.001)
Fine-tuning with Gradual Unfreezing
class TransferModel(nn.Module):
def __init__(self, num_classes, pretrained=True):
super().__init__()
# Load pretrained backbone
self.backbone = models.resnet50(pretrained=pretrained)
# Freeze backbone
for param in self.backbone.parameters():
param.requires_grad = False
# Replace classifier
in_features = self.backbone.fc.in_features
self.backbone.fc = nn.Sequential(
nn.Linear(in_features, 512),
nn.ReLU(),
nn.Dropout(0.3),
nn.Linear(512, num_classes)
)
def forward(self, x):
return self.backbone(x)
def unfreeze_layers(self, num_layers):
"""Unfreeze last n layers"""
# Get all layers
layers = list(self.backbone.children())
# Unfreeze last n layers
for layer in layers[-num_layers:]:
for param in layer.parameters():
param.requires_grad = True
def get_parameter_groups(self, base_lr):
"""Different learning rates for different layers"""
# Classifier (new layers) - higher learning rate
classifier_params = list(self.backbone.fc.parameters())
# Backbone (pretrained) - lower learning rate
backbone_params = [p for p in self.backbone.parameters()
if p.requires_grad and p not in classifier_params]
return [
{'params': backbone_params, 'lr': base_lr * 0.1},
{'params': classifier_params, 'lr': base_lr}
]
# Training with gradual unfreezing
model = TransferModel(num_classes=10)
# Phase 1: Train only classifier
optimizer = optim.Adam(model.backbone.fc.parameters(), lr=0.001)
train_for_epochs(model, optimizer, epochs=5)
# Phase 2: Unfreeze last few layers
model.unfreeze_layers(3)
param_groups = model.get_parameter_groups(0.0001)
optimizer = optim.Adam(param_groups)
train_for_epochs(model, optimizer, epochs=10)
# Phase 3: Unfreeze more layers
model.unfreeze_layers(6)
param_groups = model.get_parameter_groups(0.00001)
optimizer = optim.Adam(param_groups)
train_for_epochs(model, optimizer, epochs=10)
Using timm (PyTorch Image Models)
import timm
# List available models
print(timm.list_models('*efficientnet*'))
# Load model
model = timm.create_model('efficientnet_b3', pretrained=True, num_classes=10)
# Get model configuration
data_config = timm.data.resolve_model_data_config(model)
transforms = timm.data.create_transform(**data_config, is_training=True)
# Feature extraction
model = timm.create_model('resnet50', pretrained=True, num_classes=0) # No classifier
features = model(images) # Global pooled features
# Fine-tuning with different head
model = timm.create_model(
'efficientnet_b0',
pretrained=True,
num_classes=10,
drop_rate=0.3,
drop_path_rate=0.2
)
Advanced Training Techniques
Mixed Precision Training
from torch.cuda.amp import autocast, GradScaler
scaler = GradScaler()
for epoch in range(num_epochs):
for batch_idx, (data, target) in enumerate(train_loader):
data, target = data.to(device), target.to(device)
optimizer.zero_grad()
# Forward pass with autocast
with autocast():
output = model(data)
loss = criterion(output, target)
# Backward pass with scaler
scaler.scale(loss).backward()
scaler.step(optimizer)
scaler.update()
Gradient Accumulation
accumulation_steps = 4
for epoch in range(num_epochs):
optimizer.zero_grad()
for batch_idx, (data, target) in enumerate(train_loader):
data, target = data.to(device), target.to(device)
# Forward pass
output = model(data)
loss = criterion(output, target) / accumulation_steps
# Backward pass
loss.backward()
# Update weights every accumulation_steps
if (batch_idx + 1) % accumulation_steps == 0:
optimizer.step()
optimizer.zero_grad()
TensorBoard Logging
from torch.utils.tensorboard import SummaryWriter
writer = SummaryWriter('runs/experiment_1')
for epoch in range(num_epochs):
train_loss, train_acc = train_epoch(...)
val_loss, val_acc = validate(...)
# Log scalars
writer.add_scalar('Loss/train', train_loss, epoch)
writer.add_scalar('Loss/val', val_loss, epoch)
writer.add_scalar('Accuracy/train', train_acc, epoch)
writer.add_scalar('Accuracy/val', val_acc, epoch)
writer.add_scalar('Learning_rate', optimizer.param_groups[0]['lr'], epoch)
# Log histograms
for name, param in model.named_parameters():
writer.add_histogram(f'Parameters/{name}', param, epoch)
if param.grad is not None:
writer.add_histogram(f'Gradients/{name}', param.grad, epoch)
# Log images
writer.add_images('Samples', sample_images, epoch)
# Log model graph
if epoch == 0:
writer.add_graph(model, sample_input)
writer.close()
# View: tensorboard --logdir=runs
Early Stopping
class EarlyStopping:
def __init__(self, patience=7, min_delta=0, restore_best_weights=True):
self.patience = patience
self.min_delta = min_delta
self.restore_best_weights = restore_best_weights
self.best_loss = None
self.counter = 0
self.best_weights = None
def __call__(self, val_loss, model):
if self.best_loss is None:
self.best_loss = val_loss
self.save_checkpoint(model)
elif val_loss > self.best_loss - self.min_delta:
self.counter += 1
print(f'EarlyStopping counter: {self.counter}/{self.patience}')
if self.counter >= self.patience:
print('Early stopping triggered')
if self.restore_best_weights:
model.load_state_dict(self.best_weights)
return True
else:
self.best_loss = val_loss
self.save_checkpoint(model)
self.counter = 0
return False
def save_checkpoint(self, model):
self.best_weights = model.state_dict().copy()
# Usage
early_stopping = EarlyStopping(patience=5)
for epoch in range(num_epochs):
train_epoch(...)
val_loss = validate(...)
if early_stopping(val_loss, model):
break
Model Checkpointing
def save_checkpoint(state, is_best, filename='checkpoint.pth'):
torch.save(state, filename)
if is_best:
shutil.copyfile(filename, 'model_best.pth')
best_val_loss = float('inf')
for epoch in range(num_epochs):
train_epoch(...)
val_loss = validate(...)
is_best = val_loss < best_val_loss
best_val_loss = min(val_loss, best_val_loss)
save_checkpoint({
'epoch': epoch + 1,
'model_state_dict': model.state_dict(),
'optimizer_state_dict': optimizer.state_dict(),
'scheduler_state_dict': scheduler.state_dict(),
'best_val_loss': best_val_loss,
}, is_best)
# Resume training
checkpoint = torch.load('checkpoint.pth')
model.load_state_dict(checkpoint['model_state_dict'])
optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
scheduler.load_state_dict(checkpoint['scheduler_state_dict'])
start_epoch = checkpoint['epoch']
best_val_loss = checkpoint['best_val_loss']
Model Deployment
Saving and Loading
# Save entire model
torch.save(model, 'model.pth')
model = torch.load('model.pth')
# Save state dict only (recommended)
torch.save(model.state_dict(), 'model_state.pth')
model = MyModel()
model.load_state_dict(torch.load('model_state.pth'))
# Save for inference
model.eval()
torch.save(model.state_dict(), 'model_inference.pth')
TorchScript for Production
# Tracing
model.eval()
example_input = torch.randn(1, 3, 224, 224)
traced_model = torch.jit.trace(model, example_input)
traced_model.save('model_traced.pt')
# Scripting (handles control flow)
scripted_model = torch.jit.script(model)
scripted_model.save('model_scripted.pt')
# Load and run
loaded_model = torch.jit.load('model_traced.pt')
output = loaded_model(input_tensor)
# Optimize for inference
optimized_model = torch.jit.optimize_for_inference(loaded_model)
ONNX Export
import torch.onnx
model.eval()
dummy_input = torch.randn(1, 3, 224, 224)
torch.onnx.export(
model,
dummy_input,
'model.onnx',
export_params=True,
opset_version=11,
do_constant_folding=True,
input_names=['input'],
output_names=['output'],
dynamic_axes={
'input': {0: 'batch_size'},
'output': {0: 'batch_size'}
}
)
# Run with ONNX Runtime
import onnxruntime as ort
import numpy as np
session = ort.InferenceSession('model.onnx')
input_name = session.get_inputs()[0].name
output = session.run(None, {input_name: input_data.numpy()})
Mobile Deployment with PyTorch Mobile
import torch
from torch.utils.mobile_optimizer import optimize_for_mobile
model.eval()
scripted_model = torch.jit.script(model)
# Optimize for mobile
optimized_model = optimize_for_mobile(scripted_model)
optimized_model._save_for_lite_interpreter('model_mobile.ptl')
# Quantize for smaller size
quantized_model = torch.quantization.quantize_dynamic(
model, {torch.nn.Linear}, dtype=torch.qint8
)
Summary
PyTorch Best Practices
- Use
model.train()andmodel.eval()- Essential for dropout and batch norm - Move data to device - Ensure tensors are on the same device as the model
- Zero gradients - Call
optimizer.zero_grad()before each backward pass - Use
with torch.no_grad()- For inference to save memory - Pin memory - Use
pin_memory=Truein DataLoader for faster GPU transfer - Use mixed precision - Speeds up training with minimal accuracy loss
- Save checkpoints regularly - Don’t lose training progress
- Monitor with TensorBoard - Track metrics and debug training
Quick Reference
| Task | Code |
|---|---|
| Create model | model = nn.Sequential(...) |
| Move to GPU | model.to('cuda') |
| Train mode | model.train() |
| Eval mode | model.eval() |
| Forward pass | output = model(input) |
| Compute loss | loss = criterion(output, target) |
| Backward pass | loss.backward() |
| Update weights | optimizer.step() |
| Clear gradients | optimizer.zero_grad() |
| Save model | torch.save(model.state_dict(), 'model.pth') |
| Load model | model.load_state_dict(torch.load('model.pth')) |
Further Reading
- PyTorch Documentation - pytorch.org/docs
- PyTorch Tutorials - pytorch.org/tutorials
- “Deep Learning with PyTorch” - Manning Publications
- Hugging Face Transformers - huggingface.co/transformers
- PyTorch Lightning - lightning.ai
- timm - github.com/huggingface/pytorch-image-models