Skip to content

Commit 56ac516

Browse files
committed
[#691] Improve memory usage via reuse InputStreamBufferInput
1 parent f1ac3dd commit 56ac516

File tree

13 files changed

+218
-28
lines changed

13 files changed

+218
-28
lines changed

msgpack-core/src/main/java/org/msgpack/core/buffer/ArrayBufferInput.java

+8-3
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
* MessageBufferInput adapter for byte arrays
2222
*/
2323
public class ArrayBufferInput
24-
implements MessageBufferInput
24+
implements MessageBufferInput<byte[]>
2525
{
2626
private MessageBuffer buffer;
2727
private boolean isEmpty;
@@ -66,9 +66,14 @@ public MessageBuffer reset(MessageBuffer buf)
6666
return old;
6767
}
6868

69-
public void reset(byte[] arr)
69+
@Override
70+
public byte[] reset(byte[] arr)
7071
{
71-
reset(MessageBuffer.wrap(checkNotNull(arr, "input array is null")));
72+
final MessageBuffer messageBuffer = reset(MessageBuffer.wrap(checkNotNull(arr, "input array is null")));
73+
if (messageBuffer == null) {
74+
return null;
75+
}
76+
return messageBuffer.array();
7277
}
7378

7479
public void reset(byte[] arr, int offset, int len)

msgpack-core/src/main/java/org/msgpack/core/buffer/ByteBufferInput.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
* {@link MessageBufferInput} adapter for {@link java.nio.ByteBuffer}
2424
*/
2525
public class ByteBufferInput
26-
implements MessageBufferInput
26+
implements MessageBufferInput<ByteBuffer>
2727
{
2828
private ByteBuffer input;
2929
private boolean isRead = false;
@@ -39,6 +39,7 @@ public ByteBufferInput(ByteBuffer input)
3939
* @param input new buffer
4040
* @return the old buffer
4141
*/
42+
@Override
4243
public ByteBuffer reset(ByteBuffer input)
4344
{
4445
ByteBuffer old = this.input;

msgpack-core/src/main/java/org/msgpack/core/buffer/ChannelBufferInput.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
* {@link MessageBufferInput} adapter for {@link java.nio.channels.ReadableByteChannel}
2727
*/
2828
public class ChannelBufferInput
29-
implements MessageBufferInput
29+
implements MessageBufferInput<ReadableByteChannel>
3030
{
3131
private ReadableByteChannel channel;
3232
private final MessageBuffer buffer;
@@ -49,8 +49,8 @@ public ChannelBufferInput(ReadableByteChannel channel, int bufferSize)
4949
* @param channel new channel
5050
* @return the old resource
5151
*/
52+
@Override
5253
public ReadableByteChannel reset(ReadableByteChannel channel)
53-
throws IOException
5454
{
5555
ReadableByteChannel old = this.channel;
5656
this.channel = channel;

msgpack-core/src/main/java/org/msgpack/core/buffer/InputStreamBufferInput.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
* {@link MessageBufferInput} adapter for {@link InputStream}
2727
*/
2828
public class InputStreamBufferInput
29-
implements MessageBufferInput
29+
implements MessageBufferInput<InputStream>
3030
{
3131
private InputStream in;
3232
private final byte[] buffer;
@@ -60,8 +60,8 @@ public InputStreamBufferInput(InputStream in, int bufferSize)
6060
* @param in new stream
6161
* @return the old resource
6262
*/
63+
@Override
6364
public InputStream reset(InputStream in)
64-
throws IOException
6565
{
6666
InputStream old = this.in;
6767
this.in = in;

msgpack-core/src/main/java/org/msgpack/core/buffer/MessageBufferInput.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
* A MessageBufferInput implementation has control of lifecycle of the memory so that it can reuse previously
2525
* allocated memory, use memory pools, or use memory-mapped files.
2626
*/
27-
public interface MessageBufferInput
27+
public interface MessageBufferInput<T>
2828
extends Closeable
2929
{
3030
/**
@@ -40,6 +40,8 @@ public interface MessageBufferInput
4040
MessageBuffer next()
4141
throws IOException;
4242

43+
T reset(T input);
44+
4345
/**
4446
* Closes the input.
4547
* <p>

msgpack-core/src/test/java/org/msgpack/core/buffer/SequenceMessageBufferInput.java

+6-1
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
* {@link MessageBufferInput} adapter for {@link MessageBufferInput} Enumeration
2525
*/
2626
public class SequenceMessageBufferInput
27-
implements MessageBufferInput
27+
implements MessageBufferInput<Void>
2828
{
2929
private Enumeration<? extends MessageBufferInput> sequence;
3030
private MessageBufferInput input;
@@ -54,6 +54,11 @@ public MessageBuffer next() throws IOException
5454
return buffer;
5555
}
5656

57+
@Override
58+
public Void reset(Void input) {
59+
throw new UnsupportedOperationException("reset");
60+
}
61+
5762
private void nextInput() throws IOException
5863
{
5964
if (input != null) {

msgpack-core/src/test/scala/org/msgpack/core/MessageUnpackerTest.scala

+4-1
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import scala.jdk.CollectionConverters._
2929
import scala.util.Random
3030

3131
object MessageUnpackerTest {
32-
class SplitMessageBufferInput(array: Array[Array[Byte]]) extends MessageBufferInput {
32+
class SplitMessageBufferInput(array: Array[Array[Byte]]) extends MessageBufferInput[Void] {
3333
var cursor = 0
3434
override def next(): MessageBuffer = {
3535
if (cursor < array.length) {
@@ -41,6 +41,9 @@ object MessageUnpackerTest {
4141
}
4242
}
4343

44+
45+
override def reset(input: Void): Void = throw new UnsupportedOperationException("reset")
46+
4447
override def close(): Unit = {}
4548
}
4649
}

msgpack-core/src/test/scala/org/msgpack/core/buffer/ByteStringTest.scala

+3-1
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ class ByteStringTest extends AirSpec {
2626
private val byteString = ByteString(createMessagePackData(_.packString(unpackedString)))
2727

2828
private def unpackString(messageBuffer: MessageBuffer) = {
29-
val input = new MessageBufferInput {
29+
val input = new MessageBufferInput[Void] {
3030

3131
private var isRead = false
3232

@@ -38,6 +38,8 @@ class ByteStringTest extends AirSpec {
3838
messageBuffer
3939
}
4040
override def close(): Unit = {}
41+
42+
override def reset(input: Void): Void = throw new UnsupportedOperationException("reset")
4143
}
4244

4345
MessagePack.newDefaultUnpacker(input).unpackString()
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
//
2+
// MessagePack for Java
3+
//
4+
// Licensed under the Apache License, Version 2.0 (the "License");
5+
// you may not use this file except in compliance with the License.
6+
// You may obtain a copy of the License at
7+
//
8+
// http://www.apache.org/licenses/LICENSE-2.0
9+
//
10+
// Unless required by applicable law or agreed to in writing, software
11+
// distributed under the License is distributed on an "AS IS" BASIS,
12+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
// See the License for the specific language governing permissions and
14+
// limitations under the License.
15+
//
16+
package org.msgpack.jackson.dataformat;
17+
18+
import org.msgpack.core.buffer.MessageBufferInput;
19+
20+
public interface MessageBufferInputLocator
21+
{
22+
MessageBufferInput get(Class clazz);
23+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
//
2+
// MessagePack for Java
3+
//
4+
// Licensed under the Apache License, Version 2.0 (the "License");
5+
// you may not use this file except in compliance with the License.
6+
// You may obtain a copy of the License at
7+
//
8+
// http://www.apache.org/licenses/LICENSE-2.0
9+
//
10+
// Unless required by applicable law or agreed to in writing, software
11+
// distributed under the License is distributed on an "AS IS" BASIS,
12+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
// See the License for the specific language governing permissions and
14+
// limitations under the License.
15+
//
16+
package org.msgpack.jackson.dataformat;
17+
18+
import org.msgpack.core.buffer.MessageBufferInput;
19+
20+
interface MessageBufferInputProvider
21+
{
22+
MessageBufferInput provide();
23+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
//
2+
// MessagePack for Java
3+
//
4+
// Licensed under the Apache License, Version 2.0 (the "License");
5+
// you may not use this file except in compliance with the License.
6+
// You may obtain a copy of the License at
7+
//
8+
// http://www.apache.org/licenses/LICENSE-2.0
9+
//
10+
// Unless required by applicable law or agreed to in writing, software
11+
// distributed under the License is distributed on an "AS IS" BASIS,
12+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
// See the License for the specific language governing permissions and
14+
// limitations under the License.
15+
//
16+
package org.msgpack.jackson.dataformat;
17+
18+
import org.msgpack.core.buffer.MessageBufferInput;
19+
20+
import java.util.HashMap;
21+
import java.util.Map;
22+
import java.util.Objects;
23+
24+
public class MessageBufferInputRegistry implements MessageBufferInputLocator
25+
{
26+
private final Map<Class, MessageBufferInput> messageBufferInputMap = new HashMap<>(1);
27+
28+
@Override
29+
public MessageBufferInput get(Class clazz)
30+
{
31+
return messageBufferInputMap.get(clazz);
32+
}
33+
34+
public boolean register(Class clazz, MessageBufferInputProvider provider)
35+
{
36+
Objects.requireNonNull(clazz, "clazz");
37+
Objects.requireNonNull(provider, "provider");
38+
39+
if (messageBufferInputMap.containsKey(clazz)) {
40+
return false;
41+
}
42+
43+
messageBufferInputMap.put(clazz, provider.provide());
44+
return true;
45+
}
46+
}

msgpack-jackson/src/main/java/org/msgpack/jackson/dataformat/MessagePackParser.java

+47-16
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,8 @@
4747
public class MessagePackParser
4848
extends ParserMinimalBase
4949
{
50-
private static final ThreadLocal<Tuple<Object, MessageUnpacker>> messageUnpackerHolder =
51-
new ThreadLocal<Tuple<Object, MessageUnpacker>>();
50+
private static final ThreadLocal<Triple<Object, MessageUnpacker, MessageBufferInputLocator>> reuseObjectHolder =
51+
new ThreadLocal<>();
5252
private final MessageUnpacker messageUnpacker;
5353

5454
private static final BigInteger LONG_MIN = BigInteger.valueOf((long) Long.MIN_VALUE);
@@ -126,11 +126,17 @@ public MessagePackParser(
126126
IOContext ctxt,
127127
int features,
128128
ObjectCodec objectCodec,
129-
InputStream in,
129+
final InputStream in,
130130
boolean reuseResourceInParser)
131131
throws IOException
132132
{
133-
this(ctxt, features, new InputStreamBufferInput(in), objectCodec, in, reuseResourceInParser);
133+
this(ctxt, features, new MessageBufferInputProvider() {
134+
@Override
135+
public MessageBufferInput provide()
136+
{
137+
return new InputStreamBufferInput(in);
138+
}
139+
}, objectCodec, in, reuseResourceInParser);
134140
}
135141

136142
public MessagePackParser(IOContext ctxt, int features, ObjectCodec objectCodec, byte[] bytes)
@@ -143,16 +149,22 @@ public MessagePackParser(
143149
IOContext ctxt,
144150
int features,
145151
ObjectCodec objectCodec,
146-
byte[] bytes,
152+
final byte[] bytes,
147153
boolean reuseResourceInParser)
148154
throws IOException
149155
{
150-
this(ctxt, features, new ArrayBufferInput(bytes), objectCodec, bytes, reuseResourceInParser);
156+
this(ctxt, features, new MessageBufferInputProvider() {
157+
@Override
158+
public MessageBufferInput provide()
159+
{
160+
return new ArrayBufferInput(bytes);
161+
}
162+
}, objectCodec, bytes, reuseResourceInParser);
151163
}
152164

153165
private MessagePackParser(IOContext ctxt,
154166
int features,
155-
MessageBufferInput input,
167+
MessageBufferInputProvider bufferInputProvider,
156168
ObjectCodec objectCodec,
157169
Object src,
158170
boolean reuseResourceInParser)
@@ -167,29 +179,48 @@ private MessagePackParser(IOContext ctxt,
167179
parsingContext = JsonReadContext.createRootContext(dups);
168180
this.reuseResourceInParser = reuseResourceInParser;
169181
if (!reuseResourceInParser) {
170-
this.messageUnpacker = MessagePack.newDefaultUnpacker(input);
182+
this.messageUnpacker = MessagePack.newDefaultUnpacker(bufferInputProvider.provide());
171183
return;
172184
}
173185
else {
174186
this.messageUnpacker = null;
175187
}
176188

177189
MessageUnpacker messageUnpacker;
178-
Tuple<Object, MessageUnpacker> messageUnpackerTuple = messageUnpackerHolder.get();
179-
if (messageUnpackerTuple == null) {
180-
messageUnpacker = MessagePack.newDefaultUnpacker(input);
190+
MessageBufferInputLocator messageBufferInputLocator;
191+
Triple<Object, MessageUnpacker, MessageBufferInputLocator> messageUnpackerTriple = reuseObjectHolder.get();
192+
if (messageUnpackerTriple == null) {
193+
final MessageBufferInputRegistry messageBufferInputRegistry = new MessageBufferInputRegistry();
194+
messageBufferInputRegistry.register(src.getClass(), bufferInputProvider);
195+
messageBufferInputLocator = messageBufferInputRegistry;
196+
messageUnpacker = MessagePack.newDefaultUnpacker(messageBufferInputRegistry.get(src.getClass()));
181197
}
182198
else {
183199
// Considering to reuse InputStream with JsonParser.Feature.AUTO_CLOSE_SOURCE,
184200
// MessagePackParser needs to use the MessageUnpacker that has the same InputStream
185201
// since it has buffer which has loaded the InputStream data ahead.
186202
// However, it needs to call MessageUnpacker#reset when the source is different from the previous one.
187-
if (isEnabled(JsonParser.Feature.AUTO_CLOSE_SOURCE) || messageUnpackerTuple.first() != src) {
188-
messageUnpackerTuple.second().reset(input);
203+
if (isEnabled(JsonParser.Feature.AUTO_CLOSE_SOURCE) || messageUnpackerTriple.first() != src) {
204+
final MessageBufferInputLocator bufferInputLocator = messageUnpackerTriple.third();
205+
MessageBufferInput messageBufferInput = bufferInputLocator.get(src.getClass());
206+
if (messageBufferInput != null) {
207+
messageBufferInput.reset(src);
208+
}
209+
else {
210+
if (bufferInputLocator instanceof MessageBufferInputRegistry) {
211+
((MessageBufferInputRegistry) bufferInputLocator).register(src.getClass(), bufferInputProvider);
212+
messageBufferInput = bufferInputLocator.get(src.getClass());
213+
}
214+
else {
215+
messageBufferInput = bufferInputProvider.provide();
216+
}
217+
}
218+
messageUnpackerTriple.second().reset(messageBufferInput);
189219
}
190-
messageUnpacker = messageUnpackerTuple.second();
220+
messageUnpacker = messageUnpackerTriple.second();
221+
messageBufferInputLocator = messageUnpackerTriple.third();
191222
}
192-
messageUnpackerHolder.set(new Tuple<Object, MessageUnpacker>(src, messageUnpacker));
223+
reuseObjectHolder.set(new Triple<Object, MessageUnpacker, MessageBufferInputLocator>(src, messageUnpacker, messageBufferInputLocator));
193224
}
194225

195226
public void setExtensionTypeCustomDeserializers(ExtensionTypeCustomDeserializers extTypeCustomDesers)
@@ -690,7 +721,7 @@ private MessageUnpacker getMessageUnpacker()
690721
return this.messageUnpacker;
691722
}
692723

693-
Tuple<Object, MessageUnpacker> messageUnpackerTuple = messageUnpackerHolder.get();
724+
Triple<Object, MessageUnpacker, MessageBufferInputLocator> messageUnpackerTuple = reuseObjectHolder.get();
694725
if (messageUnpackerTuple == null) {
695726
throw new IllegalStateException("messageUnpacker is null");
696727
}

0 commit comments

Comments
 (0)