Skip to content

Commit

Permalink
Merge pull request EESSI#130 from casparvl/pytorch
Browse files Browse the repository at this point in the history
PyTorch test that uses torchvision
  • Loading branch information
smoors authored Jul 30, 2024
2 parents 5cabf95 + 4c5c3e7 commit 6e05428
Show file tree
Hide file tree
Showing 5 changed files with 446 additions and 10 deletions.
2 changes: 2 additions & 0 deletions eessi/testsuite/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
HWTHREAD = 'HWTHREAD'
CPU = 'CPU'
CPU_SOCKET = 'CPU_SOCKET'
NUMA_NODE = 'NUMA_NODE'
GPU = 'GPU'
GPU_VENDOR = 'GPU_VENDOR'
INTEL = 'INTEL'
Expand All @@ -23,6 +24,7 @@
HWTHREAD: 'hwthread',
CPU: 'cpu',
CPU_SOCKET: 'cpu_socket',
NUMA_NODE: 'numa_node',
GPU: 'gpu',
NODE: 'node',
}
Expand Down
71 changes: 61 additions & 10 deletions eessi/testsuite/hooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ def _assign_default_num_gpus_per_node(test: rfm.RegressionTest):

def assign_tasks_per_compute_unit(test: rfm.RegressionTest, compute_unit: str, num_per: int = 1):
"""
Assign one task per compute unit (COMPUTE_UNIT[CPU], COMPUTE_UNIT[CPU_SOCKET] or COMPUTE_UNIT[GPU]).
Assign one task per compute unit. More than 1 task per compute unit can be assigned with
num_per for compute units that support it.
Automatically sets num_tasks, num_tasks_per_node, num_cpus_per_task, and num_gpus_per_node,
based on the current scale and the current partition’s num_cpus, max_avail_gpus_per_node and num_nodes.
For GPU tests, one task per GPU is set, and num_cpus_per_task is based on the ratio of CPU-cores/GPUs.
Expand All @@ -80,7 +81,7 @@ def assign_tasks_per_compute_unit(test: rfm.RegressionTest, compute_unit: str, n
- assign_tasks_per_compute_unit(test, COMPUTE_UNIT[CPU_SOCKET]) will launch 2 tasks with 64 threads per task
"""
if num_per != 1 and compute_unit in [COMPUTE_UNIT[GPU], COMPUTE_UNIT[CPU], COMPUTE_UNIT[CPU_SOCKET]]:
if num_per != 1 and compute_unit not in [COMPUTE_UNIT[NODE]]:
raise NotImplementedError(
f'Non-default num_per {num_per} is not implemented for compute_unit {compute_unit}.')

Expand Down Expand Up @@ -123,6 +124,8 @@ def assign_tasks_per_compute_unit(test: rfm.RegressionTest, compute_unit: str, n
_assign_one_task_per_cpu(test)
elif compute_unit == COMPUTE_UNIT[CPU_SOCKET]:
_assign_one_task_per_cpu_socket(test)
elif compute_unit == COMPUTE_UNIT[NUMA_NODE]:
_assign_one_task_per_numa_node(test)
elif compute_unit == COMPUTE_UNIT[NODE]:
_assign_num_tasks_per_node(test, num_per)
else:
Expand Down Expand Up @@ -198,22 +201,70 @@ def _assign_one_task_per_cpu_socket(test: rfm.RegressionTest):
test.num_tasks_per_node * test.num_cpus_per_task == test.default_num_cpus_per_node.
Default resources requested:
- num_tasks_per_node = default_num_cpus_per_node
- num_tasks_per_node = default_num_cpus_per_node / num_cpus_per_socket
- num_cpus_per_task = default_num_cpus_per_node / num_tasks_per_node
"""
# neither num_tasks_per_node nor num_cpus_per_task are set
if not test.num_tasks_per_node and not test.num_cpus_per_task:
check_proc_attribute_defined(test, 'num_cpus')
check_proc_attribute_defined(test, 'num_sockets')
num_cpus_per_socket = test.current_partition.processor.num_cpus / test.current_partition.processor.num_sockets
test.num_tasks_per_node = math.ceil(test.default_num_cpus_per_node / num_cpus_per_socket)
check_proc_attribute_defined(test, 'num_cores_per_socket')
test.num_tasks_per_node = math.ceil(
test.default_num_cpus_per_node / test.current_partition.processor.num_cores_per_socket
)
test.num_cpus_per_task = int(test.default_num_cpus_per_node / test.num_tasks_per_node)

# num_tasks_per_node is not set, but num_cpus_per_task is
elif not test.num_tasks_per_node:
test.num_tasks_per_node = int(test.default_num_cpus_per_node / test.num_cpus_per_task)

# num_cpus_per_task is not set, but num_tasks_per_node is
elif not test.num_cpus_per_task:
test.num_cpus_per_task = int(test.default_num_cpus_per_node / test.num_tasks_per_node)

else:
pass # both num_tasks_per_node and num_cpus_per_node are already set

test.num_tasks = test.num_nodes * test.num_tasks_per_node
log(f'Number of tasks per node set to: {test.num_tasks_per_node}')
log(f'Number of cpus per task set to {test.num_cpus_per_task}')
log(f'num_tasks set to {test.num_tasks}')


def _assign_one_task_per_numa_node(test: rfm.RegressionTest):
"""
Determines the number of tasks per node by dividing the default_num_cpus_per_node by
the number of cpus available per numa node, and rounding up. The result is that for full-node jobs the default
will spawn one task per numa node, with a number of cpus per task equal to the number of cpus per numa node.
Other examples:
- half a node (i.e. node_part=2) on a system with 4 numa nodes would result in 2 tasks per node,
with number of cpus per task equal to the number of cpus per numa node.
- a quarter node (i.e. node_part=4) on a system with 2 numa nodes would result in 1 task per node,
with number of cpus equal to half a numa node.
- 2 cores (i.e. default_num_cpus_per_node=2) on a system with 4 cores per numa node would result in
1 task per node, with 2 cpus per task
- 8 cores (i.e. default_num_cpus_per_node=4) on a system with 4 cores per numa node would result in
2 tasks per node, with 4 cpus per task
This default is set unless the test is run with:
--setvar num_tasks_per_node=<x> and/or
--setvar num_cpus_per_task=<y>.
In those cases, those take precedence, and the remaining variable (num_cpus_per task or
num_tasks_per_node respectively) is calculated based on the equality
test.num_tasks_per_node * test.num_cpus_per_task == test.default_num_cpus_per_node.
Default resources requested:
- num_tasks_per_node = default_num_cpus_per_node / num_cores_per_numa_node
- num_cpus_per_task = default_num_cpus_per_node / num_tasks_per_node
"""
# neither num_tasks_per_node nor num_cpus_per_task are set
if not test.num_tasks_per_node and not test.num_cpus_per_task:
check_proc_attribute_defined(test, 'num_cores_per_numa_node')
test.num_tasks_per_node = math.ceil(
test.default_num_cpus_per_node / test.current_partition.processor.num_cores_per_numa_node
)
test.num_cpus_per_task = int(test.default_num_cpus_per_node / test.num_tasks_per_node)

# num_tasks_per_node is not set, but num_cpus_per_task is
elif not test.num_tasks_per_node:
check_proc_attribute_defined(test, 'num_cpus')
check_proc_attribute_defined(test, 'num_sockets')
num_cpus_per_socket = test.current_partition.processor.num_cpus / test.current_partition.processor.num_sockets
test.num_tasks_per_node = int(test.default_num_cpus_per_node / test.num_cpus_per_task)

# num_cpus_per_task is not set, but num_tasks_per_node is
Expand Down
134 changes: 134 additions & 0 deletions eessi/testsuite/tests/apps/PyTorch/PyTorch_torchvision.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
from itertools import chain

import reframe as rfm
import reframe.utility.sanity as sn
# Added only to make the linter happy
from reframe.core.builtins import parameter, variable, run_after, sanity_function, performance_function

from eessi.testsuite import hooks
from eessi.testsuite.constants import SCALES, TAGS, DEVICE_TYPES, COMPUTE_UNIT, CPU, NUMA_NODE, GPU
from eessi.testsuite.utils import find_modules


class EESSI_PyTorch_torchvision(rfm.RunOnlyRegressionTest):
nn_model = parameter(['vgg16', 'resnet50', 'resnet152', 'densenet121', 'mobilenet_v3_large'])
scale = parameter(SCALES.keys())
parallel_strategy = parameter([None, 'ddp'])
compute_device = variable(str)
# Both torchvision and PyTorch-bundle modules have everything needed to run this test
module_name = parameter(chain(find_modules('torchvision'), find_modules('PyTorch-bundle')))

descr = 'Benchmark that runs a selected torchvision model on synthetic data'

executable = 'python'

valid_prog_environs = ['default']
valid_systems = ['*']

time_limit = '30m'

@run_after('init')
def prepare_test(self):

# Set nn_model as executable option
self.executable_opts = ['pytorch_synthetic_benchmark.py --model %s' % self.nn_model]

# If not a GPU run, disable CUDA
if self.compute_device != DEVICE_TYPES[GPU]:
self.executable_opts += ['--no-cuda']

@run_after('init')
def apply_init_hooks(self):
# Filter on which scales are supported by the partitions defined in the ReFrame configuration
hooks.filter_supported_scales(self)

# Make sure that GPU tests run in partitions that support running on a GPU,
# and that CPU-only tests run in partitions that support running CPU-only.
# Also support setting valid_systems on the cmd line.
hooks.filter_valid_systems_by_device_type(self, required_device_type=self.compute_device)

# Support selecting modules on the cmd line.
hooks.set_modules(self)

# Support selecting scales on the cmd line via tags.
hooks.set_tag_scale(self)

@run_after('init')
def set_tag_ci(self):
if self.nn_model == 'resnet50':
self.tags.add(TAGS['CI'])

@run_after('setup')
def apply_setup_hooks(self):
if self.compute_device == DEVICE_TYPES[GPU]:
hooks.assign_tasks_per_compute_unit(test=self, compute_unit=COMPUTE_UNIT[GPU])
else:
# Hybrid code, for which launching one task per NUMA_NODE is typically the most efficient
hooks.assign_tasks_per_compute_unit(test=self, compute_unit=COMPUTE_UNIT[NUMA_NODE])

# This is a hybrid test, binding is important for performance
hooks.set_compact_process_binding(self)

# Set OMP_NUM_THREADS based on the number of cores per task
self.env_vars["OMP_NUM_THREADS"] = self.num_cpus_per_task

@run_after('setup')
def set_ddp_options(self):
# Set environment variables for PyTorch DDP
if self.parallel_strategy == 'ddp':
# Set additional options required by DDP
self.executable_opts += ["--master-port $(python get_free_socket.py)"]
self.executable_opts += ["--master-address $(hostname --fqdn)"]
self.executable_opts += ["--world-size %s" % self.num_tasks]

@run_after('setup')
def filter_invalid_parameter_combinations(self):
# We cannot detect this situation before the setup phase, because it requires self.num_tasks.
# Thus, the core count of the node needs to be known, which is only the case after the setup phase.
msg = "Skipping test: parallel strategy is 'None',"
msg += f" but requested process count is larger than one ({self.num_tasks})."
self.skip_if(self.num_tasks > 1 and self.parallel_strategy is None, msg)
msg = f"Skipping test: parallel strategy is {self.parallel_strategy},"
msg += " but only one process is requested."
self.skip_if(self.num_tasks == 1 and self.parallel_strategy is not None, msg)

@run_after('setup')
def pass_parallel_strategy(self):
# Set parallelization strategy when using more than one process
if self.num_tasks != 1:
self.executable_opts += ['--use-%s' % self.parallel_strategy]

@sanity_function
def assert_num_ranks(self):
'''Assert that the number of reported CPUs/GPUs used is correct'''
return sn.assert_found(r'Total img/sec on %s .PU\(s\):.*' % self.num_tasks, self.stdout)

@performance_function('img/sec')
def total_throughput(self):
'''Total training throughput, aggregated over all CPUs/GPUs'''
return sn.extractsingle(r'Total img/sec on [0-9]+ .PU\(s\):\s+(?P<perf>\S+)', self.stdout, 'perf', float)

@performance_function('img/sec')
def througput_per_CPU(self):
'''Training througput per CPU'''
if self.compute_device == DEVICE_TYPES[CPU]:
return sn.extractsingle(r'Img/sec per CPU:\s+(?P<perf_per_cpu>\S+)', self.stdout, 'perf_per_cpu', float)
else:
return sn.extractsingle(r'Img/sec per GPU:\s+(?P<perf_per_gpu>\S+)', self.stdout, 'perf_per_gpu', float)


@rfm.simple_test
class EESSI_PyTorch_torchvision_CPU(EESSI_PyTorch_torchvision):
compute_device = DEVICE_TYPES[CPU]


@rfm.simple_test
class EESSI_PyTorch_torchvision_GPU(EESSI_PyTorch_torchvision):
compute_device = DEVICE_TYPES[GPU]
precision = parameter(['default', 'mixed'])

@run_after('init')
def prepare_gpu_test(self):
# Set precision
if self.precision == 'mixed':
self.executable_opts += ['--use-amp']
8 changes: 8 additions & 0 deletions eessi/testsuite/tests/apps/PyTorch/src/get_free_socket.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# Based on https://unix.stackexchange.com/a/132524
import socket

s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind(('', 0))
addr = s.getsockname()
print(addr[1])
s.close()
Loading

0 comments on commit 6e05428

Please sign in to comment.