Часть серии о распределенном искусственном интеллекте на нескольких графических процессорах:
Введение
Распределенный параллелизм данных (DDP) — это первый метод распараллеливания, который мы рассмотрим. Это основной подход Всегда Используется в условиях распределенного обучения и обычно сочетается с другими методами распараллеливания.
Быстрое обновление нейронной сети
Обучение нейронной сети означает выполнение прямого прохода, вычисление потерь, обратное распространение градиентов каждого веса относительно функции потерь и, наконец, обновление весов (это мы называем этапом оптимизации). В PyTorch это обычно выглядит так:
import torch
def training_loop(
model: torch.nn.Module,
dataloader: torch.utils.data.DataLoader,
optimizer: torch.optim.Optimizer,
loss_fn: callable,
):
for i, batch in enumerate(dataloader):
inputs, targets = batch
output = model(inputs) # Forward pass
loss = loss_fn(output, targets) # Compute loss
loss.backward() # Backward pass (compute gradients)
optimizer.step() # Update weights
optimizer.zero_grad() # Clear gradients for the next step
Выполнение этапа оптимизации на больших объемах обучающих данных обычно приводит к более точным оценкам градиента, что упрощает обучение и потенциально ускоряет сходимость. Поэтому в идеале мы должны делать каждый шаг после вычисления градиентов на основе всего набора обучающих данных. На практике это вряд ли возможно в сценариях глубокого обучения, поскольку вычисления заняли бы слишком много времени. Вместо этого мы работаем с меньшими частями, такими как мини-партия И микропартия.
- партия: Относится ко всему обучающему набору, используемому на этапе оптимизации.
- Мини-партия: Относится к небольшому подмножеству обучающих данных, используемых на этапе оптимизации.
- Микро-пакет: Мини-партия относится к подмножеству операций, в которых мы объединяем несколько микропартий для этапа оптимизации.
Именно здесь в игру вступают накопление градиента и параллелизм данных. Хотя мы не используем весь набор данных для каждого шага, мы можем использовать эти методы, чтобы существенно увеличить размер мини-пакета.
постепенное накопление
Вот как это работает: выберите большой мини-пакет, который не помещается в памяти графического процессора, а затем разделите его. микропартия Он подходит. Для каждой микропартии бегите вперед и назад, добавляя (накапливая) рассчитанные градиенты. После обработки всех микропартий выполните один шаг оптимизации, используя усреднение градиентов.
Обратите внимание, что накопление градиента не является методом распараллеливания и не требует использования нескольких графических процессоров.

Реализовать накопление градиента с нуля очень просто. В простом цикле обучения это выглядит так:
import torch
def training_loop(
model: torch.nn.Module,
dataloader: torch.utils.data.DataLoader,
optimizer: torch.optim.Optimizer,
loss_fn: callable,
grad_accum_steps: int,
):
for i, batch in enumerate(dataloader):
inputs, targets = batch
output = model(inputs)
loss = loss_fn(output, targets)
loss.backward() # Gradients get accumulated (summed)
# Only update weights after `grad_accum_steps` micro-batches
if (i+1) % grad_accum_steps == 0: # i+1 to avoid a step in the first iteration when i=0
optimizer.step()
optimizer.zero_grad()
Обратите внимание, что мы последовательно Выполнение нескольких проходов вперед и назад перед каждым шагом оптимизации, что требует более длительных периодов обучения. Было бы здорово, если бы мы могли ускорить этот процесс, обрабатывая несколько микропакетов. параллельный…Именно это и делает DDP!
Распределенный параллелизм данных (DDP)
Для довольно небольшого количества графических процессоров (~до 8) DDP масштабируется примерно линейно, что является оптимальным. Это означает, что если вы удвоите количество графических процессоров, вы сможете сократить время обучения почти вдвое (о линейном масштабировании мы уже говорили).
При использовании DDP несколько графических процессоров работают вместе для обработки одного большого эффективного мини-пакета, обрабатывая каждый микропакет параллельно. Рабочий процесс выглядит следующим образом:
- Разделите мини-пакет между графическими процессорами.
- Каждый графический процессор выполняет свои собственные проходы вперед и назад для вычисления градиента для своего фрагмента данных (микропакета).
- используйте один полностью свернуть Операция усреднения градиентов по всем графическим процессорам (об этом мы узнали ранее в разделе «Коллективные операции»).
- Каждый графический процессор применяет одинаковые обновления веса, обеспечивая идеальную синхронизацию моделей.
Это позволяет нам тренироваться с гораздо большими эффективными размерами мини-пакетов, что приводит к более стабильному обучению и потенциально более быстрой сходимости.

Реализация DDP с нуля в PyTorch
Давайте сделаем это шаг за шагом. В этой первой итерации мы синхронизируем только градиенты.
import torch
class DDPModelWrapper:
def __init__(self, model: torch.nn.Module):
self.model = model
def __call__(self, *args, **kwargs):
return self.model(*args, **kwargs)
def sync_gradients(self):
# Iterate over parameter matrices in the model
for param in self.model.parameters():
# Some parameters might be frozen and don't have gradients
if param.grad is not None:
# We sum and then divide since torch.distributed doesn't have an average operation
torch.distributed.all_reduce(param.grad.data, op=torch.distributed.ReduceOp.SUM)
# Assuming each GPU received an equally sized mini-batch, we can average
# the gradients dividing by the number of GPUs (aka world size)
# By default the loss function already averages over the mini-batch size
param.grad.data /= torch.distributed.get_world_size()
Прежде чем мы начнем обучение, мы, очевидно, требуем, чтобы наша модель была одинаковой для всех графических процессоров, иначе нам пришлось бы обучать разные модели! Давайте улучшим нашу реализацию, проверив, что все веса равны во время создания экземпляра (если вы не знаете, каковы ранги, см. первую публикацию в блоге из этой серии).
import torch
class DDPModelWrapper:
def __init__(self, model: torch.nn.Module):
self.model = model
for param in self.model.parameters():
# We create a new tensor so it can receive the broadcast
rank_0_param = param.data.clone()
# Initially rank_0_param contains the values for the current rank
torch.distributed.broadcast(rank_0_param, src=0)
# After the broadcast rank_0_param variable is overwritten with the parameters from rank_0
if not torch.equal(param.data, rank_0_param): # Now we compare rank_x with rank_0
raise ValueError("Model parameters are not the same across all processes.")
def __call__(self, *args, **kwargs):
return self.model(*args, **kwargs)
def sync_gradients(self):
for param in self.model.parameters():
if param.grad is not None:
torch.distributed.all_reduce(param.grad.data, op=torch.distributed.ReduceOp.SUM)
param.grad.data /= torch.distributed.get_world_size()
Связь DDP с Google Analytics
Вы можете комбинировать DDP с GA, чтобы добиться еще большего эффективного размера пакетов. Это особенно полезно, когда ваша модель настолько велика, что на один графический процессор умещается лишь несколько образцов.
Основным преимуществом является накладные расходы на связь уменьшены: вместо синхронизации градиентов после каждого пакета вы синхронизируете только один раз за пакет. grad_accum_steps партия. Это означает:
- глобальный эффективный размер партии =
num_gpus × micro_batch_size × grad_accum_steps - Меньше точек синхронизации = меньше времени, затрачиваемого на связь между графическими процессорами
Цикл обучения с использованием нашего DDPModelWrapper С накоплением градиента это выглядит так:
def training_loop(
ddp_model: DDPModelWrapper,
dataloader: torch.utils.data.DataLoader,
optimizer: torch.optim.Optimizer,
loss_fn: callable,
grad_accum_steps: int,
):
for i, batch in enumerate(dataloader):
inputs, targets = batch
output = ddp_model(inputs)
loss = loss_fn(output, targets)
loss.backward()
if (i+1) % grad_accum_steps == 0:
# Must sync gradients across GPUs *BEFORE* the optimization step
ddp_model.sync_gradients()
optimizer.step()
optimizer.zero_grad()
Профессиональные советы и расширенное использование
- Используйте предварительную выборку данных. Вы можете ускорить обучение, загрузив следующий пакет данных во время обработки текущего пакета. PyTorch’s
DataLoaderобеспечиваетprefetch_factorЛогика, управляющая количеством пакетов предварительной выборки в фоновом режиме. Правильно воспользоваться преимуществами предварительной выборки с помощью CUDA может быть немного сложно, поэтому мы оставим это для следующего поста. - Не максимально увеличивайте память графического процессора. Как ни странно, оставление некоторой свободной памяти может привести к увеличению производительности обучения. Если вы оставите свободными хотя бы ~15% памяти графического процессора, графический процессор сможет лучше управлять памятью, избегая фрагментации.
- PyTorch DDP перекрывает связь с вычислениями. По умолчанию DDP передает градиенты так, как они рассчитываются во время обратного распространения ошибки, а не ждет завершения полного обратного прохода. Сюда:
- PyTorch объединяет градиенты модели в сегменты.
bucket_cap_mbВо время обратного прохода мегабайт PyTorch помечает градиенты как готовые к уменьшению в соответствии с расчетом. Как только все градиенты в сегменте готовы, DDP запускает асинхронный процесс.allreduceУсреднить эти градиенты по всем рангам.loss.backward()Звонок возвращается только после всегоallreduceОперация завершена, поэтому я немедленно звонюopt.step()Безопасно. -
bucket_cap_mbПараметр идет на компромисс: меньшие значения срабатывают чаще.allreduceопераций, но каждый запуск коммуникационного ядра влечет за собой некоторые накладные расходы, которые могут снизить производительность. Большие значения уменьшают частоту связи, но также уменьшают перекрытие; В крайнем случае, если сегменты очень большие, вы ждете завершения всего обратного прохода, прежде чем обмениваться данными. Оптимальное значение зависит от архитектуры и оборудования вашей модели, поэтому создавайте профили с разными значениями, чтобы найти тот, который работает лучше всего.
- PyTorch объединяет градиенты модели в сегменты.

- Вот полная реализация DDP в PyTorch:
"""
Launch with:
torchrun --nproc_per_node=NUM_GPUS ddp.py
"""
import torch
import torch.nn as nn
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data import DataLoader, TensorDataset
from torch.utils.data.distributed import DistributedSampler
from torch import optim
class ToyModel(nn.Module):
def __init__(self):
super().__init__()
self.net = nn.Sequential(
nn.Linear(1024, 1024), nn.ReLU(),
nn.Linear(1024, 1024), nn.ReLU(),
nn.Linear(1024, 256),
)
def forward(self, x):
return self.net(x)
def train():
dist.init_process_group(backend="nccl")
rank = dist.get_rank()
torch.cuda.set_device(rank)
device = torch.device(f"cuda:{rank}")
# Create dummy dataset
x_data = torch.randn(1000, 1024)
y_data = torch.randn(1000, 256)
dataset = TensorDataset(x_data, y_data)
# DistributedSampler ensures each rank gets different data
sampler = DistributedSampler(dataset, shuffle=True)
dataloader = DataLoader(dataset, batch_size=64, sampler=sampler)
model = ToyModel().to(device)
# gradient_as_bucket_view: avoids an extra grad tensor copy per bucket.
ddp_model = DDP(
model,
device_ids=[rank],
bucket_cap_mb=25,
gradient_as_bucket_view=True,
)
optimizer = optim.AdamW(ddp_model.parameters(), lr=1e-3)
loss_fn = nn.MSELoss()
for epoch in range(2):
sampler.set_epoch(epoch) # Ensures different shuffling each epoch
for batch_idx, (x, y) in enumerate(dataloader):
x, y = x.to(device), y.to(device)
optimizer.zero_grad()
output = ddp_model(x)
loss = loss_fn(output, y)
# Backward automatically overlaps with allreduce per bucket.
# By the time this returns, all allreduce ops are done.
loss.backward()
optimizer.step()
if rank == 0 and batch_idx % 5 == 0:
print(f"epoch {epoch} batch {batch_idx} loss={loss.item():.4f}")
dist.destroy_process_group()
if __name__ == "__main__":
train()
- Вот полная реализация PyTorch, сочетающая DDP и GA:
"""
Launch with:
torchrun --nproc_per_node=NUM_GPUS ddp_ga.py
"""
import torch
import torch.nn as nn
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data import DataLoader, TensorDataset
from torch.utils.data.distributed import DistributedSampler
from torch import optim
from contextlib import nullcontext
class ToyModel(nn.Module):
def __init__(self):
super().__init__()
self.net = nn.Sequential(
nn.Linear(1024, 1024), nn.ReLU(),
nn.Linear(1024, 1024), nn.ReLU(),
nn.Linear(1024, 256),
)
def forward(self, x):
return self.net(x)
def train():
dist.init_process_group(backend="nccl")
rank = dist.get_rank()
torch.cuda.set_device(rank)
device = torch.device(f"cuda:{rank}")
# Create dummy dataset
x_data = torch.randn(1000, 1024)
y_data = torch.randn(1000, 256)
dataset = TensorDataset(x_data, y_data)
# DistributedSampler ensures each rank gets different data
sampler = DistributedSampler(dataset, shuffle=True)
dataloader = DataLoader(dataset, batch_size=16, sampler=sampler)
model = ToyModel().to(device)
ddp_model = DDP(
model,
device_ids=[rank],
bucket_cap_mb=25,
gradient_as_bucket_view=True,
)
optimizer = optim.AdamW(ddp_model.parameters(), lr=1e-3)
loss_fn = nn.MSELoss()
ACCUM_STEPS = 4
for epoch in range(2):
sampler.set_epoch(epoch) # Ensures different shuffling each epoch
optimizer.zero_grad()
for batch_idx, (x, y) in enumerate(dataloader):
x, y = x.to(device), y.to(device)
is_last_micro_step = (batch_idx + 1) % ACCUM_STEPS == 0
# no_sync() suppresses allreduce on accumulation steps.
# On the last microstep we exit no_sync() so DDP fires
# the allreduce overlapped with that backward pass.
ctx = ddp_model.no_sync() if not is_last_micro_step else nullcontext()
with ctx:
output = ddp_model(x)
loss = loss_fn(output, y) / ACCUM_STEPS
loss.backward()
if is_last_micro_step:
optimizer.step()
optimizer.zero_grad()
if rank == 0:
print(f"epoch {epoch} batch {batch_idx} loss={loss.item() * ACCUM_STEPS:.4f}")
dist.destroy_process_group()
if __name__ == "__main__":
train()
заключение
Следуйте за мной на X, чтобы увидеть больше бесплатного контента об искусственном интеллекте @l_cesconetto
Поздравляем с доведением до конца! Из этого поста вы узнали о:
- Важность большого размера партии
- Как работает накопление градиента и его ограничения
- Рабочий процесс DDP и его преимущества
- Как реализовать GA и DDP с нуля в PyTorch
- Как объединить GA и DDP
В следующей статье мы рассмотрим Zero (Zero Redundancy Optimizer), более продвинутую технологию, основанную на DDP для дальнейшей оптимизации использования памяти VRAM.