Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions src/common/net/ib/IBConnect.cc
Original file line number Diff line number Diff line change
Expand Up @@ -435,12 +435,15 @@ CoTryTask<void> IBSocket::connect(serde::ClientContext &ctx, Duration timeout) {
co_return makeError(RPCCode::kConnectFailed);
}

// post a empty msg to generate a event and notify remote side,
// post a 1-byte msg to generate a event and notify remote side,
// after this msg send success, state will change to State::READY
auto bufIdx = sendBufs_.front().first;
// empty RDMA packet might be silently droped in some certain virtualized
// RDMA environment on cloud.
auto [bufIdx, sendBuf] = sendBufs_.front();
memset(sendBuf.data(), 0, 1);
sendBufs_.pop();
if (auto ret = postSend(bufIdx, 0, IBV_SEND_SIGNALED) != 0; ret != 0) {
XLOGF(CRITICAL, "IBSocket {} failed to send empty msg, errno {}", describe(), ret);
if (auto ret = postSend(bufIdx, 1, IBV_SEND_SIGNALED) != 0; ret != 0) {
XLOGF(CRITICAL, "IBSocket {} failed to send 1-byte msg, errno {}", describe(), ret);
co_return Void{};
}
XLOGF(INFO, "IBSocket {} connected", describe());
Expand Down Expand Up @@ -485,7 +488,7 @@ Result<Void> IBSocket::accept(folly::IPAddressV4 ip, const IBConnectReq &req, Du
return makeError(RPCCode::kConnectFailed, "IBSocket failed to modify QP to RTR.");
}

// client will send a empty msg to server, then server's QP will turn to READY
// client will send a 1-byte msg to server, then server's QP will turn to READY
auto state = state_.exchange(State::ACCEPTED);
XLOGF_IF(FATAL, state != State::INIT, "State is not INIT!!!");
if (UNLIKELY(closed_)) {
Expand Down
9 changes: 8 additions & 1 deletion src/common/net/ib/IBSocket.cc
Original file line number Diff line number Diff line change
Expand Up @@ -516,8 +516,9 @@ int IBSocket::onSended(const ibv_wc &wc, Events &events) {

int IBSocket::onRecved(const ibv_wc &wc, Events &events) {
WRId wr(wc.wr_id);
bool isConnectMsg = (state_.load(std::memory_order_seq_cst) == State::ACCEPTED);

if (UNLIKELY(state_.load(std::memory_order_seq_cst) == State::ACCEPTED)) {
if (isConnectMsg) {
// turn QP from ACCEPTED to RTS when receive first msg from peer.
XLOGF(INFO, "IBSocket {} turn to READY from ACCEPTED.", describe());
if (qpReadyToSend() != 0) {
Expand All @@ -538,6 +539,12 @@ int IBSocket::onRecved(const ibv_wc &wc, Events &events) {
return postRecv(recvBufIdx);
}

// The connecting message will be consumed by the transport layer and not be
// passed to the application.
if (isConnectMsg) {
return postRecv(recvBufIdx);
}

XLOGF_IF(FATAL, wc.byte_len > recvBufs_.getBufSize(), "{} > {}", wc.byte_len, recvBufs_.getBufSize());
recvBufs_.push(recvBufIdx, wc.byte_len);
events |= kEventReadableFlag;
Expand Down