Skip to content

Commit

Permalink
fix invalid size for getByteBufferView of 2G block
Browse files Browse the repository at this point in the history
  • Loading branch information
JeynmannZ committed Jan 4, 2024
1 parent e5a6742 commit 8efbc24
Showing 1 changed file with 7 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,8 @@ case class UcxWorkerWrapper(worker: UcpWorker, transport: UcxShuffleTransport, i
override def onSuccess(r: UcpRequest): Unit = {
request.completed = true
stats.endTime = System.nanoTime()
logDebug(s"Received rndv data of size: ${mem.size} for tag $i in " +
s"${stats.getElapsedTimeNs} ns " +
logDebug(s"Received rndv data of size: ${ucpAmData.getLength}" +
s" for tag $i in ${stats.getElapsedTimeNs} ns " +
s"time from amHandle: ${System.nanoTime() - stats.amHandleTime} ns")
for (b <- 0 until numBlocks) {
val blockSize = headerBuffer.getInt
Expand Down Expand Up @@ -280,10 +280,11 @@ case class UcxWorkerWrapper(worker: UcpWorker, transport: UcxShuffleTransport, i

def handleFetchBlockRequest(blocks: Seq[Block], replyTag: Int, replyExecutor: Long): Unit = try {
val tagAndSizes = UnsafeUtils.INT_SIZE + UnsafeUtils.INT_SIZE * blocks.length
val resultMemory = transport.hostBounceBufferMemoryPool.get(tagAndSizes + blocks.map(_.getSize).sum)
val msgSize = tagAndSizes + blocks.map(_.getSize).sum
val resultMemory = transport.hostBounceBufferMemoryPool.get(msgSize)
.asInstanceOf[UcxBounceBufferMemoryBlock]
val resultBuffer = UcxUtils.getByteBufferView(resultMemory.address,
resultMemory.size)
msgSize)
resultBuffer.putInt(replyTag)

var offset = 0
Expand Down Expand Up @@ -311,9 +312,9 @@ case class UcxWorkerWrapper(worker: UcpWorker, transport: UcxShuffleTransport, i

val startTime = System.nanoTime()
val req = connections(replyExecutor).sendAmNonBlocking(1, resultMemory.address, tagAndSizes,
resultMemory.address + tagAndSizes, resultMemory.size - tagAndSizes, 0, new UcxCallback {
resultMemory.address + tagAndSizes, msgSize - tagAndSizes, 0, new UcxCallback {
override def onSuccess(request: UcpRequest): Unit = {
logTrace(s"Sent ${blocks.length} blocks of size: ${resultMemory.size} " +
logTrace(s"Sent ${blocks.length} blocks of size: ${msgSize} " +
s"to tag $replyTag in ${System.nanoTime() - startTime} ns.")
transport.hostBounceBufferMemoryPool.put(resultMemory)
}
Expand Down

0 comments on commit 8efbc24

Please sign in to comment.