Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
181 changes: 181 additions & 0 deletions examples/send_recv.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
/**
* InfiniCCL Example: SendRecv
*
* This example demonstrates point-to-point `infiniSend` and `infiniRecv`
* operations between rank 0 and rank 1 on accelerator memory.
*/

#include <unistd.h>

#include <cstdlib>
#include <iostream>
#include <string>
#include <vector>

#include "backend_manifest.h"
#include "device.h"
#include "infiniccl.h"
#include "runtime.h"
#include "traits.h"
#include "utils.h"

namespace ccl = infini::ccl;

void RunSendRecvExample(int argc, char **argv, int warmup_iter,
int profile_iter, size_t num_elements) {
constexpr ccl::Device::Type kDevType =
ccl::ListGetBest<ccl::DevicePriority>(ccl::EnabledDevices{});
using Rt = ccl::Runtime<kDevType>;

CHECK_INFINI(infiniInit(&argc, &argv));

int rank = 0;
int size = 0;
CHECK_INFINI(infiniGetRank(&rank));
CHECK_INFINI(infiniGetSize(&size));

char hostname[256];
gethostname(hostname, sizeof(hostname));

const char *local_rank_str = std::getenv("OMPI_COMM_WORLD_LOCAL_RANK");
int local_rank = 0;
if (local_rank_str != nullptr) {
local_rank = std::atoi(local_rank_str);
}

std::cout << "[Rank " << rank << "] Host: " << hostname
<< " | GPU: " << ccl::Device::StringFromType(kDevType) << " "
<< " | Device " << local_rank << std::endl;

constexpr int kSender = 0;
constexpr int kReceiver = 1;
constexpr int kRequiredRanks = 2;
constexpr float kSendValue = 7.0f;

if (size < kRequiredRanks) {
if (rank == kSender) {
std::cerr << "Send/Recv example requires at least 2 ranks." << std::endl;
}

CHECK_INFINI(infiniFinalize());
return;
}

infiniComm_t comm = nullptr;
CHECK_INFINI(infiniCommInitAll(&comm, size, nullptr));

std::vector<float> h_send(num_elements, kSendValue);
std::vector<float> h_recv(num_elements, 0.0f);

float *d_send = nullptr;
float *d_recv = nullptr;
size_t total_bytes = num_elements * sizeof(*d_send);

CHECK_RT(Rt, Rt::Malloc(&d_send, total_bytes));
CHECK_RT(Rt, Rt::Malloc(&d_recv, total_bytes));
CHECK_RT(Rt, Rt::Memcpy(d_send, h_send.data(), total_bytes,
Rt::MemcpyHostToDevice));
CHECK_RT(Rt, Rt::Memcpy(d_recv, h_recv.data(), total_bytes,
Rt::MemcpyHostToDevice));

if (rank == kSender) {
std::cout << "\n=== Performing Send/Recv on GPU Memory ===" << std::endl;
std::cout << "Sender rank: " << kSender << std::endl;
std::cout << "Receiver rank: " << kReceiver << std::endl;
std::cout << "Data size: " << num_elements << " floats ("
<< total_bytes / 1024 / 1024 << " MB)" << std::endl;
std::cout << "Warm-up iterations: " << warmup_iter << std::endl;
std::cout << "Profile iterations: " << profile_iter << std::endl;
}

CHECK_RT(Rt, Rt::StreamSynchronize(nullptr));

auto send_recv_call = [&]() {
if (rank == kSender) {
return infiniSend(d_send, num_elements, infiniFloat32, kReceiver, comm,
nullptr);
}

if (rank == kReceiver) {
return infiniRecv(d_recv, num_elements, infiniFloat32, kSender, comm,
nullptr);
}

return infiniSuccess;
};

CHECK_INFINI(send_recv_call());
if (rank == kReceiver) {
CHECK_RT(Rt, Rt::Memcpy(h_recv.data(), d_recv, total_bytes,
Rt::MemcpyDeviceToHost));
}

for (int i = 1; i < warmup_iter; ++i) {
CHECK_INFINI(send_recv_call());
}
CHECK_RT(Rt, Rt::StreamSynchronize(nullptr));

Timer timer;

for (int i = 0; i < profile_iter; ++i) {
CHECK_INFINI(send_recv_call());
}

CHECK_RT(Rt, Rt::StreamSynchronize(nullptr));
if (rank == kReceiver) {
CHECK_RT(Rt, Rt::Memcpy(h_recv.data(), d_recv, total_bytes,
Rt::MemcpyDeviceToHost));
}

double elapsed = timer.elapsed_ms() / static_cast<double>(profile_iter);

if (rank == kReceiver) {
bool correct = Validator::ValidateResult(
h_recv.data(), num_elements, kSendValue, rank, false, "SendRecv");

const char *kGreen = "\033[32m";
const char *kRed = "\033[31m";
const char *kReset = "\033[0m";

std::cout << "\n=== Send/Recv Results ===" << std::endl;
std::cout << "Correct: "
<< (correct ? (kGreen + std::string("YES") + kReset)
: (kRed + std::string("NO") + kReset))
<< std::endl;
std::cout << "Expect: " << kSendValue << std::endl;
std::cout << "Actual: " << h_recv[0] << std::endl;

if (!correct) {
CHECK_RT(Rt, Rt::Free(d_send));
CHECK_RT(Rt, Rt::Free(d_recv));
CHECK_INFINI(infiniCommDestroy(comm));
CHECK_INFINI(infiniFinalize());
std::exit(EXIT_FAILURE);
}
}

if (rank == kSender) {
Metrics metrics{elapsed, total_bytes, kRequiredRanks};
metrics.Print();
}

CHECK_RT(Rt, Rt::Free(d_send));
CHECK_RT(Rt, Rt::Free(d_recv));

CHECK_INFINI(infiniCommDestroy(comm));
CHECK_INFINI(infiniFinalize());

if (rank == kSender) {
std::cout << "InfiniCCL finalized." << std::endl;
}
}

int main(int argc, char **argv) {
int warmup_iters = 2;
int profile_iters = 20;
size_t num_elements = 1 << 20;

RunSendRecvExample(argc, argv, warmup_iters, profile_iters, num_elements);

return EXIT_SUCCESS;
}
8 changes: 8 additions & 0 deletions include/comm.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,14 @@ infiniResult_t infiniAllToAll(const void *sendbuff, void *recvbuff,
size_t count, infiniDataType_t datatype,
infiniComm_t comm, void *stream);

infiniResult_t infiniSend(const void *sendbuff, size_t count,
infiniDataType_t datatype, int peer,
infiniComm_t comm, void *stream);

infiniResult_t infiniRecv(void *recvbuff, size_t count,
infiniDataType_t datatype, int peer,
infiniComm_t comm, void *stream);

#ifdef __cplusplus
}
#endif
Expand Down
61 changes: 61 additions & 0 deletions src/base/recv.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
#ifndef INFINI_CCL_BASE_RECV_H_
#define INFINI_CCL_BASE_RECV_H_

#include "communicator.h"
#include "data_type_impl.h"
#include "logging.h"
#include "operation.h"
#include "return_status_impl.h"

namespace infini::ccl {

template <BackendType backend_type, Device::Type device_type>
struct RecvImpl;

class Recv : public Operation<Recv> {
public:
template <BackendType backend_type, Device::Type device_type>
static ReturnStatus Execute(void *recv_buff, size_t count, DataType datatype,
int peer, void *comm_handle, void *stream) {
if (!comm_handle) {
LOG("Invalid communicator handle for `Recv`.");
return ReturnStatus::kInvalidArgument;
}

auto *comm = static_cast<Communicator *>(comm_handle);
if (HasInvalidArgs(recv_buff, count, datatype, peer, comm)) {
return ReturnStatus::kInvalidArgument;
}
if (count == 0) {
return ReturnStatus::kSuccess;
}

return RecvImpl<backend_type, device_type>::Apply(
recv_buff, count, datatype, peer, comm, stream);
}

private:
static bool HasInvalidArgs(const void *recv_buff, size_t count,
DataType datatype, int peer, Communicator *comm) {
if (datatype < DataType::kChar || datatype >= DataType::kNumTypes) {
LOG("Invalid data type for `Recv`.");
return true;
}
if (peer < 0 || peer >= comm->size()) {
LOG("Invalid peer rank for `Recv`.");
return true;
}
if (count == 0) {
return false;
}
if (!recv_buff) {
LOG("Invalid receive buffer pointer for `Recv`.");
return true;
}
return false;
}
};

} // namespace infini::ccl

#endif // INFINI_CCL_BASE_RECV_H_
62 changes: 62 additions & 0 deletions src/base/send.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
#ifndef INFINI_CCL_BASE_SEND_H_
#define INFINI_CCL_BASE_SEND_H_

#include "communicator.h"
#include "data_type_impl.h"
#include "logging.h"
#include "operation.h"
#include "return_status_impl.h"

namespace infini::ccl {

template <BackendType backend_type, Device::Type device_type>
struct SendImpl;

class Send : public Operation<Send> {
public:
template <BackendType backend_type, Device::Type device_type>
static ReturnStatus Execute(const void *send_buff, size_t count,
DataType datatype, int peer, void *comm_handle,
void *stream) {
if (!comm_handle) {
LOG("Invalid communicator handle for `Send`.");
return ReturnStatus::kInvalidArgument;
}

auto *comm = static_cast<Communicator *>(comm_handle);
if (HasInvalidArgs(send_buff, count, datatype, peer, comm)) {
return ReturnStatus::kInvalidArgument;
}
if (count == 0) {
return ReturnStatus::kSuccess;
}

return SendImpl<backend_type, device_type>::Apply(
send_buff, count, datatype, peer, comm, stream);
}

private:
static bool HasInvalidArgs(const void *send_buff, size_t count,
DataType datatype, int peer, Communicator *comm) {
if (datatype < DataType::kChar || datatype >= DataType::kNumTypes) {
LOG("Invalid data type for `Send`.");
return true;
}
if (peer < 0 || peer >= comm->size()) {
LOG("Invalid peer rank for `Send`.");
return true;
}
if (count == 0) {
return false;
}
if (!send_buff) {
LOG("Invalid send buffer pointer for `Send`.");
return true;
}
return false;
}
};

} // namespace infini::ccl

#endif // INFINI_CCL_BASE_SEND_H_
72 changes: 72 additions & 0 deletions src/ompi/impl/recv.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
#ifndef INFINI_CCL_OMPI_IMPL_RECV_H_
#define INFINI_CCL_OMPI_IMPL_RECV_H_

#include <cstdlib>
#include <limits>

#include "base/recv.h"
#include "communicator.h"
#include "data_type_impl.h"
#include "logging.h"
#include "ompi/checks.h"
#include "ompi/comm_instance.h"

namespace infini::ccl {

template <Device::Type device_type>
class RecvImpl<BackendType::kOmpi, device_type> {
public:
static ReturnStatus Apply(void *recv_buff, size_t count, DataType data_type,
int peer, Communicator *comm, void *stream) {
constexpr Device::Type kDev =
ListGetBest<DevicePriority>(ActiveDevices<Recv>{});

auto *inst = static_cast<OmpiInstance *>(comm->inter_comm());
if (!inst || inst->handle == MPI_COMM_NULL) {
LOG("Invalid `OpenMPI` communicator instance for `Recv`.");
return ReturnStatus::kInternalError;
}

size_t type_size = kDataTypeToSize.at(data_type);
if (count > std::numeric_limits<size_t>::max() / type_size) {
LOG("Byte size overflow for `Recv`.");
return ReturnStatus::kInvalidArgument;
}

size_t total_bytes = count * type_size;
void *host_buf = std::malloc(total_bytes);
if (!host_buf) {
LOG("Failed to allocate host buffer for `Recv` staging.");
return ReturnStatus::kSystemError;
}

auto *bytes = static_cast<char *>(host_buf);
size_t offset = 0;
constexpr size_t kMaxMpiCount =
static_cast<size_t>(std::numeric_limits<int>::max());
constexpr int kTag = 0;
while (offset < total_bytes) {
size_t chunk = total_bytes - offset;
if (chunk > kMaxMpiCount) {
chunk = kMaxMpiCount;
}
INFINI_CHECK_MPI(MPI_Recv(bytes + offset, static_cast<int>(chunk),
MPI_BYTE, peer, kTag, inst->handle,
MPI_STATUS_IGNORE));
offset += chunk;
}

Runtime<kDev>::Memcpy(recv_buff, host_buf, total_bytes,
Runtime<kDev>::MemcpyHostToDevice);
Comment on lines +59 to +60
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Propagate failed host-to-device receives

When the receive destination is an invalid/stale device pointer or the H2D copy fails, infiniRecv still frees the staging buffer and returns success because the Runtime<kDev>::Memcpy result is ignored. In that scenario the MPI receive completed but the user's device buffer was not updated, so callers get silent data corruption instead of an error status.

Useful? React with 👍 / 👎.


std::free(host_buf);
return ReturnStatus::kSuccess;
}
};

template <>
struct BackendEnabled<Recv, BackendType::kOmpi> : std::true_type {};

} // namespace infini::ccl

#endif // INFINI_CCL_OMPI_IMPL_RECV_H_
Loading