Skip to content

Commit 4e4f24a

Browse files
committed
Add support for Jackson's new ByteBufferFeeder (apple#1711)
Motivation: In 2.14.0, Jackson added support for feeding ByteBuffers directly into the streaming parser engine (where previously only byte arrays would be supported). This changeset incorporates this enhancement into the JacksonStreamingSerializer infrastructure. Modifications: Before this changeset, the ByteArrayParser (and corresponding feeder) were the only choice when using the streaming deserialization infrastructure in Jackson, so the code was able to eaglerly initialize the parser and feeder. Now there is a choice that can be made at runtime which parser might be the best one based on the incoming buffer in the stream. As such, the code now checks the first Buffer and then decides if either the array-backed or the bytebuffer-backed parser should be initalized. This also has the advantage that if no items are emitted on an empty stream, no parser needs to be initialized at all. Note that the code still preserves the runtime "backed by type" checks since (while likely not common but possible) buffers with different backing types can arrive one after another and need to be handled. In this case it is possible that a sub-optimal parser is chosen, but the code optimizes for the likely scenario that all buffers on one stream are backed by the same type. Result: Added support for Jackson's new ByteBufferFeeder and choosing the best strategy at runtime depending on the first buffer type that arrives.
1 parent 97f9a21 commit 4e4f24a

File tree

4 files changed

+247
-70
lines changed

4 files changed

+247
-70
lines changed

servicetalk-benchmarks/build.gradle

+2-1
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ dependencies {
3232
implementation project(":servicetalk-http-netty")
3333
implementation project(":servicetalk-transport-netty-internal")
3434
implementation project(":servicetalk-loadbalancer")
35+
implementation project(":servicetalk-data-jackson")
3536
implementation "com.google.code.findbugs:jsr305"
3637
implementation "io.netty:netty-codec-http"
3738
implementation "org.openjdk.jmh:jmh-core"
@@ -43,7 +44,7 @@ dependencies {
4344
}
4445

4546
jmh {
46-
includes = [".*Benchmark"]
47+
includes = ["JacksonStreamingSerializerBenchmark"]
4748
jmhVersion = "$jmhCoreVersion"
4849
jvmArgsPrepend = ["-Dio.netty.maxDirectMemory=9223372036854775807 " +
4950
"-Djmh.executor=CUSTOM " +
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
/*
2+
* Copyright © 2022 Apple Inc. and the ServiceTalk project authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.servicetalk.data.jackson;
17+
18+
import com.fasterxml.jackson.core.type.TypeReference;
19+
import io.servicetalk.buffer.api.Buffer;
20+
import io.servicetalk.buffer.netty.BufferAllocators;
21+
import org.openjdk.jmh.annotations.Benchmark;
22+
import org.openjdk.jmh.annotations.BenchmarkMode;
23+
import org.openjdk.jmh.annotations.Fork;
24+
import org.openjdk.jmh.annotations.Level;
25+
import org.openjdk.jmh.annotations.Measurement;
26+
import org.openjdk.jmh.annotations.Mode;
27+
import org.openjdk.jmh.annotations.Scope;
28+
import org.openjdk.jmh.annotations.Setup;
29+
import org.openjdk.jmh.annotations.State;
30+
import org.openjdk.jmh.annotations.Warmup;
31+
32+
import java.util.Map;
33+
34+
import static io.servicetalk.concurrent.api.Publisher.from;
35+
import static io.servicetalk.data.jackson.JacksonSerializerFactory.JACKSON;
36+
37+
/**
38+
* Performs (de)serialization benchmarks on the JacksonStreamingSerializer.
39+
*
40+
* Benchmark Mode Cnt Score Error Units
41+
* JacksonStreamingSerializerBenchmark.deserializeLargeBackedByArray thrpt 5 93522,586 ± 968,326 ops/s
42+
* JacksonStreamingSerializerBenchmark.deserializeLargeBackedByDirect thrpt 5 66786,239 ± 21883,864 ops/s
43+
* JacksonStreamingSerializerBenchmark.deserializeMidBackedByArray thrpt 5 388610,250 ± 51973,334 ops/s
44+
* JacksonStreamingSerializerBenchmark.deserializeMidBackedByDirect thrpt 5 363756,018 ± 92330,915 ops/s
45+
* JacksonStreamingSerializerBenchmark.deserializeSmallBackedByArray thrpt 5 1058117,567 ± 65362,818 ops/s
46+
* JacksonStreamingSerializerBenchmark.deserializeSmallBackedByDirect thrpt 5 917872,015 ± 140051,402 ops/s
47+
*/
48+
@Fork(value = 1)
49+
@State(Scope.Benchmark)
50+
@Warmup(iterations = 5, time = 3)
51+
@Measurement(iterations = 5, time = 3)
52+
@BenchmarkMode(Mode.Throughput)
53+
public class JacksonStreamingSerializerBenchmark {
54+
55+
/**
56+
* Type reference used for deserialization mapping.
57+
*/
58+
private static final TypeReference<Map<String, Object>> MAP_TYPE_REFERENCE =
59+
new TypeReference<Map<String, Object>>() {};
60+
61+
/**
62+
* Represents a 10-byte JSON object (encoded).
63+
*/
64+
private static final String SMALL_JSON = "{\"a\":true}";
65+
66+
/**
67+
* Represents a 100-byte JSON object (encoded).
68+
*/
69+
private static final String MID_JSON = "{\"_id\":\"6371eb6af678bb79174\",\"a\":false,\"balance\":\"$2,087.12\"," +
70+
"\"picture\":\"https://placehold.it/32x32\"}";
71+
72+
/**
73+
* Represents a 1000-byte JSON object (encoded).
74+
*/
75+
private static final String LARGE_JSON = "{\"_id\":\"6371eaaf236316f8be8b44cc\",\"index\":0,\"guid\":" +
76+
"\"12cdb7e4-b133-4da7-94b3-b1c58c4b3ef6\",\"isActive\":true,\"balance\":\"$3,777.31\",\"picture\":" +
77+
"\"http://placehold.it/32x32\",\"age\":24,\"eyeColor\":\"blue\",\"name\":\"Nikki Randall\"," +
78+
"\"gender\":\"female\",\"company\":\"PROVIDCO\",\"email\":\"[email protected]\",\"phone\":" +
79+
"\"+1 (981) 475-2422\",\"address\":\"914 Butler Street, Eggertsville, Maryland, 5970\",\"about\":" +
80+
"\"Nulla proident velit culpa magna sit duis in in deserunt. Irure consectetur ea veniam quis amet " +
81+
"nulla tempor in esse ipsum. Do aute magna in qui qui dolor adipisicing ipsum nulla deserunt. Labore " +
82+
"voluptate sint laborum adipisicing dolor anim amet pariatur sint. Ad nulla commodo ipsum aliqua " +
83+
"est.\\r\\n\",\"registered\":\"2022-01-01T03:44:39 -01:00\",\"latitude\":61.061004,\"longitude\":38.544479," +
84+
"\"tags\":[\"qui\",\"nulla\",\"cillum\",\"dolor\",\"deserunt\",\"amet\",\"magna\"],\"friends\":[{\"id\":0," +
85+
"\"name\":\"Gibson Benson\"},{\"id\":1,\"name\":\"Marshall Atkins\"},{\"id\":2,\"name\":\"Gabriel Clay\"}," +
86+
"{\"id\":3,\"name\":\"Aguilar\"},{\"id\":4,\"name\":\"Bauer\"}]}";
87+
88+
private Buffer smallBackedByArray;
89+
private Buffer smallBackedByDirect;
90+
91+
private Buffer midBackedByArray;
92+
private Buffer midBackedByDirect;
93+
94+
private Buffer largeBackedByArray;
95+
private Buffer largeBackedByDirect;
96+
97+
@Setup(Level.Trial)
98+
public void setup() {
99+
smallBackedByArray = BufferAllocators.PREFER_HEAP_ALLOCATOR.fromUtf8(SMALL_JSON);
100+
smallBackedByDirect = BufferAllocators.PREFER_DIRECT_ALLOCATOR.fromUtf8(SMALL_JSON);
101+
midBackedByArray = BufferAllocators.PREFER_HEAP_ALLOCATOR.fromUtf8(MID_JSON);
102+
midBackedByDirect = BufferAllocators.PREFER_DIRECT_ALLOCATOR.fromUtf8(MID_JSON);
103+
largeBackedByArray = BufferAllocators.PREFER_HEAP_ALLOCATOR.fromUtf8(LARGE_JSON);
104+
largeBackedByDirect = BufferAllocators.PREFER_DIRECT_ALLOCATOR.fromUtf8(LARGE_JSON);
105+
}
106+
107+
@Benchmark
108+
public Map<String, Object> deserializeSmallBackedByArray() throws Exception {
109+
return deserialize(smallBackedByArray);
110+
}
111+
112+
@Benchmark
113+
public Map<String, Object> deserializeSmallBackedByDirect() throws Exception {
114+
return deserialize(smallBackedByDirect);
115+
}
116+
117+
@Benchmark
118+
public Map<String, Object> deserializeMidBackedByArray() throws Exception {
119+
return deserialize(midBackedByArray);
120+
}
121+
122+
@Benchmark
123+
public Map<String, Object> deserializeMidBackedByDirect() throws Exception {
124+
return deserialize(midBackedByDirect);
125+
}
126+
127+
@Benchmark
128+
public Map<String, Object> deserializeLargeBackedByArray() throws Exception {
129+
return deserialize(largeBackedByArray);
130+
}
131+
132+
@Benchmark
133+
public Map<String, Object> deserializeLargeBackedByDirect() throws Exception {
134+
return deserialize(largeBackedByDirect);
135+
}
136+
137+
private static Map<String, Object> deserialize(final Buffer fromBuffer) throws Exception {
138+
return JACKSON
139+
.streamingSerializerDeserializer(MAP_TYPE_REFERENCE)
140+
.deserialize(from(fromBuffer), BufferAllocators.DEFAULT_ALLOCATOR)
141+
.firstOrError()
142+
.toFuture()
143+
.get();
144+
}
145+
146+
}

servicetalk-data-jackson/src/main/java/io/servicetalk/data/jackson/JacksonStreamingSerializer.java

+78-67
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package io.servicetalk.data.jackson;
1717

18+
import com.fasterxml.jackson.core.async.NonBlockingInputFeeder;
1819
import io.servicetalk.buffer.api.Buffer;
1920
import io.servicetalk.buffer.api.BufferAllocator;
2021
import io.servicetalk.concurrent.PublisherSource.Subscriber;
@@ -29,7 +30,6 @@
2930
import com.fasterxml.jackson.core.JsonToken;
3031
import com.fasterxml.jackson.core.async.ByteArrayFeeder;
3132
import com.fasterxml.jackson.core.async.ByteBufferFeeder;
32-
import com.fasterxml.jackson.core.async.NonBlockingInputFeeder;
3333
import com.fasterxml.jackson.core.type.TypeReference;
3434
import com.fasterxml.jackson.databind.JavaType;
3535
import com.fasterxml.jackson.databind.JsonNode;
@@ -101,37 +101,58 @@ private DeserializeOperator(ObjectReader reader) {
101101

102102
@Override
103103
public Subscriber<? super Buffer> apply(final Subscriber<? super Iterable<T>> subscriber) {
104-
final JsonParser parser;
105-
try {
106-
// TODO(scott): ByteBufferFeeder is currently not supported by jackson, and the current API throws
107-
// UnsupportedOperationException if not supported. When jackson does support two NonBlockingInputFeeder
108-
// types we need an approach which doesn't involve catching UnsupportedOperationException to try to get
109-
// ByteBufferFeeder and then ByteArrayFeeder.
110-
parser = reader.getFactory().createNonBlockingByteArrayParser();
111-
} catch (IOException e) {
112-
throw new SerializationException(e);
113-
}
114-
NonBlockingInputFeeder feeder = parser.getNonBlockingInputFeeder();
115-
if (feeder instanceof ByteBufferFeeder) {
116-
return new ByteBufferDeserializeSubscriber<>(subscriber, reader, parser, (ByteBufferFeeder) feeder);
117-
} else if (feeder instanceof ByteArrayFeeder) {
118-
return new ByteArrayDeserializeSubscriber<>(subscriber, reader, parser, (ByteArrayFeeder) feeder);
119-
}
120-
return new FailedSubscriber<>(subscriber, new SerializationException("unsupported feeder type: " + feeder));
104+
return new DeserializeSubscriber<>(subscriber, reader);
121105
}
122106

123-
private static final class ByteArrayDeserializeSubscriber<T> extends DeserializeSubscriber<T> {
124-
private final ByteArrayFeeder feeder;
107+
private static class DeserializeSubscriber<T> implements Subscriber<Buffer> {
108+
private final ObjectReader reader;
109+
private final Deque<JsonNode> tokenStack = new ArrayDeque<>(8);
110+
private final Subscriber<? super Iterable<T>> subscriber;
111+
@Nullable
112+
private Subscription subscription;
113+
@Nullable
114+
private String fieldName;
115+
116+
@Nullable
117+
private JsonParser parser;
118+
@Nullable
119+
private NonBlockingInputFeeder feeder;
120+
121+
private boolean isByteArrayParser;
122+
123+
private DeserializeSubscriber(final Subscriber<? super Iterable<T>> subscriber, final ObjectReader reader) {
124+
this.reader = reader;
125+
this.subscriber = subscriber;
126+
}
127+
128+
/**
129+
* Consumer the buffer from {@link #onNext(Buffer)}.
130+
* @param buffer The bytes to append.
131+
* @return {@code true} if more data is required to parse an object. {@code false} if object(s) should be
132+
* parsed after this method returns.
133+
* @throws IOException If an exception occurs while appending {@link Buffer}.
134+
*/
135+
private boolean consumeOnNext(final Buffer buffer) throws IOException {
136+
if (feeder == null) {
137+
throw new NullPointerException("The NonBlockingInputFeeder is null when it should not be " +
138+
"- this is a bug.");
139+
}
125140

126-
private ByteArrayDeserializeSubscriber(final Subscriber<? super Iterable<T>> subscriber,
127-
final ObjectReader reader, final JsonParser parser,
128-
final ByteArrayFeeder feeder) {
129-
super(subscriber, reader, parser);
130-
this.feeder = feeder;
141+
if (isByteArrayParser) {
142+
feedByteArrayParser(buffer, (ByteArrayFeeder) feeder);
143+
} else {
144+
feedByteBufferParser(buffer, (ByteBufferFeeder) feeder);
145+
}
146+
return feeder.needMoreInput();
131147
}
132148

133-
@Override
134-
boolean consumeOnNext(final Buffer buffer) throws IOException {
149+
/**
150+
* Feeds the buffer into the {@link ByteArrayFeeder}.
151+
* @param buffer the buffer to feed into the streaming JSON parser.
152+
* @param feeder the feeder into which the buffer should be fed.
153+
* @throws IOException if feeding the buffer failed.
154+
*/
155+
private void feedByteArrayParser(final Buffer buffer, final ByteArrayFeeder feeder) throws IOException {
135156
if (buffer.hasArray()) {
136157
final int start = buffer.arrayOffset() + buffer.readerIndex();
137158
feeder.feedInput(buffer.array(), start, start + buffer.readableBytes());
@@ -143,53 +164,33 @@ boolean consumeOnNext(final Buffer buffer) throws IOException {
143164
feeder.feedInput(copy, 0, copy.length);
144165
}
145166
}
146-
return feeder.needMoreInput();
147-
}
148-
}
149-
150-
private static final class ByteBufferDeserializeSubscriber<T> extends DeserializeSubscriber<T> {
151-
private final ByteBufferFeeder feeder;
152-
153-
private ByteBufferDeserializeSubscriber(final Subscriber<? super Iterable<T>> subscriber,
154-
final ObjectReader reader, final JsonParser parser,
155-
final ByteBufferFeeder feeder) {
156-
super(subscriber, reader, parser);
157-
this.feeder = feeder;
158167
}
159168

160-
@Override
161-
boolean consumeOnNext(final Buffer buffer) throws IOException {
169+
/**
170+
* Feeds the buffer into the {@link ByteBufferFeeder}.
171+
* @param buffer the buffer to feed into the streaming JSON parser.
172+
* @param feeder the feeder into which the buffer should be fed.
173+
* @throws IOException if feeding the buffer failed.
174+
*/
175+
private void feedByteBufferParser(final Buffer buffer, final ByteBufferFeeder feeder) throws IOException {
162176
feeder.feedInput(buffer.toNioBuffer());
163-
return feeder.needMoreInput();
164-
}
165-
}
166-
167-
private abstract static class DeserializeSubscriber<T> implements Subscriber<Buffer> {
168-
private final JsonParser parser;
169-
private final ObjectReader reader;
170-
private final Deque<JsonNode> tokenStack = new ArrayDeque<>(8);
171-
private final Subscriber<? super Iterable<T>> subscriber;
172-
@Nullable
173-
private Subscription subscription;
174-
@Nullable
175-
private String fieldName;
176-
177-
private DeserializeSubscriber(final Subscriber<? super Iterable<T>> subscriber,
178-
final ObjectReader reader,
179-
final JsonParser parser) {
180-
this.reader = reader;
181-
this.parser = parser;
182-
this.subscriber = subscriber;
183177
}
184178

185179
/**
186-
* Consumer the buffer from {@link #onNext(Buffer)}.
187-
* @param buffer The bytes to append.
188-
* @return {@code true} if more data is required to parse an object. {@code false} if object(s) should be
189-
* parsed after this method returns.
190-
* @throws IOException If an exception occurs while appending {@link Buffer}.
180+
* Initializes the JSON streaming parser and feeder based on the buffer type.
181+
* @param arrayBackedBuffer if the buffer on the operator is backed by an array.
182+
* @throws IOException if creating the non-blocking {@link JsonParser} failed.
191183
*/
192-
abstract boolean consumeOnNext(Buffer buffer) throws IOException;
184+
private void initParserAndFeeder(final boolean arrayBackedBuffer) throws IOException {
185+
if (arrayBackedBuffer) {
186+
parser = reader.getFactory().createNonBlockingByteArrayParser();
187+
isByteArrayParser = true;
188+
} else {
189+
parser = reader.getFactory().createNonBlockingByteBufferParser();
190+
isByteArrayParser = false;
191+
}
192+
feeder = parser.getNonBlockingInputFeeder();
193+
}
193194

194195
@Override
195196
public final void onSubscribe(final Subscription subscription) {
@@ -201,6 +202,10 @@ public final void onSubscribe(final Subscription subscription) {
201202
public final void onNext(@Nullable final Buffer buffer) {
202203
assert subscription != null;
203204
try {
205+
if (parser == null && buffer != null) {
206+
initParserAndFeeder(buffer.hasArray());
207+
}
208+
204209
if (buffer == null || consumeOnNext(buffer)) {
205210
subscription.request(1);
206211
} else {
@@ -238,11 +243,17 @@ public final void onNext(@Nullable final Buffer buffer) {
238243

239244
@Override
240245
public final void onError(final Throwable t) {
246+
if (feeder != null) {
247+
feeder.endOfInput();
248+
}
241249
subscriber.onError(t);
242250
}
243251

244252
@Override
245253
public final void onComplete() {
254+
if (feeder != null) {
255+
feeder.endOfInput();
256+
}
246257
if (tokenStack.isEmpty()) {
247258
subscriber.onComplete();
248259
} else {

0 commit comments

Comments
 (0)