diff --git a/charm4py/channel.py b/charm4py/channel.py index aaf058ec..c5c5eed7 100644 --- a/charm4py/channel.py +++ b/charm4py/channel.py @@ -1,4 +1,6 @@ from .threads import LocalFuture +from .charm import charm +import time class Channel(object): @@ -52,15 +54,15 @@ def ready(self): def waitReady(self, f): self.wait_ready = f - def send(self, *msg): + def send(self, *msg, **kwargs): if not self.established: self.established_fut = LocalFuture() self.established_fut.get() self.setEstablished() - self.remote._channelRecv__(self.remote_port, self.send_seqno, *msg) + self.remote._channelRecv__(self.remote_port, self.send_seqno, *msg, **kwargs) self.send_seqno = (self.send_seqno + 1) % CHAN_BUF_SIZE - def recv(self): + def recv(self, *post_buffers, post_addresses = None, post_sizes = None, stream_ptrs = None): if self.recv_seqno in self.data: ret = self.data.pop(self.recv_seqno) else: @@ -68,4 +70,39 @@ def recv(self): ret = self.recv_fut.get() self.recv_fut = None self.recv_seqno = (self.recv_seqno + 1) % CHAN_BUF_SIZE + + if post_buffers: + if isinstance(ret, tuple): + gpu_recv_bufs = ret[-1] + ret = ret[0:-1] + if len(ret) == 1: + ret = ret[0] + else: + gpu_recv_bufs = ret + + assert len(post_buffers) == len(gpu_recv_bufs) + + recv_future = charm.getGPUDirectData(post_buffers, gpu_recv_bufs, stream_ptrs) + recv_future.get() + elif post_addresses is not None: + if isinstance(ret, tuple): + gpu_recv_bufs = ret[-1] + ret = ret[0:-1] + if len(ret) == 1: + ret = ret[0] + else: + gpu_recv_bufs = ret + + assert len(post_addresses) == len(gpu_recv_bufs) + assert post_sizes + recv_future = charm.getGPUDirectDataFromAddresses(post_addresses, post_sizes, gpu_recv_bufs, stream_ptrs) + recv_future.get() + + return ret + + + + + + diff --git a/charm4py/chare.py b/charm4py/chare.py index a1447eb4..4bfa6245 100644 --- a/charm4py/chare.py +++ b/charm4py/chare.py @@ -461,6 +461,9 @@ def proxy_entry_method(proxy, *args, **kwargs): for i in range(num_args, argcount): argname = argnames[i] # first look for argument in kwargs + # TODO: Should stream_ptrs be skipped? + if argname in {'stream_ptrs', 'src_ptrs', 'src_sizes'}: + continue if argname in kwargs: args.append(kwargs[argname]) else: @@ -485,8 +488,28 @@ def proxy_entry_method(proxy, *args, **kwargs): gid = proxy.gid if Options.local_msg_optim and (elemIdx == charm._myPe) and (len(args) > 0): destObj = charm.groups[gid] - msg = charm.packMsg(destObj, args, header) - charm.CkGroupSend(gid, elemIdx, ep, msg) + should_pack_gpu = True + if 'src_ptrs' in kwargs: + should_pack_gpu = False + msg = charm.packMsg(destObj, args, header, pack_gpu=should_pack_gpu) + if msg[1] or not should_pack_gpu: + if 'stream_ptrs' in kwargs: + stream_ptrs = kwargs['stream_ptrs'] + else: + stream_ptrs = None + if should_pack_gpu: + charm.CkGroupSendWithDeviceData(gid, elemIdx, ep, + msg, stream_ptrs + ) + else: + charm.CkGroupSendWithDeviceDataFromPointers(gid, elemIdx, ep, + msg, kwargs['src_ptrs'], + kwargs['src_sizes'], + stream_ptrs + ) + + else: + charm.CkGroupSend(gid, elemIdx, ep, msg) else: root, sid = proxy.section header[b'sid'] = sid @@ -721,7 +744,10 @@ def proxy_entry_method(proxy, *args, **kwargs): for i in range(num_args, argcount): argname = argnames[i] # first look for argument in kwargs - if argname in kwargs: + # TODO: Should stream_ptrs be skipped? + if argname in {'stream_ptrs', 'src_ptrs', 'src_sizes'}: + continue + if argname in kwargs and argname: args.append(kwargs[argname]) else: # if not there, see if there is a default value @@ -741,15 +767,39 @@ def proxy_entry_method(proxy, *args, **kwargs): if elemIdx == (): header[b'bcast'] = True if not proxy.issec or elemIdx != (): + # TODO: Check that this is channel proxy method? destObj = None aid = proxy.aid if Options.local_msg_optim and (len(args) > 0): array = charm.arrays[aid] if elemIdx in array: destObj = array[elemIdx] - msg = charm.packMsg(destObj, args, header) - charm.CkArraySend(aid, elemIdx, ep, msg) + should_pack_gpu = True + if 'src_ptrs' in kwargs: + should_pack_gpu = False + msg = charm.packMsg(destObj, args, header, pack_gpu = should_pack_gpu) + if msg[1] or not should_pack_gpu: + if 'stream_ptrs' in kwargs: + stream_ptrs = kwargs['stream_ptrs'] + else: + stream_ptrs = None + if should_pack_gpu: + charm.CkArraySendWithDeviceData(aid, elemIdx, ep, + msg, stream_ptrs + ) + else: + charm.CkArraySendWithDeviceDataFromPointers(aid, elemIdx, ep, + msg, kwargs['src_ptrs'], + kwargs['src_sizes'], + stream_ptrs + ) + + + + else: + charm.CkArraySend(aid, elemIdx, ep, msg) else: + # TODO: Error if trying to send ZC data root, sid = proxy.section header[b'sid'] = sid if Options.local_msg_optim and root == charm._myPe: diff --git a/charm4py/charm.py b/charm4py/charm.py index a13935fe..4e626a9e 100644 --- a/charm4py/charm.py +++ b/charm4py/charm.py @@ -28,6 +28,8 @@ from . import reduction from . import wait import array +import numpy as np +import greenlet try: import numpy except ImportError: @@ -40,6 +42,14 @@ class NumpyDummy: def SECTION_ALL(obj): return 0 +def getDeviceDataInfo(devArray): + return devArray.__cuda_array_interface__['data'] + +def getDeviceDataAddress(devArray): + return getDeviceDataInfo(devArray)[0] + +def getDeviceDataSizeInBytes(devArray): + return devArray.nbytes class Options(object): @@ -106,11 +116,13 @@ def __init__(self): self.options.interactive.verbose = 1 self.options.interactive.broadcast_imports = True + ''' if 'OMPI_COMM_WORLD_SIZE' in os.environ: # this is needed for OpenMPI, see: # https://svn.open-mpi.org/trac/ompi/wiki/Linkers import ctypes self.__libmpi__ = ctypes.CDLL('libmpi.so', mode=ctypes.RTLD_GLOBAL) + ''' self.lib = load_charm_library(self) self.ReducerType = self.lib.ReducerType self.CkContributeToChare = self.lib.CkContributeToChare @@ -120,6 +132,13 @@ def __init__(self): self.CkChareSend = self.lib.CkChareSend self.CkGroupSend = self.lib.CkGroupSend self.CkArraySend = self.lib.CkArraySend + self.CkArraySendWithDeviceData = self.lib.CkArraySendWithDeviceData + self.CkGroupSendWithDeviceData = self.lib.CkGroupSendWithDeviceData + self.CkArraySendWithDeviceDataFromPointersArray = self.lib.CkArraySendWithDeviceDataFromPointersArray + self.CkArraySendWithDeviceDataFromPointersOther = self.lib.CkArraySendWithDeviceDataFromPointersOther + self.CkGroupSendWithDeviceDataFromPointersArray = self.lib.CkGroupSendWithDeviceDataFromPointersArray + self.CkGroupSendWithDeviceDataFromPointersOther = self.lib.CkGroupSendWithDeviceDataFromPointersOther + self.CkCudaEnabled = self.lib.CkCudaEnabled self.reducers = reduction.ReducerContainer(self) self.redMgr = reduction.ReductionManager(self, self.reducers) self.mainchareRegistered = False @@ -305,6 +324,21 @@ def recvArrayMsg(self, aid, index, ep, msg, dcopy_start): self.arrays[aid][index] = obj em.run(obj, header, args) # now call the user's array element __init__ + def recvGPUDirectArrayMsg(self, aid, index, ep, + devBuf_ptrs, msg, dcopy_start + ): + obj = self.arrays[aid][index] + header, args = self.unpackMsg(msg, dcopy_start, obj) + args.append(devBuf_ptrs) + + self.invokeEntryMethod(obj, ep, header, args) + + def recvGPUDirectGroupMsg(self, gid, ep, devBuf_ptrs, msg, dcopy_start): + obj = self.groups[gid] + header, args = self.unpackMsg(msg, dcopy_start, obj) + args.append(devBuf_ptrs) + self.invokeEntryMethod(obj, ep, header, args) + def recvArrayBcast(self, aid, indexes, ep, msg, dcopy_start): header, args = self.unpackMsg(msg, dcopy_start, None) array = self.arrays[aid] @@ -332,6 +366,70 @@ def unpackMsg(self, msg, dcopy_start, dest_obj): return header, args + def getGPUDirectData(self, post_buffers, remote_bufs, stream_ptrs): + # this future will only be satisfied when all buffers have been received + return_fut = self.Future() + post_buf_data = [getDeviceDataAddress(buf) for buf in post_buffers] + post_buf_sizes = [getDeviceDataSizeInBytes(buf) for buf in post_buffers] + if not stream_ptrs: + stream_ptrs = [0] * len(post_buffers) + self.lib.getGPUDirectData(post_buf_data, post_buf_sizes, remote_bufs, stream_ptrs, return_fut) + return return_fut + + def getGPUDirectDataFromAddresses(self, post_buf_ptrs, post_buf_sizes, remote_bufs, stream_ptrs): + # this future will only be satisfied when all buffers have been received + return_fut = self.Future() + if not stream_ptrs: + stream_ptrs = array.array('L', [0] * len(post_buf_ptrs)) + self.lib.getGPUDirectDataFromAddresses(post_buf_ptrs, post_buf_sizes, remote_bufs, stream_ptrs, return_fut) + return return_fut + + def CkArraySendWithDeviceDataFromPointers(self, array_id, index, ep, + msg, gpu_src_ptrs, + gpu_src_sizes, + stream_ptrs + ): + if isinstance(gpu_src_ptrs, array.array): + assert isinstance(gpu_src_sizes, array.array), \ + "GPU source pointers and sizes must be of the same type." + self.CkArraySendWithDeviceDataFromPointersArray(array_id, index, ep, + msg, gpu_src_ptrs, + gpu_src_sizes, + stream_ptrs, + len(gpu_src_ptrs) + ) + else: + self.CkArraySendWithDeviceDataFromPointersOther(array_id, index, ep, + msg, gpu_src_ptrs, + gpu_src_sizes, + stream_ptrs, + len(gpu_src_ptrs) + ) + + def CkGroupSendWithDeviceDataFromPointers(self, gid, elemIdx, ep, + msg, gpu_src_ptrs, gpu_src_sizes, + stream_ptrs): + if isinstance(gpu_src_ptrs, array.array): + assert isinstance(gpu_src_sizes, array.array), \ + "GPU source pointers and sizes must be of the same type." + self.CkGroupSendWithDeviceDataFromPointersArray(gid, elemIdx, ep, msg, + gpu_src_ptrs, + gpu_src_sizes, + stream_ptrs, + len(gpu_src_ptrs) + ) + else: + self.CkGroupSendWithDeviceDataFromPointersOther(gid, elemIdx, ep, msg, + gpu_src_ptrs, + gpu_src_sizes, + stream_ptrs, + len(gpu_src_ptrs) + ) + + # deposit value of one of the futures that was created on this PE + def _future_deposit_result(self, fid, result=None): + self.threadMgr.depositFuture(fid, result) + def packMsg(self, destObj, msgArgs, header): """Prepares a message for sending, given arguments to an entry method invocation. @@ -815,6 +913,7 @@ def triggerCallable(self, tag): def iwait(self, objs): n = len(objs) f = LocalFuture() + for obj in objs: if obj.ready(): n -= 1 @@ -826,6 +925,43 @@ def iwait(self, objs): n -= 1 yield obj + def iwait_map(self, func, objs): + n = len(objs) + f = LocalFuture() + remaining_grs = [n] + + def map_func(remaining, obj): + gr = greenlet.getcurrent() + gr.notify = gr.parent.notify + gr.obj = gr.parent.obj + gr.fu = 1 + func(obj) + remaining[0] -= 1 + + def gr_func(): + return map_func(remaining_grs, obj) + + for obj in objs: + if obj.ready(): + new_gr = greenlet.greenlet(gr_func) + n -= 1 + obj = new_gr.switch() + while obj: + new_gr = greenlet.greenlet(gr_func) + n -= 1 + obj = new_gr.switch() + else: + obj.waitReady(f) + while n > 0: + obj = self.threadMgr.pauseThread() + while obj: + new_gr = greenlet.greenlet(gr_func) + n -= 1 + obj = new_gr.switch() + + while remaining_grs[0]: + self.threadMgr.pauseThread() + def wait(self, objs): for o in self.iwait(objs): pass @@ -1156,6 +1292,5 @@ def rebuildNumpyArray(data, shape, dt): a.shape = shape return a.copy() - charm = Charm() readonlies = __ReadOnlies() diff --git a/charm4py/charmlib/ccharm.pxd b/charm4py/charmlib/ccharm.pxd index 5bbe1b05..2de91517 100644 --- a/charm4py/charmlib/ccharm.pxd +++ b/charm4py/charmlib/ccharm.pxd @@ -1,5 +1,7 @@ # libcharm wrapper for Cython +cdef extern from "cuda_runtime.h": + ctypedef long cudaStream_t cdef extern from "charm.h": @@ -51,6 +53,7 @@ cdef extern from "charm.h": void registerReadOnlyRecvExtCallback(void (*cb)(int, char*)); void registerChareMsgRecvExtCallback(void (*cb)(int, void*, int, int, char*, int)); void registerGroupMsgRecvExtCallback(void (*cb)(int, int, int, char *, int)); + void registerGroupMsgGPUDirectRecvExtCallback(void (*cb)(int, int, int, int *, void *, int, char *, int)); void registerArrayMsgRecvExtCallback(void (*cb)(int, int, int *, int, int, char *, int)); void registerArrayBcastRecvExtCallback(void (*cb)(int, int, int, int, int *, int, int, char *, int)); void registerArrayElemLeaveExtCallback(int (*cb)(int, int, int *, char**, int)); @@ -70,8 +73,39 @@ cdef extern from "charm.h": void CkStartQDExt_SectionCallback(int sid_pe, int sid_cnt, int rootPE, int ep); void CcdCallFnAfter(void (*CcdVoidFn)(void *userParam,double curWallTime), void *arg, double msecs); + # TODO: Organize these to place them near their related functions + int CkCudaEnabled(); + int CUDAPointerOnDevice(const void *ptr); + void CkArrayExtSendWithDeviceData(int aid, int *idx, int ndims, + int epIdx, int num_bufs, char **bufs, + int *buf_sizes, + long *devBufPtrs, + int *devBufSizesInBytes, + long *streamPtrs, int numDevBufs + ); + void CkGroupExtSendWithDeviceData(int gid, int pe, int epIdx, int num_bufs, char **bufs, + int *buf_sizes, long *devBufPtrs, + int *devBufSizesInBytes, + long *streamPtrs, int numDevBufs + ); + + + void registerArrayMsgGPUDirectRecvExtCallback(void (*cb)(int, int, int*, int, int, int*, void *, int, char*, int)); + void CkGetGPUDirectData(int numBuffers, void *recvBufPtrs, int *arrSizes, + void *remoteBufInfo, void *streamPtrs, int futureId); + + int CkDeviceBufferSizeInBytes(); + + void registerDepositFutureWithIdFn(void (*cb)(void*, void*)); + + void CkCUDAHtoD(void *dest, void *src, int nbytes, cudaStream_t stream); + void CkCUDADtoH(void *dest, void *src, int nbytes, cudaStream_t stream); + void CkCUDAStreamSynchronize(cudaStream_t stream); + + cdef extern from "spanningTree.h": void getPETopoTreeEdges(int pe, int rootPE, int *pes, int numpes, unsigned int bfactor, int *parent, int *child_count, int **children); + diff --git a/charm4py/charmlib/charmlib_cython.pyx b/charm4py/charmlib/charmlib_cython.pyx index aad5f323..0183ea10 100644 --- a/charm4py/charmlib/charmlib_cython.pyx +++ b/charm4py/charmlib/charmlib_cython.pyx @@ -1,6 +1,6 @@ from ccharm cimport * from libc.stdlib cimport malloc, free -from libc.string cimport memcpy +from libc.string cimport memcpy, memset from libc.stdint cimport uintptr_t from cpython.version cimport PY_MAJOR_VERSION from cpython.buffer cimport PyObject_GetBuffer, PyBuffer_Release, PyBUF_ANY_CONTIGUOUS, PyBUF_SIMPLE @@ -30,7 +30,7 @@ ELSE: np = NumpyDummyModule() cdef object np_number = np.number - +cdef int CK_DEVICEBUFFER_SIZE_IN_BYTES = CkDeviceBufferSizeInBytes() # ------ global constants ------ @@ -285,19 +285,32 @@ cdef inline object array_index_to_tuple(int ndims, int *arrayIndex): PyTuple_SET_ITEM(arrIndex, i, d) return arrIndex - cdef extern const char * const CmiCommitID +# cdef class PyCkDeviceBuffer: +# cdef CkDeviceBuffer c_buff + +# @staticmethod +# cdef PyCkDeviceBuffer from_ptr(CkDeviceBuffer buf): +# cdef PyCkDeviceBuffer newBuf = PyCkDeviceBuffer.__new__(PyCkDeviceBuffer) +# newBuf.c_buff = buf +# return newBuf + + # supports up to NUM_DCOPY_BUFS direct-copy entry method arguments cdef (char*)[NUM_DCOPY_BUFS] send_bufs # ?TODO bounds checking is needed where this is used cdef int[NUM_DCOPY_BUFS] send_buf_sizes # ?TODO bounds checking is needed where this is used cdef int cur_buf = 1 +cdef int gpu_direct_buf_idx = 0 cdef int[MAX_INDEX_LEN] c_index cdef Py_buffer send_buffer cdef ReceiveMsgBuffer recv_buffer = ReceiveMsgBuffer() cdef c_type_table_typecodes = [None] * 13 cdef int c_type_table_sizes[13] cdef int[SECTION_MAX_BFACTOR] section_children +cdef long[NUM_DCOPY_BUFS] gpu_direct_device_ptrs +cdef int[NUM_DCOPY_BUFS] gpu_direct_buff_sizes +cdef long[NUM_DCOPY_BUFS] gpu_direct_stream_ptrs cdef object charm cdef object charm_reducer_to_ctype @@ -449,6 +462,193 @@ class CharmLib(object): CkGroupExtSend_multi(group_id, num_pes, section_children, ep, cur_buf, send_bufs, send_buf_sizes) cur_buf = 1 + def CkGroupSendWithDeviceData(self, int group_id, int index, int ep, + msg not None, stream_ptrs): + global gpu_direct_buf_idx + cdef int i = 0 + msg0, dcopy = msg + dcopy = None + cdef int num_direct_buffers = gpu_direct_buf_idx + # TODO: Message on assertion failure + assert num_direct_buffers <= NUM_DCOPY_BUFS + global gpu_direct_device_ptrs + global gpu_direct_stream_ptrs + global cur_buf + + if stream_ptrs and isinstance(stream_ptrs, list): + for i in range(num_direct_buffers): + gpu_direct_stream_ptrs[i] = stream_ptrs[i] + elif not stream_ptrs: + memset(gpu_direct_stream_ptrs, 0, sizeof(long) * num_direct_buffers) + + send_bufs[0] = msg0 + send_buf_sizes[0] = len(msg0) + CkGroupExtSendWithDeviceData(group_id, index, ep, + cur_buf, send_bufs, send_buf_sizes, + gpu_direct_device_ptrs, + gpu_direct_buff_sizes, + gpu_direct_stream_ptrs, + num_direct_buffers + ) + cur_buf = 1 + gpu_direct_buf_idx = 0 + + def CkGroupSendWithDeviceDataFromPointersArray(self, int gid, int index, int ep, + msg not None, array.array gpu_src_ptrs, + array.array gpu_src_sizes, stream_ptrs, + num_bufs): + cdef int i = 0 + global cur_buf + msg0, dcopy = msg + dcopy = None + + if stream_ptrs and isinstance(stream_ptrs, list): + for i in range(num_bufs): + gpu_direct_stream_ptrs[i] = stream_ptrs[i] + else: + memset(gpu_direct_stream_ptrs, 0, sizeof(long) * num_bufs) + + send_bufs[0] = msg0 + send_buf_sizes[0] = len(msg0) + CkGroupExtSendWithDeviceData(gid, index, ep, + cur_buf, send_bufs, send_buf_sizes, + gpu_src_ptrs.data.as_voidptr, + gpu_src_sizes.data.as_voidptr, + gpu_direct_stream_ptrs, + num_bufs + ) + cur_buf = 1 + gpu_direct_buf_idx = 0 + + def CkGroupSendWithDeviceDataFromPointersOther(self, int gid, int index, int ep, + msg not None, gpu_src_ptrs, + gpu_src_sizes, stream_ptrs, + num_bufs): + cdef int i = 0 + global cur_buf + msg0, dcopy = msg + dcopy = None + cdef unsigned long[:] gpu_addresses = gpu_src_ptrs + cdef int[:] gpu_buffer_sizes = gpu_src_sizes + + if stream_ptrs and isinstance(stream_ptrs, list): + for i in range(num_bufs): + gpu_direct_stream_ptrs[i] = stream_ptrs[i] + else: + memset(gpu_direct_stream_ptrs, 0, sizeof(long) * num_bufs) + + send_bufs[0] = msg0 + send_buf_sizes[0] = len(msg0) + CkGroupExtSendWithDeviceData(gid, index, ep, + cur_buf, send_bufs, send_buf_sizes, + &gpu_addresses[0], + &gpu_buffer_sizes[0], + gpu_direct_stream_ptrs, + num_bufs + ) + cur_buf = 1 + gpu_direct_buf_idx = 0 + + def CkArraySendWithDeviceData(self, int array_id, index not None, int ep, + msg not None, stream_ptrs): + + global gpu_direct_buf_idx + cdef int i = 0 + cdef int ndims = len(index) + # assert ndims == 1 + for i in range(ndims): c_index[i] = index[i] + msg0, dcopy = msg + dcopy = None + cdef int num_direct_buffers = gpu_direct_buf_idx + # TODO: Message on assertion failure + assert num_direct_buffers <= NUM_DCOPY_BUFS + global gpu_direct_device_ptrs + global gpu_direct_stream_ptrs + global cur_buf + + if stream_ptrs and isinstance(stream_ptrs, list): + for i in range(num_direct_buffers): + gpu_direct_stream_ptrs[i] = stream_ptrs[i] + elif not stream_ptrs: + memset(gpu_direct_stream_ptrs, 0, sizeof(long) * num_direct_buffers) + + send_bufs[0] = msg0 + send_buf_sizes[0] = len(msg0) + CkArrayExtSendWithDeviceData(array_id, c_index, ndims, ep, + cur_buf, send_bufs, send_buf_sizes, + gpu_direct_device_ptrs, + gpu_direct_buff_sizes, + gpu_direct_stream_ptrs, + num_direct_buffers + ) + cur_buf = 1 + gpu_direct_buf_idx = 0 + + def CkArraySendWithDeviceDataFromPointersArray(self, int array_id, index not None, int ep, + msg not None, array.array gpu_src_ptrs, + array.array gpu_src_sizes, + stream_ptrs, int num_bufs): + + cdef int i = 0 + cdef int ndims = len(index) + global cur_buf + for i in range(ndims): c_index[i] = index[i] + msg0, dcopy = msg + dcopy = None + + if stream_ptrs and isinstance(stream_ptrs, list): + for i in range(num_bufs): + gpu_direct_stream_ptrs[i] = stream_ptrs[i] + else: + memset(gpu_direct_stream_ptrs, 0, sizeof(long) * num_bufs) + + send_bufs[0] = msg0 + send_buf_sizes[0] = len(msg0) + CkArrayExtSendWithDeviceData(array_id, c_index, ndims, ep, + cur_buf, send_bufs, send_buf_sizes, + gpu_src_ptrs.data.as_voidptr, + gpu_src_sizes.data.as_voidptr, + gpu_direct_stream_ptrs, + num_bufs + ) + cur_buf = 1 + gpu_direct_buf_idx = 0 + + def CkArraySendWithDeviceDataFromPointersOther(self, int array_id, index not None, int ep, + msg not None, gpu_src_ptrs, + gpu_src_sizes, + stream_ptrs, int num_bufs + ): + cdef int i = 0 + cdef int ndims = len(index) + global cur_buf + for i in range(ndims): c_index[i] = index[i] + msg0, dcopy = msg + dcopy = None + cdef unsigned long[:] gpu_addresses = gpu_src_ptrs + cdef int[:] gpu_buffer_sizes = gpu_src_sizes + + if stream_ptrs and isinstance(stream_ptrs, list): + for i in range(num_bufs): + gpu_direct_stream_ptrs[i] = stream_ptrs[i] + else: + memset(gpu_direct_stream_ptrs, 0, sizeof(long) * num_bufs) + + send_bufs[0] = msg0 + send_buf_sizes[0] = len(msg0) + CkArrayExtSendWithDeviceData(array_id, c_index, ndims, ep, + cur_buf, send_bufs, send_buf_sizes, + &gpu_addresses[0], + &gpu_buffer_sizes[0], + gpu_direct_stream_ptrs, + num_bufs + ) + cur_buf = 1 + gpu_direct_buf_idx = 0 + + def CkCudaEnabled(self): + return bool(CkCudaEnabled()) + def CkArraySend(self, int array_id, index not None, int ep, msg not None): global cur_buf msg0, dcopy = msg @@ -506,6 +706,9 @@ class CharmLib(object): CkRegisterArrayExt(self.chareNames[-1], numEntryMethods, &chareIdx, &startEpIdx) return chareIdx, startEpIdx + def CUDAPointerOnDevice(self, long address): + return CUDAPointerOnDevice(address) + def CkCreateGroup(self, int chareIdx, int epIdx, msg not None): global cur_buf msg0, dcopy = msg @@ -702,12 +905,15 @@ class CharmLib(object): registerReadOnlyRecvExtCallback(recvReadOnly) registerChareMsgRecvExtCallback(recvChareMsg) registerGroupMsgRecvExtCallback(recvGroupMsg) + registerGroupMsgGPUDirectRecvExtCallback(recvGPUDirectGroupMsg) registerArrayMsgRecvExtCallback(recvArrayMsg) + registerArrayMsgGPUDirectRecvExtCallback(recvGPUDirectArrayMsg) registerArrayBcastRecvExtCallback(recvArrayBcast) registerArrayMapProcNumExtCallback(arrayMapProcNum) registerArrayElemJoinExtCallback(arrayElemJoin) registerPyReductionExtCallback(pyReduction) registerCreateCallbackMsgExtCallback(createCallbackMsg) + registerDepositFutureWithIdFn(depositFutureWithId); def CkMyPe(self): return CkMyPeHook() def CkNumPes(self): return CkNumPesHook() @@ -768,12 +974,13 @@ class CharmLib(object): return header, args - def packMsg(self, destObj, msgArgs not None, dict header): + def packMsg(self, destObj, msgArgs not None, dict header, pack_gpu=True): cdef int i = 0 cdef int localTag cdef array.array a IF HAVE_NUMPY: cdef np.ndarray np_array + cuda_dev_info = None dcopy_size = 0 if destObj is not None: # if dest obj is local localTag = destObj.__addLocal__((header, msgArgs)) @@ -783,46 +990,118 @@ class CharmLib(object): msg = emptyMsg else: direct_copy_hdr = [] # goes to header - args = list(msgArgs) global cur_buf + global gpu_direct_buf_idx + global gpu_direct_device_ptrs cur_buf = 1 - for i in range(len(args)): + gpu_direct_buf_idx = 0 + # GPU-direct buffers will not be sent + args_to_send = list() + n_gpu_bufs = 0 + for i in range(len(msgArgs)): arg = msgArgs[i] - if isinstance(arg, np.ndarray) and not arg.dtype.hasobject: + if CkCudaEnabled() and hasattr(arg, '__cuda_array_interface__'): + if pack_gpu: + # we want to take the args that implement the cuda array interface and make them into ckdevicebuffers + # assumption: we can get nbytes from the arg directly + # TODO: verify this assertion for other types + # gpu_direct_device_ptrs[gpu_direct_buf_idx] = arg.__cuda_array_interface__['data'][0] + gpu_direct_device_ptrs[gpu_direct_buf_idx] = arg.__cuda_array_interface__['data'][0] + gpu_direct_buff_sizes[gpu_direct_buf_idx] = arg.nbytes + cuda_dev_info = True + gpu_direct_buf_idx += 1 + n_gpu_bufs += 1 + continue + elif isinstance(arg, np.ndarray) and not arg.dtype.hasobject: np_array = arg nbytes = np_array.nbytes - direct_copy_hdr.append((i, 2, (arg.shape, np_array.dtype.name), nbytes)) + direct_copy_hdr.append((i-n_gpu_bufs, 2, (arg.shape, np_array.dtype.name), nbytes)) send_bufs[cur_buf] = np_array.data elif isinstance(arg, bytes): nbytes = len(arg) - direct_copy_hdr.append((i, 0, (), nbytes)) + direct_copy_hdr.append((i-n_gpu_bufs, 0, (), nbytes)) send_bufs[cur_buf] = arg elif isinstance(arg, array.array): a = arg #nbytes = arg.buffer_info()[1] * arg.itemsize nbytes = len(a) * a.itemsize # NOTE that cython's array C interface doesn't expose itemsize attribute - direct_copy_hdr.append((i, 1, (a.typecode), nbytes)) + direct_copy_hdr.append((i-n_gpu_bufs, 1, (a.typecode), nbytes)) send_bufs[cur_buf] = a.data.as_voidptr else: + args_to_send.append(arg) continue - args[i] = None # will direct-copy this arg so remove from args list + args_to_send.append(None) send_buf_sizes[cur_buf] = nbytes if PROFILING: dcopy_size += nbytes cur_buf += 1 if len(direct_copy_hdr) > 0: header[b'dcopy'] = direct_copy_hdr try: - msg = dumps((header, args), PICKLE_PROTOCOL) + msg = dumps((header, args_to_send), PICKLE_PROTOCOL) except: global cur_buf + global gpu_direct_buf_idx cur_buf = 1 + gpu_direct_buf_idx = 0 raise if PROFILING: charm.recordSend(len(msg) + dcopy_size) - return msg, None + return msg, cuda_dev_info def scheduleTagAfter(self, int tag, double msecs): CcdCallFnAfter(CcdCallFnAfterCallback, tag, msecs) + def getGPUDirectData(self, list post_buf_data, list post_buf_sizes, array.array remote_bufs, list stream_ptrs, return_fut): + cdef int num_buffers = len(post_buf_data) + cdef int future_id = return_fut.fid + cdef array.array long_array_template = array.array('L', []) + cdef array.array int_array_template = array.array('i', []) + cdef array.array recv_buf_sizes + cdef array.array recv_buf_ptrs + # pointers from the remote that we will be issuing Rgets for + # these are pointers to type CkDeviceBuffer + cdef array.array remote_buf_ptrs + + recv_buf_sizes = array.clone(int_array_template, num_buffers, zero=False) + recv_buf_ptrs = array.clone(long_array_template, num_buffers, zero=False) + stream_ptrs_forc = array.clone(long_array_template, num_buffers, zero=False) + remote_buf_ptrs = array.clone(long_array_template, num_buffers, zero=False) + + for idx in range(num_buffers): + recv_buf_ptrs[idx] = post_buf_data[idx] + recv_buf_sizes[idx] = post_buf_sizes[idx] + remote_buf_ptrs[idx] = remote_bufs[idx] + stream_ptrs_forc[idx] = stream_ptrs[idx] + # what do we do about the return future? Need to turn it into some callback. + CkGetGPUDirectData(num_buffers, recv_buf_ptrs.data.as_voidptr, + recv_buf_sizes.data.as_voidptr, + remote_buf_ptrs.data.as_voidptr, + stream_ptrs_forc.data.as_voidptr, + future_id + ) + + def getGPUDirectDataFromAddresses(self, array.array post_buf_ptrs, array.array post_buf_sizes, array.array remote_bufs, array.array stream_ptrs, return_fut): + cdef int num_buffers = len(post_buf_ptrs) + cdef int future_id = return_fut.fid + # pointers from the remote that we will be issuing Rgets for + # these are pointers to type CkDeviceBuffer + CkGetGPUDirectData(num_buffers, post_buf_ptrs.data.as_voidptr, + post_buf_sizes.data.as_voidptr, + remote_bufs.data.as_voidptr, + stream_ptrs.data.as_voidptr, + future_id + ) + def CudaHtoD(self, long destAddr, long srcAddr, int nbytes, long streamAddr): + CkCUDAHtoD(destAddr, srcAddr,nbytes, 0); + + def CudaDtoH(self, long destAddr, long srcAddr, int nbytes, long streamAddr): + CkCUDADtoH(destAddr, srcAddr,int(nbytes), 0); + + def CudaStreamSynchronize(self, long streamAddr): + CkCUDAStreamSynchronize(0) + + + + # first callback from Charm++ shared library cdef void registerMainModule(): try: @@ -864,6 +1143,24 @@ cdef void recvGroupMsg(int gid, int ep, int msgSize, char *msg, int dcopy_start) except: charm.handleGeneralError() +cdef void recvGPUDirectGroupMsg(int gid, int ep, int numDevBuffs, + int *devBufSizes, void *devBufs, int msgSize, + char *msg, int dcopy_start + ): + try: + if PROFILING: + charm._precvtime = time.time() + charm.recordReceive(msgSize) + devBufInfo = array.array('L', [0] * numDevBuffs) + for idx in range(numDevBuffs): + # Add the buffer's address to the list + devBufInfo[idx] = devBufs+(CK_DEVICEBUFFER_SIZE_IN_BYTES*idx) + recv_buffer.setMsg(msg, msgSize) + charm.recvGPUDirectGroupMsg(gid, ep, devBufInfo, recv_buffer, dcopy_start) + except: + charm.handleGeneralError() + + cdef void recvArrayMsg(int aid, int ndims, int *arrayIndex, int ep, int msgSize, char *msg, int dcopy_start): try: if PROFILING: @@ -874,6 +1171,26 @@ cdef void recvArrayMsg(int aid, int ndims, int *arrayIndex, int ep, int msgSize, except: charm.handleGeneralError() +cdef void recvGPUDirectArrayMsg(int aid, int ndims, int *arrayIndex, int ep, int numDevBuffs, + int *devBufSizes, void *devBufs, int msgSize, + char *msg, int dcopy_start): + + cdef int idx = 0 + try: + if PROFILING: + charm._precvtime = time.time() + charm.recordReceive(msgSize) + devBufInfo = array.array('L', [0] * numDevBuffs) + for idx in range(numDevBuffs): + # Add the buffer's address to the list + devBufInfo[idx] = devBufs+(CK_DEVICEBUFFER_SIZE_IN_BYTES*idx) + recv_buffer.setMsg(msg, msgSize) + # TODO: Can this be the same for array and groups? + charm.recvGPUDirectArrayMsg(aid, array_index_to_tuple(ndims, arrayIndex), ep, devBufInfo, recv_buffer, dcopy_start) + + except: + charm.handleGeneralError() + cdef void recvArrayBcast(int aid, int ndims, int nInts, int numElems, int *arrayIndexes, int ep, int msgSize, char *msg, int dcopy_start): cdef int i = 0 try: @@ -931,6 +1248,11 @@ cdef void resumeFromSync(int aid, int ndims, int *arrayIndex): except: charm.handleGeneralError() +cdef void depositFutureWithId(void *param, void *msg): + cdef int futureId = param + charm._future_deposit_result(futureId) + + cdef void createCallbackMsg(void *data, int dataSize, int reducerType, int fid, int *sectionInfo, char **returnBuffers, int *returnBufferSizes): cdef int numElems @@ -1056,3 +1378,4 @@ cdef void CcdCallFnAfterCallback(void *userParam, double curWallTime): charm.triggerCallable(userParam) except: charm.handleGeneralError() + diff --git a/charm4py/threads.py b/charm4py/threads.py index e05678cf..fd7cec3e 100644 --- a/charm4py/threads.py +++ b/charm4py/threads.py @@ -166,16 +166,21 @@ def pauseThread(self): if gr.notify: obj = gr.obj obj._thread_notify_target.threadPaused(obj._thread_notify_data) - if gr.parent != main_gr: + if False and gr.parent != main_gr: # this can happen with threaded chare constructors that are called # "inline" by Charm++ on the PE where the collection is created. # Initially it will switch back to the parent thread, but after that # we make the parent to be the main thread + # try: + # if gr.fu: + # return main_gr.switch() + # except: parent = gr.parent gr.parent = main_gr return parent.switch() else: - return main_gr.switch() + ret_val = main_gr.switch() + return ret_val def _resumeThread(self, gr, arg): """ Deposit a result or signal that a local entry method thread is waiting on, diff --git a/examples/cuda/gpudirect/jacobi3d/block.py b/examples/cuda/gpudirect/jacobi3d/block.py new file mode 100644 index 00000000..4732ecb7 --- /dev/null +++ b/examples/cuda/gpudirect/jacobi3d/block.py @@ -0,0 +1,367 @@ +from charm4py import * +import array +from numba import cuda +import numpy as np +import time +import kernels + +def getArrayAddress(arr): + return arr.__cuda_array_interface__['data'][0] + +def getArraySize(arr): + return arr.nbytes + +def getArrayData(arr): + return (getArrayAddress(arr), getArraySize(arr)) + +class Block(Chare): + def __init__(self, init_done_future): + self.my_iter = 0 + self.neighbors = 0 + self.remote_count = 0 + self.x = self.thisIndex[0] + self.y = self.thisIndex[1] + self.z = self.thisIndex[2] + + self.ghost_sizes = (x_surf_size, x_surf_size, + y_surf_size, y_surf_size, + z_surf_size, z_surf_size + ) + + self.ghost_counts = (x_surf_count, x_surf_count, + y_surf_count, y_surf_count, + z_surf_count, z_surf_count + ) + + self.bounds = [False] * kernels.DIR_COUNT + + empty = lambda x: [0] * x + + self.neighbor_channels = empty(kernels.DIR_COUNT) + self.acive_neighbor_channels = None + + self.h_temperature = None + self.d_temperature = None + self.d_new_temperature = None + self.h_ghosts = empty(kernels.DIR_COUNT) + self.d_ghosts = empty(kernels.DIR_COUNT) + self.d_send_ghosts = empty(kernels.DIR_COUNT) + self.d_recv_ghosts = empty(kernels.DIR_COUNT) + self.d_ghosts_addr = empty(kernels.DIR_COUNT) + self.d_send_ghosts_addr = empty(kernels.DIR_COUNT) + self.d_recv_ghosts_addr = empty(kernels.DIR_COUNT) + self.d_send_ghosts_size = empty(kernels.DIR_COUNT) + self.d_recv_ghosts_size = empty(kernels.DIR_COUNT) + + self.stream = cuda.default_stream() + + self.init() + + self.reduce(init_done_future) + + def init(self): + self.init_bounds(self.x, self.y, self.z) + self.init_device_data() + self.init_neighbor_channels() + + def init_neighbor_channels(self): + n_channels = self.neighbors + active_neighbors = [] + + if not self.bounds[kernels.LEFT]: + new_c = Channel(self, self.thisProxy[(self.x-1, self.y, self.z)]) + self.neighbor_channels[kernels.LEFT] = new_c + # NOTE: we are adding the member 'recv_direction' to this channel!!! + new_c.recv_direction = kernels.LEFT + active_neighbors.append(new_c) + + if not self.bounds[kernels.RIGHT]: + new_c = Channel(self, self.thisProxy[(self.x+1, self.y, self.z)]) + self.neighbor_channels[kernels.RIGHT] = new_c + new_c.recv_direction = kernels.RIGHT + active_neighbors.append(new_c) + + if not self.bounds[kernels.TOP]: + new_c = Channel(self, self.thisProxy[(self.x, self.y-1, self.z)]) + self.neighbor_channels[kernels.TOP] = new_c + new_c.recv_direction = kernels.TOP + active_neighbors.append(new_c) + + if not self.bounds[kernels.BOTTOM]: + new_c = Channel(self, self.thisProxy[(self.x, self.y+1, self.z)]) + self.neighbor_channels[kernels.BOTTOM] = new_c + new_c.recv_direction = kernels.BOTTOM + active_neighbors.append(new_c) + + if not self.bounds[kernels.FRONT]: + new_c = Channel(self, self.thisProxy[(self.x, self.y, self.z-1)]) + self.neighbor_channels[kernels.FRONT] = new_c + new_c.recv_direction = kernels.FRONT + active_neighbors.append(new_c) + + if not self.bounds[kernels.BACK]: + new_c = Channel(self, self.thisProxy[(self.x, self.y, self.z+1)]) + self.neighbor_channels[kernels.BACK] = new_c + new_c.recv_direction = kernels.BACK + active_neighbors.append(new_c) + + self.active_neighbor_channels = active_neighbors + + def init_device_data(self): + temp_size = (block_width+2) * (block_height+2) * (block_depth+2) + self.h_temperature = cuda.pinned_array(temp_size, dtype=np.float64) + self.d_temperature = cuda.device_array(temp_size, dtype=np.float64) + self.d_new_temperature = cuda.device_array(temp_size, dtype=np.float64) + + if use_zerocopy: + for i in range(kernels.DIR_COUNT): + self.d_send_ghosts[i] = cuda.device_array(self.ghost_sizes[i], + dtype=np.float64 + ) + self.d_recv_ghosts[i] = cuda.device_array(self.ghost_sizes[i], + dtype=np.float64 + ) + + d_send_data = getArrayData(self.d_send_ghosts[i]) + d_recv_data = getArrayData(self.d_send_ghosts[i]) + + d_send_addr = array.array('L', [d_send_data[0]]) + d_recv_addr = array.array('L', [d_recv_data[0]]) + + d_send_size = array.array('L', [d_send_data[1]]) + d_recv_size = array.array('L', [d_recv_data[1]]) + + self.d_send_ghosts_addr[i] = d_send_addr + self.d_recv_ghosts_addr[i] = d_recv_addr + + self.d_send_ghosts_size[i] = d_send_size + self.d_recv_ghosts_size[i] = d_recv_size + else: + for i in range(kernels.DIR_COUNT): + self.h_ghosts[i] = cuda.pinned_array(self.ghost_sizes[i], + dtype=np.float64 + ) + self.d_ghosts[i] = cuda.device_array(self.ghost_sizes[i], + dtype=np.float64 + ) + + kernels.invokeInitKernel(self.d_temperature, block_width, block_height, block_depth, + self.stream + ) + kernels.invokeInitKernel(self.d_new_temperature, block_width, block_height, block_depth, + self.stream + ) + if use_zerocopy: + kernels.invokeGhostInitKernels(self.d_send_ghosts, + self.ghost_counts, + self.stream + ) + kernels.invokeGhostInitKernels(self.d_recv_ghosts, + self.ghost_counts, + self.stream + ) + else: + kernels.invokeGhostInitKernels(self.d_ghosts, + self.ghost_counts, + self.stream + ) + for i in range(kernels.DIR_COUNT): + self.h_ghosts[i].fill(0) + + kernels.invokeBoundaryKernels(self.d_temperature, + block_width, + block_height, + block_depth, + self.bounds, + self.stream + ) + kernels.invokeBoundaryKernels(self.d_new_temperature, + block_width, + block_height, + block_depth, + self.bounds, + self.stream + ) + self.stream.synchronize() + + + def init_bounds(self, x, y, z): + neighbors = 0 + + if x == 0: + self.bounds[kernels.LEFT] = True + else: + neighbors += 1 + if x == n_chares_x - 1: + self.bounds[kernels.RIGHT] = True + else: + neighbors += 1 + if y == 0: + self.bounds[kernels.TOP] = True + else: + neighbors += 1 + if y == n_chares_y - 1: + self.bounds[kernels.BOTTOM] = True + else: + neighbors += 1 + if z == 0: + self.bounds[kernels.FRONT] = True + else: + neighbors += 1 + if z == n_chares_z - 1: + self.bounds[kernels.BACK] = True + else: + neighbors += 1 + + self.neighbors = neighbors + + + def updateAndPack(self): + kernels.invokeJacobiKernel(self.d_temperature, + self.d_new_temperature, + block_width, + block_height, + block_depth, + self.stream + ) + + for i in range(kernels.DIR_COUNT): + if not self.bounds[i]: + ghosts = self.d_send_ghosts[i] if use_zerocopy else self.d_ghosts[i] + + kernels.invokePackingKernel(self.d_temperature, + ghosts, + i, + block_width, + block_height, + block_depth, + self.stream + ) + if not use_zerocopy: + #self.d_ghosts[i].copy_to_host(self.h_ghosts[i], self.stream) + charm.lib.CudaDtoH(self.h_ghosts[i].__array_interface__['data'][0], + self.d_ghosts[i].__cuda_array_interface__['data'][0], + self.d_ghosts[i].nbytes, 0 + ) + + self.stream.synchronize() + + def sendGhosts(self): + count = 0 + for dir in range(kernels.DIR_COUNT): + if not self.bounds[dir]: + self.sendGhost(dir) + count += 1 + + def sendGhost(self, direction): + send_ch = self.neighbor_channels[direction] + + if use_zerocopy: + send_ch.send(gpu_src_ptrs = self.d_send_ghosts_addr[direction], + gpu_src_sizes = self.d_send_ghosts_size[direction] + ) + else: + send_ch.send(self.h_ghosts[direction]) + + def recvGhosts(self): + charm.iwait_map(self.recvGhost, self.active_neighbor_channels) + # for ch in charm.iwait(self.active_neighbor_channels): + # # remember: we set 'recv_direction' member + # # directly in the initialization phase + # neighbor_idx = ch.recv_direction + + # if use_zerocopy: + # ch.recv(post_buf_addresses = self.d_recv_ghosts_addr[neighbor_idx], + # post_buf_sizes = self.d_recv_ghosts_size[neighbor_idx] + # ) + # recv_ghost = self.d_recv_ghosts[neighbor_idx] + # else: + # self.h_ghosts[neighbor_idx] = ch.recv() + # ''' + # self.d_ghosts[neighbor_idx].copy_to_device(self.h_ghosts[neighbor_idx], + # stream=self.stream + # ) + # ''' + # charm.lib.CudaHtoD(self.d_ghosts[neighbor_idx].__cuda_array_interface__['data'][0], + # self.h_ghosts[neighbor_idx].__array_interface__['data'][0], + # self.d_ghosts[neighbor_idx].nbytes, 0 + # ) + # recv_ghost = self.d_ghosts[neighbor_idx] + + # kernels.invokeUnpackingKernel(self.d_temperature, + # recv_ghost, + # ch.recv_direction, + # block_width, + # block_height, + # block_depth, + # self.stream + # ) + # # Not using charm.iwait as it errors out with more than 2 processes + # for dir in range(kernels.DIR_COUNT): + # if not self.bounds[dir]: + # self.recvGhost(dir) + + def recvGhost(self, recv_ch): + direction = recv_ch.recv_direction + if use_zerocopy: + f = recv_ch.recv(post_buf_addresses = self.d_recv_ghosts_addr[direction], + post_buf_sizes = self.d_recv_ghosts_size[direction] + ) + recv_ghost = self.d_recv_ghosts[direction] + else: + self.h_ghosts[direction] = recv_ch.recv() + ''' + self.d_ghosts[direction].copy_to_device(self.h_ghosts[direction], + stream=self.stream + ) + ''' + charm.lib.CudaHtoD(self.d_ghosts[direction].__cuda_array_interface__['data'][0], + self.h_ghosts[direction].__array_interface__['data'][0], + self.d_ghosts[direction].nbytes, 0 + ) + recv_ghost = self.d_ghosts[direction] + + kernels.invokeUnpackingKernel(self.d_temperature, + recv_ghost, + direction, + block_width, + block_height, + block_depth, + self.stream + ) + self.stream.synchronize() + + # @coro + def exchangeGhosts(self): + self.d_temperature, self.d_new_temperature = \ + self.d_new_temperature, self.d_temperature + + self.sendGhosts() + self.recvGhosts() + + @coro + def run(self, done_future): + tstart = time.time() + comm_time = 0 + for current_iter in range(n_iters + warmup_iters): + if current_iter == warmup_iters: + tstart = time.time() + + self.my_iter = current_iter + self.updateAndPack() + + comm_start_time = time.time() + + self.exchangeGhosts() + + if current_iter >= warmup_iters: + comm_time += time.time() - comm_start_time + + tend = time.time() + + if self.thisIndex == (0, 0, 0): + elapsed_time = tend-tstart + print(f'Elapsed time: {round(elapsed_time,3)} s') + print(f'Average time per iteration: {round(((elapsed_time/n_iters)*1e3),3)} ms') + print(f'Communication time per iteration: {round(((comm_time/n_iters)*1e3),3)} ms') + self.reduce(done_future) diff --git a/examples/cuda/gpudirect/jacobi3d/jacobi3d.py b/examples/cuda/gpudirect/jacobi3d/jacobi3d.py new file mode 100644 index 00000000..b3f9bcd7 --- /dev/null +++ b/examples/cuda/gpudirect/jacobi3d/jacobi3d.py @@ -0,0 +1,174 @@ +from charm4py import * +from numba import cuda +import numpy as np +from argparse import ArgumentParser +from enum import Enum +from functools import reduce +from block import Block +import time + +class Defaults(Enum): + GRID_WIDTH = 512 + GRID_HEIGHT = 512 + GRID_DEPTH = 512 + NUM_ITERS = 100 + WARMUP_ITERS = 10 + USE_ZEROCOPY = False + PRINT_ELEMENTS = False + + +def main(args): + Defaults.NUM_CHARES = charm.numPes() + argp = ArgumentParser(description ="Jacobi3D implementation in Charm4Py using " + "CUDA and GPU-Direct communication" + ) + argp.add_argument('-x', '--grid_width', help = "Grid width", + type = int, + default = Defaults.GRID_WIDTH.value + ) + argp.add_argument('-y', '--grid_height', help = "Grid height", + type = int, + default = Defaults.GRID_HEIGHT.value + ) + argp.add_argument('-z', '--grid_depth', help = "Grid depth", + type = int, + default = Defaults.GRID_DEPTH.value + ) + argp.add_argument('-c', '--num_chares', help = "Number of chares", + type = int, + default = Defaults.NUM_CHARES + ) + argp.add_argument('-i', '--iterations', help = "Number of iterations", + type = int, + default = Defaults.NUM_ITERS.value + ) + argp.add_argument('-w', '--warmup_iterations', help = "Number of warmup iterations", + type = int, + default = Defaults.WARMUP_ITERS.value + ) + argp.add_argument('-d', '--use_zerocopy', action = "store_true", + help = "Use zerocopy when performing data transfers", + default = Defaults.USE_ZEROCOPY.value + ) + argp.add_argument('-p', '--print_blocks', help = "Print blocks", + action = "store_true", + default = Defaults.PRINT_ELEMENTS.value + ) + + # only parse the known args because argparse sees the Charm++ args for some reason + args, _ = argp.parse_known_args() + + grid_width = args.grid_width + grid_height = args.grid_height + grid_depth = args.grid_depth + num_chares = args.num_chares + n_iters = args.iterations + warmup_iters = args.warmup_iterations + use_zerocopy = args.use_zerocopy + print_elements = args.print_blocks + + + num_chares_per_dim = calc_num_chares_per_dim(num_chares, + grid_width, + grid_height, + grid_depth + ) + n_chares_x, n_chares_y, n_chares_z = num_chares_per_dim + + if reduce(lambda x, y: x*y, num_chares_per_dim) != num_chares: + print(f"ERROR: Bad grid of chares: {n_chares_x} x {n_chares_y} x " + f"{n_chares_z} != {num_chares}" + ) + charm.exit(-1) + + # Calculate block size + block_width = grid_width // n_chares_x + block_height = grid_height // n_chares_y + block_depth = grid_depth // n_chares_z + + # Calculate surf count, sizes + x_surf_count = block_height * block_depth + y_surf_count = block_width * block_depth + z_surf_count = block_width * block_height + x_surf_size = x_surf_count * np.dtype(np.float64).itemsize + y_surf_size = y_surf_count * np.dtype(np.float64).itemsize + z_surf_size = z_surf_count * np.dtype(np.float64).itemsize + + + # print configuration + print("\n[CUDA 3D Jacobi example]\n") + print(f"Grid: {grid_width} x {grid_height} x {grid_depth}, " + f"Block: {block_width} x {block_height} x {block_depth}, " + f"Chares: {n_chares_x} x {n_chares_y} x {n_chares_z}, " + f"Iterations: {n_iters}, Warm-up: {warmup_iters}, " + f"Zerocopy: {use_zerocopy}, Print: {print_elements}\n\n", + ) + + charm.thisProxy.updateGlobals({'num_chares': num_chares, + 'grid_width': grid_width, + 'grid_height': grid_height, + 'grid_depth': grid_depth, + 'block_width': block_width, + 'block_height': block_height, + 'block_depth': block_depth, + 'x_surf_count': x_surf_count, + 'y_surf_count': y_surf_count, + 'z_surf_count': z_surf_count, + 'x_surf_size': x_surf_size, + 'y_surf_size': y_surf_size, + 'z_surf_size': z_surf_size, + 'n_chares_x': n_chares_x, + 'n_chares_y': n_chares_y, + 'n_chares_z': n_chares_z, + 'n_iters': n_iters, + 'warmup_iters': warmup_iters, + 'use_zerocopy': use_zerocopy, + 'print_elements': print_elements + }, awaitable = True, module_name = 'block' + ).get() + + init_done_future = Future() + run_future = Future() + block_proxy = Array(Block, + dims=[n_chares_x, n_chares_y, n_chares_z], + args = [init_done_future] + ) + init_done_future.get() + + block_proxy.run(run_future) + run_future.get() + charm.exit() + + +def calc_num_chares_per_dim(num_chares, grid_w, grid_h, grid_d): + n_chares = [0, 0, 0] + area = [0.0, 0.0, 0.0] + area[0] = grid_w * grid_h + area[1] = grid_w * grid_d + area[2] = grid_h * grid_d + + bestsurf = 2.0 * sum(area) + + ipx = 1 + + while ipx <= num_chares: + if not num_chares % ipx: + nremain = num_chares // ipx + ipy = 1 + + while ipy <= nremain: + if not nremain % ipy: + ipz = nremain // ipy + surf = area[0] / ipx / ipy + area[1] / ipx / ipz + area[2] / ipy / ipz + + if surf < bestsurf: + bestsurf = surf + n_chares[0] = ipx + n_chares[1] = ipy + n_chares[2] = ipz + ipy += 1 + ipx += 1 + + return n_chares + +charm.start(main, modules = ['block']) diff --git a/examples/cuda/gpudirect/jacobi3d/kernels.py b/examples/cuda/gpudirect/jacobi3d/kernels.py new file mode 100644 index 00000000..ce489757 --- /dev/null +++ b/examples/cuda/gpudirect/jacobi3d/kernels.py @@ -0,0 +1,396 @@ +from numba import cuda + +TILE_SIZE_3D = 8 +TILE_SIZE_2D = 16 + +LEFT = 0 +RIGHT = 1 +TOP = 2 +BOTTOM = 3 +FRONT = 4 +BACK = 5 +DIR_COUNT = 6 + +@cuda.jit(device=True) +def IDX(i,j,k, block_width, block_height): + return ((block_width+2)*(block_height+2)*(k)+(block_width+2)*(j)+(i)) + +@cuda.jit +def initKernel(temperature, block_width, block_height, block_depth): + i = cuda.blockDim.x * cuda.blockIdx.x + cuda.threadIdx.x + j = cuda.blockDim.y * cuda.blockIdx.y + cuda.threadIdx.y + k = cuda.blockDim.z * cuda.blockIdx.z + cuda.threadIdx.z + + if i < block_width + 2 and j < block_height + 2 and k < block_depth + 2: + temperature[IDX(i, j, k, block_width, block_height)] = 0 + +@cuda.jit +def ghostInitKernel(ghost, ghost_count): + i = cuda.blockDim.x * cuda.blockIdx.x + cuda.threadIdx.x + if i < ghost_count: + ghost[i] = 0 + +@cuda.jit +def leftBoundaryKernel(temperature, block_width, block_height, block_depth): + j = cuda.blockDim.x * cuda.blockIdx.x + cuda.threadIdx.x + k = cuda.blockDim.y * cuda.blockIdx.y + cuda.threadIdx.y + if j < block_height and k < block_depth: + temperature[IDX(0,1+j,1+k, block_width, block_height)] = 1; + +@cuda.jit +def rightBoundaryKernel(temperature, block_width, block_height, block_depth): + j = cuda.blockDim.x*cuda.blockIdx.x+cuda.threadIdx.x + k = cuda.blockDim.y*cuda.blockIdx.y+cuda.threadIdx.y + if j < block_height and k < block_depth: + temperature[IDX(block_width+1,1+j,1+k, block_width, block_height)] = 1; + + +@cuda.jit +def topBoundaryKernel(temperature, block_width, block_height, block_depth): + i = cuda.blockDim.x*cuda.blockIdx.x+cuda.threadIdx.x + k = cuda.blockDim.y*cuda.blockIdx.y+cuda.threadIdx.y + if i < block_width and k < block_depth: + temperature[IDX(1+i,0,1+k, block_width, block_height)] = 1 + + +@cuda.jit +def bottomBoundaryKernel(temperature, block_width, block_height, block_depth): + i = cuda.blockDim.x*cuda.blockIdx.x+cuda.threadIdx.x + k = cuda.blockDim.y*cuda.blockIdx.y+cuda.threadIdx.y + if i < block_width and k < block_depth: + temperature[IDX(1+i,block_height+1,1+k, block_width, block_height)] = 1 + +@cuda.jit +def frontBoundaryKernel(temperature, block_width, block_height, block_depth): + i = cuda.blockDim.x*cuda.blockIdx.x+cuda.threadIdx.x + j = cuda.blockDim.y*cuda.blockIdx.y+cuda.threadIdx.y + if i < block_width and j < block_height: + temperature[IDX(1+i,1+j,0, block_width, block_height)] = 1; + + +@cuda.jit +def backBoundaryKernel(temperature, block_width, block_height, block_depth): + i = cuda.blockDim.x*cuda.blockIdx.x+cuda.threadIdx.x + j = cuda.blockDim.y*cuda.blockIdx.y+cuda.threadIdx.y + if i < block_width and j < block_height: + temperature[IDX(1+i,1+j,block_depth+1, block_width, block_height)] = 1 + +@cuda.jit +def jacobiKernel(temperature, new_temperature, block_width, block_height, block_depth): + i = (cuda.blockDim.x*cuda.blockIdx.x+cuda.threadIdx.x)+1 + j = (cuda.blockDim.y*cuda.blockIdx.y+cuda.threadIdx.y)+1 + k = (cuda.blockDim.z*cuda.blockIdx.z+cuda.threadIdx.z)+1 + + if (i <= block_width and j <= block_height and k <= block_depth): + new_temperature[IDX(i,j,k, block_width, block_height)] = \ + (temperature[IDX(i,j,k, block_width, block_height)] + \ + temperature[IDX(i-1,j,k, block_width, block_height)] + \ + temperature[IDX(i+1,j,k, block_width, block_height)] + \ + temperature[IDX(i,j-1,k, block_width, block_height)] + \ + temperature[IDX(i,j+1,k, block_width, block_height)] + \ + temperature[IDX(i,j,k-1, block_width, block_height)] + \ + temperature[IDX(i,j,k+1, block_width, block_height)]) * \ + 0.142857 # equivalent to dividing by 7 + +@cuda.jit +def leftPackingKernel(temperature, ghost, block_width, block_height, block_depth): + j = cuda.blockDim.x*cuda.blockIdx.x+cuda.threadIdx.x; + k = cuda.blockDim.y*cuda.blockIdx.y+cuda.threadIdx.y; + if j < block_height and k < block_depth: + ghost[block_height*k+j] = \ + temperature[IDX(1,1+j,1+k, block_width, block_height)] + +@cuda.jit +def rightPackingKernel(temperature, ghost, block_width, block_height, block_depth): + j = cuda.blockDim.x*cuda.blockIdx.x+cuda.threadIdx.x + k = cuda.blockDim.y*cuda.blockIdx.y+cuda.threadIdx.y + if j < block_height and k < block_depth: + ghost[block_height*k+j] = \ + temperature[IDX(1,1+j,1+k, block_width, block_height)] + +@cuda.jit +def topPackingKernel(temperature, ghost, block_width, block_height, block_depth): + i = cuda.blockDim.x*cuda.blockIdx.x+cuda.threadIdx.x + k = cuda.blockDim.y*cuda.blockIdx.y+cuda.threadIdx.y + if i < block_width and k < block_depth: + ghost[block_width*k+i] = \ + temperature[IDX(1+i,1,1+k, block_width, block_height)] + +@cuda.jit +def bottomPackingKernel(temperature, ghost, block_width, block_height, block_depth): + i = cuda.blockDim.x*cuda.blockIdx.x+cuda.threadIdx.x + k = cuda.blockDim.y*cuda.blockIdx.y+cuda.threadIdx.y + if i < block_width and k < block_depth: + ghost[block_width*k+i] = \ + temperature[IDX(1+i,block_height,1+k, block_width, block_height)]; + +@cuda.jit +def frontPackingKernel(temperature, ghost, block_width, block_height, block_depth): + i = cuda.blockDim.x*cuda.blockIdx.x+cuda.threadIdx.x + j = cuda.blockDim.y*cuda.blockIdx.y+cuda.threadIdx.y + if i < block_width and j < block_height: + temperature[IDX(1+i,1+j,0, block_width, block_height)] = \ + ghost[block_width*j+i] + +@cuda.jit +def backPackingKernel(temperature, ghost, block_width, block_height, block_depth): + i = cuda.blockDim.x*cuda.blockIdx.x+cuda.threadIdx.x + j = cuda.blockDim.y*cuda.blockIdx.y+cuda.threadIdx.y + if i < block_width and j < block_height: + temperature[IDX(1+i,1+j,block_depth+1, block_width, block_height)] = \ + ghost[block_width*j+i] + + +@cuda.jit +def leftUnpackingKernel(temperature, ghost, block_width, block_height, block_depth): + j = cuda.blockDim.x*cuda.blockIdx.x+cuda.threadIdx.x + k = cuda.blockDim.y*cuda.blockIdx.y+cuda.threadIdx.y + if j < block_height and k < block_depth: + temperature[IDX(0,1+j,1+k, block_width, block_height)] = ghost[block_height*k+j] + + + +@cuda.jit +def rightUnpackingKernel(temperature, ghost, block_width, block_height, block_depth): + j = cuda.blockDim.x*cuda.blockIdx.x+cuda.threadIdx.x + k = cuda.blockDim.y*cuda.blockIdx.y+cuda.threadIdx.y + if j < block_height and k < block_depth: + temperature[IDX(block_width+1,1+j,1+k, block_width, block_height)] = ghost[block_height*k+j] + +@cuda.jit +def topUnpackingKernel(temperature, ghost, block_width, block_height, block_depth): + i = cuda.blockDim.x*cuda.blockIdx.x+cuda.threadIdx.x + k = cuda.blockDim.y*cuda.blockIdx.y+cuda.threadIdx.y + if i < block_width and k < block_depth: + temperature[IDX(1+i,0,1+k, block_width, block_height)] = ghost[block_width*k+i] + +@cuda.jit +def bottomUnpackingKernel(temperature, ghost, block_width, block_height, block_depth): + i = cuda.blockDim.x*cuda.blockIdx.x+cuda.threadIdx.x + k = cuda.blockDim.y*cuda.blockIdx.y+cuda.threadIdx.y + if i < block_width and k < block_depth: + temperature[IDX(1+i,block_height+1,1+k, block_width, block_height)] = ghost[block_width*k+i] + +@cuda.jit +def frontUnpackingKernel(temperature, ghost, block_width, block_height, block_depth): + i = cuda.blockDim.x*cuda.blockIdx.x+cuda.threadIdx.x + j = cuda.blockDim.y*cuda.blockIdx.y+cuda.threadIdx.y + if i < block_width and j < block_height: + temperature[IDX(1+i,1+j,0, block_width, block_height)] = ghost[block_width*j+i] + +@cuda.jit +def backUnpackingKernel(temperature, ghost, block_width, block_height, block_depth): + i = cuda.blockDim.x*cuda.blockIdx.x+cuda.threadIdx.x + j = cuda.blockDim.y*cuda.blockIdx.y+cuda.threadIdx.y + if i < block_width and j < block_height: + temperature[IDX(1+i,1+j,block_depth+1, block_width, block_height)] = ghost[block_width*j+i] + +def invokeInitKernel(temp_dev_array, block_width, block_height, block_depth, stream): + block_dim = (TILE_SIZE_3D, TILE_SIZE_3D, TILE_SIZE_3D) + grid_dim = (((block_width+2)+(block_dim[0]-1))//block_dim[0], # x + ((block_height+2)+(block_dim[1]-1))//block_dim[1], # y + ((block_depth+2)+(block_dim[2]-1))//block_dim[2]) # z + + initKernel[grid_dim, block_dim, stream](temp_dev_array, + block_width, block_height, + block_depth) + + +def invokeGhostInitKernels(ghosts, ghost_counts, stream): + #TODO: this fn will probably have to change if the ghosts/counts can't + # be transferred automatically + # https://docs.nvidia.com/cuda/cuda-c-programming-guide/#dim3 + block_dim = (256, 1, 1) + for i in range(len(ghosts)): + ghost = ghosts[i] + ghost_count = ghost_counts[i] + grid_dim = ((ghost_count+block_dim[0]-1)//block_dim[0], 1, 1) + + ghostInitKernel[grid_dim, block_dim, stream](ghost, ghost_count) + +def invokeBoundaryKernels(d_temperature, block_width, block_height, block_depth, bounds, stream): + block_dim = (TILE_SIZE_2D, TILE_SIZE_2D, 1) + + if bounds[LEFT]: + grid_dim = ((block_height+(block_dim[0]-1))//block_dim[0], + (block_depth+(block_dim[1]-1))//block_dim[1], 1) + leftBoundaryKernel[grid_dim, block_dim, stream](d_temperature, + block_width, + block_height, + block_depth + ) + if bounds[RIGHT]: + grid_dim = ((block_height+(block_dim[0]-1))//block_dim[0], + (block_depth+(block_dim[1]-1))//block_dim[1], 1) + rightBoundaryKernel[grid_dim, block_dim, stream](d_temperature, + block_width, + block_height, + block_depth + ) + + if bounds[TOP]: + grid_dim = ((block_width+(block_dim[0]-1))//block_dim[0], + (block_depth+(block_dim[1]-1))//block_dim[1], 1) + topBoundaryKernel[grid_dim, block_dim, stream](d_temperature, + block_width, + block_height, + block_depth + ) + + if bounds[BOTTOM]: + grid_dim = ((block_width+(block_dim[0]-1))//block_dim[0], + (block_depth+(block_dim[1]-1))//block_dim[1], 1) + bottomBoundaryKernel[grid_dim, block_dim, stream](d_temperature, + block_width, + block_height, + block_depth + ) + + if bounds[FRONT]: + grid_dim = ((block_width+(block_dim[0]-1))//block_dim[0], + (block_height+(block_dim[1]-1))//block_dim[1], 1) + frontBoundaryKernel[grid_dim, block_dim, stream](d_temperature, + block_width, + block_height, + block_depth + ) + + if bounds[BACK]: + grid_dim = ((block_width+(block_dim[0]-1))//block_dim[0], + (block_height+(block_dim[1]-1))//block_dim[1], 1) + backBoundaryKernel[grid_dim, block_dim, stream](d_temperature, + block_width, + block_height, + block_depth + ) + + +def invokeJacobiKernel(d_temperature, d_new_temperature, block_width, block_height, block_depth, stream): + block_dim = (TILE_SIZE_3D, TILE_SIZE_3D, TILE_SIZE_3D) + grid_dim = ((block_width+(block_dim[0]-1))//block_dim[0], + (block_height+(block_dim[1]-1))//block_dim[1], + (block_depth+(block_dim[2]-1))//block_dim[2]) + + jacobiKernel[grid_dim, block_dim, stream](d_temperature, + d_new_temperature, + block_width, + block_height, + block_depth + ) + + +def invokePackingKernel(d_temperature, d_ghost, dir, block_width, block_height, block_depth, stream): + block_dim = (TILE_SIZE_2D, TILE_SIZE_2D, 1) + + if dir == LEFT: + grid_dim = ((block_height+(block_dim[0]-1))//block_dim[0], + (block_depth+(block_dim[1]-1))//block_dim[1], 1) + leftPackingKernel[grid_dim, block_dim, stream](d_temperature, + d_ghost, + block_width, + block_height, + block_depth + ) + elif dir == RIGHT: + grid_dim = ((block_height+(block_dim[0]-1))//block_dim[0], + (block_depth+(block_dim[1]-1))//block_dim[1], 1) + rightPackingKernel[grid_dim, block_dim, stream](d_temperature, + d_ghost, + block_width, + block_height, + block_depth + ) + elif dir == TOP: + grid_dim = ((block_width+(block_dim[0]-1))//block_dim[0], + (block_depth+(block_dim[1]-1))//block_dim[1], 1) + topPackingKernel[grid_dim, block_dim, stream](d_temperature, + d_ghost, + block_width, + block_height, + block_depth + ) + elif dir == BOTTOM: + grid_dim = ((block_width+(block_dim[0]-1))//block_dim[0], + (block_depth+(block_dim[1]-1))//block_dim[1], 1) + bottomPackingKernel[grid_dim, block_dim, stream](d_temperature, + d_ghost, + block_width, + block_height, + block_depth + ) + elif dir == FRONT: + grid_dim = ((block_width+(block_dim[0]-1))//block_dim[0], + (block_height+(block_dim[1]-1))//block_dim[1], 1) + frontPackingKernel[grid_dim, block_dim, stream](d_temperature, + d_ghost, + block_width, + block_height, + block_depth + ) + elif dir == BACK: + grid_dim = ((block_width+(block_dim[0]-1))//block_dim[0], + (block_height+(block_dim[1]-1))//block_dim[1], 1) + backPackingKernel[grid_dim, block_dim, stream](d_temperature, + d_ghost, + block_width, + block_height, + block_depth + ) + +def invokeUnpackingKernel(d_temperature, d_ghost, dir, block_width, block_height, block_depth, stream): + block_dim = (TILE_SIZE_2D, TILE_SIZE_2D, 1) + + if dir == LEFT: + grid_dim = ((block_height+(block_dim[0]-1))//block_dim[0], + (block_depth+(block_dim[1]-1))//block_dim[1], 1) + leftUnpackingKernel[grid_dim, block_dim, stream](d_temperature, + d_ghost, + block_width, + block_height, + block_depth + ) + if dir == RIGHT: + grid_dim = ((block_height+(block_dim[0]-1))//block_dim[0], + (block_depth+(block_dim[1]-1))//block_dim[1], 1) + rightUnpackingKernel[grid_dim, block_dim, stream](d_temperature, + d_ghost, + block_width, + block_height, + block_depth + ) + if dir == TOP: + grid_dim = ((block_width+(block_dim[0]-1))//block_dim[0], + (block_depth+(block_dim[1]-1))//block_dim[1], 1) + topUnpackingKernel[grid_dim, block_dim, stream](d_temperature, + d_ghost, + block_width, + block_height, + block_depth + ) + if dir == BOTTOM: + grid_dim = ((block_width+(block_dim[0]-1))//block_dim[0], + (block_depth+(block_dim[1]-1))//block_dim[1], 1) + bottomUnpackingKernel[grid_dim, block_dim, stream](d_temperature, + d_ghost, + block_width, + block_height, + block_depth + ) + if dir == FRONT: + grid_dim = ((block_width+(block_dim[0]-1))//block_dim[0], + (block_height+(block_dim[1]-1))//block_dim[1], 1) + frontUnpackingKernel[grid_dim, block_dim, stream](d_temperature, + d_ghost, + block_width, + block_height, + block_depth + ) + if dir == BACK: + grid_dim = ((block_width+(block_dim[0]-1))//block_dim[0], + (block_height+(block_dim[1]-1))//block_dim[1], 1) + backUnpackingKernel[grid_dim, block_dim, stream](d_temperature, + d_ghost, + block_width, + block_height, + block_depth + ) diff --git a/examples/cuda/gpudirect/jacobi3d/scripts/charm4py.sh b/examples/cuda/gpudirect/jacobi3d/scripts/charm4py.sh new file mode 100755 index 00000000..54fcaa78 --- /dev/null +++ b/examples/cuda/gpudirect/jacobi3d/scripts/charm4py.sh @@ -0,0 +1,48 @@ +#!/bin/bash +#BSUB -W 30 +#BSUB -P csc357 +#BSUB -nnodes 256 +#BSUB -J jacobi3d-charm4py-strong-n256 + +# These need to be changed between submissions +file=jacobi3d.py +n_nodes=256 +n_procs=$((n_nodes * 6)) +grid_width=3072 +grid_height=3072 +grid_depth=3072 + +# Function to display commands +exe() { echo "\$ $@" ; "$@" ; } + +cd $HOME/work/charm4py/examples/cuda/gpudirect/jacobi3d + +#exe conda init bash +#exe conda activate charm4py +exe source activate charm4py + +exe export LD_LIBRARY_PATH=$HOME/work/ucx/install/lib:$HOME/work/pmix-3.1.5/install/lib:/sw/summit/gdrcopy/2.0/lib64:$LD_LIBRARY_PATH +exe export UCX_MEMTYPE_CACHE=n + +ppn=1 +pemap="L0,4,8,84,88,92" +n_iters=100 +warmup_iters=10 + +echo "# Charm4py Jacobi3D Performance Benchmarking (GPUDirect off)" + +for iter in 1 2 3 +do + date + echo "# Run $iter" + exe jsrun -n$n_procs -a1 -c$ppn -g1 -K3 -r6 --smpiargs="-disable_gpu_hooks" python3 ./$file -x $grid_width -y $grid_height -z $grid_depth -w $warmup_iters -i $n_iters +ppn $ppn +pemap $pemap +done + +echo "# Charm4py Jacobi3D Performance Benchmarking (GPUDirect on)" + +for iter in 1 2 3 +do + date + echo "# Run $iter" + exe jsrun -n$n_procs -a1 -c$ppn -g1 -K3 -r6 --smpiargs="-disable_gpu_hooks" python3 ./$file -x $grid_width -y $grid_height -z $grid_depth -w $warmup_iters -i $n_iters +ppn $ppn +pemap $pemap -d +done diff --git a/examples/cuda/gpudirect/jacobi3d/scripts/process_times.py b/examples/cuda/gpudirect/jacobi3d/scripts/process_times.py new file mode 100755 index 00000000..0063b79d --- /dev/null +++ b/examples/cuda/gpudirect/jacobi3d/scripts/process_times.py @@ -0,0 +1,63 @@ +#!/usr/bin/env python3 +import os +import sys +import csv +import statistics + +if len(sys.argv) != 4: + print('Please use', sys.argv[0], '[job name] [start node count] [end node count]') + exit() + +job_name = sys.argv[1] +start_node_count = int(sys.argv[2]) +end_node_count = int(sys.argv[3]) + +csv_filename = job_name + '.csv' +csv_file = open(csv_filename, 'w', newline='') +writer = csv.writer(csv_file) +writer.writerow(['Number of GPUs', 'Charm4py-H-Total', 'error', 'Charm4py-H-Comm', 'error', 'Charm4py-D-Total', 'error', 'Charm4py-D-Comm', 'error']) + +def is_host(index): + return index % 6 == 0 or index % 6 == 1 or index % 6 == 2 + +node_count_list = [] +cur_node_count = start_node_count +while cur_node_count <= end_node_count: + node_count_list.append(cur_node_count) + cur_node_count *= 2 + +for node_count in node_count_list: + print('Node count:', str(node_count)) + total_str = 'grep -ir "Average time per" ' + job_name + '-n' + str(node_count) + '.* | cut -d " " -f5' + comm_str = 'grep -ir "Communication time" ' + job_name + '-n' + str(node_count) + '.* | cut -d " " -f5' + + total_stream = os.popen(total_str) + total_lines = total_stream.readlines() + total_times = list(map(lambda x: x, list(map(float, list(map(str.rstrip, total_lines)))))) + comm_stream = os.popen(comm_str) + comm_lines = comm_stream.readlines() + comm_times = list(map(lambda x: x, list(map(float, list(map(str.rstrip, comm_lines)))))) + + h_total_times = [total_times[i] for i in range(len(total_times)) if is_host(i)] + h_comm_times = [comm_times[i] for i in range(len(comm_times)) if is_host(i)] + d_total_times = [total_times[i] for i in range(len(total_times)) if not is_host(i)] + d_comm_times = [comm_times[i] for i in range(len(comm_times)) if not is_host(i)] + print('H total:', h_total_times) + print('H comm:', h_comm_times) + print('D total:', d_total_times) + print('D comm:', d_comm_times) + + h_total_avg = round(statistics.mean(h_total_times), 2) + h_total_stdev = round(statistics.stdev(h_total_times), 2) + h_comm_avg = round(statistics.mean(h_comm_times), 2) + h_comm_stdev = round(statistics.stdev(h_comm_times), 2) + d_total_avg = round(statistics.mean(d_total_times), 2) + d_total_stdev = round(statistics.stdev(d_total_times), 2) + d_comm_avg = round(statistics.mean(d_comm_times), 2) + d_comm_stdev = round(statistics.stdev(d_comm_times), 2) + print('H total avg:', h_total_avg, 'stdev:', h_total_stdev) + print('H comm avg:', h_comm_avg, 'stdev:', h_comm_stdev) + print('D total avg:', d_total_avg, 'stdev:', d_total_stdev) + print('D comm avg:', d_comm_avg, 'stdev:', d_comm_stdev) + + writer.writerow([str(node_count), str(h_total_avg), str(h_total_stdev), str(h_comm_avg), str(h_comm_stdev), str(d_total_avg), str(d_total_stdev), str(d_comm_avg), str(d_comm_stdev)]) diff --git a/tests/benchmark/bandwidth.py b/tests/benchmark/bandwidth.py new file mode 100644 index 00000000..69da8180 --- /dev/null +++ b/tests/benchmark/bandwidth.py @@ -0,0 +1,149 @@ +from charm4py import charm, Chare, Array, coro, Future, Channel, Group, ArrayMap +import time +import numpy as np +from numba import cuda +import array + +USE_PINNED = True +# provide the address/size data for GPU-direct addresses. Saves ~11us per iteration +USE_ADDRESS_OPTIMIZATION = True +LOW_ITER_THRESHOLD = 8192 +WARMUP_ITERS = 10 + + +class Block(Chare): + def __init__(self, use_gpudirect): + self.gpu_direct = use_gpudirect + self.num_chares = charm.numPes() + self.am_low_chare = self.thisIndex[0] == 0 + + if self.am_low_chare: + print("Msg Size, Iterations, Bandwidth (MB/s)") + + @coro + def do_iteration(self, message_size, windows, num_iters, done_future): + if USE_PINNED: + h_local_data = cuda.pinned_array(message_size, dtype='int8') + h_remote_data = cuda.pinned_array(message_size, dtype='int8') + else: + if self.am_low_chare: + h_local_data = np.ones(message_size, dtype='int8') + h_remote_data = np.ones(message_size, dtype='int8') + else: + h_local_data = np.zeros(message_size, dtype='int8') + h_remote_data = np.zeros(message_size, dtype='int8') + + + d_local_data = cuda.device_array(message_size, dtype='int8') + d_remote_data = cuda.device_array(message_size, dtype='int8') + + my_stream = cuda.stream() + stream_address = my_stream.handle.value + + d_local_data_addr = d_local_data.__cuda_array_interface__['data'][0] + h_local_data_addr = h_local_data.__array_interface__['data'][0] + + d_remote_data_addr = d_remote_data.__cuda_array_interface__['data'][0] + h_remote_data_addr = h_remote_data.__array_interface__['data'][0] + + if self.gpu_direct: + d_local_data_addr = array.array('L', [0]) + d_local_data_size = array.array('L', [0]) + + d_local_data_addr[0] = d_local_data.__cuda_array_interface__['data'][0] + d_local_data_size[0] = d_local_data.nbytes + + + partner_idx = int(not self.thisIndex[0]) + partner = self.thisProxy[partner_idx] + partner_channel = Channel(self, partner) + partner_ack_channel = Channel(self, partner) + + tstart = 0 + + for idx in range(num_iters + WARMUP_ITERS): + if idx == WARMUP_ITERS: + tstart = time.time() + if self.am_low_chare: + if not self.gpu_direct: + for _ in range(windows): + charm.lib.CudaDtoH(h_local_data_addr, d_local_data_addr, message_size, stream_address) + charm.lib.CudaStreamSynchronize(stream_address) + # d_local_data.copy_to_host(h_local_data) + for _ in range(windows): + partner_channel.send(h_local_data) + else: + for _ in range(windows): + partner_channel.send(gpu_src_ptrs = d_local_data_addr, + gpu_src_sizes = d_local_data_size + ) + + partner_ack_channel.recv() + else: + if not self.gpu_direct: + for _ in range(windows): + received = partner_channel.recv() + charm.lib.CudaHtoD(d_remote_data_addr, received.__array_interface__['data'][0], + message_size, stream_address + ) + charm.lib.CudaStreamSynchronize(stream_address) + # d_local_data.copy_to_device(received) + else: + for _ in range(windows): + partner_channel.recv(post_buf_addresses = d_local_data_addr, + post_buf_sizes = d_local_data_size) + partner_ack_channel.send(1) + + tend = time.time() + elapsed_time = tend - tstart + if self.am_low_chare: + self.display_iteration_data(elapsed_time, num_iters, windows, message_size) + + self.reduce(done_future) + + def display_iteration_data(self, elapsed_time, num_iters, windows, message_size): + data_sent = message_size / 1e6 * num_iters * windows; + print(f'{message_size},{num_iters},{data_sent/elapsed_time}') + + + +class ArrMap(ArrayMap): + def procNum(self, index): + return index[0] % 2 + + +def main(args): + if len(args) < 7: + print("Doesn't have the required input params. Usage:" + " " + " " + "\n" + ) + charm.exit(-1) + + min_msg_size = int(args[1]) + max_msg_size = int(args[2]) + window_size = int(args[3]) + low_iter = int(args[4]) + high_iter = int(args[5]) + use_gpudirect = int(args[6]) + + peMap = Group(ArrMap) + blocks = Array(Block, 2, args=[use_gpudirect], map = peMap) + charm.awaitCreation(blocks) + msg_size = min_msg_size + + while msg_size <= max_msg_size: + if msg_size <= LOW_ITER_THRESHOLD: + iter = low_iter + else: + iter = high_iter + done_future = Future() + blocks.do_iteration(msg_size, window_size, iter, done_future) + done_future.get() + msg_size *= 2 + + charm.exit() + + +charm.start(main) diff --git a/tests/benchmark/pingpong.py b/tests/benchmark/pingpong.py index cb6060c2..6c027fb9 100644 --- a/tests/benchmark/pingpong.py +++ b/tests/benchmark/pingpong.py @@ -1,63 +1,102 @@ -from charm4py import charm, Chare, Array, coro, Future -from time import time -#import numpy as np +from charm4py import charm, Chare, Array, coro, Future, Channel, Group +import time +import numpy as np -PAYLOAD = 100 # number of bytes -NITER = 10000 +class Ping(Chare): + def __init__(self, use_zerocopy, print_format): + self.zero_copy = use_zerocopy + self.num_chares = charm.numPes() + self.print_format = print_format + self.am_low_chare = self.thisIndex == 0 + if self.am_low_chare: + if print_format == 0: + print("Msg Size, Iterations, One-way Time (us), " + "Bandwidth (bytes/us)" + ) + else: + print(f'{"Msg Size": <30} {"Iterations": <25} ' + f'{"One-way Time (us)": <20} {"Bandwidth (bytes/us)": <20}' + ) -class Ping(Chare): + @coro + def do_iteration(self, message_size, num_iters, done_future): + data = np.zeros(message_size, dtype='int8') + partner_idx = int(not self.thisIndex) + partner = self.thisProxy[partner_idx] + partner_channel = Channel(self, partner) - def __init__(self): - self.myIndex = self.thisIndex[0] - if self.myIndex == 0: - self.neighbor = self.thisProxy[1] - else: - self.neighbor = self.thisProxy[0] - - def start(self, done_future, threaded=False): - self.done_future = done_future - self.iter = 0 - #data = np.zeros(PAYLOAD, dtype='int8') - data = 3 - self.startTime = time() - if threaded: - self.neighbor.recv_th(data) - else: - self.neighbor.recv(data) + tstart = time.time() - def recv(self, data): - if self.myIndex == 0: - self.iter += 1 - if self.iter == NITER: - totalTime = time() - self.startTime - self.done_future.send(totalTime) - return - self.neighbor.recv(data) + for _ in range(num_iters): + if self.am_low_chare: + if not self.zero_copy: + partner_channel.send(data) + partner_channel.recv() + else: + raise NotImplementedError("TODO: ZeroCopy") - @coro - def recv_th(self, data): - if self.myIndex == 0: - self.iter += 1 - if self.iter == NITER: - totalTime = time() - self.startTime - self.done_future.send(totalTime) - return - self.neighbor.recv_th(data) + else: + if not self.zero_copy: + partner_channel.recv() + partner_channel.send(data) + else: + raise NotImplementedError("TODO: ZeroCopy") + + tend = time.time() + + elapsed_time = tend - tstart + if self.am_low_chare: + self.display_iteration_data(elapsed_time, num_iters, message_size) + + self.reduce(done_future) + + def display_iteration_data(self, elapsed_time, num_iters, message_size): + elapsed_time /= 2 # 1-way performance, not RTT + elapsed_time /= num_iters # Time for each message + bandwidth = message_size / elapsed_time + if self.print_format == 0: + print(f'{message_size},{num_iters},{elapsed_time * 1e6},' + f'{bandwidth / 1e6}' + ) + else: + print(f'{message_size: <30} {num_iters: <25} ' + f'{elapsed_time * 1e6: <20} {bandwidth / 1e6: <20}' + ) def main(args): - threaded = False - if len(args) > 1 and args[1] == '-t': - threaded = True - pings = Array(Ping, 2) + if len(args) < 7: + print("Doesn't have the required input params. Usage:" + " " + " \n" + ) + charm.exit(-1) + + min_msg_size = int(args[1]) + max_msg_size = int(args[2]) + low_iter = int(args[3]) + high_iter = int(args[4]) + print_format = int(args[5]) + use_zerocopy = int(args[6]) + + pings = Group(Ping, args=[use_zerocopy, print_format]) charm.awaitCreation(pings) - for _ in range(2): + msg_size = min_msg_size + + while msg_size <= max_msg_size: + if msg_size <= 1048576: + iter = low_iter + else: + iter = high_iter done_future = Future() - pings[0].start(done_future, threaded) - totalTime = done_future.get() - print("ping pong time per iter (us)=", totalTime / NITER * 1000000) - exit() + pings.do_iteration(msg_size, iter, done_future) + done_future.get() + msg_size *= 2 + + charm.exit() charm.start(main) diff --git a/tests/benchmark/pingpong_gpu.py b/tests/benchmark/pingpong_gpu.py new file mode 100644 index 00000000..4a771794 --- /dev/null +++ b/tests/benchmark/pingpong_gpu.py @@ -0,0 +1,177 @@ +from charm4py import charm, Chare, Array, coro, Future, Channel, Group, ArrayMap +import time +import numpy as np +from numba import cuda +import array + +USE_PINNED = True +# provide the address/size data for GPU-direct addresses. Saves ~11us per iteration +USE_ADDRESS_OPTIMIZATION = True +LOW_ITER_THRESHOLD = 8192 +WARMUP_ITERS = 10 + +class Ping(Chare): + def __init__(self, use_gpudirect, print_format): + self.gpu_direct = use_gpudirect + self.num_chares = charm.numPes() + self.print_format = print_format + self.am_low_chare = self.thisIndex[0] == 0 + + if self.am_low_chare: + if print_format == 0: + print("Msg Size, Iterations, One-way Time (us), " + "Bandwidth (bytes/us)" + ) + else: + print(f'{"Msg Size": <30} {"Iterations": <25} ' + f'{"One-way Time (us)": <20} {"Bandwidth (bytes/us)": <20}' + ) + + @coro + def do_iteration(self, message_size, num_iters, done_future): + if USE_PINNED: + h_data_send = cuda.pinned_array(message_size, dtype='int8') + h_data_recv = cuda.pinned_array(message_size, dtype='int8') + else: + if self.am_low_chare: + h_data_send = np.ones(message_size, dtype='int8') + h_data_recv = np.ones(message_size, dtype='int8') + else: + h_data_send = np.zeros(message_size, dtype='int8') + h_data_recv = np.zeros(message_size, dtype='int8') + + d_data_send = cuda.device_array(message_size, dtype='int8') + d_data_recv = cuda.device_array(message_size, dtype='int8') + d_data_send.copy_to_device(h_data_send) + d_data_recv.copy_to_device(h_data_recv) + partner_idx = int(not self.thisIndex[0]) + partner = self.thisProxy[partner_idx] + partner_channel = Channel(self, partner) + + my_stream = cuda.stream() + stream_address = my_stream.handle.value + d_data_send_addr = d_data_send.__cuda_array_interface__['data'][0] + h_data_send_addr = h_data_send.__array_interface__['data'][0] + + d_data_recv_addr = d_data_recv.__cuda_array_interface__['data'][0] + h_data_recv_addr = h_data_recv.__array_interface__['data'][0] + + if self.gpu_direct and USE_ADDRESS_OPTIMIZATION: + d_data_recv_addr = array.array('L', [0]) + d_data_recv_size = array.array('i', [0]) + d_data_send_addr = array.array('L', [0]) + d_data_send_size = array.array('i', [0]) + + d_data_recv_addr[0] = d_data_recv.__cuda_array_interface__['data'][0] + d_data_recv_size[0] = d_data_recv.nbytes + d_data_send_addr[0] = d_data_send.__cuda_array_interface__['data'][0] + d_data_send_size[0] = d_data_send.nbytes + + + tstart = time.time() + + for iternum in range(num_iters + WARMUP_ITERS): + if iternum == WARMUP_ITERS: + tstart = time.time() + if self.am_low_chare: + if not self.gpu_direct: + charm.lib.CudaDtoH(h_data_send_addr, d_data_send_addr, message_size, stream_address) + charm.lib.CudaStreamSynchronize(stream_address) + + partner_channel.send(h_data_send) + received = partner_channel.recv() + + charm.lib.CudaHtoD(d_data_recv_addr, received.__array_interface__['data'][0], message_size, stream_address) + charm.lib.CudaStreamSynchronize(stream_address) + else: + if USE_ADDRESS_OPTIMIZATION: + partner_channel.send(src_ptrs = d_data_send_addr, src_sizes = d_data_send_size) + partner_channel.recv(post_addresses = d_data_recv_addr, + post_sizes = d_data_recv_size + ) + else: + partner_channel.send(d_data_send) + partner_channel.recv(d_data_recv) + else: + if not self.gpu_direct: + received = partner_channel.recv() + + charm.lib.CudaHtoD(d_data_recv_addr, received.__array_interface__['data'][0], message_size, stream_address) + charm.lib.CudaDtoH(h_data_send_addr, d_data_send_addr, message_size, stream_address) + charm.lib.CudaStreamSynchronize(stream_address) + + partner_channel.send(h_data_send) + else: + if USE_ADDRESS_OPTIMIZATION: + partner_channel.recv(post_addresses = d_data_recv_addr, + post_sizes = d_data_recv_size + ) + partner_channel.send(src_ptrs = d_data_send_addr, src_sizes = d_data_send_size) + else: + partner_channel.recv(d_data_recv) + partner_channel.send(d_data_send) + + tend = time.time() + + elapsed_time = tend - tstart + + if self.am_low_chare: + self.display_iteration_data(elapsed_time, num_iters, message_size) + + self.reduce(done_future) + + def display_iteration_data(self, elapsed_time, num_iters, message_size): + elapsed_time /= 2 # 1-way performance, not RTT + elapsed_time /= num_iters # Time for each message + bandwidth = message_size / elapsed_time + if self.print_format == 0: + print(f'{message_size},{num_iters},{elapsed_time * 1e6},' + f'{bandwidth / 1e6}' + ) + else: + print(f'{message_size: <30} {num_iters: <25} ' + f'{elapsed_time * 1e6: <20} {bandwidth / 1e6: <20}' + ) + + +class ArrMap(ArrayMap): + def procNum(self, index): + return index[0] % 2 + + +def main(args): + if len(args) < 7: + print("Doesn't have the required input params. Usage:" + " " + " \n" + ) + charm.exit(-1) + + min_msg_size = int(args[1]) + max_msg_size = int(args[2]) + low_iter = int(args[3]) + high_iter = int(args[4]) + print_format = int(args[5]) + use_gpudirect = int(args[6]) + + peMap = Group(ArrMap) + pings = Array(Ping, 2, args=[use_gpudirect, print_format], map = peMap) + charm.awaitCreation(pings) + msg_size = min_msg_size + + while msg_size <= max_msg_size: + if msg_size <= LOW_ITER_THRESHOLD: + iter = low_iter + else: + iter = high_iter + done_future = Future() + pings.do_iteration(msg_size, iter, done_future) + done_future.get() + msg_size *= 2 + + charm.exit() + + +charm.start(main) diff --git a/tests/cuda/multi_array.py b/tests/cuda/multi_array.py new file mode 100644 index 00000000..93c7876a --- /dev/null +++ b/tests/cuda/multi_array.py @@ -0,0 +1,100 @@ +from charm4py import charm, Chare, Array, coro, Future, Channel, Group, ArrayMap +import numpy as np +from numba import cuda +import array + + +class A(Chare): + def __init__(self, msg_size): + self.msg_size = msg_size + if type(self.thisIndex) is tuple: + self.idx = int(self.thisIndex[0]) + else: + self.idx = self.thisIndex + + @coro + def run(self, done_future, addr_optimization=False): + partner = self.thisProxy[int(not self.idx)] + partner_channel = Channel(self, partner) + + device_data = cuda.device_array(self.msg_size, dtype='int8') + device_data2 = cuda.device_array(self.msg_size, dtype='int8') + # if addr_optimization: + d_addr = array.array('L', [0, 0]) + d_size = array.array('i', [0, 0]) + + d_addr[0] = device_data.__cuda_array_interface__['data'][0] + d_addr[1] = device_data2.__cuda_array_interface__['data'][0] + + d_size[0] = device_data.nbytes + d_size[1] = device_data2.nbytes + + host_array = np.array(self.msg_size, dtype='int32') + host_array.fill(42) + + if self.idx: + h1 = np.ones(self.msg_size, dtype='int8') + h2 = np.zeros(self.msg_size, dtype='int8') + device_data.copy_to_device(h1) + device_data2.copy_to_device(h2) + if addr_optimization: + partner_channel.send(20, host_array, src_ptrs=d_addr, + src_sizes=d_size + ) + partner_channel.recv() + else: + partner_channel.send(20, host_array, device_data, device_data2) + else: + if addr_optimization: + f, g = partner_channel.recv(post_addresses=d_addr, + post_sizes=d_size + ) + else: + f, g = partner_channel.recv(device_data, device_data2) + partner_channel.send(1) + h1 = device_data.copy_to_host() + h2 = device_data2.copy_to_host() + + assert f == 20 + assert np.array_equal(host_array, g) + assert np.array_equal(h1, np.ones(self.msg_size, dtype='int8')) + assert np.array_equal(h2, np.zeros(self.msg_size, dtype='int8')) + self.reduce(done_future) + + +class ArrMap(ArrayMap): + def procNum(self, index): + return index[0] % 2 + + +def main(args): + # if this is not a cuda-aware build, + # vacuously pass the test + if not charm.CkCudaEnabled(): + print("WARNING: Charm4Py was not build with CUDA-enabled Charm++. " + "GPU-Direct functionality will not be tested" + ) + charm.exit(0) + + peMap = Group(ArrMap) + chares = Array(A, 2, args=[(1<<30)], map=peMap) + done_fut = Future() + chares.run(done_fut, addr_optimization=False) + done_fut.get() + + done_fut = Future() + chares.run(done_fut, addr_optimization=True) + done_fut.get() + + chares = Group(A, args=[(1<<30)]) + done_fut = Future() + chares.run(done_fut, addr_optimization=False) + done_fut.get() + + done_fut = Future() + chares.run(done_fut, addr_optimization=True) + done_fut.get() + charm.exit(0) + + +charm.start(main) diff --git a/tests/cuda/single_array.py b/tests/cuda/single_array.py new file mode 100644 index 00000000..1c3a3692 --- /dev/null +++ b/tests/cuda/single_array.py @@ -0,0 +1,139 @@ +from charm4py import charm, Chare, Array, coro, Future, Channel, Group, ArrayMap +import numpy as np +from numba import cuda +import array + + +class A(Chare): + def __init__(self, msg_size): + self.msg_size = msg_size + if type(self.thisIndex) is tuple: + self.idx = int(self.thisIndex[0]) + else: + self.idx = self.thisIndex + @coro + def run(self, done_future, addr_optimization=False): + partner = self.thisProxy[int(not self.idx)] + partner_channel = Channel(self, partner) + + device_data = cuda.device_array(self.msg_size, dtype='int8') + + d_addr = array.array('L', [0]) + d_size = array.array('i', [0]) + + d_addr[0] = device_data.__cuda_array_interface__['data'][0] + d_size[0] = device_data.nbytes + + my_stream = cuda.stream() + stream_addr = array.array('L', [my_stream.handle.value]) + + if self.idx: + host_data = np.zeros(self.msg_size, dtype='int8') + host_data.fill(5) + device_data.copy_to_device(host_data) + if addr_optimization: + partner_channel.send(1, 2, "hello", + np.ones(self.msg_size, dtype='int8'), + src_ptrs=d_addr, src_sizes=d_size, + stream_ptrs=stream_addr + ) + p_data = partner_channel.recv(post_addresses=d_addr, + post_sizes=d_size, + stream_ptrs=stream_addr + ) + else: + partner_channel.send(1, 2, "hello", + device_data, + np.ones(self.msg_size, dtype='int8'), + stream_ptrs=stream_addr + ) + p_data = partner_channel.recv(device_data) + + assert p_data == (2, 3) + h_ary = device_data.copy_to_host() + assert np.array_equal(h_ary, host_data) + + if addr_optimization: + partner_channel.send(src_ptrs=d_addr, src_sizes=d_size) + partner_channel.recv(post_addresses=d_addr, + post_sizes=d_size + ) + else: + partner_channel.send(device_data) + partner_channel.recv(device_data) + + h_ary = device_data.copy_to_host() + assert np.array_equal(h_ary, host_data) + else: + if addr_optimization: + p_data = partner_channel.recv(post_addresses=d_addr, + post_sizes=d_size + ) + else: + p_data = partner_channel.recv(device_data) + p_data, p_host_arr = p_data[0:-1], p_data[-1] + recvd = device_data.copy_to_host() + + compare = np.zeros(self.msg_size, dtype='int8') + compare.fill(5) + assert np.array_equal(recvd, compare) + assert np.array_equal(np.ones(self.msg_size, dtype='int8'), + p_host_arr + ) + assert p_data == (1, 2, "hello") + + if addr_optimization: + partner_channel.send(2, 3, src_ptrs=d_addr, + src_sizes=d_size + ) + else: + partner_channel.send(2, 3, device_data) + + if addr_optimization: + partner_channel.recv(post_addresses=d_addr, + post_sizes=d_size + ) + partner_channel.send(src_ptrs=d_addr, src_sizes=d_size) + else: + partner_channel.recv(device_data) + partner_channel.send(device_data) + + self.reduce(done_future) + + +class ArrMap(ArrayMap): + def procNum(self, index): + return index[0] % 2 + + +def main(args): + # if this is not a cuda-aware build, + # vacuously pass the test + if not charm.CkCudaEnabled(): + print("WARNING: Charm4Py was not build with CUDA-enabled Charm++. " + "GPU-Direct functionality will not be tested" + ) + charm.exit(0) + + peMap = Group(ArrMap) + chares = Array(A, 2, args=[1<<20], map=peMap) + done_fut = Future() + chares.run(done_fut, addr_optimization=False) + done_fut.get() + + done_fut = Future() + chares.run(done_fut, addr_optimization=True) + done_fut.get() + + chares = Group(A, args=[1<<20]) + done_fut = Future() + chares.run(done_fut, addr_optimization=False) + done_fut.get() + + done_fut = Future() + chares.run(done_fut, addr_optimization=True) + done_fut.get() + charm.exit(0) + + +charm.start(main)