Skip to content

Commit 81d540d

Browse files
authored
Merge pull request #428 from msgpack/fix-issue-426
Fix a bug ChannelBufferInput#next blocks until the buffer is filled
2 parents 88722a4 + fdddf49 commit 81d540d

File tree

2 files changed

+47
-7
lines changed

2 files changed

+47
-7
lines changed

msgpack-core/src/main/java/org/msgpack/core/buffer/ChannelBufferInput.java

+4-6
Original file line numberDiff line numberDiff line change
@@ -62,14 +62,12 @@ public MessageBuffer next()
6262
throws IOException
6363
{
6464
ByteBuffer b = buffer.sliceAsByteBuffer();
65-
while (b.remaining() > 0) {
66-
int ret = channel.read(b);
67-
if (ret == -1) {
68-
break;
69-
}
65+
int ret = channel.read(b);
66+
if (ret == -1) {
67+
return null;
7068
}
7169
b.flip();
72-
return b.remaining() == 0 ? null : buffer.slice(0, b.limit());
70+
return buffer.slice(0, b.limit());
7371
}
7472

7573
@Override

msgpack-core/src/test/scala/org/msgpack/core/buffer/MessageBufferInputTest.scala

+43-1
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,13 @@
1616
package org.msgpack.core.buffer
1717

1818
import java.io._
19+
import java.net.{InetSocketAddress}
1920
import java.nio.ByteBuffer
21+
import java.nio.channels.{ServerSocketChannel, SocketChannel}
22+
import java.util.concurrent.{Callable, Executors, TimeUnit}
2023
import java.util.zip.{GZIPInputStream, GZIPOutputStream}
2124

22-
import org.msgpack.core.{MessagePack, MessagePackSpec, MessageUnpacker}
25+
import org.msgpack.core.{MessagePack, MessagePackSpec}
2326
import xerial.core.io.IOUtil._
2427

2528
import scala.util.Random
@@ -201,5 +204,44 @@ class MessageBufferInputTest
201204
buf.reset(in1)
202205
readInt(buf) shouldBe 42
203206
}
207+
208+
"unpack without blocking" in {
209+
val server = ServerSocketChannel.open.bind(new InetSocketAddress("localhost", 0))
210+
val executorService = Executors.newCachedThreadPool
211+
212+
try {
213+
executorService.execute(new Runnable {
214+
override def run {
215+
val server_ch = server.accept
216+
val packer = MessagePack.newDefaultPacker(server_ch)
217+
packer.packString("0123456789")
218+
packer.flush
219+
// Keep the connection open
220+
while (!executorService.isShutdown) {
221+
TimeUnit.SECONDS.sleep(1)
222+
}
223+
packer.close
224+
}
225+
})
226+
227+
val future = executorService.submit(new Callable[String] {
228+
override def call: String = {
229+
val conn_ch = SocketChannel.open(new InetSocketAddress("localhost", server.socket.getLocalPort))
230+
val unpacker = MessagePack.newDefaultUnpacker(conn_ch)
231+
val s = unpacker.unpackString
232+
unpacker.close
233+
s
234+
}
235+
})
236+
237+
future.get(5, TimeUnit.SECONDS) shouldBe "0123456789"
238+
}
239+
finally {
240+
executorService.shutdown
241+
if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) {
242+
executorService.shutdownNow
243+
}
244+
}
245+
}
204246
}
205247
}

0 commit comments

Comments
 (0)