diff --git a/examples/send_recv.cc b/examples/send_recv.cc new file mode 100644 index 0000000..ad25337 --- /dev/null +++ b/examples/send_recv.cc @@ -0,0 +1,181 @@ +/** + * InfiniCCL Example: SendRecv + * + * This example demonstrates point-to-point `infiniSend` and `infiniRecv` + * operations between rank 0 and rank 1 on accelerator memory. + */ + +#include + +#include +#include +#include +#include + +#include "backend_manifest.h" +#include "device.h" +#include "infiniccl.h" +#include "runtime.h" +#include "traits.h" +#include "utils.h" + +namespace ccl = infini::ccl; + +void RunSendRecvExample(int argc, char **argv, int warmup_iter, + int profile_iter, size_t num_elements) { + constexpr ccl::Device::Type kDevType = + ccl::ListGetBest(ccl::EnabledDevices{}); + using Rt = ccl::Runtime; + + CHECK_INFINI(infiniInit(&argc, &argv)); + + int rank = 0; + int size = 0; + CHECK_INFINI(infiniGetRank(&rank)); + CHECK_INFINI(infiniGetSize(&size)); + + char hostname[256]; + gethostname(hostname, sizeof(hostname)); + + const char *local_rank_str = std::getenv("OMPI_COMM_WORLD_LOCAL_RANK"); + int local_rank = 0; + if (local_rank_str != nullptr) { + local_rank = std::atoi(local_rank_str); + } + + std::cout << "[Rank " << rank << "] Host: " << hostname + << " | GPU: " << ccl::Device::StringFromType(kDevType) << " " + << " | Device " << local_rank << std::endl; + + constexpr int kSender = 0; + constexpr int kReceiver = 1; + constexpr int kRequiredRanks = 2; + constexpr float kSendValue = 7.0f; + + if (size < kRequiredRanks) { + if (rank == kSender) { + std::cerr << "Send/Recv example requires at least 2 ranks." << std::endl; + } + + CHECK_INFINI(infiniFinalize()); + return; + } + + infiniComm_t comm = nullptr; + CHECK_INFINI(infiniCommInitAll(&comm, size, nullptr)); + + std::vector h_send(num_elements, kSendValue); + std::vector h_recv(num_elements, 0.0f); + + float *d_send = nullptr; + float *d_recv = nullptr; + size_t total_bytes = num_elements * sizeof(*d_send); + + CHECK_RT(Rt, Rt::Malloc(&d_send, total_bytes)); + CHECK_RT(Rt, Rt::Malloc(&d_recv, total_bytes)); + CHECK_RT(Rt, Rt::Memcpy(d_send, h_send.data(), total_bytes, + Rt::MemcpyHostToDevice)); + CHECK_RT(Rt, Rt::Memcpy(d_recv, h_recv.data(), total_bytes, + Rt::MemcpyHostToDevice)); + + if (rank == kSender) { + std::cout << "\n=== Performing Send/Recv on GPU Memory ===" << std::endl; + std::cout << "Sender rank: " << kSender << std::endl; + std::cout << "Receiver rank: " << kReceiver << std::endl; + std::cout << "Data size: " << num_elements << " floats (" + << total_bytes / 1024 / 1024 << " MB)" << std::endl; + std::cout << "Warm-up iterations: " << warmup_iter << std::endl; + std::cout << "Profile iterations: " << profile_iter << std::endl; + } + + CHECK_RT(Rt, Rt::StreamSynchronize(nullptr)); + + auto send_recv_call = [&]() { + if (rank == kSender) { + return infiniSend(d_send, num_elements, infiniFloat32, kReceiver, comm, + nullptr); + } + + if (rank == kReceiver) { + return infiniRecv(d_recv, num_elements, infiniFloat32, kSender, comm, + nullptr); + } + + return infiniSuccess; + }; + + CHECK_INFINI(send_recv_call()); + if (rank == kReceiver) { + CHECK_RT(Rt, Rt::Memcpy(h_recv.data(), d_recv, total_bytes, + Rt::MemcpyDeviceToHost)); + } + + for (int i = 1; i < warmup_iter; ++i) { + CHECK_INFINI(send_recv_call()); + } + CHECK_RT(Rt, Rt::StreamSynchronize(nullptr)); + + Timer timer; + + for (int i = 0; i < profile_iter; ++i) { + CHECK_INFINI(send_recv_call()); + } + + CHECK_RT(Rt, Rt::StreamSynchronize(nullptr)); + if (rank == kReceiver) { + CHECK_RT(Rt, Rt::Memcpy(h_recv.data(), d_recv, total_bytes, + Rt::MemcpyDeviceToHost)); + } + + double elapsed = timer.elapsed_ms() / static_cast(profile_iter); + + if (rank == kReceiver) { + bool correct = Validator::ValidateResult( + h_recv.data(), num_elements, kSendValue, rank, false, "SendRecv"); + + const char *kGreen = "\033[32m"; + const char *kRed = "\033[31m"; + const char *kReset = "\033[0m"; + + std::cout << "\n=== Send/Recv Results ===" << std::endl; + std::cout << "Correct: " + << (correct ? (kGreen + std::string("YES") + kReset) + : (kRed + std::string("NO") + kReset)) + << std::endl; + std::cout << "Expect: " << kSendValue << std::endl; + std::cout << "Actual: " << h_recv[0] << std::endl; + + if (!correct) { + CHECK_RT(Rt, Rt::Free(d_send)); + CHECK_RT(Rt, Rt::Free(d_recv)); + CHECK_INFINI(infiniCommDestroy(comm)); + CHECK_INFINI(infiniFinalize()); + std::exit(EXIT_FAILURE); + } + } + + if (rank == kSender) { + Metrics metrics{elapsed, total_bytes, kRequiredRanks}; + metrics.Print(); + } + + CHECK_RT(Rt, Rt::Free(d_send)); + CHECK_RT(Rt, Rt::Free(d_recv)); + + CHECK_INFINI(infiniCommDestroy(comm)); + CHECK_INFINI(infiniFinalize()); + + if (rank == kSender) { + std::cout << "InfiniCCL finalized." << std::endl; + } +} + +int main(int argc, char **argv) { + int warmup_iters = 2; + int profile_iters = 20; + size_t num_elements = 1 << 20; + + RunSendRecvExample(argc, argv, warmup_iters, profile_iters, num_elements); + + return EXIT_SUCCESS; +} diff --git a/include/comm.h b/include/comm.h index fae0c04..65a38e2 100644 --- a/include/comm.h +++ b/include/comm.h @@ -61,6 +61,14 @@ infiniResult_t infiniAllToAll(const void *sendbuff, void *recvbuff, size_t count, infiniDataType_t datatype, infiniComm_t comm, void *stream); +infiniResult_t infiniSend(const void *sendbuff, size_t count, + infiniDataType_t datatype, int peer, + infiniComm_t comm, void *stream); + +infiniResult_t infiniRecv(void *recvbuff, size_t count, + infiniDataType_t datatype, int peer, + infiniComm_t comm, void *stream); + #ifdef __cplusplus } #endif diff --git a/src/base/recv.h b/src/base/recv.h new file mode 100644 index 0000000..43d89c1 --- /dev/null +++ b/src/base/recv.h @@ -0,0 +1,61 @@ +#ifndef INFINI_CCL_BASE_RECV_H_ +#define INFINI_CCL_BASE_RECV_H_ + +#include "communicator.h" +#include "data_type_impl.h" +#include "logging.h" +#include "operation.h" +#include "return_status_impl.h" + +namespace infini::ccl { + +template +struct RecvImpl; + +class Recv : public Operation { + public: + template + static ReturnStatus Execute(void *recv_buff, size_t count, DataType datatype, + int peer, void *comm_handle, void *stream) { + if (!comm_handle) { + LOG("Invalid communicator handle for `Recv`."); + return ReturnStatus::kInvalidArgument; + } + + auto *comm = static_cast(comm_handle); + if (HasInvalidArgs(recv_buff, count, datatype, peer, comm)) { + return ReturnStatus::kInvalidArgument; + } + if (count == 0) { + return ReturnStatus::kSuccess; + } + + return RecvImpl::Apply( + recv_buff, count, datatype, peer, comm, stream); + } + + private: + static bool HasInvalidArgs(const void *recv_buff, size_t count, + DataType datatype, int peer, Communicator *comm) { + if (datatype < DataType::kChar || datatype >= DataType::kNumTypes) { + LOG("Invalid data type for `Recv`."); + return true; + } + if (peer < 0 || peer >= comm->size()) { + LOG("Invalid peer rank for `Recv`."); + return true; + } + if (count == 0) { + return false; + } + if (!recv_buff) { + LOG("Invalid receive buffer pointer for `Recv`."); + return true; + } + return false; + } +}; + +} // namespace infini::ccl + +#endif // INFINI_CCL_BASE_RECV_H_ diff --git a/src/base/send.h b/src/base/send.h new file mode 100644 index 0000000..fbc3657 --- /dev/null +++ b/src/base/send.h @@ -0,0 +1,62 @@ +#ifndef INFINI_CCL_BASE_SEND_H_ +#define INFINI_CCL_BASE_SEND_H_ + +#include "communicator.h" +#include "data_type_impl.h" +#include "logging.h" +#include "operation.h" +#include "return_status_impl.h" + +namespace infini::ccl { + +template +struct SendImpl; + +class Send : public Operation { + public: + template + static ReturnStatus Execute(const void *send_buff, size_t count, + DataType datatype, int peer, void *comm_handle, + void *stream) { + if (!comm_handle) { + LOG("Invalid communicator handle for `Send`."); + return ReturnStatus::kInvalidArgument; + } + + auto *comm = static_cast(comm_handle); + if (HasInvalidArgs(send_buff, count, datatype, peer, comm)) { + return ReturnStatus::kInvalidArgument; + } + if (count == 0) { + return ReturnStatus::kSuccess; + } + + return SendImpl::Apply( + send_buff, count, datatype, peer, comm, stream); + } + + private: + static bool HasInvalidArgs(const void *send_buff, size_t count, + DataType datatype, int peer, Communicator *comm) { + if (datatype < DataType::kChar || datatype >= DataType::kNumTypes) { + LOG("Invalid data type for `Send`."); + return true; + } + if (peer < 0 || peer >= comm->size()) { + LOG("Invalid peer rank for `Send`."); + return true; + } + if (count == 0) { + return false; + } + if (!send_buff) { + LOG("Invalid send buffer pointer for `Send`."); + return true; + } + return false; + } +}; + +} // namespace infini::ccl + +#endif // INFINI_CCL_BASE_SEND_H_ diff --git a/src/ompi/impl/recv.h b/src/ompi/impl/recv.h new file mode 100644 index 0000000..5b70ad4 --- /dev/null +++ b/src/ompi/impl/recv.h @@ -0,0 +1,72 @@ +#ifndef INFINI_CCL_OMPI_IMPL_RECV_H_ +#define INFINI_CCL_OMPI_IMPL_RECV_H_ + +#include +#include + +#include "base/recv.h" +#include "communicator.h" +#include "data_type_impl.h" +#include "logging.h" +#include "ompi/checks.h" +#include "ompi/comm_instance.h" + +namespace infini::ccl { + +template +class RecvImpl { + public: + static ReturnStatus Apply(void *recv_buff, size_t count, DataType data_type, + int peer, Communicator *comm, void *stream) { + constexpr Device::Type kDev = + ListGetBest(ActiveDevices{}); + + auto *inst = static_cast(comm->inter_comm()); + if (!inst || inst->handle == MPI_COMM_NULL) { + LOG("Invalid `OpenMPI` communicator instance for `Recv`."); + return ReturnStatus::kInternalError; + } + + size_t type_size = kDataTypeToSize.at(data_type); + if (count > std::numeric_limits::max() / type_size) { + LOG("Byte size overflow for `Recv`."); + return ReturnStatus::kInvalidArgument; + } + + size_t total_bytes = count * type_size; + void *host_buf = std::malloc(total_bytes); + if (!host_buf) { + LOG("Failed to allocate host buffer for `Recv` staging."); + return ReturnStatus::kSystemError; + } + + auto *bytes = static_cast(host_buf); + size_t offset = 0; + constexpr size_t kMaxMpiCount = + static_cast(std::numeric_limits::max()); + constexpr int kTag = 0; + while (offset < total_bytes) { + size_t chunk = total_bytes - offset; + if (chunk > kMaxMpiCount) { + chunk = kMaxMpiCount; + } + INFINI_CHECK_MPI(MPI_Recv(bytes + offset, static_cast(chunk), + MPI_BYTE, peer, kTag, inst->handle, + MPI_STATUS_IGNORE)); + offset += chunk; + } + + Runtime::Memcpy(recv_buff, host_buf, total_bytes, + Runtime::MemcpyHostToDevice); + + std::free(host_buf); + return ReturnStatus::kSuccess; + } +}; + +template <> +struct BackendEnabled : std::true_type {}; + +} // namespace infini::ccl + +#endif // INFINI_CCL_OMPI_IMPL_RECV_H_ diff --git a/src/ompi/impl/send.h b/src/ompi/impl/send.h new file mode 100644 index 0000000..23b38db --- /dev/null +++ b/src/ompi/impl/send.h @@ -0,0 +1,74 @@ +#ifndef INFINI_CCL_OMPI_IMPL_SEND_H_ +#define INFINI_CCL_OMPI_IMPL_SEND_H_ + +#include +#include + +#include "base/send.h" +#include "communicator.h" +#include "data_type_impl.h" +#include "logging.h" +#include "ompi/checks.h" +#include "ompi/comm_instance.h" + +namespace infini::ccl { + +template +class SendImpl { + public: + static ReturnStatus Apply(const void *send_buff, size_t count, + DataType data_type, int peer, Communicator *comm, + void *stream) { + constexpr Device::Type kDev = + ListGetBest(ActiveDevices{}); + + auto *inst = static_cast(comm->inter_comm()); + if (!inst || inst->handle == MPI_COMM_NULL) { + LOG("Invalid `OpenMPI` communicator instance for `Send`."); + return ReturnStatus::kInternalError; + } + + size_t type_size = kDataTypeToSize.at(data_type); + if (count > std::numeric_limits::max() / type_size) { + LOG("Byte size overflow for `Send`."); + return ReturnStatus::kInvalidArgument; + } + + size_t total_bytes = count * type_size; + void *host_buf = std::malloc(total_bytes); + if (!host_buf) { + LOG("Failed to allocate host buffer for `Send` staging."); + return ReturnStatus::kSystemError; + } + + Runtime::Memcpy(host_buf, send_buff, total_bytes, + Runtime::MemcpyDeviceToHost); + Runtime::StreamSynchronize( + static_cast::Stream>(stream)); + + auto *bytes = static_cast(host_buf); + size_t offset = 0; + constexpr size_t kMaxMpiCount = + static_cast(std::numeric_limits::max()); + constexpr int kTag = 0; + while (offset < total_bytes) { + size_t chunk = total_bytes - offset; + if (chunk > kMaxMpiCount) { + chunk = kMaxMpiCount; + } + INFINI_CHECK_MPI(MPI_Send(bytes + offset, static_cast(chunk), + MPI_BYTE, peer, kTag, inst->handle)); + offset += chunk; + } + + std::free(host_buf); + return ReturnStatus::kSuccess; + } +}; + +template <> +struct BackendEnabled : std::true_type {}; + +} // namespace infini::ccl + +#endif // INFINI_CCL_OMPI_IMPL_SEND_H_