Искусственный интеллект на нескольких графических процессорах: накопление градиента и параллелизм данных

Искусственный интеллект на нескольких графических процессорах: накопление градиента и параллелизм данных


Часть серии о распределенном искусственном интеллекте на нескольких графических процессорах:

Введение

Распределенный параллелизм данных (DDP) — это первый метод распараллеливания, который мы рассмотрим. Это основной подход Всегда Используется в условиях распределенного обучения и обычно сочетается с другими методами распараллеливания.

Быстрое обновление нейронной сети

Обучение нейронной сети означает выполнение прямого прохода, вычисление потерь, обратное распространение градиентов каждого веса относительно функции потерь и, наконец, обновление весов (это мы называем этапом оптимизации). В PyTorch это обычно выглядит так:

import torch

def training_loop(
    model: torch.nn.Module,
    dataloader: torch.utils.data.DataLoader,
    optimizer: torch.optim.Optimizer,
    loss_fn: callable,
):
    for i, batch in enumerate(dataloader):
        inputs, targets = batch
        output = model(inputs)  # Forward pass
        loss = loss_fn(output, targets)  # Compute loss
        loss.backward()  # Backward pass (compute gradients)
        optimizer.step()  # Update weights
        optimizer.zero_grad()  # Clear gradients for the next step

Выполнение этапа оптимизации на больших объемах обучающих данных обычно приводит к более точным оценкам градиента, что упрощает обучение и потенциально ускоряет сходимость. Поэтому в идеале мы должны делать каждый шаг после вычисления градиентов на основе всего набора обучающих данных. На практике это вряд ли возможно в сценариях глубокого обучения, поскольку вычисления заняли бы слишком много времени. Вместо этого мы работаем с меньшими частями, такими как мини-партия И микропартия.

  • партия: Относится ко всему обучающему набору, используемому на этапе оптимизации.
  • Мини-партия: Относится к небольшому подмножеству обучающих данных, используемых на этапе оптимизации.
  • Микро-пакет: Мини-партия относится к подмножеству операций, в которых мы объединяем несколько микропартий для этапа оптимизации.

Именно здесь в игру вступают накопление градиента и параллелизм данных. Хотя мы не используем весь набор данных для каждого шага, мы можем использовать эти методы, чтобы существенно увеличить размер мини-пакета.

постепенное накопление

Вот как это работает: выберите большой мини-пакет, который не помещается в памяти графического процессора, а затем разделите его. микропартия Он подходит. Для каждой микропартии бегите вперед и назад, добавляя (накапливая) рассчитанные градиенты. После обработки всех микропартий выполните один шаг оптимизации, используя усреднение градиентов.

Обратите внимание, что накопление градиента не является методом распараллеливания и не требует использования нескольких графических процессоров.

Искусственный интеллект на нескольких графических процессорах: накопление градиента и параллелизм данных
Изображение автора:Gradient Accumulation Animation

Реализовать накопление градиента с нуля очень просто. В простом цикле обучения это выглядит так:

import torch

def training_loop(
    model: torch.nn.Module,
    dataloader: torch.utils.data.DataLoader,
    optimizer: torch.optim.Optimizer,
    loss_fn: callable,
    grad_accum_steps: int,
):
    for i, batch in enumerate(dataloader):
        inputs, targets = batch
        output = model(inputs)
        loss = loss_fn(output, targets)
        loss.backward()  # Gradients get accumulated (summed)

        # Only update weights after `grad_accum_steps` micro-batches
        if (i+1) % grad_accum_steps == 0:  # i+1 to avoid a step in the first iteration when i=0
            optimizer.step()
            optimizer.zero_grad()

Обратите внимание, что мы последовательно Выполнение нескольких проходов вперед и назад перед каждым шагом оптимизации, что требует более длительных периодов обучения. Было бы здорово, если бы мы могли ускорить этот процесс, обрабатывая несколько микропакетов. параллельный…Именно это и делает DDP!

Распределенный параллелизм данных (DDP)

Для довольно небольшого количества графических процессоров (~до 8) DDP масштабируется примерно линейно, что является оптимальным. Это означает, что если вы удвоите количество графических процессоров, вы сможете сократить время обучения почти вдвое (о линейном масштабировании мы уже говорили).

При использовании DDP несколько графических процессоров работают вместе для обработки одного большого эффективного мини-пакета, обрабатывая каждый микропакет параллельно. Рабочий процесс выглядит следующим образом:

  1. Разделите мини-пакет между графическими процессорами.
  2. Каждый графический процессор выполняет свои собственные проходы вперед и назад для вычисления градиента для своего фрагмента данных (микропакета).
  3. используйте один полностью свернуть Операция усреднения градиентов по всем графическим процессорам (об этом мы узнали ранее в разделе «Коллективные операции»).
  4. Каждый графический процессор применяет одинаковые обновления веса, обеспечивая идеальную синхронизацию моделей.

Это позволяет нам тренироваться с гораздо большими эффективными размерами мини-пакетов, что приводит к более стабильному обучению и потенциально более быстрой сходимости.

Изображение автора: Параллельная анимация с распределенными данными

Реализация DDP с нуля в PyTorch

Давайте сделаем это шаг за шагом. В этой первой итерации мы синхронизируем только градиенты.

import torch


class DDPModelWrapper:
    def __init__(self, model: torch.nn.Module):
        self.model = model

    def __call__(self, *args, **kwargs):
        return self.model(*args, **kwargs)

    def sync_gradients(self):
        # Iterate over parameter matrices in the model
        for param in self.model.parameters():  
            # Some parameters might be frozen and don't have gradients
            if param.grad is not None:
                # We sum and then divide since torch.distributed doesn't have an average operation
                torch.distributed.all_reduce(param.grad.data, op=torch.distributed.ReduceOp.SUM)
                # Assuming each GPU received an equally sized mini-batch, we can average
                # the gradients dividing by the number of GPUs (aka world size)
                # By default the loss function already averages over the mini-batch size
                param.grad.data /= torch.distributed.get_world_size()

Прежде чем мы начнем обучение, мы, очевидно, требуем, чтобы наша модель была одинаковой для всех графических процессоров, иначе нам пришлось бы обучать разные модели! Давайте улучшим нашу реализацию, проверив, что все веса равны во время создания экземпляра (если вы не знаете, каковы ранги, см. первую публикацию в блоге из этой серии).

import torch


class DDPModelWrapper:
    def __init__(self, model: torch.nn.Module):
        self.model = model
        for param in self.model.parameters():
            # We create a new tensor so it can receive the broadcast
            rank_0_param = param.data.clone()
            # Initially rank_0_param contains the values for the current rank
            torch.distributed.broadcast(rank_0_param, src=0)
            # After the broadcast rank_0_param variable is overwritten with the parameters from rank_0
            if not torch.equal(param.data, rank_0_param):  # Now we compare rank_x with rank_0
                raise ValueError("Model parameters are not the same across all processes.")

    def __call__(self, *args, **kwargs):
        return self.model(*args, **kwargs)

    def sync_gradients(self):
        for param in self.model.parameters():  
            if param.grad is not None:  
                torch.distributed.all_reduce(param.grad.data, op=torch.distributed.ReduceOp.SUM)
                param.grad.data /= torch.distributed.get_world_size()

Связь DDP с Google Analytics

Вы можете комбинировать DDP с GA, чтобы добиться еще большего эффективного размера пакетов. Это особенно полезно, когда ваша модель настолько велика, что на один графический процессор умещается лишь несколько образцов.

Основным преимуществом является накладные расходы на связь уменьшены: вместо синхронизации градиентов после каждого пакета вы синхронизируете только один раз за пакет. grad_accum_steps партия. Это означает:

  • глобальный эффективный размер партии = num_gpus × micro_batch_size × grad_accum_steps
  • Меньше точек синхронизации = меньше времени, затрачиваемого на связь между графическими процессорами

Цикл обучения с использованием нашего DDPModelWrapper С накоплением градиента это выглядит так:

def training_loop(
    ddp_model: DDPModelWrapper,
    dataloader: torch.utils.data.DataLoader,
    optimizer: torch.optim.Optimizer,
    loss_fn: callable,
    grad_accum_steps: int,
):
    for i, batch in enumerate(dataloader):
        inputs, targets = batch
        output = ddp_model(inputs)
        loss = loss_fn(output, targets)
        loss.backward()

        if (i+1) % grad_accum_steps == 0:
            # Must sync gradients across GPUs *BEFORE* the optimization step
            ddp_model.sync_gradients()
            optimizer.step()
            optimizer.zero_grad()

Профессиональные советы и расширенное использование

  • Используйте предварительную выборку данных. Вы можете ускорить обучение, загрузив следующий пакет данных во время обработки текущего пакета. PyTorch’s DataLoader обеспечивает prefetch_factor Логика, управляющая количеством пакетов предварительной выборки в фоновом режиме. Правильно воспользоваться преимуществами предварительной выборки с помощью CUDA может быть немного сложно, поэтому мы оставим это для следующего поста.
  • Не максимально увеличивайте память графического процессора. Как ни странно, оставление некоторой свободной памяти может привести к увеличению производительности обучения. Если вы оставите свободными хотя бы ~15% памяти графического процессора, графический процессор сможет лучше управлять памятью, избегая фрагментации.
  • PyTorch DDP перекрывает связь с вычислениями. По умолчанию DDP передает градиенты так, как они рассчитываются во время обратного распространения ошибки, а не ждет завершения полного обратного прохода. Сюда:
    • PyTorch объединяет градиенты модели в сегменты. bucket_cap_mb Во время обратного прохода мегабайт PyTorch помечает градиенты как готовые к уменьшению в соответствии с расчетом. Как только все градиенты в сегменте готовы, DDP запускает асинхронный процесс. allreduce Усреднить эти градиенты по всем рангам. loss.backward() Звонок возвращается только после всего allreduceОперация завершена, поэтому я немедленно звоню opt.step() Безопасно.
    • bucket_cap_mb Параметр идет на компромисс: меньшие значения срабатывают чаще. allreduce операций, но каждый запуск коммуникационного ядра влечет за собой некоторые накладные расходы, которые могут снизить производительность. Большие значения уменьшают частоту связи, но также уменьшают перекрытие; В крайнем случае, если сегменты очень большие, вы ждете завершения всего обратного прохода, прежде чем обмениваться данными. Оптимальное значение зависит от архитектуры и оборудования вашей модели, поэтому создавайте профили с разными значениями, чтобы найти тот, который работает лучше всего.
Источник: Учебное пособие по PyTorch.
  • Вот полная реализация DDP в PyTorch:
"""
Launch with:
  torchrun --nproc_per_node=NUM_GPUS ddp.py
"""
import torch
import torch.nn as nn
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data import DataLoader, TensorDataset
from torch.utils.data.distributed import DistributedSampler
from torch import optim


class ToyModel(nn.Module):
    def __init__(self):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(1024, 1024), nn.ReLU(),
            nn.Linear(1024, 1024), nn.ReLU(),
            nn.Linear(1024, 256),
        )

    def forward(self, x):
        return self.net(x)


def train():
    dist.init_process_group(backend="nccl")
    rank = dist.get_rank()
    torch.cuda.set_device(rank)
    device = torch.device(f"cuda:{rank}")

    # Create dummy dataset
    x_data = torch.randn(1000, 1024)
    y_data = torch.randn(1000, 256)
    dataset = TensorDataset(x_data, y_data)

    # DistributedSampler ensures each rank gets different data
    sampler = DistributedSampler(dataset, shuffle=True)
    dataloader = DataLoader(dataset, batch_size=64, sampler=sampler)

    model = ToyModel().to(device)

    # gradient_as_bucket_view: avoids an extra grad tensor copy per bucket.
    ddp_model = DDP(
        model,
        device_ids=[rank],
        bucket_cap_mb=25,
        gradient_as_bucket_view=True,
    )

    optimizer = optim.AdamW(ddp_model.parameters(), lr=1e-3)
    loss_fn = nn.MSELoss()

    for epoch in range(2):
        sampler.set_epoch(epoch)  # Ensures different shuffling each epoch

        for batch_idx, (x, y) in enumerate(dataloader):
            x, y = x.to(device), y.to(device)

            optimizer.zero_grad()
            output = ddp_model(x)
            loss = loss_fn(output, y)

            # Backward automatically overlaps with allreduce per bucket.
            # By the time this returns, all allreduce ops are done.
            loss.backward()
            optimizer.step()

            if rank == 0 and batch_idx % 5 == 0:
                print(f"epoch {epoch}  batch {batch_idx}  loss={loss.item():.4f}")

    dist.destroy_process_group()


if __name__ == "__main__":
    train()
  • Вот полная реализация PyTorch, сочетающая DDP и GA:
"""
Launch with:
  torchrun --nproc_per_node=NUM_GPUS ddp_ga.py
"""
import torch
import torch.nn as nn
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data import DataLoader, TensorDataset
from torch.utils.data.distributed import DistributedSampler
from torch import optim
from contextlib import nullcontext


class ToyModel(nn.Module):
    def __init__(self):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(1024, 1024), nn.ReLU(),
            nn.Linear(1024, 1024), nn.ReLU(),
            nn.Linear(1024, 256),
        )

    def forward(self, x):
        return self.net(x)


def train():
    dist.init_process_group(backend="nccl")
    rank = dist.get_rank()
    torch.cuda.set_device(rank)
    device = torch.device(f"cuda:{rank}")

    # Create dummy dataset
    x_data = torch.randn(1000, 1024)
    y_data = torch.randn(1000, 256)
    dataset = TensorDataset(x_data, y_data)

    # DistributedSampler ensures each rank gets different data
    sampler = DistributedSampler(dataset, shuffle=True)
    dataloader = DataLoader(dataset, batch_size=16, sampler=sampler)

    model = ToyModel().to(device)

    ddp_model = DDP(
        model,
        device_ids=[rank],
        bucket_cap_mb=25,
        gradient_as_bucket_view=True,
    )

    optimizer = optim.AdamW(ddp_model.parameters(), lr=1e-3)
    loss_fn = nn.MSELoss()

    ACCUM_STEPS = 4

    for epoch in range(2):
        sampler.set_epoch(epoch)  # Ensures different shuffling each epoch

        optimizer.zero_grad()
        for batch_idx, (x, y) in enumerate(dataloader):
            x, y = x.to(device), y.to(device)

            is_last_micro_step = (batch_idx + 1) % ACCUM_STEPS == 0

            # no_sync() suppresses allreduce on accumulation steps.
            # On the last microstep we exit no_sync() so DDP fires
            # the allreduce overlapped with that backward pass.
            ctx = ddp_model.no_sync() if not is_last_micro_step else nullcontext()

            with ctx:
                output = ddp_model(x)
                loss = loss_fn(output, y) / ACCUM_STEPS
                loss.backward()

            if is_last_micro_step:
                optimizer.step()
                optimizer.zero_grad()

                if rank == 0:
                    print(f"epoch {epoch}  batch {batch_idx}  loss={loss.item() * ACCUM_STEPS:.4f}")

    dist.destroy_process_group()


if __name__ == "__main__":
    train()

заключение

Следуйте за мной на X, чтобы увидеть больше бесплатного контента об искусственном интеллекте @l_cesconetto

Поздравляем с доведением до конца! Из этого поста вы узнали о:

  • Важность большого размера партии
  • Как работает накопление градиента и его ограничения
  • Рабочий процесс DDP и его преимущества
  • Как реализовать GA и DDP с нуля в PyTorch
  • Как объединить GA и DDP

В следующей статье мы рассмотрим Zero (Zero Redundancy Optimizer), более продвинутую технологию, основанную на DDP для дальнейшей оптимизации использования памяти VRAM.

Ссылка

Leave a Reply

Your email address will not be published. Required fields are marked *