diff --git a/CHANGELOG.md b/CHANGELOG.md index 7d596b7e4..2464b1f9a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,9 @@ - Support `assert` statements in kernels ([docs](https://nvidia.github.io/warp/debugging.html#assertions)). Assertions can only be triggered in `"debug"` mode ([GH-366](https://github.com/NVIDIA/warp/issues/336)). - Add optimization example for soft-body properties ([GH-419](https://github.com/NVIDIA/warp/pull/419)). +- CUDA IPC support on Linux. Call the `ipc_handle()` method get an IPC handle for a `wp.Event` or a `wp.array`, + and call `wp.from_ipc_handle()` or `wp.event_from_ipc_handle()` in another process to open the handle + ([docs](https://nvidia.github.io/warp/modules/runtime.html#interprocess-communication-ipc)). - Add per-module option to disable fused floating point operations, use `wp.set_module_options({"fuse_fp": False})` ([GH-379](https://github.com/NVIDIA/warp/issues/379)). - Add per-module option to add CUDA-C line information for profiling, use `wp.set_module_options({"lineinfo": True})`. diff --git a/docs/modules/runtime.rst b/docs/modules/runtime.rst index e5541cf36..be5aecc54 100644 --- a/docs/modules/runtime.rst +++ b/docs/modules/runtime.rst @@ -115,7 +115,6 @@ Additionally, data can be copied between arrays in different memory spaces using :undoc-members: :exclude-members: vars - Multi-dimensional Arrays ######################## @@ -1200,3 +1199,42 @@ See :doc:`../profiling` documentation for more information. .. autoclass:: warp.ScopedTimer :noindex: + +Interprocess Communication (IPC) +-------------------------------- + +Interprocess communication can be used to share Warp arrays and events across +processes without creating copies of the underlying data. + +Some basic requirements for using IPC include: + +* Linux operating system +* The array must be allocated on a GPU device using the default memory allocator (see :doc:`allocators`) + + The ``wp.ScopedMempool`` context manager is useful for temporarily disabling + memory pools for the purpose of allocating arrays that can be shared using IPC. + +Support for IPC on a device is indicated by the :attr:`is_ipc_supported ` +attribute of the :class:`Device `. + +To share a Warp array between processes, use :meth:`array.ipc_handle` in the +originating process to obtain an IPC handle for the array's memory allocation. +The handle is a ``bytes`` object with a length of 64. +The IPC handle along with information about the array (data type, shape, and +optionally strides) should be shared with another process, e.g. via shared +memory or files. +Another process can use this information to import the original array by +calling :func:`from_ipc_handle`. + +Events can be shared in a similar manner, but they must be constructed with +``interprocess=True``. Additionally, events cannot be created with both +``interprocess=True`` and ``enable_timing=True``. Use :meth:`Event.ipc_handle` +in the originating process to obtain an IPC handle for the event. Another +process can use this information to import the original event by calling +:func:`event_from_ipc_handle`. + + + +.. autofunction:: from_ipc_handle + +.. autofunction:: event_from_ipc_handle diff --git a/warp/__init__.py b/warp/__init__.py index 243d7ae1b..71a9f86f0 100644 --- a/warp/__init__.py +++ b/warp/__init__.py @@ -43,6 +43,8 @@ # numpy interop from warp.types import dtype_from_numpy, dtype_to_numpy +from warp.types import from_ipc_handle + from warp.context import init, func, func_grad, func_replay, func_native, kernel, struct, overload from warp.context import is_cpu_available, is_cuda_available, is_device_available from warp.context import get_devices, get_preferred_device @@ -65,6 +67,7 @@ synchronize, force_load, load_module, + event_from_ipc_handle, ) from warp.context import set_module_options, get_module_options, get_module from warp.context import capture_begin, capture_end, capture_launch diff --git a/warp/context.py b/warp/context.py index a15c5ada7..ae9412610 100644 --- a/warp/context.py +++ b/warp/context.py @@ -5,6 +5,8 @@ # distribution of this software and related documentation without an express # license agreement from NVIDIA CORPORATION is strictly prohibited. +from __future__ import annotations + import ast import ctypes import errno @@ -2358,6 +2360,7 @@ class Flags: DEFAULT = 0x0 BLOCKING_SYNC = 0x1 DISABLE_TIMING = 0x2 + INTERPROCESS = 0x4 def __new__(cls, *args, **kwargs): """Creates a new event instance.""" @@ -2365,7 +2368,9 @@ def __new__(cls, *args, **kwargs): instance.owner = False return instance - def __init__(self, device: "Devicelike" = None, cuda_event=None, enable_timing: bool = False): + def __init__( + self, device: "Devicelike" = None, cuda_event=None, enable_timing: bool = False, interprocess: bool = False + ): """Initializes the event on a CUDA device. Args: @@ -2377,6 +2382,12 @@ def __init__(self, device: "Devicelike" = None, cuda_event=None, enable_timing: :func:`~warp.get_event_elapsed_time` can be used to measure the time between two events created with ``enable_timing=True`` and recorded onto streams. + interprocess: If ``True`` this event may be used as an interprocess event. + + Raises: + RuntimeError: The event could not be created. + ValueError: The combination of ``enable_timing=True`` and + ``interprocess=True`` is not allowed. """ device = get_device(device) @@ -2391,11 +2402,48 @@ def __init__(self, device: "Devicelike" = None, cuda_event=None, enable_timing: flags = Event.Flags.DEFAULT if not enable_timing: flags |= Event.Flags.DISABLE_TIMING + if interprocess: + if enable_timing: + raise ValueError("The combination of 'enable_timing=True' and 'interprocess=True' is not allowed.") + flags |= Event.Flags.INTERPROCESS + self.cuda_event = runtime.core.cuda_event_create(device.context, flags) if not self.cuda_event: raise RuntimeError(f"Failed to create event on device {device}") self.owner = True + def ipc_handle(self) -> bytes: + """Return a CUDA IPC handle of the event as a 64-byte ``bytes`` object. + + The event must have been created with ``interprocess=True`` in order to + obtain a valid interprocess handle. + + IPC is currently only supported on Linux. + + Example: + Create an event and get its IPC handle:: + + e1 = wp.Event(interprocess=True) + event_handle = e1.ipc_handle() + + Raises: + RuntimeError: Device does not support IPC. + """ + + if self.device.is_ipc_supported: + # Allocate a buffer for the data (64-element char array) + ipc_handle_buffer = (ctypes.c_char * 64)() + + warp.context.runtime.core.cuda_ipc_get_event_handle(self.device.context, self.cuda_event, ipc_handle_buffer) + + if ipc_handle_buffer.raw == bytes(64): + warp.utils.warn("IPC event handle appears to be invalid. Was interprocess=True used?") + + return ipc_handle_buffer.raw + + else: + raise RuntimeError(f"Device {self.device} does not support IPC.") + def __del__(self): if not self.owner: return @@ -2543,23 +2591,22 @@ class Device: """A device to allocate Warp arrays and to launch kernels on. Attributes: - ordinal: A Warp-specific integer label for the device. ``-1`` for CPU devices. - name: A string label for the device. By default, CPU devices will be named according to the processor name, + ordinal (int): A Warp-specific label for the device. ``-1`` for CPU devices. + name (str): A label for the device. By default, CPU devices will be named according to the processor name, or ``"CPU"`` if the processor name cannot be determined. - arch: An integer representing the compute capability version number calculated as - ``10 * major + minor``. ``0`` for CPU devices. - is_uva: A boolean indicating whether the device supports unified addressing. + arch (int): The compute capability version number calculated as ``10 * major + minor``. + ``0`` for CPU devices. + is_uva (bool): Indicates whether the device supports unified addressing. ``False`` for CPU devices. - is_cubin_supported: A boolean indicating whether Warp's version of NVRTC can directly + is_cubin_supported (bool): Indicates whether Warp's version of NVRTC can directly generate CUDA binary files (cubin) for this device's architecture. ``False`` for CPU devices. - is_mempool_supported: A boolean indicating whether the device supports using the - ``cuMemAllocAsync`` and ``cuMemPool`` family of APIs for stream-ordered memory allocations. ``False`` for - CPU devices. - is_primary: A boolean indicating whether this device's CUDA context is also the - device's primary context. - uuid: A string representing the UUID of the CUDA device. The UUID is in the same format used by - ``nvidia-smi -L``. ``None`` for CPU devices. - pci_bus_id: A string identifier for the CUDA device in the format ``[domain]:[bus]:[device]``, in which + is_mempool_supported (bool): Indicates whether the device supports using the ``cuMemAllocAsync`` and + ``cuMemPool`` family of APIs for stream-ordered memory allocations. ``False`` for CPU devices. + is_ipc_supported (bool): Indicates whether the device supports IPC. + is_primary (bool): Indicates whether this device's CUDA context is also the device's primary context. + uuid (str): The UUID of the CUDA device. The UUID is in the same format used by ``nvidia-smi -L``. + ``None`` for CPU devices. + pci_bus_id (str): An identifier for the CUDA device in the format ``[domain]:[bus]:[device]``, in which ``domain``, ``bus``, and ``device`` are all hexadecimal values. ``None`` for CPU devices. """ @@ -2592,6 +2639,7 @@ def __init__(self, runtime, alias, ordinal=-1, is_primary=False, context=None): self.is_uva = False self.is_mempool_supported = False self.is_mempool_enabled = False + self.is_ipc_supported = False # TODO: Support IPC for CPU arrays self.is_cubin_supported = False self.uuid = None self.pci_bus_id = None @@ -2607,8 +2655,11 @@ def __init__(self, runtime, alias, ordinal=-1, is_primary=False, context=None): # CUDA device self.name = runtime.core.cuda_device_get_name(ordinal).decode() self.arch = runtime.core.cuda_device_get_arch(ordinal) - self.is_uva = runtime.core.cuda_device_is_uva(ordinal) - self.is_mempool_supported = runtime.core.cuda_device_is_mempool_supported(ordinal) + self.is_uva = runtime.core.cuda_device_is_uva(ordinal) > 0 + self.is_mempool_supported = runtime.core.cuda_device_is_mempool_supported(ordinal) > 0 + self.is_ipc_supported = ( + runtime.core.cuda_device_is_ipc_supported(ordinal) > 0 and platform.system() == "Linux" + ) if warp.config.enable_mempools_at_init: # enable if supported self.is_mempool_enabled = self.is_mempool_supported @@ -3375,6 +3426,8 @@ def __init__(self): self.core.cuda_device_is_uva.restype = ctypes.c_int self.core.cuda_device_is_mempool_supported.argtypes = [ctypes.c_int] self.core.cuda_device_is_mempool_supported.restype = ctypes.c_int + self.core.cuda_device_is_ipc_supported.argtypes = [ctypes.c_int] + self.core.cuda_device_is_ipc_supported.restype = ctypes.c_int self.core.cuda_device_set_mempool_release_threshold.argtypes = [ctypes.c_int, ctypes.c_uint64] self.core.cuda_device_set_mempool_release_threshold.restype = ctypes.c_int self.core.cuda_device_get_mempool_release_threshold.argtypes = [ctypes.c_int] @@ -3428,6 +3481,22 @@ def __init__(self): self.core.cuda_set_mempool_access_enabled.argtypes = [ctypes.c_int, ctypes.c_int, ctypes.c_int] self.core.cuda_set_mempool_access_enabled.restype = ctypes.c_int + # inter-process communication + self.core.cuda_ipc_get_mem_handle.argtypes = [ctypes.c_void_p, ctypes.POINTER(ctypes.c_char)] + self.core.cuda_ipc_get_mem_handle.restype = None + self.core.cuda_ipc_open_mem_handle.argtypes = [ctypes.c_void_p, ctypes.POINTER(ctypes.c_char)] + self.core.cuda_ipc_open_mem_handle.restype = ctypes.c_void_p + self.core.cuda_ipc_close_mem_handle.argtypes = [ctypes.c_void_p] + self.core.cuda_ipc_close_mem_handle.restype = None + self.core.cuda_ipc_get_event_handle.argtypes = [ + ctypes.c_void_p, + ctypes.c_void_p, + ctypes.POINTER(ctypes.c_char), + ] + self.core.cuda_ipc_get_event_handle.restype = None + self.core.cuda_ipc_open_event_handle.argtypes = [ctypes.c_void_p, ctypes.POINTER(ctypes.c_char)] + self.core.cuda_ipc_open_event_handle.restype = ctypes.c_void_p + self.core.cuda_stream_create.argtypes = [ctypes.c_void_p, ctypes.c_int] self.core.cuda_stream_create.restype = ctypes.c_void_p self.core.cuda_stream_destroy.argtypes = [ctypes.c_void_p, ctypes.c_void_p] @@ -4894,6 +4963,40 @@ def from_numpy( ) +def event_from_ipc_handle(handle, device: "Devicelike" = None) -> Event: + """Create an event from an IPC handle. + + Args: + handle: The interprocess event handle for an existing CUDA event. + device (Devicelike): Device to associate with the array. + + Returns: + An event created from the interprocess event handle ``handle``. + + Raises: + RuntimeError: IPC is not supported on ``device``. + """ + + try: + # Performance note: try first, ask questions later + device = warp.context.runtime.get_device(device) + except Exception: + # Fallback to using the public API for retrieving the device, + # which takes take of initializing Warp if needed. + device = warp.context.get_device(device) + + if not device.is_ipc_supported: + raise RuntimeError(f"IPC is not supported on device {device}.") + + event = Event( + device=device, cuda_event=warp.context.runtime.core.cuda_ipc_open_event_handle(device.context, handle) + ) + # Events created from IPC handles must be freed with cuEventDestroy + event.owner = True + + return event + + # given a kernel destination argument type and a value convert # to a c-type that can be passed to a kernel def pack_arg(kernel, arg_type, arg_name, value, device, adjoint=False): diff --git a/warp/native/cuda_util.cpp b/warp/native/cuda_util.cpp index 4a4107449..5b1d113e6 100644 --- a/warp/native/cuda_util.cpp +++ b/warp/native/cuda_util.cpp @@ -102,6 +102,11 @@ static PFN_cuGraphicsGLRegisterBuffer_v3000 pfn_cuGraphicsGLRegisterBuffer; static PFN_cuGraphicsUnregisterResource_v3000 pfn_cuGraphicsUnregisterResource; static PFN_cuModuleGetGlobal_v3020 pfn_cuModuleGetGlobal; static PFN_cuFuncSetAttribute_v9000 pfn_cuFuncSetAttribute; +static PFN_cuIpcGetEventHandle_v4010 pfn_cuIpcGetEventHandle; +static PFN_cuIpcOpenEventHandle_v4010 pfn_cuIpcOpenEventHandle; +static PFN_cuIpcGetMemHandle_v4010 pfn_cuIpcGetMemHandle; +static PFN_cuIpcOpenMemHandle_v11000 pfn_cuIpcOpenMemHandle; +static PFN_cuIpcCloseMemHandle_v4010 pfn_cuIpcCloseMemHandle; static bool cuda_driver_initialized = false; @@ -238,6 +243,11 @@ bool init_cuda_driver() get_driver_entry_point("cuGraphicsUnregisterResource", 3000, &(void*&)pfn_cuGraphicsUnregisterResource); get_driver_entry_point("cuModuleGetGlobal", 3020, &(void*&)pfn_cuModuleGetGlobal); get_driver_entry_point("cuFuncSetAttribute", 9000, &(void*&)pfn_cuFuncSetAttribute); + get_driver_entry_point("cuIpcGetEventHandle", 4010, &(void*&)pfn_cuIpcGetEventHandle); + get_driver_entry_point("cuIpcOpenEventHandle", 4010, &(void*&)pfn_cuIpcOpenEventHandle); + get_driver_entry_point("cuIpcGetMemHandle", 4010, &(void*&)pfn_cuIpcGetMemHandle); + get_driver_entry_point("cuIpcOpenMemHandle", 11000, &(void*&)pfn_cuIpcOpenMemHandle); + get_driver_entry_point("cuIpcCloseMemHandle", 4010, &(void*&)pfn_cuIpcCloseMemHandle); if (pfn_cuInit) cuda_driver_initialized = check_cu(pfn_cuInit(0)); @@ -585,4 +595,29 @@ CUresult cuFuncSetAttribute_f(CUfunction hfunc, CUfunction_attribute attrib, int return pfn_cuFuncSetAttribute ? pfn_cuFuncSetAttribute(hfunc, attrib, value) : DRIVER_ENTRY_POINT_ERROR; } +CUresult cuIpcGetEventHandle_f(CUipcEventHandle *pHandle, CUevent event) +{ + return pfn_cuIpcGetEventHandle ? pfn_cuIpcGetEventHandle(pHandle, event) : DRIVER_ENTRY_POINT_ERROR; +} + +CUresult cuIpcOpenEventHandle_f(CUevent *phEvent, CUipcEventHandle handle) +{ + return pfn_cuIpcOpenEventHandle ? pfn_cuIpcOpenEventHandle(phEvent, handle) : DRIVER_ENTRY_POINT_ERROR; +} + +CUresult cuIpcGetMemHandle_f(CUipcMemHandle *pHandle, CUdeviceptr dptr) +{ + return pfn_cuIpcGetMemHandle ? pfn_cuIpcGetMemHandle(pHandle, dptr) : DRIVER_ENTRY_POINT_ERROR; +} + +CUresult cuIpcOpenMemHandle_f(CUdeviceptr *pdptr, CUipcMemHandle handle, unsigned int flags) +{ + return pfn_cuIpcOpenMemHandle ? pfn_cuIpcOpenMemHandle(pdptr, handle, flags) : DRIVER_ENTRY_POINT_ERROR; +} + +CUresult cuIpcCloseMemHandle_f(CUdeviceptr dptr) +{ + return pfn_cuIpcCloseMemHandle ? pfn_cuIpcCloseMemHandle(dptr) : DRIVER_ENTRY_POINT_ERROR; +} + #endif // WP_ENABLE_CUDA diff --git a/warp/native/cuda_util.h b/warp/native/cuda_util.h index 7498af015..22fefe8c4 100644 --- a/warp/native/cuda_util.h +++ b/warp/native/cuda_util.h @@ -101,6 +101,11 @@ CUresult cuGraphicsGLRegisterBuffer_f(CUgraphicsResource *pCudaResource, unsigne CUresult cuGraphicsUnregisterResource_f(CUgraphicsResource resource); CUresult cuModuleGetGlobal_f(CUdeviceptr* dptr, size_t* bytes, CUmodule hmod, const char* name ); CUresult cuFuncSetAttribute_f(CUfunction hfunc, CUfunction_attribute attrib, int value); +CUresult cuIpcGetEventHandle_f(CUipcEventHandle *pHandle, CUevent event); +CUresult cuIpcOpenEventHandle_f(CUevent *phEvent, CUipcEventHandle handle); +CUresult cuIpcGetMemHandle_f(CUipcMemHandle *pHandle, CUdeviceptr dptr); +CUresult cuIpcOpenMemHandle_f(CUdeviceptr *pdptr, CUipcMemHandle handle, unsigned int flags); +CUresult cuIpcCloseMemHandle_f(CUdeviceptr dptr); bool init_cuda_driver(); bool is_cuda_driver_initialized(); diff --git a/warp/native/warp.cpp b/warp/native/warp.cpp index afcae954a..7ff84a01a 100644 --- a/warp/native/warp.cpp +++ b/warp/native/warp.cpp @@ -992,6 +992,7 @@ WP_API int cuda_device_get_pci_bus_id(int ordinal) { return -1; } WP_API int cuda_device_get_pci_device_id(int ordinal) { return -1; } WP_API int cuda_device_is_uva(int ordinal) { return 0; } WP_API int cuda_device_is_mempool_supported(int ordinal) { return 0; } +WP_API int cuda_device_is_ipc_supported(int ordinal) { return 0; } WP_API int cuda_device_set_mempool_release_threshold(int ordinal, uint64_t threshold) { return 0; } WP_API uint64_t cuda_device_get_mempool_release_threshold(int ordinal) { return 0; } WP_API void cuda_device_get_memory_info(int ordinal, size_t* free_mem, size_t* total_mem) {} @@ -1015,6 +1016,12 @@ WP_API int cuda_set_peer_access_enabled(void* target_context, void* peer_context WP_API int cuda_is_mempool_access_enabled(int target_ordinal, int peer_ordinal) { return 0; } WP_API int cuda_set_mempool_access_enabled(int target_ordinal, int peer_ordinal, int enable) { return 0; } +WP_API void cuda_ipc_get_mem_handle(void* ptr, char* out_buffer) {} +WP_API void* cuda_ipc_open_mem_handle(void* context, char* handle) { return NULL; } +WP_API void cuda_ipc_close_mem_handle(void* ptr) {} +WP_API void cuda_ipc_get_event_handle(void* context, void* event, char* out_buffer) {} +WP_API void* cuda_ipc_open_event_handle(void* context, char* handle) { return NULL; } + WP_API void* cuda_stream_create(void* context, int priority) { return NULL; } WP_API void cuda_stream_destroy(void* context, void* stream) {} WP_API void cuda_stream_register(void* context, void* stream) {} diff --git a/warp/native/warp.cu b/warp/native/warp.cu index 903e7324f..3a9b46780 100644 --- a/warp/native/warp.cu +++ b/warp/native/warp.cu @@ -146,6 +146,7 @@ struct DeviceInfo int arch = 0; int is_uva = 0; int is_mempool_supported = 0; + int is_ipc_supported = 0; int max_smem_bytes = 0; CUcontext primary_context = NULL; }; @@ -270,6 +271,7 @@ int cuda_init() check_cu(cuDeviceGetAttribute_f(&g_devices[i].pci_device_id, CU_DEVICE_ATTRIBUTE_PCI_DEVICE_ID, device)); check_cu(cuDeviceGetAttribute_f(&g_devices[i].is_uva, CU_DEVICE_ATTRIBUTE_UNIFIED_ADDRESSING, device)); check_cu(cuDeviceGetAttribute_f(&g_devices[i].is_mempool_supported, CU_DEVICE_ATTRIBUTE_MEMORY_POOLS_SUPPORTED, device)); + check_cu(cuDeviceGetAttribute_f(&g_devices[i].is_ipc_supported, CU_DEVICE_ATTRIBUTE_IPC_EVENT_SUPPORTED, device)); check_cu(cuDeviceGetAttribute_f(&g_devices[i].max_smem_bytes, CU_DEVICE_ATTRIBUTE_MAX_SHARED_MEMORY_PER_BLOCK_OPTIN, device)); int major = 0; int minor = 0; @@ -1801,6 +1803,13 @@ int cuda_device_is_mempool_supported(int ordinal) return 0; } +int cuda_device_is_ipc_supported(int ordinal) +{ + if (ordinal >= 0 && ordinal < int(g_devices.size())) + return g_devices[ordinal].is_ipc_supported; + return 0; +} + int cuda_device_set_mempool_release_threshold(int ordinal, uint64_t threshold) { if (ordinal < 0 || ordinal > int(g_devices.size())) @@ -2268,6 +2277,52 @@ int cuda_set_mempool_access_enabled(int target_ordinal, int peer_ordinal, int en return 1; // success } +void cuda_ipc_get_mem_handle(void* ptr, char* out_buffer) { + CUipcMemHandle memHandle; + check_cu(cuIpcGetMemHandle_f(&memHandle, (CUdeviceptr)ptr)); + memcpy(out_buffer, memHandle.reserved, CU_IPC_HANDLE_SIZE); +} + +void* cuda_ipc_open_mem_handle(void* context, char* handle) { + ContextGuard guard(context); + + CUipcMemHandle memHandle; + memcpy(memHandle.reserved, handle, CU_IPC_HANDLE_SIZE); + + CUdeviceptr device_ptr; + + // Strangely, the CU_IPC_MEM_LAZY_ENABLE_PEER_ACCESS flag is required + if check_cu(cuIpcOpenMemHandle_f(&device_ptr, memHandle, CU_IPC_MEM_LAZY_ENABLE_PEER_ACCESS)) + return (void*) device_ptr; + else + return NULL; +} + +void cuda_ipc_close_mem_handle(void* ptr) { + check_cu(cuIpcCloseMemHandle_f((CUdeviceptr) ptr)); +} + +void cuda_ipc_get_event_handle(void* context, void* event, char* out_buffer) { + ContextGuard guard(context); + + CUipcEventHandle eventHandle; + check_cu(cuIpcGetEventHandle_f(&eventHandle, static_cast(event))); + memcpy(out_buffer, eventHandle.reserved, CU_IPC_HANDLE_SIZE); +} + +void* cuda_ipc_open_event_handle(void* context, char* handle) { + ContextGuard guard(context); + + CUipcEventHandle eventHandle; + memcpy(eventHandle.reserved, handle, CU_IPC_HANDLE_SIZE); + + CUevent event; + + if (check_cu(cuIpcOpenEventHandle_f(&event, eventHandle))) + return event; + else + return NULL; +} void* cuda_stream_create(void* context, int priority) { diff --git a/warp/native/warp.h b/warp/native/warp.h index 51c94374a..8f86632ce 100644 --- a/warp/native/warp.h +++ b/warp/native/warp.h @@ -269,6 +269,7 @@ extern "C" WP_API int cuda_device_get_pci_device_id(int ordinal); WP_API int cuda_device_is_uva(int ordinal); WP_API int cuda_device_is_mempool_supported(int ordinal); + WP_API int cuda_device_is_ipc_supported(int ordinal); WP_API int cuda_device_set_mempool_release_threshold(int ordinal, uint64_t threshold); WP_API uint64_t cuda_device_get_mempool_release_threshold(int ordinal); WP_API void cuda_device_get_memory_info(int ordinal, size_t* free_mem, size_t* total_mem); @@ -297,6 +298,13 @@ extern "C" WP_API int cuda_is_mempool_access_enabled(int target_ordinal, int peer_ordinal); WP_API int cuda_set_mempool_access_enabled(int target_ordinal, int peer_ordinal, int enable); + // inter-process communication + WP_API void cuda_ipc_get_mem_handle(void* ptr, char* out_buffer); + WP_API void* cuda_ipc_open_mem_handle(void* context, char* handle); + WP_API void cuda_ipc_close_mem_handle(void* ptr); + WP_API void cuda_ipc_get_event_handle(void* context, void* event, char* out_buffer); + WP_API void* cuda_ipc_open_event_handle(void* context, char* handle); + WP_API void* cuda_stream_create(void* context, int priority); WP_API void cuda_stream_destroy(void* context, void* stream); WP_API void cuda_stream_register(void* context, void* stream); diff --git a/warp/stubs.py b/warp/stubs.py index 844aea958..c3163ab57 100644 --- a/warp/stubs.py +++ b/warp/stubs.py @@ -48,6 +48,8 @@ from warp.types import dtype_from_numpy, dtype_to_numpy +from warp.types import from_ipc_handle + from warp.context import init, func, func_grad, func_replay, func_native, kernel, struct, overload from warp.context import is_cpu_available, is_cuda_available, is_device_available from warp.context import get_devices, get_preferred_device @@ -70,6 +72,7 @@ synchronize, force_load, load_module, + event_from_ipc_handle, ) from warp.context import set_module_options, get_module_options, get_module from warp.context import capture_begin, capture_end, capture_launch diff --git a/warp/tests/test_ipc.py b/warp/tests/test_ipc.py new file mode 100644 index 000000000..f3b19cdd0 --- /dev/null +++ b/warp/tests/test_ipc.py @@ -0,0 +1,116 @@ +# Copyright (c) 2024 NVIDIA CORPORATION. All rights reserved. +# NVIDIA CORPORATION and its licensors retain all intellectual property +# and proprietary rights in and to this software, related documentation +# and any modifications thereto. Any use, reproduction, disclosure or +# distribution of this software and related documentation without an express +# license agreement from NVIDIA CORPORATION is strictly prohibited. + +import multiprocessing as mp +import unittest + +import warp as wp +from warp.tests.unittest_utils import * + + +def test_ipc_get_memory_handle(test, device): + if not device.is_ipc_supported: + test.skipTest(f"IPC is not supported on {device}") + + with wp.ScopedMempool(device, False): + test_array = wp.full(10, value=42.0, dtype=wp.float32, device=device) + ipc_handle = test_array.ipc_handle() + + test.assertNotEqual(ipc_handle, bytes(64), "IPC memory handle appears to be invalid") + + +def test_ipc_get_event_handle(test, device): + if not device.is_ipc_supported: + test.skipTest(f"IPC is not supported on {device}") + + e1 = wp.Event(device, interprocess=True) + + ipc_handle = e1.ipc_handle() + + test.assertNotEqual(ipc_handle, bytes(64), "IPC event handle appears to be invalid") + + +def test_ipc_event_missing_interprocess_flag(test, device): + if not device.is_ipc_supported: + test.skipTest(f"IPC is not supported on {device}") + + e1 = wp.Event(device, interprocess=False) + + try: + capture = StdOutCapture() + capture.begin() + ipc_handle = e1.ipc_handle() + finally: + output = capture.end() + + # Older Windows C runtimes have a bug where stdout sometimes does not get properly flushed. + if sys.platform != "win32": + test.assertRegex(output, r"Warp UserWarning: IPC event handle appears to be invalid.") + + +@wp.kernel +def multiply_by_two(a: wp.array(dtype=wp.float32)): + i = wp.tid() + a[i] = 2.0 * a[i] + + +def child_task(array_handle, dtype, shape, device, event_handle): + with wp.ScopedDevice(device): + ipc_array = wp.from_ipc_handle(array_handle, dtype, shape, device=device) + ipc_event = wp.event_from_ipc_handle(event_handle, device=device) + stream = wp.get_stream() + wp.launch(multiply_by_two, ipc_array.shape, inputs=[ipc_array]) + stream.record_event(ipc_event) + stream.wait_event(ipc_event) + wp.synchronize_device() + + +def test_ipc_multiprocess_write(test, device): + if not device.is_ipc_supported: + test.skipTest(f"IPC is not supported on {device}") + + stream = wp.get_stream(device) + e1 = wp.Event(device, interprocess=True) + + with wp.ScopedMempool(device, False): + test_array = wp.full(1024, value=42.0, dtype=wp.float32, device=device) + ipc_handle = test_array.ipc_handle() + + wp.launch(multiply_by_two, test_array.shape, inputs=[test_array], device=device) + + ctx = mp.get_context("spawn") + + process = ctx.Process( + target=child_task, args=(ipc_handle, test_array.dtype, test_array.shape, str(device), e1.ipc_handle()) + ) + + process.start() + process.join() + + assert_np_equal(test_array.numpy(), np.full(test_array.shape, 168.0, dtype=np.float32)) + + +cuda_devices = get_cuda_test_devices() + + +class TestIpc(unittest.TestCase): + pass + + +add_function_test(TestIpc, "test_ipc_get_memory_handle", test_ipc_get_memory_handle, devices=cuda_devices) +add_function_test(TestIpc, "test_ipc_get_event_handle", test_ipc_get_event_handle, devices=cuda_devices) +add_function_test( + TestIpc, "test_ipc_event_missing_interprocess_flag", test_ipc_event_missing_interprocess_flag, devices=cuda_devices +) +add_function_test( + TestIpc, "test_ipc_multiprocess_write", test_ipc_multiprocess_write, devices=cuda_devices, check_output=False +) + + +if __name__ == "__main__": + wp.clear_kernel_cache() + unittest.main(verbosity=2) diff --git a/warp/types.py b/warp/types.py index b67097aab..6cd78a889 100644 --- a/warp/types.py +++ b/warp/types.py @@ -1517,7 +1517,7 @@ def strides_from_shape(shape: Tuple, dtype): def check_array_shape(shape: Tuple): - """Checks that the size in each dimension is positive and less than 2^32.""" + """Checks that the size in each dimension is positive and less than 2^31.""" for dim_index, dim_size in enumerate(shape): if dim_size < 0: @@ -2021,6 +2021,7 @@ def _init_new(self, dtype, shape, strides, device, pinned): self.pinned = pinned if device.is_cpu else False self.is_contiguous = is_contiguous self.deleter = allocator.deleter + self._allocator = allocator def _init_annotation(self, dtype, ndim): self.dtype = dtype @@ -2737,6 +2738,52 @@ def transpose(self, axes=None): a._ref = self return a + def ipc_handle(self) -> bytes: + """Return an IPC handle of the array as a 64-byte ``bytes`` object + + :func:`from_ipc_handle` can be used with this handle in another process + to obtain a :class:`array` that shares the same underlying memory + allocation. + + IPC is currently only supported on Linux. + Additionally, IPC is only supported for arrays allocated using + the default memory allocator. + + :class:`Event` objects created with the ``interprocess=True`` argument + may similarly be shared between processes to synchronize GPU work. + + Example: + Temporarily using the default memory allocator to allocate an array + and get its IPC handle:: + + with wp.ScopedMempool("cuda:0", False): + test_array = wp.full(1024, value=42.0, dtype=wp.float32, device="cuda:0") + ipc_handle = test_array.ipc_handle() + + Raises: + RuntimeError: The array is not associated with a CUDA device. + RuntimeError: The CUDA device does not appear to support IPC. + RuntimeError: The array was allocated using the :ref:`mempool memory allocator `. + """ + + if self.device is None or not self.device.is_cuda: + raise RuntimeError("IPC requires a CUDA device") + elif not self.device.is_ipc_supported: + raise RuntimeError("IPC does not appear to be supported on this CUDA device") + elif isinstance(self._allocator, warp.context.CudaMempoolAllocator): + raise RuntimeError( + "Currently, IPC is only supported for arrays using the default memory allocator.\n" + "See https://nvidia.github.io/warp/modules/allocators.html for instructions on how to disable\n" + f"the mempool allocator on device {self.device}." + ) + + # Allocate a buffer for the data (64-element char array) + ipc_handle_buffer = (ctypes.c_char * 64)() + + warp.context.runtime.core.cuda_ipc_get_mem_handle(self.ptr, ipc_handle_buffer) + + return ipc_handle_buffer.raw + # aliases for arrays with small dimensions def array1d(*args, **kwargs): @@ -2785,6 +2832,51 @@ def from_ptr(ptr, length, dtype=None, shape=None, device=None): ) +def _close_cuda_ipc_handle(ptr, size): + warp.context.runtime.core.cuda_ipc_close_mem_handle(ptr) + + +def from_ipc_handle( + handle: bytes, dtype, shape: Tuple[int, ...], strides: Optional[Tuple[int, ...]] = None, device=None +) -> array: + """Create an array from an IPC handle. + + The ``dtype``, ``shape``, and optional ``strides`` arguments should + match the values from the :class:`array` from which ``handle`` was created. + + Args: + handle: The interprocess memory handle for an existing device memory allocation. + dtype: One of the available `data types <#data-types>`_, such as :class:`warp.float32`, :class:`warp.mat33`, or a custom `struct <#structs>`_. + shape: Dimensions of the array. + strides: Number of bytes in each dimension between successive elements of the array. + device (Devicelike): Device to associate with the array. + + Returns: + An array created from the existing memory allocation described by the interprocess memory handle ``handle``. + + A copy of the underlying data is not made. Modifications to the array's data will be reflected in the + original process from which ``handle`` was exported. + + Raises: + RuntimeError: IPC is not supported on ``device``. + """ + + try: + # Performance note: try first, ask questions later + device = warp.context.runtime.get_device(device) + except Exception: + # Fallback to using the public API for retrieving the device, + # which takes take of initializing Warp if needed. + device = warp.context.get_device(device) + + if not device.is_ipc_supported: + raise RuntimeError(f"IPC is not supported on device {device}.") + + ptr = warp.context.runtime.core.cuda_ipc_open_mem_handle(device.context, handle) + + return array(ptr=ptr, dtype=dtype, shape=shape, strides=strides, device=device, deleter=_close_cuda_ipc_handle) + + # A base class for non-contiguous arrays, providing the implementation of common methods like # contiguous(), to(), numpy(), list(), assign(), zero_(), and fill_(). class noncontiguous_array_base(Generic[T]):