15
15
*/
16
16
package io .servicetalk .data .jackson ;
17
17
18
+ import com .fasterxml .jackson .core .async .NonBlockingInputFeeder ;
18
19
import io .servicetalk .buffer .api .Buffer ;
19
20
import io .servicetalk .buffer .api .BufferAllocator ;
20
21
import io .servicetalk .concurrent .PublisherSource .Subscriber ;
29
30
import com .fasterxml .jackson .core .JsonToken ;
30
31
import com .fasterxml .jackson .core .async .ByteArrayFeeder ;
31
32
import com .fasterxml .jackson .core .async .ByteBufferFeeder ;
32
- import com .fasterxml .jackson .core .async .NonBlockingInputFeeder ;
33
33
import com .fasterxml .jackson .core .type .TypeReference ;
34
34
import com .fasterxml .jackson .databind .JavaType ;
35
35
import com .fasterxml .jackson .databind .JsonNode ;
@@ -101,37 +101,58 @@ private DeserializeOperator(ObjectReader reader) {
101
101
102
102
@ Override
103
103
public Subscriber <? super Buffer > apply (final Subscriber <? super Iterable <T >> subscriber ) {
104
- final JsonParser parser ;
105
- try {
106
- // TODO(scott): ByteBufferFeeder is currently not supported by jackson, and the current API throws
107
- // UnsupportedOperationException if not supported. When jackson does support two NonBlockingInputFeeder
108
- // types we need an approach which doesn't involve catching UnsupportedOperationException to try to get
109
- // ByteBufferFeeder and then ByteArrayFeeder.
110
- parser = reader .getFactory ().createNonBlockingByteArrayParser ();
111
- } catch (IOException e ) {
112
- throw new SerializationException (e );
113
- }
114
- NonBlockingInputFeeder feeder = parser .getNonBlockingInputFeeder ();
115
- if (feeder instanceof ByteBufferFeeder ) {
116
- return new ByteBufferDeserializeSubscriber <>(subscriber , reader , parser , (ByteBufferFeeder ) feeder );
117
- } else if (feeder instanceof ByteArrayFeeder ) {
118
- return new ByteArrayDeserializeSubscriber <>(subscriber , reader , parser , (ByteArrayFeeder ) feeder );
119
- }
120
- return new FailedSubscriber <>(subscriber , new SerializationException ("unsupported feeder type: " + feeder ));
104
+ return new DeserializeSubscriber <>(subscriber , reader );
121
105
}
122
106
123
- private static final class ByteArrayDeserializeSubscriber <T > extends DeserializeSubscriber <T > {
124
- private final ByteArrayFeeder feeder ;
107
+ private static class DeserializeSubscriber <T > implements Subscriber <Buffer > {
108
+ private final ObjectReader reader ;
109
+ private final Deque <JsonNode > tokenStack = new ArrayDeque <>(8 );
110
+ private final Subscriber <? super Iterable <T >> subscriber ;
111
+ @ Nullable
112
+ private Subscription subscription ;
113
+ @ Nullable
114
+ private String fieldName ;
115
+
116
+ @ Nullable
117
+ private JsonParser parser ;
118
+ @ Nullable
119
+ private NonBlockingInputFeeder feeder ;
120
+
121
+ private boolean isByteArrayParser ;
122
+
123
+ private DeserializeSubscriber (final Subscriber <? super Iterable <T >> subscriber , final ObjectReader reader ) {
124
+ this .reader = reader ;
125
+ this .subscriber = subscriber ;
126
+ }
127
+
128
+ /**
129
+ * Consumer the buffer from {@link #onNext(Buffer)}.
130
+ * @param buffer The bytes to append.
131
+ * @return {@code true} if more data is required to parse an object. {@code false} if object(s) should be
132
+ * parsed after this method returns.
133
+ * @throws IOException If an exception occurs while appending {@link Buffer}.
134
+ */
135
+ private boolean consumeOnNext (final Buffer buffer ) throws IOException {
136
+ if (feeder == null ) {
137
+ throw new NullPointerException ("The NonBlockingInputFeeder is null when it should not be " +
138
+ "- this is a bug." );
139
+ }
125
140
126
- private ByteArrayDeserializeSubscriber (final Subscriber <? super Iterable <T >> subscriber ,
127
- final ObjectReader reader , final JsonParser parser ,
128
- final ByteArrayFeeder feeder ) {
129
- super (subscriber , reader , parser );
130
- this .feeder = feeder ;
141
+ if (isByteArrayParser ) {
142
+ feedByteArrayParser (buffer , (ByteArrayFeeder ) feeder );
143
+ } else {
144
+ feedByteBufferParser (buffer , (ByteBufferFeeder ) feeder );
145
+ }
146
+ return feeder .needMoreInput ();
131
147
}
132
148
133
- @ Override
134
- boolean consumeOnNext (final Buffer buffer ) throws IOException {
149
+ /**
150
+ * Feeds the buffer into the {@link ByteArrayFeeder}.
151
+ * @param buffer the buffer to feed into the streaming JSON parser.
152
+ * @param feeder the feeder into which the buffer should be fed.
153
+ * @throws IOException if feeding the buffer failed.
154
+ */
155
+ private void feedByteArrayParser (final Buffer buffer , final ByteArrayFeeder feeder ) throws IOException {
135
156
if (buffer .hasArray ()) {
136
157
final int start = buffer .arrayOffset () + buffer .readerIndex ();
137
158
feeder .feedInput (buffer .array (), start , start + buffer .readableBytes ());
@@ -143,53 +164,33 @@ boolean consumeOnNext(final Buffer buffer) throws IOException {
143
164
feeder .feedInput (copy , 0 , copy .length );
144
165
}
145
166
}
146
- return feeder .needMoreInput ();
147
- }
148
- }
149
-
150
- private static final class ByteBufferDeserializeSubscriber <T > extends DeserializeSubscriber <T > {
151
- private final ByteBufferFeeder feeder ;
152
-
153
- private ByteBufferDeserializeSubscriber (final Subscriber <? super Iterable <T >> subscriber ,
154
- final ObjectReader reader , final JsonParser parser ,
155
- final ByteBufferFeeder feeder ) {
156
- super (subscriber , reader , parser );
157
- this .feeder = feeder ;
158
167
}
159
168
160
- @ Override
161
- boolean consumeOnNext (final Buffer buffer ) throws IOException {
169
+ /**
170
+ * Feeds the buffer into the {@link ByteBufferFeeder}.
171
+ * @param buffer the buffer to feed into the streaming JSON parser.
172
+ * @param feeder the feeder into which the buffer should be fed.
173
+ * @throws IOException if feeding the buffer failed.
174
+ */
175
+ private void feedByteBufferParser (final Buffer buffer , final ByteBufferFeeder feeder ) throws IOException {
162
176
feeder .feedInput (buffer .toNioBuffer ());
163
- return feeder .needMoreInput ();
164
- }
165
- }
166
-
167
- private abstract static class DeserializeSubscriber <T > implements Subscriber <Buffer > {
168
- private final JsonParser parser ;
169
- private final ObjectReader reader ;
170
- private final Deque <JsonNode > tokenStack = new ArrayDeque <>(8 );
171
- private final Subscriber <? super Iterable <T >> subscriber ;
172
- @ Nullable
173
- private Subscription subscription ;
174
- @ Nullable
175
- private String fieldName ;
176
-
177
- private DeserializeSubscriber (final Subscriber <? super Iterable <T >> subscriber ,
178
- final ObjectReader reader ,
179
- final JsonParser parser ) {
180
- this .reader = reader ;
181
- this .parser = parser ;
182
- this .subscriber = subscriber ;
183
177
}
184
178
185
179
/**
186
- * Consumer the buffer from {@link #onNext(Buffer)}.
187
- * @param buffer The bytes to append.
188
- * @return {@code true} if more data is required to parse an object. {@code false} if object(s) should be
189
- * parsed after this method returns.
190
- * @throws IOException If an exception occurs while appending {@link Buffer}.
180
+ * Initializes the JSON streaming parser and feeder based on the buffer type.
181
+ * @param arrayBackedBuffer if the buffer on the operator is backed by an array.
182
+ * @throws IOException if creating the non-blocking {@link JsonParser} failed.
191
183
*/
192
- abstract boolean consumeOnNext (Buffer buffer ) throws IOException ;
184
+ private void initParserAndFeeder (final boolean arrayBackedBuffer ) throws IOException {
185
+ if (arrayBackedBuffer ) {
186
+ parser = reader .getFactory ().createNonBlockingByteArrayParser ();
187
+ isByteArrayParser = true ;
188
+ } else {
189
+ parser = reader .getFactory ().createNonBlockingByteBufferParser ();
190
+ isByteArrayParser = false ;
191
+ }
192
+ feeder = parser .getNonBlockingInputFeeder ();
193
+ }
193
194
194
195
@ Override
195
196
public final void onSubscribe (final Subscription subscription ) {
@@ -201,6 +202,10 @@ public final void onSubscribe(final Subscription subscription) {
201
202
public final void onNext (@ Nullable final Buffer buffer ) {
202
203
assert subscription != null ;
203
204
try {
205
+ if (parser == null && buffer != null ) {
206
+ initParserAndFeeder (buffer .hasArray ());
207
+ }
208
+
204
209
if (buffer == null || consumeOnNext (buffer )) {
205
210
subscription .request (1 );
206
211
} else {
@@ -238,11 +243,17 @@ public final void onNext(@Nullable final Buffer buffer) {
238
243
239
244
@ Override
240
245
public final void onError (final Throwable t ) {
246
+ if (feeder != null ) {
247
+ feeder .endOfInput ();
248
+ }
241
249
subscriber .onError (t );
242
250
}
243
251
244
252
@ Override
245
253
public final void onComplete () {
254
+ if (feeder != null ) {
255
+ feeder .endOfInput ();
256
+ }
246
257
if (tokenStack .isEmpty ()) {
247
258
subscriber .onComplete ();
248
259
} else {
0 commit comments