Skip to content

Commit 79481dc

Browse files
committed
New Serializer APIs, consolidation of ContentCodec, and gRPC MethodDescriptor
Motivation: Supporting a custom transport for gRPC currently requires reflection and special knowledge of types for serialization. The generated code has information related to types and serialization that can be exposed to the runtime which also allows supporting custom transports. This task requires exposing serializer APIs into the public API. Our existing serializer APIs were heavily influced by streaming use cases and JSON serialization which can serialize any type and generally doesn't have type restrictions. This lead to a general SerializationProvider API which is impossible to constrain types due to the generics being at the method level, which doesn't account for implementing serialization for type constrained approached (protobuf MessageLite). Also the serialization APIs couple scalar and streaming use cases which in practice are more commonly decoupled into scalar (individual objects) and streaming (which applies some framing around individual objects). These concepts are coupled which leads to confusing specializations (deserializeAggregatedSingle, deserializeAggregated) which are easy to misuse and lead to invalid serialization in practice (streaming {form url, string} encoding are not properly framed and may not be deserialized correctly). The existing Serializer APIs also do not promote composability in that if you can serialize one object you should be able to re-use that serialization for a stream of objects with some frameing applied on top (required for gRPC). Modifications: - Introduce a new servicetalk-serializer-api package with new Serializer APIs. - Introduce a new servicetalk-serializer-utils package with common utilities for serialization such as streaming framing (fixed and varint length prefixed) - Introduce BufferEncoder APIs in servicetalk-encoding-api for compression and decompression. This also comes with NettyBufferEncoders and NettyCompression to create Netty backed implementations. - Introduce HttpStreamingSerializer and new HttpSerializer APIs in servicetalk-http-api, and modify all request/response APIs to use the new serializer APIs. Introduce new methods to deserialize and process trailers. - Introduce HttpSerializers in servicetalk-http-api for implementations of the new APIs. - Introduce JacksonSerializerCache in servicetalk-http-api as the new entry point to create/cache serializers for JSON. - Introduce ProtobufSerializerCache in servicetalk-data-protobuf as the new entry point to create/cache serializers for protobuf. - Introduce MethodDescriptor API into servicetalk-grpc-api and update code generation to use this new API. - Update all examples avoid usage of deprecated APIs and leverage new APIs. - Deprecate the following APIs and related methods in favor of the new types descrbied above: - SerializationProvider, JacksonSerializationProvider, ProtobufSerializationProvider, GrpcSerializationProvider, HttpSerializationProvider - ContentCodec, CodecDecodingException, CodecEncodingException, ContentCodings - GrpcMetadata#path(), code generated types that extend DefaultGrpcClientMetadata Result: gRPC API supports MethodDescriptor, which uses new Serializer APIs, and the entire code base has been updated to use the new APIs consistently. make control flow in ConnectionCloseHeaderHandlingTest and GracefulConnectionClosureHandlingTest more robust
1 parent 4eef639 commit 79481dc

File tree

303 files changed

+10670
-2626
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

303 files changed

+10670
-2626
lines changed

servicetalk-buffer-api/src/main/java/io/servicetalk/buffer/api/Buffer.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1657,6 +1657,21 @@ default boolean tryEnsureWritable(int minWritableBytes, boolean force) {
16571657
*/
16581658
Buffer writeUtf8(CharSequence seq, int ensureWritable);
16591659

1660+
/**
1661+
* Encode a {@link CharSequence} encoded in {@link Charset} and write it to this buffer starting at
1662+
* {@code writerIndex} and increases the {@code writerIndex} by the number of the transferred bytes.
1663+
*
1664+
* @param seq the source of the data.
1665+
* @param charset the charset used for encoding.
1666+
* @return self.
1667+
* @throws ReadOnlyBufferException if this buffer is read-only
1668+
*/
1669+
default Buffer writeCharSequence(CharSequence seq, Charset charset) {
1670+
byte[] bytes = seq.toString().getBytes(charset);
1671+
writeBytes(bytes);
1672+
return this;
1673+
}
1674+
16601675
/**
16611676
* Locates the first occurrence of the specified {@code value} in this
16621677
* buffer. The search takes place from the specified {@code fromIndex}

servicetalk-buffer-api/src/main/java/io/servicetalk/buffer/api/EmptyBuffer.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -644,6 +644,11 @@ public Buffer writeUtf8(CharSequence seq, int ensureWritable) {
644644
throw new IndexOutOfBoundsException();
645645
}
646646

647+
@Override
648+
public Buffer writeCharSequence(CharSequence seq, Charset charset) {
649+
throw new IndexOutOfBoundsException();
650+
}
651+
647652
@Override
648653
public int indexOf(int fromIndex, int toIndex, byte value) {
649654
checkIndex(fromIndex);

servicetalk-buffer-api/src/main/java/io/servicetalk/buffer/api/ReadOnlyByteBuffer.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -375,6 +375,11 @@ public Buffer writeUtf8(CharSequence seq, int ensureWritable) {
375375
throw new ReadOnlyBufferException();
376376
}
377377

378+
@Override
379+
public Buffer writeCharSequence(CharSequence seq, Charset charset) {
380+
throw new ReadOnlyBufferException();
381+
}
382+
378383
@Override
379384
public Buffer readSlice(int length) {
380385
checkReadableBytes0(length);

servicetalk-buffer-netty/src/main/java/io/servicetalk/buffer/netty/NettyBuffer.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -710,6 +710,12 @@ public Buffer writeUtf8(CharSequence seq, int ensureWritable) {
710710
return this;
711711
}
712712

713+
@Override
714+
public Buffer writeCharSequence(CharSequence seq, Charset charset) {
715+
buffer.writeCharSequence(seq, charset);
716+
return this;
717+
}
718+
713719
@Override
714720
public int indexOf(int fromIndex, int toIndex, byte value) {
715721
return buffer.indexOf(fromIndex, toIndex, value);

servicetalk-buffer-netty/src/main/java/io/servicetalk/buffer/netty/NettyCompositeBuffer.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import io.netty.buffer.CompositeByteBuf;
2222

2323
import java.nio.ByteBuffer;
24+
import java.nio.charset.Charset;
2425

2526
final class NettyCompositeBuffer extends NettyBuffer<CompositeByteBuf> implements CompositeBuffer {
2627

@@ -363,4 +364,10 @@ public CompositeBuffer writeUtf8(CharSequence seq, int ensureWritable) {
363364
super.writeUtf8(seq, ensureWritable);
364365
return this;
365366
}
367+
368+
@Override
369+
public CompositeBuffer writeCharSequence(CharSequence seq, Charset charset) {
370+
super.writeCharSequence(seq, charset);
371+
return this;
372+
}
366373
}

servicetalk-buffer-netty/src/main/java/io/servicetalk/buffer/netty/ReadOnlyBuffer.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.io.InputStream;
2121
import java.nio.ByteBuffer;
2222
import java.nio.ReadOnlyBufferException;
23+
import java.nio.charset.Charset;
2324

2425
final class ReadOnlyBuffer extends WrappedBuffer {
2526

@@ -302,6 +303,11 @@ public Buffer writeUtf8(CharSequence seq, int ensureWritable) {
302303
throw new ReadOnlyBufferException();
303304
}
304305

306+
@Override
307+
public Buffer writeCharSequence(CharSequence seq, Charset charset) {
308+
throw new ReadOnlyBufferException();
309+
}
310+
305311
@Override
306312
public Buffer readSlice(int length) {
307313
return buffer.readSlice(length).asReadOnly();

servicetalk-buffer-netty/src/main/java/io/servicetalk/buffer/netty/WrappedBuffer.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -658,6 +658,12 @@ public Buffer writeUtf8(CharSequence seq, int ensureWritable) {
658658
return this;
659659
}
660660

661+
@Override
662+
public Buffer writeCharSequence(CharSequence seq, Charset charset) {
663+
buffer.writeCharSequence(seq, charset);
664+
return this;
665+
}
666+
661667
@Override
662668
public int indexOf(int fromIndex, int toIndex, byte value) {
663669
return buffer.indexOf(fromIndex, toIndex, value);

servicetalk-data-jackson-jersey/src/main/java/io/servicetalk/data/jackson/jersey/JacksonSerializationProviderContextResolver.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,10 @@
2020
import javax.annotation.Nullable;
2121
import javax.ws.rs.ext.ContextResolver;
2222

23+
/**
24+
* @deprecated Use {@link JacksonSerializerCacheContextResolver}.
25+
*/
26+
@Deprecated
2327
final class JacksonSerializationProviderContextResolver implements ContextResolver<JacksonSerializationProvider> {
2428
private final JacksonSerializationProvider jacksonSerializationProvider;
2529

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Copyright © 2021 Apple Inc. and the ServiceTalk project authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.servicetalk.data.jackson.jersey;
17+
18+
import io.servicetalk.data.jackson.JacksonSerializerCache;
19+
20+
import javax.annotation.Nullable;
21+
import javax.ws.rs.ext.ContextResolver;
22+
23+
import static java.util.Objects.requireNonNull;
24+
25+
final class JacksonSerializerCacheContextResolver implements ContextResolver<JacksonSerializerCache> {
26+
private final JacksonSerializerCache cache;
27+
28+
JacksonSerializerCacheContextResolver(final JacksonSerializerCache cache) {
29+
this.cache = requireNonNull(cache);
30+
}
31+
32+
@Nullable
33+
@Override
34+
public JacksonSerializerCache getContext(final Class<?> aClass) {
35+
if (!JacksonSerializerCache.class.isAssignableFrom(aClass)) {
36+
return null;
37+
}
38+
39+
return cache;
40+
}
41+
}

servicetalk-data-jackson-jersey/src/main/java/io/servicetalk/data/jackson/jersey/JacksonSerializerMessageBodyReaderWriter.java

Lines changed: 40 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,13 @@
1919
import io.servicetalk.buffer.api.BufferAllocator;
2020
import io.servicetalk.concurrent.api.Publisher;
2121
import io.servicetalk.concurrent.api.Single;
22-
import io.servicetalk.data.jackson.JacksonSerializationProvider;
22+
import io.servicetalk.data.jackson.JacksonSerializerCache;
2323
import io.servicetalk.http.router.jersey.internal.SourceWrappers.PublisherSource;
2424
import io.servicetalk.http.router.jersey.internal.SourceWrappers.SingleSource;
25-
import io.servicetalk.serialization.api.DefaultSerializer;
26-
import io.servicetalk.serialization.api.SerializationException;
27-
import io.servicetalk.serialization.api.Serializer;
25+
import io.servicetalk.serializer.api.Deserializer;
26+
import io.servicetalk.serializer.api.SerializationException;
27+
import io.servicetalk.serializer.api.Serializer;
28+
import io.servicetalk.serializer.api.StreamingSerializer;
2829
import io.servicetalk.transport.api.ConnectionContext;
2930
import io.servicetalk.transport.api.ExecutionContext;
3031

@@ -68,9 +69,6 @@
6869
@Consumes(WILDCARD)
6970
@Produces(WILDCARD)
7071
final class JacksonSerializerMessageBodyReaderWriter implements MessageBodyReader<Object>, MessageBodyWriter<Object> {
71-
private static final JacksonSerializationProvider DEFAULT_JACKSON_SERIALIZATION_PROVIDER =
72-
new JacksonSerializationProvider();
73-
7472
// We can not use `@Context ConnectionContext` directly because we would not see the latest version
7573
// in case it has been rebound as part of offloading.
7674
@Context
@@ -95,86 +93,81 @@ public boolean isReadable(final Class<?> type, final Type genericType, final Ann
9593
public Object readFrom(final Class<Object> type, final Type genericType, final Annotation[] annotations,
9694
final MediaType mediaType, final MultivaluedMap<String, String> httpHeaders,
9795
final InputStream entityStream) throws WebApplicationException {
98-
99-
final Serializer serializer = getSerializer(mediaType);
96+
final JacksonSerializerCache serializerCache = getJacksonSerializerCache(mediaType);
10097
final ExecutionContext executionContext = ctxRefProvider.get().get().executionContext();
10198
final BufferAllocator allocator = executionContext.bufferAllocator();
10299
final int contentLength = requestCtxProvider.get().getLength();
103100

104101
if (Single.class.isAssignableFrom(type)) {
105102
return handleEntityStream(entityStream, allocator,
106-
(p, a) -> deserialize(p, serializer, getSourceClass(genericType), contentLength, a),
107-
(is, a) -> new SingleSource<>(deserialize(toBufferPublisher(is, a), serializer,
108-
getSourceClass(genericType), contentLength, a)));
103+
(p, a) -> deserialize(p, serializerCache.serializerDeserializer(getSourceClass(genericType)),
104+
contentLength, a),
105+
(is, a) -> new SingleSource<>(deserialize(toBufferPublisher(is, a),
106+
serializerCache.serializerDeserializer(getSourceClass(genericType)), contentLength, a)));
109107
} else if (Publisher.class.isAssignableFrom(type)) {
110108
return handleEntityStream(entityStream, allocator,
111-
(p, a) -> serializer.deserialize(p, getSourceClass(genericType)),
112-
(is, a) -> new PublisherSource<>(serializer.deserialize(toBufferPublisher(is, a),
113-
getSourceClass(genericType))));
109+
(p, a) -> serializerCache.streamingSerializerDeserializer(
110+
getSourceClass(genericType)).deserialize(p, a),
111+
(is, a) -> new PublisherSource<>(serializerCache.streamingSerializerDeserializer(
112+
getSourceClass(genericType)).deserialize(toBufferPublisher(is, a), a)));
114113
}
115114

116115
return handleEntityStream(entityStream, allocator,
117-
(p, a) -> deserializeObject(p, serializer, type, contentLength, a),
118-
(is, a) -> deserializeObject(toBufferPublisher(is, a), serializer, type, contentLength, a));
116+
(p, a) -> deserializeObject(p, serializerCache.serializerDeserializer(type), contentLength, a),
117+
(is, a) -> deserializeObject(toBufferPublisher(is, a), serializerCache.serializerDeserializer(type),
118+
contentLength, a));
119119
}
120120

121121
@Override
122122
public boolean isWriteable(final Class<?> type, final Type genericType, final Annotation[] annotations,
123123
final MediaType mediaType) {
124-
125124
return !isSse(requestCtxProvider.get()) && isSupportedMediaType(mediaType);
126125
}
127126

127+
@SuppressWarnings({"rawtypes", "unchecked"})
128128
@Override
129129
public void writeTo(final Object o, final Class<?> type, final Type genericType, final Annotation[] annotations,
130130
final MediaType mediaType, final MultivaluedMap<String, Object> httpHeaders,
131131
final OutputStream entityStream) throws WebApplicationException {
132-
132+
final BufferAllocator allocator = ctxRefProvider.get().get().executionContext().bufferAllocator();
133133
final Publisher<Buffer> bufferPublisher;
134134
if (o instanceof Single) {
135-
bufferPublisher = getResponseBufferPublisher(((Single) o).toPublisher(), genericType, mediaType);
135+
final Class<?> clazz = genericType instanceof Class ? (Class) genericType : getSourceClass(genericType);
136+
Serializer serializer = getJacksonSerializerCache(mediaType).serializerDeserializer(clazz);
137+
bufferPublisher = ((Single) o).map(t -> serializer.serialize(t, allocator)).toPublisher();
136138
} else if (o instanceof Publisher) {
137-
bufferPublisher = getResponseBufferPublisher((Publisher) o, genericType, mediaType);
139+
final Class<?> clazz = genericType instanceof Class ? (Class) genericType : getSourceClass(genericType);
140+
StreamingSerializer serializer = getJacksonSerializerCache(mediaType)
141+
.streamingSerializerDeserializer(clazz);
142+
bufferPublisher = serializer.serialize((Publisher) o, allocator);
138143
} else {
139-
bufferPublisher = getResponseBufferPublisher(Publisher.from(o), o.getClass(), mediaType);
144+
Serializer serializer = getJacksonSerializerCache(mediaType).serializerDeserializer(o.getClass());
145+
bufferPublisher = Publisher.from(serializer.serialize(o, allocator));
140146
}
141147

142148
setResponseBufferPublisher(bufferPublisher, requestCtxProvider.get());
143149
}
144150

145-
@SuppressWarnings("unchecked")
146-
private Publisher<Buffer> getResponseBufferPublisher(final Publisher publisher, final Type type,
147-
final MediaType mediaType) {
148-
final BufferAllocator allocator = ctxRefProvider.get().get().executionContext().bufferAllocator();
149-
return getSerializer(mediaType).serialize(publisher, allocator,
150-
type instanceof Class ? (Class) type : getSourceClass(type));
151-
}
152-
153-
private Serializer getSerializer(final MediaType mediaType) {
154-
return new DefaultSerializer(getJacksonSerializationProvider(mediaType));
155-
}
151+
private JacksonSerializerCache getJacksonSerializerCache(final MediaType mediaType) {
152+
final ContextResolver<JacksonSerializerCache> contextResolver =
153+
providers.getContextResolver(JacksonSerializerCache.class, mediaType);
156154

157-
private JacksonSerializationProvider getJacksonSerializationProvider(final MediaType mediaType) {
158-
final ContextResolver<JacksonSerializationProvider> contextResolver =
159-
providers.getContextResolver(JacksonSerializationProvider.class, mediaType);
160-
161-
return contextResolver != null ? contextResolver.getContext(JacksonSerializationProvider.class) :
162-
DEFAULT_JACKSON_SERIALIZATION_PROVIDER;
155+
return contextResolver != null ? contextResolver.getContext(JacksonSerializerCache.class) :
156+
JacksonSerializerCache.INSTANCE;
163157
}
164158

165159
private static Publisher<Buffer> toBufferPublisher(final InputStream is, final BufferAllocator a) {
166160
return fromInputStream(is).map(a::wrap);
167161
}
168162

169-
private static <T> Single<T> deserialize(final Publisher<Buffer> bufferPublisher, final Serializer ser,
170-
final Class<T> type, final int contentLength,
171-
final BufferAllocator allocator) {
172-
163+
private static <T> Single<T> deserialize(
164+
final Publisher<Buffer> bufferPublisher, final Deserializer<T> deserializer, final int contentLength,
165+
final BufferAllocator allocator) {
173166
return bufferPublisher
174167
.collect(() -> newBufferForRequestContent(contentLength, allocator), Buffer::writeBytes)
175168
.map(buf -> {
176169
try {
177-
return ser.deserializeAggregatedSingle(buf, type);
170+
return deserializer.deserialize(buf, allocator);
178171
} catch (final NoSuchElementException e) {
179172
throw new BadRequestException("No deserializable JSON content", e);
180173
} catch (final SerializationException e) {
@@ -192,10 +185,9 @@ static Buffer newBufferForRequestContent(final int contentLength,
192185
}
193186

194187
// visible for testing
195-
static <T> T deserializeObject(final Publisher<Buffer> bufferPublisher, final Serializer ser,
196-
final Class<T> type, final int contentLength,
197-
final BufferAllocator allocator) {
198-
return awaitResult(deserialize(bufferPublisher, ser, type, contentLength, allocator).toFuture());
188+
static <T> T deserializeObject(final Publisher<Buffer> bufferPublisher, final Deserializer<T> deserializer,
189+
final int contentLength, final BufferAllocator allocator) {
190+
return awaitResult(deserialize(bufferPublisher, deserializer, contentLength, allocator).toFuture());
199191
}
200192

201193
private static boolean isSse(ContainerRequestContext requestCtx) {

servicetalk-data-jackson-jersey/src/main/java/io/servicetalk/data/jackson/jersey/SerializationExceptionMapper.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
*/
1616
package io.servicetalk.data.jackson.jersey;
1717

18-
import io.servicetalk.serialization.api.SerializationException;
18+
import io.servicetalk.serializer.api.SerializationException;
1919

2020
import com.fasterxml.jackson.core.JsonParseException;
2121
import com.fasterxml.jackson.databind.JsonMappingException;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Copyright © 2018 Apple Inc. and the ServiceTalk project authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.servicetalk.data.jackson.jersey;
17+
18+
import io.servicetalk.serialization.api.SerializationException;
19+
20+
import com.fasterxml.jackson.core.JsonParseException;
21+
import com.fasterxml.jackson.databind.JsonMappingException;
22+
23+
import javax.ws.rs.core.Response;
24+
import javax.ws.rs.ext.ExceptionMapper;
25+
26+
import static javax.ws.rs.core.Response.Status.BAD_REQUEST;
27+
import static javax.ws.rs.core.Response.Status.INTERNAL_SERVER_ERROR;
28+
import static javax.ws.rs.core.Response.status;
29+
30+
/**
31+
* @deprecated Use {@link SerializationExceptionMapper}.
32+
*/
33+
@Deprecated
34+
final class SerializationExceptionMapperDeprecated implements ExceptionMapper<SerializationException> {
35+
@Override
36+
public Response toResponse(final SerializationException e) {
37+
return status(isDueToBadUserData(e) ? BAD_REQUEST : INTERNAL_SERVER_ERROR).build();
38+
}
39+
40+
private static boolean isDueToBadUserData(final SerializationException e) {
41+
return e.getCause() instanceof JsonMappingException || e.getCause() instanceof JsonParseException;
42+
}
43+
}

0 commit comments

Comments
 (0)