Skip to content

Dual test for NumPy and CuPy in tests #165

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Aug 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ dev-install:

dev-install_nccl:
make pipcheck
$(PIP) install -r requirements-dev.txt && $(PIP) install cupy-cuda12x nvidia-nccl-cu12 $(PIP) install -e .
$(PIP) install -r requirements-dev.txt && $(PIP) install cupy-cuda12x nvidia-nccl-cu12 && $(PIP) install -e .

install_conda:
conda env create -f environment.yml && conda activate pylops_mpi && pip install .
Expand All @@ -49,6 +49,10 @@ lint:
tests:
mpiexec -n $(NUM_PROCESSES) pytest tests/ --with-mpi

# assuming NUM_PROCESSES <= number of gpus available
tests_gpu:
export TEST_CUPY_PYLOPS=1 && mpiexec -n $(NUM_PROCESSES) pytest tests/ --with-mpi

# assuming NUM_PROCESSES <= number of gpus available
tests_nccl:
mpiexec -n $(NUM_PROCESSES) pytest tests_nccl/ --with-mpi
Expand Down
19 changes: 17 additions & 2 deletions docs/source/contributing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,18 @@ that the both old and new tests pass successfully:

>> make tests

If you run PyLops-MPI with GPUs you may also do:

.. code-block:: bash

>> make tests_gpu

Additionally, if you have a NCCL-enabled environment, you may also check:

.. code-block:: bash

>> make tests_nccl

4. Make sure the ``examples`` python scripts are executed using 3 processes without any errors:

.. code-block:: bash
Expand Down Expand Up @@ -123,8 +135,11 @@ Project structure
This repository is organized as follows:

* **pylops_mpi**: Python library containing various mpi linear operators.
* **tests**: Set of tests using pytest-mpi.
* **tests**: Set of tests using pytest-mpi for both CPU and GPU.
* **tests_nccl** Set of tests for NCCL-enabled environment using pytest-mpi
* **testdata**: Sample datasets used in tests and documentation.
* **docs**: Sphinx documentation.
* **examples**: Set of python script examples for each mpi linear operator to be embedded in documentation using sphinx-gallery.
* **tutorials**: Set of python script tutorials to be embedded in documentation using sphinx-gallery.
* **tutorials**: Set of python script tutorials (NumPy & MPI) to be embedded in documentation using sphinx-gallery.
* **tutorials_cupy**: Same set of scripts as above but with CuPy & MPI
* **tutorials_nccl**: Same set of scripts as above but with CuPy & NCCL
23 changes: 17 additions & 6 deletions pylops_mpi/DistributedArray.py
Original file line number Diff line number Diff line change
Expand Up @@ -694,14 +694,25 @@ def _compute_vector_norm(self, local_array: NDArray,
recv_buf = self._allreduce_subcomm(ncp.count_nonzero(local_array, axis=axis).astype(ncp.float64))
elif ord == ncp.inf:
# Calculate max followed by max reduction
recv_buf = self._allreduce_subcomm(ncp.max(ncp.abs(local_array), axis=axis).astype(ncp.float64),
recv_buf, op=MPI.MAX)
recv_buf = ncp.squeeze(recv_buf, axis=axis)
# TODO (tharitt): currently CuPy + MPI does not work well with buffered communication, particularly
# with MAX, MIN operator. Here we copy the array back to CPU, transfer, and copy them back to GPUs
send_buf = ncp.max(ncp.abs(local_array), axis=axis).astype(ncp.float64)
if self.engine == "cupy" and self.base_comm_nccl is None:
recv_buf = self._allreduce_subcomm(send_buf.get(), recv_buf.get(), op=MPI.MAX)
recv_buf = ncp.asarray(ncp.squeeze(recv_buf, axis=axis))
else:
recv_buf = self._allreduce_subcomm(send_buf, recv_buf, op=MPI.MAX)
recv_buf = ncp.squeeze(recv_buf, axis=axis)
elif ord == -ncp.inf:
# Calculate min followed by min reduction
recv_buf = self._allreduce_subcomm(ncp.min(ncp.abs(local_array), axis=axis).astype(ncp.float64),
recv_buf, op=MPI.MIN)
recv_buf = ncp.squeeze(recv_buf, axis=axis)
# TODO (tharitt): see the comment above in infinity norm
send_buf = ncp.min(ncp.abs(local_array), axis=axis).astype(ncp.float64)
if self.engine == "cupy" and self.base_comm_nccl is None:
recv_buf = self._allreduce_subcomm(send_buf.get(), recv_buf.get(), op=MPI.MIN)
recv_buf = ncp.asarray(ncp.squeeze(recv_buf, axis=axis))
else:
recv_buf = self._allreduce_subcomm(send_buf, recv_buf, op=MPI.MIN)
recv_buf = ncp.asarray(ncp.squeeze(recv_buf, axis=axis))

else:
recv_buf = self._allreduce_subcomm(ncp.sum(ncp.abs(ncp.float_power(local_array, ord)), axis=axis))
Expand Down
6 changes: 4 additions & 2 deletions pylops_mpi/basicoperators/MatrixMult.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,8 @@ def _matvec(self, x: DistributedArray) -> DistributedArray:
mask=x.mask,
partition=Partition.SCATTER,
dtype=self.dtype,
base_comm=self.base_comm
base_comm=self.base_comm,
engine=x.engine
)

my_own_cols = self._rank_col_lens[self.rank]
Expand All @@ -257,7 +258,8 @@ def _rmatvec(self, x: DistributedArray) -> DistributedArray:
mask=x.mask,
partition=Partition.SCATTER,
dtype=self.dtype,
base_comm=self.base_comm
base_comm=self.base_comm,
engine=x.engine
)

x_arr = x.local_array.reshape((self.N, self._local_ncols)).astype(self.dtype)
Expand Down
30 changes: 22 additions & 8 deletions tests/test_blockdiag.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,19 @@
Designed to run with n processes
$ mpiexec -n 10 pytest test_blockdiag.py --with-mpi
"""
import os

if int(os.environ.get("TEST_CUPY_PYLOPS", 0)):
import cupy as np
from cupy.testing import assert_allclose

backend = "cupy"
else:
import numpy as np
from numpy.testing import assert_allclose

backend = "numpy"
from mpi4py import MPI
import numpy as np
from numpy.testing import assert_allclose
import pytest

import pylops
Expand All @@ -17,6 +27,10 @@
par2j = {'ny': 301, 'nx': 101, 'dtype': np.complex128}

np.random.seed(42)
rank = MPI.COMM_WORLD.Get_rank()
if backend == "cupy":
device_id = rank % np.cuda.runtime.getDeviceCount()
np.cuda.Device(device_id).use()


@pytest.mark.mpi(min_size=2)
Expand All @@ -27,11 +41,11 @@ def test_blockdiag(par):
Op = pylops.MatrixMult(A=((rank + 1) * np.ones(shape=(par['ny'], par['nx']))).astype(par['dtype']))
BDiag_MPI = pylops_mpi.MPIBlockDiag(ops=[Op, ])

x = pylops_mpi.DistributedArray(global_shape=size * par['nx'], dtype=par['dtype'])
x = pylops_mpi.DistributedArray(global_shape=size * par['nx'], dtype=par['dtype'], engine=backend)
x[:] = np.ones(shape=par['nx'], dtype=par['dtype'])
x_global = x.asarray()

y = pylops_mpi.DistributedArray(global_shape=size * par['ny'], dtype=par['dtype'])
y = pylops_mpi.DistributedArray(global_shape=size * par['ny'], dtype=par['dtype'], engine=backend)
y[:] = np.ones(shape=par['ny'], dtype=par['dtype'])
y_global = y.asarray()

Expand Down Expand Up @@ -68,16 +82,16 @@ def test_stacked_blockdiag(par):
FirstDeriv_MPI = pylops_mpi.MPIFirstDerivative(dims=(par['ny'], par['nx']), dtype=par['dtype'])
StackedBDiag_MPI = pylops_mpi.MPIStackedBlockDiag(ops=[BDiag_MPI, FirstDeriv_MPI])

dist1 = pylops_mpi.DistributedArray(global_shape=size * par['nx'], dtype=par['dtype'])
dist1 = pylops_mpi.DistributedArray(global_shape=size * par['nx'], dtype=par['dtype'], engine=backend)
dist1[:] = np.ones(dist1.local_shape, dtype=par['dtype'])
dist2 = pylops_mpi.DistributedArray(global_shape=par['nx'] * par['ny'], dtype=par['dtype'])
dist2 = pylops_mpi.DistributedArray(global_shape=par['nx'] * par['ny'], dtype=par['dtype'], engine=backend)
dist2[:] = np.ones(dist2.local_shape, dtype=par['dtype'])
x = pylops_mpi.StackedDistributedArray(distarrays=[dist1, dist2])
x_global = x.asarray()

dist1 = pylops_mpi.DistributedArray(global_shape=size * par['ny'], dtype=par['dtype'])
dist1 = pylops_mpi.DistributedArray(global_shape=size * par['ny'], dtype=par['dtype'], engine=backend)
dist1[:] = np.ones(dist1.local_shape, dtype=par['dtype'])
dist2 = pylops_mpi.DistributedArray(global_shape=par['nx'] * par['ny'], dtype=par['dtype'])
dist2 = pylops_mpi.DistributedArray(global_shape=par['nx'] * par['ny'], dtype=par['dtype'], engine=backend)
dist2[:] = np.ones(dist2.local_shape, dtype=par['dtype'])
y = pylops_mpi.StackedDistributedArray(distarrays=[dist1, dist2])
y_global = y.asarray()
Expand Down
69 changes: 42 additions & 27 deletions tests/test_derivative.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,20 @@
Designed to run with n processes
$ mpiexec -n 10 pytest test_derivative.py --with-mpi
"""
import numpy as np
import os

if int(os.environ.get("TEST_CUPY_PYLOPS", 0)):
import cupy as np
from cupy.testing import assert_allclose

backend = "cupy"
else:
import numpy as np
from numpy.testing import assert_allclose

backend = "numpy"
import numpy as npp
from mpi4py import MPI
from numpy.testing import assert_allclose
import pytest

import pylops
Expand All @@ -14,6 +25,10 @@
np.random.seed(42)
rank = MPI.COMM_WORLD.Get_rank()
size = MPI.COMM_WORLD.Get_size()
if backend == "cupy":
device_id = rank % np.cuda.runtime.getDeviceCount()
np.cuda.Device(device_id).use()


par1 = {
"nz": 600,
Expand Down Expand Up @@ -189,8 +204,8 @@ def test_first_derivative_forward(par):
Fop_MPI = pylops_mpi.MPIFirstDerivative(dims=par['nz'], sampling=par['dz'],
kind="forward", edge=par['edge'],
dtype=par['dtype'])
x = pylops_mpi.DistributedArray(global_shape=np.prod(par['nz']), dtype=par['dtype'],
partition=par['partition'])
x = pylops_mpi.DistributedArray(global_shape=npp.prod(par['nz']), dtype=par['dtype'],
partition=par['partition'], engine=backend)
x[:] = np.random.normal(rank, 10, x.local_shape)
x_global = x.asarray()
# Forward
Expand All @@ -200,7 +215,7 @@ def test_first_derivative_forward(par):
y_adj_dist = Fop_MPI.H @ x
y_adj = y_adj_dist.asarray()
# Dot test
dottest(Fop_MPI, x, y_dist, np.prod(par['nz']), np.prod(par['nz']))
dottest(Fop_MPI, x, y_dist, npp.prod(par['nz']), npp.prod(par['nz']))

if rank == 0:
Fop = pylops.FirstDerivative(dims=par['nz'], axis=0,
Expand All @@ -223,8 +238,8 @@ def test_first_derivative_backward(par):
Fop_MPI = pylops_mpi.MPIFirstDerivative(dims=par['nz'], sampling=par['dz'],
kind="backward", edge=par['edge'],
dtype=par['dtype'])
x = pylops_mpi.DistributedArray(global_shape=np.prod(par['nz']), dtype=par['dtype'],
partition=par['partition'])
x = pylops_mpi.DistributedArray(global_shape=npp.prod(par['nz']), dtype=par['dtype'],
partition=par['partition'], engine=backend)
x[:] = np.random.normal(rank, 10, x.local_shape)
x_global = x.asarray()
# Forward
Expand All @@ -234,7 +249,7 @@ def test_first_derivative_backward(par):
y_adj_dist = Fop_MPI.H @ x
y_adj = y_adj_dist.asarray()
# Dot test
dottest(Fop_MPI, x, y_dist, np.prod(par['nz']), np.prod(par['nz']))
dottest(Fop_MPI, x, y_dist, npp.prod(par['nz']), npp.prod(par['nz']))

if rank == 0:
Fop = pylops.FirstDerivative(dims=par['nz'], axis=0,
Expand All @@ -258,8 +273,8 @@ def test_first_derivative_centered(par):
Fop_MPI = pylops_mpi.MPIFirstDerivative(dims=par['nz'], sampling=par['dz'],
kind="centered", edge=par['edge'],
order=order, dtype=par['dtype'])
x = pylops_mpi.DistributedArray(global_shape=np.prod(par['nz']), dtype=par['dtype'],
partition=par['partition'])
x = pylops_mpi.DistributedArray(global_shape=npp.prod(par['nz']), dtype=par['dtype'],
partition=par['partition'], engine=backend)
x[:] = np.random.normal(rank, 10, x.local_shape)
x_global = x.asarray()
# Forward
Expand All @@ -269,7 +284,7 @@ def test_first_derivative_centered(par):
y_adj_dist = Fop_MPI.H @ x
y_adj = y_adj_dist.asarray()
# Dot test
dottest(Fop_MPI, x, y_dist, np.prod(par['nz']), np.prod(par['nz']))
dottest(Fop_MPI, x, y_dist, npp.prod(par['nz']), npp.prod(par['nz']))

if rank == 0:
Fop = pylops.FirstDerivative(dims=par['nz'], axis=0,
Expand All @@ -292,8 +307,8 @@ def test_second_derivative_forward(par):
Sop_MPI = pylops_mpi.basicoperators.MPISecondDerivative(dims=par['nz'], sampling=par['dz'],
kind="forward", edge=par['edge'],
dtype=par['dtype'])
x = pylops_mpi.DistributedArray(global_shape=np.prod(par['nz']), dtype=par['dtype'],
partition=par['partition'])
x = pylops_mpi.DistributedArray(global_shape=npp.prod(par['nz']), dtype=par['dtype'],
partition=par['partition'], engine=backend)
x[:] = np.random.normal(rank, 10, x.local_shape)
x_global = x.asarray()
# Forward
Expand All @@ -303,7 +318,7 @@ def test_second_derivative_forward(par):
y_adj_dist = Sop_MPI.H @ x
y_adj = y_adj_dist.asarray()
# Dot test
dottest(Sop_MPI, x, y_dist, np.prod(par['nz']), np.prod(par['nz']))
dottest(Sop_MPI, x, y_dist, npp.prod(par['nz']), npp.prod(par['nz']))

if rank == 0:
Sop = pylops.SecondDerivative(dims=par['nz'], axis=0,
Expand All @@ -326,8 +341,8 @@ def test_second_derivative_backward(par):
Sop_MPI = pylops_mpi.basicoperators.MPISecondDerivative(dims=par['nz'], sampling=par['dz'],
kind="backward", edge=par['edge'],
dtype=par['dtype'])
x = pylops_mpi.DistributedArray(global_shape=np.prod(par['nz']), dtype=par['dtype'],
partition=par['partition'])
x = pylops_mpi.DistributedArray(global_shape=npp.prod(par['nz']), dtype=par['dtype'],
partition=par['partition'], engine=backend)
x[:] = np.random.normal(rank, 10, x.local_shape)
x_global = x.asarray()
# Forward
Expand All @@ -337,7 +352,7 @@ def test_second_derivative_backward(par):
y_adj_dist = Sop_MPI.H @ x
y_adj = y_adj_dist.asarray()
# Dot test
dottest(Sop_MPI, x, y_dist, np.prod(par['nz']), np.prod(par['nz']))
dottest(Sop_MPI, x, y_dist, npp.prod(par['nz']), npp.prod(par['nz']))

if rank == 0:
Sop = pylops.SecondDerivative(dims=par['nz'], axis=0,
Expand All @@ -360,8 +375,8 @@ def test_second_derivative_centered(par):
Sop_MPI = pylops_mpi.basicoperators.MPISecondDerivative(dims=par['nz'], sampling=par['dz'],
kind="centered", edge=par['edge'],
dtype=par['dtype'])
x = pylops_mpi.DistributedArray(global_shape=np.prod(par['nz']), dtype=par['dtype'],
partition=par['partition'])
x = pylops_mpi.DistributedArray(global_shape=npp.prod(par['nz']), dtype=par['dtype'],
partition=par['partition'], engine=backend)
x[:] = np.random.normal(rank, 10, x.local_shape)
x_global = x.asarray()
# Forward
Expand All @@ -371,7 +386,7 @@ def test_second_derivative_centered(par):
y_adj_dist = Sop_MPI.H @ x
y_adj = y_adj_dist.asarray()
# Dot test
dottest(Sop_MPI, x, y_dist, np.prod(par['nz']), np.prod(par['nz']))
dottest(Sop_MPI, x, y_dist, npp.prod(par['nz']), npp.prod(par['nz']))

if rank == 0:
Sop = pylops.SecondDerivative(dims=par['nz'], axis=0,
Expand All @@ -394,7 +409,7 @@ def test_laplacian(par):
weights=par['weights'], sampling=par['sampling'],
kind=kind, edge=par['edge'],
dtype=par['dtype'])
x = pylops_mpi.DistributedArray(global_shape=np.prod(par['n']), dtype=par['dtype'])
x = pylops_mpi.DistributedArray(global_shape=npp.prod(par['n']), dtype=par['dtype'], engine=backend)
x[:] = np.random.normal(rank, 10, x.local_shape)
x_global = x.asarray()
# Forward
Expand All @@ -404,7 +419,7 @@ def test_laplacian(par):
y_adj_dist = Lop_MPI.H @ x
y_adj = y_adj_dist.asarray()
# Dot test
dottest(Lop_MPI, x, y_dist, np.prod(par['n']), np.prod(par['n']))
dottest(Lop_MPI, x, y_dist, npp.prod(par['n']), npp.prod(par['n']))

if rank == 0:
Lop = pylops.Laplacian(dims=par['n'], axes=par['axes'],
Expand All @@ -426,7 +441,7 @@ def test_gradient(par):
Gop_MPI = pylops_mpi.basicoperators.MPIGradient(dims=par['n'], sampling=par['sampling'],
kind=kind, edge=par['edge'],
dtype=par['dtype'])
x_fwd = pylops_mpi.DistributedArray(global_shape=np.prod(par['n']), dtype=par['dtype'])
x_fwd = pylops_mpi.DistributedArray(global_shape=npp.prod(par['n']), dtype=par['dtype'], engine=backend)
x_fwd[:] = np.random.normal(rank, 10, x_fwd.local_shape)
x_global = x_fwd.asarray()

Expand All @@ -436,11 +451,11 @@ def test_gradient(par):
y = y_dist.asarray()

# Adjoint
x_adj_dist1 = pylops_mpi.DistributedArray(global_shape=int(np.prod(par['n'])), dtype=par['dtype'])
x_adj_dist1 = pylops_mpi.DistributedArray(global_shape=int(npp.prod(par['n'])), dtype=par['dtype'], engine=backend)
x_adj_dist1[:] = np.random.normal(rank, 10, x_adj_dist1.local_shape)
x_adj_dist2 = pylops_mpi.DistributedArray(global_shape=int(np.prod(par['n'])), dtype=par['dtype'])
x_adj_dist2 = pylops_mpi.DistributedArray(global_shape=int(npp.prod(par['n'])), dtype=par['dtype'], engine=backend)
x_adj_dist2[:] = np.random.normal(rank, 20, x_adj_dist2.local_shape)
x_adj_dist3 = pylops_mpi.DistributedArray(global_shape=int(np.prod(par['n'])), dtype=par['dtype'])
x_adj_dist3 = pylops_mpi.DistributedArray(global_shape=int(npp.prod(par['n'])), dtype=par['dtype'], engine=backend)
x_adj_dist3[:] = np.random.normal(rank, 30, x_adj_dist3.local_shape)
x_adj = pylops_mpi.StackedDistributedArray(distarrays=[x_adj_dist1, x_adj_dist2, x_adj_dist3])
x_adj_global = x_adj.asarray()
Expand All @@ -449,7 +464,7 @@ def test_gradient(par):
y_adj = y_adj_dist.asarray()

# Dot test
dottest(Gop_MPI, x_fwd, y_dist, len(par['n']) * np.prod(par['n']), np.prod(par['n']))
dottest(Gop_MPI, x_fwd, y_dist, len(par['n']) * npp.prod(par['n']), npp.prod(par['n']))

if rank == 0:
Gop = pylops.Gradient(dims=par['n'], sampling=par['sampling'],
Expand Down
Loading
Loading