Skip to content

Commit ce13c87

Browse files
committed
Add support for Jackson's new ByteBufferFeeder (apple#1711)
Motivation: In 2.14.0, Jackson added support for feeding ByteBuffers directly into the streaming parser engine (where previously only byte arrays would be supported). This changeset incorporates this enhancement into the JacksonStreamingSerializer infrastructure. Modifications: Before this changeset, the ByteArrayParser (and corresponding feeder) were the only choice when using the streaming deserialization infrastructure in Jackson, so the code was able to eaglerly initialize the parser and feeder. Now there is a choice that can be made at runtime which parser might be the best one based on the incoming buffer in the stream. As such, the code now checks the first Buffer and then decides if either the array-backed or the bytebuffer-backed parser should be initalized. This also has the advantage that if no items are emitted on an empty stream, no parser needs to be initialized at all. Note that the code still preserves the runtime "backed by type" checks since (while likely not common but possible) buffers with different backing types can arrive one after another and need to be handled. In this case it is possible that a sub-optimal parser is chosen, but the code optimizes for the likely scenario that all buffers on one stream are backed by the same type. Also a MethodHandle-based runtime check has been added which makes sure that even if a user overrides the Jackson version to an older one on the Classpath the code is still functional. Result: Added support for Jackson's new ByteBufferFeeder and choosing the best strategy at runtime depending on the first buffer type that arrives.
1 parent 97f9a21 commit ce13c87

File tree

5 files changed

+325
-74
lines changed

5 files changed

+325
-74
lines changed

servicetalk-benchmarks/build.gradle

+1
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ dependencies {
3232
implementation project(":servicetalk-http-netty")
3333
implementation project(":servicetalk-transport-netty-internal")
3434
implementation project(":servicetalk-loadbalancer")
35+
implementation project(":servicetalk-data-jackson")
3536
implementation "com.google.code.findbugs:jsr305"
3637
implementation "io.netty:netty-codec-http"
3738
implementation "org.openjdk.jmh:jmh-core"
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
/*
2+
* Copyright © 2022 Apple Inc. and the ServiceTalk project authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.servicetalk.data.jackson;
17+
18+
import io.servicetalk.buffer.api.Buffer;
19+
import io.servicetalk.buffer.netty.BufferAllocators;
20+
21+
import com.fasterxml.jackson.core.JsonProcessingException;
22+
import com.fasterxml.jackson.core.type.TypeReference;
23+
import com.fasterxml.jackson.databind.ObjectMapper;
24+
import org.openjdk.jmh.annotations.Benchmark;
25+
import org.openjdk.jmh.annotations.BenchmarkMode;
26+
import org.openjdk.jmh.annotations.Fork;
27+
import org.openjdk.jmh.annotations.Measurement;
28+
import org.openjdk.jmh.annotations.Mode;
29+
import org.openjdk.jmh.annotations.Scope;
30+
import org.openjdk.jmh.annotations.State;
31+
import org.openjdk.jmh.annotations.Warmup;
32+
33+
import java.util.ArrayList;
34+
import java.util.Collections;
35+
import java.util.HashMap;
36+
import java.util.List;
37+
import java.util.Map;
38+
import java.util.Random;
39+
40+
import static io.servicetalk.concurrent.api.Publisher.fromIterable;
41+
import static io.servicetalk.data.jackson.JacksonSerializerFactory.JACKSON;
42+
43+
/**
44+
* Performs (de)serialization benchmarks on the {@link JacksonStreamingSerializer}.
45+
* <p>
46+
* Benchmark Mode Cnt Score Error Units
47+
* JacksonStreamingSerializerBenchmark.deserializeLargeBackedByArray thrpt 5 102204,872 ± 391,655 ops/s
48+
* JacksonStreamingSerializerBenchmark.deserializeLargeBackedByDirect thrpt 5 102117,249 ± 396,363 ops/s
49+
* JacksonStreamingSerializerBenchmark.deserializeMidBackedByArray thrpt 5 232416,219 ± 597,317 ops/s
50+
* JacksonStreamingSerializerBenchmark.deserializeMidBackedByDirect thrpt 5 277589,410 ± 574,916 ops/s
51+
* JacksonStreamingSerializerBenchmark.deserializeSmallBackedByArray thrpt 5 1123782,216 ± 3798,021 ops/s
52+
* JacksonStreamingSerializerBenchmark.deserializeSmallBackedByDirect thrpt 5 1137170,185 ± 5476,999 ops/s
53+
* <p>
54+
* The numbers above show streaming deserialization of different payload sizes, divided into a total of 8 chunks per
55+
* stream decode. Small is 10 bytes, Mid is 1KB and Large is 1MB of a single JSON object with a random number of
56+
* fields and values inside.
57+
*/
58+
@Fork(value = 1)
59+
@State(Scope.Benchmark)
60+
@Warmup(iterations = 5, time = 3)
61+
@Measurement(iterations = 5, time = 3)
62+
@BenchmarkMode(Mode.Throughput)
63+
public class JacksonStreamingSerializerBenchmark {
64+
65+
/**
66+
* Allows to customize the number of chunks the payload is split up to simulate JSON chunk stream decoding.
67+
*/
68+
private static final int NUM_CHUNKS = 8;
69+
70+
/**
71+
* Type reference used for deserialization mapping.
72+
*/
73+
private static final TypeReference<Map<String, Object>> MAP_TYPE_REFERENCE =
74+
new TypeReference<Map<String, Object>>() {
75+
};
76+
77+
private static final Buffer SMALL_JSON_HEAP = BufferAllocators.PREFER_HEAP_ALLOCATOR.fromUtf8(generateJson(10));
78+
private static final Buffer SMALL_JSON_DIRECT =
79+
BufferAllocators.PREFER_DIRECT_ALLOCATOR.wrap(SMALL_JSON_HEAP.toNioBuffer());
80+
81+
private static final Buffer MID_JSON_HEAP = BufferAllocators.PREFER_HEAP_ALLOCATOR.fromUtf8(generateJson(1_000));
82+
private static final Buffer MID_JSON_DIRECT =
83+
BufferAllocators.PREFER_DIRECT_ALLOCATOR.wrap(MID_JSON_HEAP.toNioBuffer());
84+
85+
private static final Buffer LARGE_JSON_HEAP =
86+
BufferAllocators.PREFER_HEAP_ALLOCATOR.fromUtf8(generateJson(1_000_000));
87+
private static final Buffer LARGE_JSON_DIRECT =
88+
BufferAllocators.PREFER_DIRECT_ALLOCATOR.wrap(LARGE_JSON_HEAP.toNioBuffer());
89+
90+
@Benchmark
91+
public Map<String, Object> deserializeSmallBackedByArray() {
92+
return deserialize(SMALL_JSON_HEAP.duplicate());
93+
}
94+
95+
@Benchmark
96+
public Map<String, Object> deserializeSmallBackedByDirect() {
97+
return deserialize(SMALL_JSON_DIRECT.duplicate());
98+
}
99+
100+
@Benchmark
101+
public Map<String, Object> deserializeMidBackedByArray() {
102+
return deserialize(MID_JSON_HEAP.duplicate());
103+
}
104+
105+
@Benchmark
106+
public Map<String, Object> deserializeMidBackedByDirect() {
107+
return deserialize(MID_JSON_DIRECT.duplicate());
108+
}
109+
110+
@Benchmark
111+
public Map<String, Object> deserializeLargeBackedByArray() {
112+
return deserialize(LARGE_JSON_HEAP.duplicate());
113+
}
114+
115+
@Benchmark
116+
public Map<String, Object> deserializeLargeBackedByDirect() {
117+
return deserialize(LARGE_JSON_DIRECT.duplicate());
118+
}
119+
120+
private static Map<String, Object> deserialize(final Buffer fromBuffer) {
121+
int chunkSize = fromBuffer.readableBytes() / NUM_CHUNKS;
122+
List<Buffer> chunks = new ArrayList<>();
123+
while (fromBuffer.readableBytes() > chunkSize) {
124+
chunks.add(fromBuffer.readSlice(chunkSize));
125+
}
126+
chunks.add(fromBuffer.readSlice(fromBuffer.readableBytes()));
127+
128+
try {
129+
return JACKSON
130+
.streamingSerializerDeserializer(MAP_TYPE_REFERENCE)
131+
.deserialize(fromIterable(chunks), BufferAllocators.DEFAULT_ALLOCATOR)
132+
.firstOrError()
133+
.toFuture()
134+
.get();
135+
} catch (Exception e) {
136+
throw new RuntimeException(e);
137+
}
138+
}
139+
140+
/**
141+
* Generates a primitive encoded JSON object with roughly the size provided as argument.
142+
* <p>
143+
* Note that the returned encoded JSON does not have exactly the size provided, since the size is used to randomly
144+
* generate keys and values. The additional bytes in the returned string are JSON structure symbols. Also, a maximum
145+
* of 50 bytes per key and value is chosen to prevent large payloads from looking very unbalanced in
146+
* key/value sizes.
147+
*
148+
* @param size the approx size for the generated JSON payload.
149+
* @return the encoded JSON object.
150+
*/
151+
private static String generateJson(final int size) {
152+
final Map<String, Object> payload = new HashMap<>();
153+
154+
int remainingBytes = size;
155+
Random random = new Random();
156+
while (remainingBytes > 2) {
157+
int keyLen = random.nextInt(1, Math.min(50, remainingBytes));
158+
remainingBytes -= keyLen;
159+
if (remainingBytes <= 2) {
160+
break;
161+
}
162+
int bodyLen = random.nextInt(1, Math.min(50, remainingBytes));
163+
remainingBytes -= bodyLen;
164+
165+
payload.put(
166+
String.join("", Collections.nCopies(keyLen, "k")),
167+
String.join("", Collections.nCopies(bodyLen, "v"))
168+
);
169+
}
170+
171+
try {
172+
return new ObjectMapper().writeValueAsString(payload);
173+
} catch (JsonProcessingException e) {
174+
throw new RuntimeException(e);
175+
}
176+
}
177+
}

servicetalk-data-jackson/build.gradle

+1
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ dependencies {
2929
implementation project(":servicetalk-concurrent-internal")
3030
implementation project(":servicetalk-utils-internal")
3131
implementation "com.google.code.findbugs:jsr305"
32+
implementation "org.slf4j:slf4j-api"
3233

3334
testImplementation testFixtures(project(":servicetalk-concurrent-api"))
3435
testImplementation testFixtures(project(":servicetalk-concurrent-internal"))

0 commit comments

Comments
 (0)