Skip to content

Commit

Permalink
Add cuda to multigpu (xpu) bench (#8386)
Browse files Browse the repository at this point in the history
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Co-authored-by: Damian Szwichtenberg <[email protected]>
  • Loading branch information
3 people authored Nov 16, 2023
1 parent 3af88bd commit aff3a99
Show file tree
Hide file tree
Showing 5 changed files with 129 additions and 50 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/).
- Added support for `torch.compile` in `MultiAggregation` ([#8345](https://github.com/pyg-team/pytorch_geometric/pull/8345))
- Added support for `torch.compile` in `HeteroConv` ([#8344](https://github.com/pyg-team/pytorch_geometric/pull/8344))
- Added support for weighted `sparse_cross_entropy` ([#8340](https://github.com/pyg-team/pytorch_geometric/pull/8340))
- Added a multi GPU training benchmarks for XPU device ([#8288](https://github.com/pyg-team/pytorch_geometric/pull/8288))
- Added a multi GPU training benchmarks for CUDA and XPU devices ([#8288](https://github.com/pyg-team/pytorch_geometric/pull/8288), [#8386](https://github.com/pyg-team/pytorch_geometric/pull/8386))
- Support MRR computation in `KGEModel.test()` ([#8298](https://github.com/pyg-team/pytorch_geometric/pull/8298))
- Added an example for model parallelism (`examples/multi_gpu/model_parallel.py`) ([#8309](https://github.com/pyg-team/pytorch_geometric/pull/8309))
- Added a tutorial for multi-node multi-GPU training with pure PyTorch ([#8071](https://github.com/pyg-team/pytorch_geometric/pull/8071))
Expand Down
17 changes: 11 additions & 6 deletions benchmark/multi_gpu/training/README.md
Original file line number Diff line number Diff line change
@@ -1,16 +1,21 @@
# Training Benchmark

## Environment setup
## Running benchmark on CUDA GPU

Optional, XPU only:
Run benchmark, e.g. assuming you have `n` NVIDIA GPUs:
```
python training_benchmark_cuda.py --dataset ogbn-products --model edge_cnn --num-epochs 3 --n_gpus <n>
```

## Running benchmark on Intel GPU

## Environment setup
```
install intel_extension_for_pytorch
install oneccl_bindings_for_pytorch
```

## Running benchmark

Run benchmark, e.g. assuming you have 2 GPUs:
Run benchmark, e.g. assuming you have `n` XPUs:
```
mpirun -np 2 python training_benchmark.py --dataset ogbn-products --model edge_cnn --num-epochs 3
mpirun -np <n> python training_benchmark_xpu.py --dataset ogbn-products --model edge_cnn --num-epochs 3
```
Original file line number Diff line number Diff line change
@@ -1,17 +1,15 @@
import argparse
import ast
import os
from time import perf_counter
from typing import Any, Tuple, Union
from typing import Any, Callable, Tuple, Union

import intel_extension_for_pytorch as ipex
import oneccl_bindings_for_pytorch # noqa
import torch
import torch.distributed as dist
import torch.nn.functional as F
from torch.nn.parallel import DistributedDataParallel as DDP

from benchmark.utils import get_dataset, get_model, get_split_masks, test
from benchmark.utils import get_model, get_split_masks, test
from torch_geometric.data import Data, HeteroData
from torch_geometric.loader import NeighborLoader
from torch_geometric.nn import PNAConv

Expand All @@ -24,6 +22,7 @@

device_conditions = {
'xpu': (lambda: torch.xpu.is_available()),
'cuda': (lambda: torch.cuda.is_available()),
}


Expand Down Expand Up @@ -63,6 +62,8 @@ def train_hetero(model: Any, loader: NeighborLoader,
def maybe_synchronize(device: str):
if device == 'xpu' and torch.xpu.is_available():
torch.xpu.synchronize()
if device == 'cuda' and torch.cuda.is_available():
torch.cuda.synchronize()


def create_mask_per_rank(
Expand All @@ -83,7 +84,9 @@ def create_mask_per_rank(
return mask_per_rank


def run(rank: int, world_size: int, args: argparse.ArgumentParser):
def run(rank: int, world_size: int, args: argparse.ArgumentParser,
num_classes: int, data: Union[Data, HeteroData],
custom_optimizer: Callable[[Any, Any], Tuple[Any, Any]] = None):
if not device_conditions[args.device]():
raise RuntimeError(f'{args.device.upper()} is not available')

Expand All @@ -92,13 +95,8 @@ def run(rank: int, world_size: int, args: argparse.ArgumentParser):
if rank == 0:
print('BENCHMARK STARTS')
print(f'Running on {args.device.upper()}')

assert args.dataset in supported_sets.keys(
), f"Dataset {args.dataset} isn't supported."
if rank == 0:
print(f'Dataset: {args.dataset}')

data, num_classes = get_dataset(args.dataset, args.root)
hetero = True if args.dataset == 'ogbn-mag' else False
mask, val_mask, test_mask = get_split_masks(data, args.dataset)
mask = create_mask_per_rank(mask, rank, world_size, hetero)
Expand Down Expand Up @@ -192,8 +190,8 @@ def run(rank: int, world_size: int, args: argparse.ArgumentParser):

optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

if args.device == 'xpu':
model, optimizer = ipex.optimize(model, optimizer=optimizer)
if custom_optimizer:
model, optimizer = custom_optimizer(model, optimizer)

train = train_hetero if hetero else train_homo

Expand Down Expand Up @@ -248,37 +246,11 @@ def run(rank: int, world_size: int, args: argparse.ArgumentParser):
dist.destroy_process_group()


def get_dist_params() -> Tuple[int, int, str]:
master_addr = "127.0.0.1"
master_port = "29500"
os.environ["MASTER_ADDR"] = master_addr
os.environ["MASTER_PORT"] = master_port

mpi_rank = int(os.environ.get("PMI_RANK", -1))
mpi_world_size = int(os.environ.get("PMI_SIZE", -1))
rank = mpi_rank if mpi_world_size > 0 else os.environ.get("RANK", 0)
world_size = (mpi_world_size if mpi_world_size > 0 else os.environ.get(
"WORLD_SIZE", 1))

os.environ["RANK"] = str(rank)
os.environ["WORLD_SIZE"] = str(world_size)

init_method = f"tcp://{master_addr}:{master_port}"

return rank, world_size, init_method


if __name__ == '__main__':
rank, world_size, init_method = get_dist_params()
dist.init_process_group(backend="ccl", init_method=init_method,
world_size=world_size, rank=rank)

def get_predefined_args() -> argparse.ArgumentParser:
argparser = argparse.ArgumentParser(
'GNN distributed (DDP) training benchmark')
add = argparser.add_argument

add('--device', choices=['xpu'], default='xpu',
help='Device to run benchmark on')
add('--dataset', choices=['ogbn-mag', 'ogbn-products', 'Reddit'],
default='Reddit', type=str)
add('--model',
Expand All @@ -297,6 +269,4 @@ def get_dist_params() -> Tuple[int, int, str]:
add('--num-epochs', default=1, type=int)
add('--evaluate', action='store_true')

args = argparser.parse_args()

run(rank, world_size, args)
return argparser
51 changes: 51 additions & 0 deletions benchmark/multi_gpu/training/training_benchmark_cuda.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import argparse
import os
from typing import Union

import torch
import torch.distributed as dist
import torch.multiprocessing as mp

from benchmark.multi_gpu.training.common import (
get_predefined_args,
run,
supported_sets,
)
from benchmark.utils import get_dataset
from torch_geometric.data import Data, HeteroData


def run_cuda(rank: int, world_size: int, args: argparse.ArgumentParser,
num_classes: int, data: Union[Data, HeteroData]):
os.environ['MASTER_ADDR'] = 'localhost'
os.environ['MASTER_PORT'] = '12355'
dist.init_process_group('nccl', rank=rank, world_size=world_size)
run(rank, world_size, args, num_classes, data)


if __name__ == '__main__':
argparser = get_predefined_args()
argparser.add_argument('--n-gpus', default=1, type=int)
args = argparser.parse_args()
setattr(args, 'device', 'cuda')

assert args.dataset in supported_sets.keys(), \
f"Dataset {args.dataset} isn't supported."
data, num_classes = get_dataset(args.dataset, args.root)

max_world_size = torch.cuda.device_count()
chosen_world_size = args.n_gpus
if chosen_world_size <= max_world_size:
world_size = chosen_world_size
else:
print(f'User selected {chosen_world_size} GPUs '
f'but only {max_world_size} GPUs are available')
world_size = max_world_size
print(f'Let\'s use {world_size} GPUs!')

mp.spawn(
run_cuda,
args=(world_size, args, num_classes, data),
nprocs=world_size,
join=True,
)
53 changes: 53 additions & 0 deletions benchmark/multi_gpu/training/training_benchmark_xpu.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import os
from typing import Any, Tuple

import intel_extension_for_pytorch as ipex
import oneccl_bindings_for_pytorch # noqa
import torch.distributed as dist

from benchmark.multi_gpu.training.common import (
get_predefined_args,
run,
supported_sets,
)
from benchmark.utils import get_dataset


def get_dist_params() -> Tuple[int, int, str]:
master_addr = "127.0.0.1"
master_port = "29500"
os.environ["MASTER_ADDR"] = master_addr
os.environ["MASTER_PORT"] = master_port

mpi_rank = int(os.environ.get("PMI_RANK", -1))
mpi_world_size = int(os.environ.get("PMI_SIZE", -1))
rank = mpi_rank if mpi_world_size > 0 else os.environ.get("RANK", 0)
world_size = (mpi_world_size if mpi_world_size > 0 else os.environ.get(
"WORLD_SIZE", 1))

os.environ["RANK"] = str(rank)
os.environ["WORLD_SIZE"] = str(world_size)

init_method = f"tcp://{master_addr}:{master_port}"

return rank, world_size, init_method


def custom_optimizer(model: Any, optimizer: Any) -> Tuple[Any, Any]:
return ipex.optimize(model, optimizer=optimizer)


if __name__ == '__main__':
rank, world_size, init_method = get_dist_params()
dist.init_process_group(backend="ccl", init_method=init_method,
world_size=world_size, rank=rank)

argparser = get_predefined_args()
args = argparser.parse_args()
setattr(args, 'device', 'xpu')

assert args.dataset in supported_sets.keys(), \
f"Dataset {args.dataset} isn't supported."
data, num_classes = get_dataset(args.dataset, args.root)

run(rank, world_size, args, num_classes, data, custom_optimizer)

0 comments on commit aff3a99

Please sign in to comment.