diff --git a/src/common/net/ib/IBConnect.cc b/src/common/net/ib/IBConnect.cc index 92a3c981..cc71ae93 100644 --- a/src/common/net/ib/IBConnect.cc +++ b/src/common/net/ib/IBConnect.cc @@ -435,12 +435,15 @@ CoTryTask IBSocket::connect(serde::ClientContext &ctx, Duration timeout) { co_return makeError(RPCCode::kConnectFailed); } - // post a empty msg to generate a event and notify remote side, + // post a 1-byte msg to generate a event and notify remote side, // after this msg send success, state will change to State::READY - auto bufIdx = sendBufs_.front().first; + // empty RDMA packet might be silently dropped in some certain virtualized + // RDMA environment on cloud. + auto [bufIdx, sendBuf] = sendBufs_.front(); + memset(sendBuf.data(), 0, 1); sendBufs_.pop(); - if (auto ret = postSend(bufIdx, 0, IBV_SEND_SIGNALED) != 0; ret != 0) { - XLOGF(CRITICAL, "IBSocket {} failed to send empty msg, errno {}", describe(), ret); + if (auto ret = postSend(bufIdx, 1, IBV_SEND_SIGNALED) != 0; ret != 0) { + XLOGF(CRITICAL, "IBSocket {} failed to send 1-byte msg, errno {}", describe(), ret); co_return Void{}; } XLOGF(INFO, "IBSocket {} connected", describe()); @@ -485,7 +488,7 @@ Result IBSocket::accept(folly::IPAddressV4 ip, const IBConnectReq &req, Du return makeError(RPCCode::kConnectFailed, "IBSocket failed to modify QP to RTR."); } - // client will send a empty msg to server, then server's QP will turn to READY + // client will send a 1-byte msg to server, then server's QP will turn to READY auto state = state_.exchange(State::ACCEPTED); XLOGF_IF(FATAL, state != State::INIT, "State is not INIT!!!"); if (UNLIKELY(closed_)) { diff --git a/src/common/net/ib/IBSocket.cc b/src/common/net/ib/IBSocket.cc index 45e97bcc..bf346c8d 100644 --- a/src/common/net/ib/IBSocket.cc +++ b/src/common/net/ib/IBSocket.cc @@ -516,8 +516,9 @@ int IBSocket::onSended(const ibv_wc &wc, Events &events) { int IBSocket::onRecved(const ibv_wc &wc, Events &events) { WRId wr(wc.wr_id); + bool isConnectMsg = (state_.load(std::memory_order_seq_cst) == State::ACCEPTED); - if (UNLIKELY(state_.load(std::memory_order_seq_cst) == State::ACCEPTED)) { + if (isConnectMsg) { // turn QP from ACCEPTED to RTS when receive first msg from peer. XLOGF(INFO, "IBSocket {} turn to READY from ACCEPTED.", describe()); if (qpReadyToSend() != 0) { @@ -538,6 +539,12 @@ int IBSocket::onRecved(const ibv_wc &wc, Events &events) { return postRecv(recvBufIdx); } + // The connecting message will be consumed by the transport layer and not be + // passed to the application. + if (isConnectMsg) { + return postRecv(recvBufIdx); + } + XLOGF_IF(FATAL, wc.byte_len > recvBufs_.getBufSize(), "{} > {}", wc.byte_len, recvBufs_.getBufSize()); recvBufs_.push(recvBufIdx, wc.byte_len); events |= kEventReadableFlag;