diff --git a/examples/rambo.py b/examples/rambo.py new file mode 100644 index 0000000..1a84009 --- /dev/null +++ b/examples/rambo.py @@ -0,0 +1,209 @@ +""" +Rambo benchmark + +Examples: + + # run 1000 iterations of 10 events and 100 outputs on sharpy backend + python rambo.py -nevts 10 -nout 100 -b sharpy -i 1000 + + # MPI parallel run + mpiexec -n 3 python rambo.py -nevts 64 -nout 64 -b sharpy -i 1000 + +""" + +import argparse +import time as time_mod + +import numpy + +import sharpy + +try: + import mpi4py + + mpi4py.rc.finalize = False + from mpi4py import MPI + + comm_rank = MPI.COMM_WORLD.Get_rank() + comm = MPI.COMM_WORLD +except ImportError: + comm_rank = 0 + comm = None + + +def info(s): + if comm_rank == 0: + print(s) + + +def sp_rambo(sp, sp_C1, sp_F1, sp_Q1, sp_output, nevts, nout): + sp_C = 2.0 * sp_C1 - 1.0 + sp_S = sp.sqrt(1 - sp.square(sp_C)) + sp_F = 2.0 * sp.pi * sp_F1 + sp_Q = -sp.log(sp_Q1) + + sp_output[:, :, 0] = sp.reshape(sp_Q, (nevts, nout, 1)) + sp_output[:, :, 1] = sp.reshape( + sp_Q * sp_S * sp.sin(sp_F), (nevts, nout, 1) + ) + sp_output[:, :, 2] = sp.reshape( + sp_Q * sp_S * sp.cos(sp_F), (nevts, nout, 1) + ) + sp_output[:, :, 3] = sp.reshape(sp_Q * sp_C, (nevts, nout, 1)) + + sharpy.sync() + + +def np_rambo(np, C1, F1, Q1, output, nevts, nout): + C = 2.0 * C1 - 1.0 + S = np.sqrt(1 - np.square(C)) + F = 2.0 * np.pi * F1 + Q = -np.log(Q1) + + output[:, :, 0] = Q + output[:, :, 1] = Q * S * np.sin(F) + output[:, :, 2] = Q * S * np.cos(F) + output[:, :, 3] = Q * C + + +def initialize(np, nevts, nout, seed, dtype): + np.random.seed(seed) + C1 = np.random.rand(nevts, nout) + F1 = np.random.rand(nevts, nout) + Q1 = np.random.rand(nevts, nout) * np.random.rand(nevts, nout) + return (C1, F1, Q1, np.zeros((nevts, nout, 4), dtype)) + + +def run(nevts, nout, backend, iterations, datatype): + if backend == "sharpy": + import sharpy as np + from sharpy import fini, init, sync + + rambo = sp_rambo + + init(False) + elif backend == "numpy": + import numpy as np + + if comm is not None: + assert ( + comm.Get_size() == 1 + ), "Numpy backend only supports serial execution." + + fini = sync = lambda x=None: None + rambo = np_rambo + else: + raise ValueError(f'Unknown backend: "{backend}"') + + dtype = { + "f32": np.float32, + "f64": np.float64, + }[datatype] + + info(f"Using backend: {backend}") + info(f"Number of events: {nevts}") + info(f"Number of outputs: {nout}") + info(f"Datatype: {datatype}") + + seed = 7777 + C1, F1, Q1, output = initialize(np, nevts, nout, seed, dtype) + sync() + + # verify + if backend == "sharpy": + sp_rambo(sharpy, C1, F1, Q1, output, nevts, nout) + # sync() !! not work here? + np_C1 = sharpy.to_numpy(C1) + np_F1 = sharpy.to_numpy(F1) + np_Q1 = sharpy.to_numpy(Q1) + np_output = numpy.zeros((nevts, nout, 4)) + np_rambo(numpy, np_C1, np_F1, np_Q1, np_output, nevts, nout) + assert numpy.allclose(sharpy.to_numpy(output), np_output) + + def eval(): + tic = time_mod.perf_counter() + rambo(np, C1, F1, Q1, output, nevts, nout) + toc = time_mod.perf_counter() + return toc - tic + + # warm-up run + t_warm = eval() + + # evaluate + info(f"Running {iterations} iterations") + time_list = [] + for i in range(iterations): + time_list.append(eval()) + + # get max time over mpi ranks + if comm is not None: + t_warm = comm.allreduce(t_warm, MPI.MAX) + time_list = comm.allreduce(time_list, MPI.MAX) + + t_min = numpy.min(time_list) + t_max = numpy.max(time_list) + t_med = numpy.median(time_list) + init_overhead = t_warm - t_med + if backend == "sharpy": + info(f"Estimated initialization overhead: {init_overhead:.5f} s") + info(f"Min. duration: {t_min:.5f} s") + info(f"Max. duration: {t_max:.5f} s") + info(f"Median duration: {t_med:.5f} s") + + fini() + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description="Run rambo benchmark", + formatter_class=argparse.ArgumentDefaultsHelpFormatter, + ) + + parser.add_argument( + "-nevts", + "--num_events", + type=int, + default=10, + help="Number of events to evaluate.", + ) + parser.add_argument( + "-nout", + "--num_outputs", + type=int, + default=10, + help="Number of outputs to evaluate.", + ) + + parser.add_argument( + "-b", + "--backend", + type=str, + default="sharpy", + choices=["sharpy", "numpy"], + help="Backend to use.", + ) + + parser.add_argument( + "-i", + "--iterations", + type=int, + default=10, + help="Number of iterations to run.", + ) + parser.add_argument( + "-d", + "--datatype", + type=str, + default="f64", + choices=["f32", "f64"], + help="Datatype for model state variables", + ) + args = parser.parse_args() + nevts, nout = args.num_events, args.num_outputs + run( + nevts, + nout, + args.backend, + args.iterations, + args.datatype, + ) diff --git a/imex_version.txt b/imex_version.txt index 7c99ccd..798b8b8 100644 --- a/imex_version.txt +++ b/imex_version.txt @@ -1 +1 @@ -199f9456fd31b96930395ab650fdb6fea42769dd +a02f09350a8eba081c92a7d0117334eb56c9fb5a diff --git a/setup.py b/setup.py index f3df165..3b8c3e6 100644 --- a/setup.py +++ b/setup.py @@ -54,7 +54,7 @@ def build_cmake(self, ext): name="sharpy", version="0.2", description="Distributed array and more", - packages=["sharpy", "sharpy.numpy"], # "sharpy.torch"], + packages=["sharpy", "sharpy.numpy", "sharpy.random"], # "sharpy.torch"], ext_modules=[CMakeExtension("sharpy/_sharpy")], cmdclass=dict( # Enable the CMakeExtension entries defined above diff --git a/sharpy/__init__.py b/sharpy/__init__.py index 56e10cc..b7784c0 100644 --- a/sharpy/__init__.py +++ b/sharpy/__init__.py @@ -38,6 +38,22 @@ from ._sharpy import sync from .ndarray import ndarray + +# Lazy load submodules +def __getattr__(name): + if name == "random": + import sharpy.random as random + + return random + elif name == "numpy": + import sharpy.numpy as numpy + + return numpy + + if "_fallback" in globals(): + return _fallback(name) + + _sharpy_cw = bool(int(getenv("SHARPY_CW", False))) pi = 3.1415926535897932384626433 @@ -185,7 +201,3 @@ def __getattr__(self, name): dt.linalg.norm(...) """ return _fallback(name, self._func) - - def __getattr__(name): - "Attempt to find a fallback in fallback-lib" - return _fallback(name) diff --git a/sharpy/random.py b/sharpy/random.py deleted file mode 100644 index 9e5db06..0000000 --- a/sharpy/random.py +++ /dev/null @@ -1,11 +0,0 @@ -from . import _sharpy as _csp -from . import float64 -from .sharpy import ndarray - - -def uniform(low, high, size, dtype=float64): - return ndarray(_csp.Random.uniform(dtype, size, low, high)) - - -def seed(s): - _csp.Random.seed(s) diff --git a/sharpy/random/__init__.py b/sharpy/random/__init__.py new file mode 100644 index 0000000..9fd4db7 --- /dev/null +++ b/sharpy/random/__init__.py @@ -0,0 +1,36 @@ +import numpy as np + +import sharpy as sp +from sharpy import float64 +from sharpy.numpy import fromfunction + + +def uniform(low, high, size, device="", team=1): + data = np.random.uniform(low, high, size) + if len(data.shape) == 0: + sp_data = sp.full((), data[()], device=device, team=team) + return sp_data + return fromfunction( + lambda *index: data[index], + data.shape, + dtype=float64, + device=device, + team=team, + ) + + +def rand(*shape, device="", team=1): + data = np.random.rand(*shape) + if isinstance(data, float): + return data + return fromfunction( + lambda *index: data[index], + data.shape, + dtype=float64, + device=device, + team=team, + ) + + +def seed(s): + np.random.seed(s) diff --git a/src/Service.cpp b/src/Service.cpp index 74b52ec..a197fd8 100644 --- a/src/Service.cpp +++ b/src/Service.cpp @@ -51,7 +51,10 @@ struct DeferredService : public DeferredT_a == guid); + Registry::del(guid); + }); break; } case RUN: diff --git a/src/idtr.cpp b/src/idtr.cpp index 4b3e1fb..3a42b26 100644 --- a/src/idtr.cpp +++ b/src/idtr.cpp @@ -241,10 +241,11 @@ void bufferize(void *cptr, SHARPY::DTypeId dtype, const int64_t *sizes, }); } -/// copy contiguous block of data into a possibly strided array -void unpack(void *in, SHARPY::DTypeId dtype, const int64_t *sizes, - const int64_t *strides, const int64_t *tStarts, - const int64_t *tSizes, uint64_t nd, uint64_t N, void *out) { +/// copy contiguous block of data into a possibly strided array distributed to N +/// ranks +void unpackN(void *in, SHARPY::DTypeId dtype, const int64_t *sizes, + const int64_t *strides, const int64_t *tStarts, + const int64_t *tSizes, uint64_t nd, uint64_t N, void *out) { if (!in || !sizes || !strides || !tStarts || !tSizes || !out) { return; } @@ -268,6 +269,21 @@ void unpack(void *in, SHARPY::DTypeId dtype, const int64_t *sizes, }); } +/// copy contiguous block of data into a possibly strided array +void unpack(void *in, SHARPY::DTypeId dtype, const int64_t *sizes, + const int64_t *strides, uint64_t ndim, void *out) { + if (!in || !sizes || !strides || !out) { + return; + } + dispatch(dtype, out, [sizes, strides, ndim, in](auto *out_) { + auto in_ = static_cast(in); + SHARPY::forall(0, out_, sizes, strides, ndim, [&in_](auto *out) { + *out = *in_; + ++in_; + }); + }); +} + template void copy_(uint64_t d, uint64_t &pos, T *cptr, const int64_t *sizes, const int64_t *strides, const uint64_t *chunks, uint64_t nd, @@ -489,23 +505,37 @@ WaitHandleBase *_idtr_copy_reshape(SHARPY::DTypeId sharpytype, } } + bool isStrided = + !SHARPY::is_contiguous(oDataShapePtr, oDataStridesPtr, oNDims); + void *rBuff = + isStrided ? new char[sizeof_dtype(sharpytype) * myOSz] : oDataPtr; + SHARPY::Buffer sendbuff(totSSz * sizeof_dtype(sharpytype), 2); bufferizeN(iNDims, iDataPtr, iDataShapePtr, iDataStridesPtr, sharpytype, N, lsOffs.data(), lsEnds.data(), sendbuff.data()); auto hdl = tc->alltoall(sendbuff.data(), sszs.data(), soffs.data(), - sharpytype, oDataPtr, rszs.data(), roffs.data()); + sharpytype, rBuff, rszs.data(), roffs.data()); - if (no_async) { + auto wait = [tc, hdl, isStrided, rBuff, sharpytype, oDataShapePtr, + oDataStridesPtr, oNDims, oDataPtr, + sendbuff = std::move(sendbuff), sszs = std::move(sszs), + soffs = std::move(soffs), rszs = std::move(rszs), + roffs = std::move(roffs)]() { tc->wait(hdl); + if (isStrided) { + unpack(rBuff, sharpytype, oDataShapePtr, oDataStridesPtr, oNDims, + oDataPtr); + delete[](char *) rBuff; + } + }; + assert(sendbuff.empty() && sszs.empty() && soffs.empty() && rszs.empty() && + roffs.empty()); + + if (no_async) { + wait(); return nullptr; } - auto wait = [tc = tc, hdl = hdl, sendbuff = std::move(sendbuff), - sszs = std::move(sszs), soffs = std::move(soffs), - rszs = std::move(rszs), - roffs = std::move(roffs)]() { tc->wait(hdl); }; - assert(sendbuff.empty() && sszs.empty() && soffs.empty() && rszs.empty() && - roffs.empty()); return mkWaitHandle(std::move(wait)); } @@ -902,15 +932,15 @@ void *_idtr_update_halo(SHARPY::DTypeId sharpytype, int64_t ndims, tc->wait(lwh); std::vector recvBufferStart(nworkers * ndims, 0); if (cache->_bufferizeLRecv) { - unpack(lRecvData, sharpytype, leftHaloShape, leftHaloStride, - recvBufferStart.data(), cache->_lRecvBufferSize.data(), ndims, - nworkers, leftHaloData); + unpackN(lRecvData, sharpytype, leftHaloShape, leftHaloStride, + recvBufferStart.data(), cache->_lRecvBufferSize.data(), ndims, + nworkers, leftHaloData); } tc->wait(rwh); if (cache->_bufferizeRRecv) { - unpack(rRecvData, sharpytype, rightHaloShape, rightHaloStride, - recvBufferStart.data(), cache->_rRecvBufferSize.data(), ndims, - nworkers, rightHaloData); + unpackN(rRecvData, sharpytype, rightHaloShape, rightHaloStride, + recvBufferStart.data(), cache->_rRecvBufferSize.data(), ndims, + nworkers, rightHaloData); } }; diff --git a/test/test_random.py b/test/test_random.py new file mode 100644 index 0000000..567dd73 --- /dev/null +++ b/test/test_random.py @@ -0,0 +1,39 @@ +import numpy as np +import pytest +from utils import device + +import sharpy as sp + + +@pytest.fixture(params=[(), (6,), (6, 5), (6, 5, 4)]) +def shape(request): + return request.param + + +@pytest.fixture(params=[0, 42, 66, 890, 1000]) +def seed(request): + return request.param + + +def test_random_rand(shape, seed): + sp.random.seed(seed) + sp_data = sp.random.rand(*shape, device=device) + + np.random.seed(seed) + np_data = np.random.rand(*shape) + + if isinstance(np_data, float): + assert isinstance(sp_data, float) and sp_data == np_data + else: + assert np.allclose(sp.to_numpy(sp_data), np_data) + + +@pytest.mark.parametrize("low,high", [(0, 1), (4, 10), (-100, 100)]) +def test_random_uniform(low, high, shape, seed): + sp.random.seed(seed) + sp_data = sp.random.uniform(low, high, shape, device=device) + + np.random.seed(seed) + np_data = np.random.uniform(low, high, shape) + + assert np.allclose(sp.to_numpy(sp_data), np_data) diff --git a/test/test_setget.py b/test/test_setget.py index d68b012..f57a453 100644 --- a/test/test_setget.py +++ b/test/test_setget.py @@ -116,6 +116,22 @@ def doit(aapi, **kwargs): assert runAndCompare(doit) + def test_setitem9(self): + N = 3 + r1 = sp.random.rand(N) + + a = sp.zeros((N, 2)) + r11 = sp.reshape(r1, (N, 1)) + + # strided array + a[:, 0] = r11 + + np_r1 = sp.to_numpy(r1) + b = numpy.zeros((N, 2)) + b[:, 0] = np_r1 + + assert numpy.allclose(sp.to_numpy(a), b) + def test_colon(self): a = sp.ones((16, 16), sp.float32, device=device) b = sp.zeros((16, 16), sp.float32, device=device)