Multi-GPU-Training mit Pytorch

Moderne Software-Anwendungen mit trainierten Deep-Learning-Modellen erfordern eine hohe Anzahl an parallelen Berechnungen. Da GPUs mehr Prozessor-Kerne (>10000) bieten, als klassische CPUs (<=64), ist die Verwendung von GPUs für die meisten Deep-Learning-Anwendungen obligatorisch.

Table of contents

Um die Rechenleistung weiter zu steigern, lassen sich AIME Server mit bis zu acht GPUs ausrüsten und kürzestmögliche Durchlaufzeiten erreichen. Um die volle Leistung von AIME-Maschinen zu nutzen, ist es wichtig sicherzustellen, dass alle installierten GPUs am Training effektiv teilnehmen.

Der folgende Artikel erklärt, wie man ein Modell mittels PyTorch effektiv mit mehreren GPUs trainiert. Der erste Teil befasst sich mit dem einfacheren DataParallel-Ansatz, der wenig effektiv nur einen einzigen Prozess nutzt. Der zweite Teil erklärt den effektiveren Lösungsweg, der unter Verwendung von DistributedDataParallel mittels Nutzung mehrerer paralleler Prozesse eine bessere Leistung bietet.

Multi-GPU-Training mittels DataParallel in einem einzigen Prozess

Der einfachste Weg, alle installierten GPUs mit PyTorch zu nutzen, ist die Verwendung der Funktion DataParallel aus dem PyTorch-Modul torch.nn.parallel. Dies kann fast auf die gleiche Weise wie bei einem Ein-GPU-Training erfolgen. Nachdem das Modell initialisiert wurde, passen Sie es wie in der folgenden Zeile dargestellt an:

model = torch.nn.parallel.DataParallel(model, device_ids=list(range(<num_gpus>)), dim=0)

wobei <num_gpus> die Anzahl der zu nutzenden GPUs darstellt.

Beachten Sie, dass die im Dataloader verwendete Batchgröße der globalen Batchgröße aller GPUs entspricht. Wenn Sie also die lokale Batchgröße jeder GPU verwenden möchten, müssen Sie sie mit der Anzahl der GPUs multiplizieren.

Im Folgenden wird ein voll funktionsfähiges Beispiel für ein Multi-GPU-Training mit einem resnet50-Modell aus der Torchvision-Bibliothek unter Verwendung von DataParallel gelistet:

#!/usr/bin/env python3

from pathlib import Path
import torch
import torchvision


def load_data(num_gpus):
    transforms = torchvision.transforms.Compose([
    torchvision.transforms.Resize(256),
    torchvision.transforms.CenterCrop(224),
    torchvision.transforms.ToTensor(),
    torchvision.transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
                                                 ])
    dataset = torchvision.datasets.ImageFolder(root=, transform=transforms)

    dataloader = torch.utils.data.DataLoader(
        dataset=dataset,
        batch_size=64,
        shuffle=False,
        num_workers=4*num_gpus
                                                )
    return dataloader

def save_model(epoch, model, optimizer):
    """Saves model checkpoint on given epoch with given data name.
    """
    checkpoint_folder = Path.cwd() / 'model_checkpoints'
    if not checkpoint_folder.is_dir():
        checkpoint_folder.mkdir()
    file = checkpoint_folder / f'epoch_{epoch}.pt'
    if not file.is_file():
        file.touch()
    torch.save(
        {
            'epoch': epoch,
            'model_state_dict': model.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
        },
        file
                )
    return True

def load_model(epoch, model, optimizer):
    """Loads model state from file.
    """
    file = Path.cwd() / 'model_checkpoints' / f'epoch_{epoch}.pt'
    checkpoint = torch.load(file)
    model.load_state_dict(checkpoint['model_state_dict'])
    optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
    return model, optimizer

def run_training(num_gpus):

    model = torchvision.models.resnet50(pretrained=False)
    model = model.cuda()
    model = torch.nn.parallel.DataParallel(model, device_ids=list(range(num_gpus)), dim=0)

    optimizer = torch.optim.SGD(model.parameters(), lr=0.01, momentum=0.9, weight_decay=1e-4)
    criterion = torch.nn.CrossEntropyLoss()
    criterion.cuda()
    model.train()
    num_epochs = 30
    dataloader = load_data(num_gpus)
    total_steps = len(dataloader)
    for epoch in range(1, num_epochs):
        print(f'\nEpoch {epoch}\n')
        if epoch > 1:
            model, optimizer = load_model(epoch-1, model, optimizer)
        for step, (images, labels) in enumerate(dataloader, 1):
            images, labels = images.cuda(), labels.cuda()
            optimizer.zero_grad()
            outputs = model(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            if step % 10 == 0:
                print(f'Epoch [{epoch} / {num_epochs}], Step [{step} / {total_steps}], Loss: {loss.item():.4f}')
        save_model(epoch, model, optimizer)

if __name__ == "__main__":
    num_gpus = torch.cuda.device_count()
    print('num_gpus: ', num_gpus)
    run_training(num_gpus)

Die Nutzung von DataParallel ist zwar recht unkompliziert ist und die hierdurch erreichte Leistungssteigerung im Vergleich zu einer einzelnen GPU bereits sichtbar, jedoch lässt sich die Performance noch weiter verbessern, da alle Berechnungen im selben Prozess ablaufen und die verfügbaren GPUs nicht voll ausgelastet werden. Für eine höhere Leistungsausbeute benötigt man mehrere parallele Prozesse, idealerweise jeweils einen eigenen Prozess pro GPU. Dies lässt sich unter Nutzung der Funktion DistributedDataParallel erreichen, die im folgenden Abschnitt detailliert erklärt wird. Die folgende Tabelle zeigt einen Vergleich der Trainings-Performance beider Methoden gemessen mit und unserem Benchmark-Tool https://github.com/aime-team/pytorch-benchmarks. Hier konnte die Trainings-Performance von 'DistributedDataParallel' im Vergleich zu 'DataParallel' um bis zu 17% gesteigert werden.

Anzahl der GPUs Bilder pro Sekunde mit Data Parallel Bilder pro Sekunde mit Distributed Data Parallel
1x NVIDIA RTX 3090 473 -
2x NVIDIA RTX 3090 883 944
4x NVIDIA RTX 3090 1526 1788

Multi-GPU-Training mit mehreren Prozessen (DistributedDataParallel)

Die in PyTorch verfügbare Funktion DistributedDataParallel aus dem Modul torch.nn.parallel ist in der Lage, das Training auf alle GPUs zu verteilen, wobei ein Subprozess pro GPU jeweils ihre volle Kapazität ausnutzt. Im Vergleich zu DataParallel sind jedoch recht viele zusätzliche Arbeitsschritte erforderlich. Als erstes müssen die Umgebungsvariablen master address und master port mit den folgenden Zeilen gesetzt werden:

os.environ['MASTER_ADDR'] = 'localhost'  
os.environ['MASTER_PORT'] = '12355'

Der Master-Port kann auf eine beliebige Nummer geändert werden.

Dann wird mit der spawn-Methode aus dem Modul torch.multiprocessing für jede GPU ein eigener Prozess erzeugt:

torch.multiprocessing.spawn(  
    run_training_process_on_given_gpu, 
    args=(args, ), 
    nprocs=<num_gpus>, 
    join=True)

Die Funktion run_training_process_on_given_gpu muss den gesamten Trainingscode für jede GPU mit ihren Argumenten args (ohne Rang) als Tupel enthalten. nprocs ist die Anzahl der Prozesse, die gespawnt werden sollen (z.B. die Anzahl der GPUs). Bei der Implementierung der Funktion run_training_process_on_given_gpu muss das erste Positionsargument dem Rang des Prozesses entsprechen. Die Spawn-Methode initialisiert nun nprocs-Prozesse. Das Positionsargument rank wird von den nprocs-Prozessen automatisch ab Rang 0 aufsteigend gesetzt. In jedem Prozess ist nun eine Prozessgruppe mit der Methode init_process_group aus dem Modul torch.distributed zu initialisieren:

torch.distributed.init_process_group(backend=<backend>, rank=rank, world_size=<num_gpus>, init_method='env://')

Übliche Werte für <backend> sind 'gloo' und 'nccl', wobei 'nccl' für Multi-GPU-Training empfohlen wird. Weitere Details zu Backends für verteiltes Training finden Sie unter https://pytorch.org/docs/stable/distributed.html. Die world_size entspricht hier der Anzahl der GPUs. Die Initialisierungsmethode 'env://' zieht alle benötigten Informationen aus der Umgebung. Der Rang meint hier den Rang der GPU, der durch die spawn-Methode weiter oben für jede GPU festgelegt wurde.

Jetzt muss das Modell mit der folgenden Zeile für das verteilte Training vorbereitet werden. Die folgenden Schritte müssen in jedem Prozess mit dem angegebenen Rang durchgeführt werden:

model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[rank])

Anschließend wird Ihr Modell und Ihre LossFunction (criterion) in den Speicher jeder einzelnen GPU geladen:

torch.cuda.set_device(rank)  
model.cuda(rank)  
criterion = torch.nn.CrossEntropyLoss()  
criterion.cuda(rank)

Der nächste Schritt besteht darin, den Dataloader für das verteilte Training vorzubereiten. Stellen Sie zunächst die Parameter für die Transformation der Daten in das Modell ein:

transforms = torchvision.transforms.Compose([  
    torchvision.transforms.Resize(256),  
    torchvision.transforms.CenterCrop(224),  
    torchvision.transforms.ToTensor(),  
    torchvision.transforms.Normalize(mean=[0.485, 0.456, 0.406],
    std=[0.229, 0.224, 0.225]),])

Initialisieren Sie dann den Datensatz, z.B. mit der Methode ImageFolder() aus dem Modul torchvision.datasets:

dataset = torchvision.datasets.ImageFolder(root=<image_destination>, transform=transforms)

Dann müssen wir den DistributedSampler aus dem Modul torch.utils.data.distributed initialisieren.

sampler = torch.utils.data.distributed.DistributedSampler(dataset)

Nun können wir den Dataloader aus dem Modul torch.utils.data mit dem angegebenen Datensatz und Sampler initialisieren:

dataloader = torch.utils.data.DataLoader(dataset=dataset, batch_size=<batch_size>, shuffle=False, num_workers=4*<num_gpus>, pin_memory=True, sampler=sampler)

Um die höchste Leistung zu erzielen, wird empfohlen, die Anzahl der Worker als vierfachen Wert der GPU-Anzahl festzulegen. Eine weitere Leistungssteigerung kann erreicht werden, indem im Dataloader pin_memory=True in Kombination mit non_blocking=True gesetzt wird, während die Daten mit dem cuda()-Aufruf in den GPU-Speicher verschoben werden (siehe unten). Im Gegensatz zu einem Einzel-GPU- oder DataParallel-Training muss das shuffle-Argument auf False gesetzt werden, da der Sampler bereits das Mischen der Daten übernimmt. Hier entspricht die batch_size der lokalen Batchgröße jeder GPU und nicht der globalen Batchgröße, wie in einer DataParallel-Anwendung.

Jetzt sind das Modell und der Dataloader bereit für das verteilte Training.

Die letzte Änderung für das Multi-GPU-Training erfolgt in der Trainingsschleife. Die Daten müssen nach jedem Schritt mit dem Befehl cuda() in den Speicher der GPUs verschoben werden:

for step, (data, label) in enumerate(dataloader):  
    data, label = data.cuda(rank, non_blocking=True), label.cuda(rank, non_blocking=True)

Wenn das Argument non_blocking  auf True gesetzt wird, wartet der Dataloader mit dem nächsten Befehl nicht bis die Daten in den Speicher der GPU geladen wurden. pin_memory=Truebedeutet der Datentransport geschieht in einem fest definierten Bereich, der für andere Aufgaben blockiert wird. Die Kombination aus beiden sorgt für einen weiteren Leistungsschub.

Modell speichern

Das trainierte Modell speichert man mit folgendem Befehl auf dem Festspeicher:

torch.save(
        {
            'epoch': epoch,
            'model_state_dict': model.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
        },
        file)

mit file als String oder PosixPath, der den Dateispeicherort der Checkpoint-Datei enthält. Es reicht aus, den Checkpoint nur von einer GPU (hier mit Rang 0) zu speichern, da die Informationen aller anderen GPUs bereits enthalten sind. Die Checkpoints der restlichen GPUs ebenfalls zu speichern und dabei aufeinander warten zu lassen ist also überflüssig.

Modell laden

Um einen gespeicherten Checkpoint Ihres Modells in alle beteiligten Prozesse zu laden, verwenden Sie die folgende Befehle:

torch.distributed.barrier()  
map_location = {'cuda:0': f'cuda:{rank}'}  
checkpoint = torch.load(file, map_location=map_location)  
model.load_state_dict(checkpoint['model_state_dict'])  
optimizer.load_state_dict(checkpoint['optimizer_state_dict'])

mit file als String oder PosixPath, der den Dateispeicherort der Checkpoint-Datei enthält. Der Befehl torch.distributed.barrier() sorgt dafür, dass die Prozesse synchronisiert werden und map_location kümmert sich um die Verteilung über alle Prozesse.

Code-Zusammenfassung

Als Zusammenfassung wird im Folgenden ein Beispiel für ein voll funktionsfähiges Multi-GPU-Training eines resnet50-Modells aus der Torchvision-Bibliothek mittels der DistributedDataParallel-Methode gelistet:

#!/usr/bin/env python3

import sys
import os
from pathlib import Path
import torch
import torchvision


def save_model(epoch, model, optimizer):
    """Saves model checkpoint on given epoch with given data name.
    """
    checkpoint_folder = Path.cwd() / 'model_checkpoints'
    if not checkpoint_folder.is_dir():
        checkpoint_folder.mkdir()
    file = checkpoint_folder / f'epoch_{epoch}.pt'
    if not file.is_file():
        file.touch()
    torch.save(
        {
            'epoch': epoch,
            'model_state_dict': model.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
        },
        file
                )
    return True

def load_model(rank, epoch, model, optimizer):
    """Loads model state from file to the GPU with given rank.
    """
    torch.distributed.barrier()
    map_location = {'cuda:0': f'cuda:{rank}'}
    file = Path.cwd() / 'model_checkpoints' / f'epoch_{epoch}.pt'
    checkpoint = torch.load(file, map_location=map_location)
    model.load_state_dict(checkpoint['model_state_dict'])
    optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
    return model, optimizer

def load_data(num_gpus):
    transforms = torchvision.transforms.Compose([
    torchvision.transforms.Resize(256),
    torchvision.transforms.CenterCrop(224),
    torchvision.transforms.ToTensor(),
    torchvision.transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
                                                ])
    dataset = torchvision.datasets.ImageFolder(root=, transform=transforms)
    sampler = torch.utils.data.distributed.DistributedSampler(dataset)

    dataloader = torch.utils.data.DataLoader(
        dataset=dataset,
        batch_size=64,
        shuffle=False,
        num_workers=4*num_gpus,
        pin_memory=True,
        sampler=sampler
                                                )
    return dataloader


def run_training_process_on_given_gpu(rank, num_gpus):
    torch.cuda.set_device(rank)
    torch.distributed.init_process_group(backend='nccl', rank=rank,
                    world_size=num_gpus, init_method='env://')
    model = torchvision.models.resnet50(pretrained=False)
    model = model.cuda(rank)
    model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[rank])

    optimizer = torch.optim.SGD(model.parameters(), lr=0.01, momentum=0.9, weight_decay=1e-4)
    criterion = torch.nn.CrossEntropyLoss()
    criterion.cuda(rank)
    model.train()
    num_epochs = 30
    dataloader = load_data(num_gpus)
    total_steps = len(dataloader)
    for epoch in range(1, num_epochs):
        if rank == 0:
            print(f'\nEpoch {epoch}\n')
        if epoch > 1:
            model, optimizer = load_model(rank, epoch-1, model, optimizer)

        for step, (images, labels) in enumerate(dataloader, 1):
            images, labels = images.cuda(rank, non_blocking=True), labels.cuda(rank, non_blocking=True)
            optimizer.zero_grad()
            outputs = model(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            if step % 10 == 0:
                if rank == 0:
                    print(f'Epoch [{epoch} / {num_epochs}], Step [{step} / {total_steps}], Loss: {loss.item():.4f}')
        if rank == 0:
            save_model(epoch, model, optimizer)
    torch.distributed.destroy_process_group()


if __name__ == "__main__":
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '12355'
    num_gpus = torch.cuda.device_count()
    print('num_gpus: ', num_gpus)
    torch.multiprocessing.spawn(run_training_process_on_given_gpu, args=(num_gpus, ), nprocs=num_gpus, join=True)

Um die volle Kapazität aller beteiligten GPUs zu nutzen, muss also, wie gezeigt, das Modul DistributedDataParallel verwendet werden. Mit unserem Bechmark-Tool https://github.com/aime-team/pytorch-benchmarks, der den Code beider Methoden enthält, lassen sich die Performance-Unterschiede von DistributedDataParallel und DataParallel vergleichen.

Spread the word

Weiter lesen...