Skip to content

Commit 4ce1aa6

Browse files
authored
New Serializer APIs, consolidation of ContentCodec, and gRPC MethodDescriptor (#1673)
Motivation: Supporting a custom transport for gRPC currently requires reflection and special knowledge of types for serialization. The generated code has information related to types and serialization that can be exposed to the runtime which also allows supporting custom transports. This task requires exposing serializer APIs into the public API. Our existing serializer APIs were heavily influced by streaming use cases and JSON serialization which can serialize any type and generally doesn't have type restrictions. This lead to a general SerializationProvider API which is impossible to constrain types due to the generics being at the method level, which doesn't account for implementing serialization for type constrained approached (protobuf MessageLite). Also the serialization APIs couple scalar and streaming use cases which in practice are more commonly decoupled into scalar (individual objects) and streaming (which applies some framing around individual objects). These concepts are coupled which leads to confusing specializations (deserializeAggregatedSingle, deserializeAggregated) which are easy to misuse and lead to invalid serialization in practice (streaming {form url, string} encoding are not properly framed and may not be deserialized correctly). The existing Serializer APIs also do not promote composability in that if you can serialize one object you should be able to re-use that serialization for a stream of objects with some frameing applied on top (required for gRPC). Modifications: - Introduce a new servicetalk-serializer-api package with new Serializer APIs. - Introduce a new servicetalk-serializer-utils package with common utilities for serialization such as streaming framing (fixed and varint length prefixed) - Introduce BufferEncoder APIs in servicetalk-encoding-api for compression and decompression. This also comes with NettyBufferEncoders and NettyCompression to create Netty backed implementations. - Introduce HttpStreamingSerializer and new HttpSerializer APIs in servicetalk-http-api, and modify all request/response APIs to use the new serializer APIs. Introduce new methods to deserialize and process trailers. - Introduce HttpSerializers in servicetalk-http-api for implementations of the new APIs. - Introduce JacksonSerializerCache in servicetalk-http-api as the new entry point to create/cache serializers for JSON. - Introduce ProtobufSerializerCache in servicetalk-data-protobuf as the new entry point to create/cache serializers for protobuf. - Introduce MethodDescriptor API into servicetalk-grpc-api and update code generation to use this new API. - Update all examples avoid usage of deprecated APIs and leverage new APIs. - Deprecate the following APIs and related methods in favor of the new types descrbied above: - SerializationProvider, JacksonSerializationProvider, ProtobufSerializationProvider, GrpcSerializationProvider, HttpSerializationProvider - ContentCodec, CodecDecodingException, CodecEncodingException, ContentCodings - GrpcMetadata#path(), code generated types that extend DefaultGrpcClientMetadata Result: gRPC API supports MethodDescriptor, which uses new Serializer APIs, and the entire code base has been updated to use the new APIs consistently.
1 parent c1db214 commit 4ce1aa6

File tree

327 files changed

+12164
-3105
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

327 files changed

+12164
-3105
lines changed

servicetalk-buffer-api/src/main/java/io/servicetalk/buffer/api/Buffer.java

+15
Original file line numberDiff line numberDiff line change
@@ -1657,6 +1657,21 @@ default boolean tryEnsureWritable(int minWritableBytes, boolean force) {
16571657
*/
16581658
Buffer writeUtf8(CharSequence seq, int ensureWritable);
16591659

1660+
/**
1661+
* Encode a {@link CharSequence} encoded in {@link Charset} and write it to this buffer starting at
1662+
* {@code writerIndex} and increases the {@code writerIndex} by the number of the transferred bytes.
1663+
*
1664+
* @param seq the source of the data.
1665+
* @param charset the charset used for encoding.
1666+
* @return self.
1667+
* @throws ReadOnlyBufferException if this buffer is read-only
1668+
*/
1669+
default Buffer writeCharSequence(CharSequence seq, Charset charset) {
1670+
byte[] bytes = seq.toString().getBytes(charset);
1671+
writeBytes(bytes);
1672+
return this;
1673+
}
1674+
16601675
/**
16611676
* Locates the first occurrence of the specified {@code value} in this
16621677
* buffer. The search takes place from the specified {@code fromIndex}

servicetalk-buffer-api/src/main/java/io/servicetalk/buffer/api/EmptyBuffer.java

+5
Original file line numberDiff line numberDiff line change
@@ -644,6 +644,11 @@ public Buffer writeUtf8(CharSequence seq, int ensureWritable) {
644644
throw new IndexOutOfBoundsException();
645645
}
646646

647+
@Override
648+
public Buffer writeCharSequence(CharSequence seq, Charset charset) {
649+
throw new IndexOutOfBoundsException();
650+
}
651+
647652
@Override
648653
public int indexOf(int fromIndex, int toIndex, byte value) {
649654
checkIndex(fromIndex);

servicetalk-buffer-api/src/main/java/io/servicetalk/buffer/api/ReadOnlyByteBuffer.java

+5
Original file line numberDiff line numberDiff line change
@@ -375,6 +375,11 @@ public Buffer writeUtf8(CharSequence seq, int ensureWritable) {
375375
throw new ReadOnlyBufferException();
376376
}
377377

378+
@Override
379+
public Buffer writeCharSequence(CharSequence seq, Charset charset) {
380+
throw new ReadOnlyBufferException();
381+
}
382+
378383
@Override
379384
public Buffer readSlice(int length) {
380385
checkReadableBytes0(length);

servicetalk-buffer-netty/src/main/java/io/servicetalk/buffer/netty/NettyBuffer.java

+6
Original file line numberDiff line numberDiff line change
@@ -710,6 +710,12 @@ public Buffer writeUtf8(CharSequence seq, int ensureWritable) {
710710
return this;
711711
}
712712

713+
@Override
714+
public Buffer writeCharSequence(CharSequence seq, Charset charset) {
715+
buffer.writeCharSequence(seq, charset);
716+
return this;
717+
}
718+
713719
@Override
714720
public int indexOf(int fromIndex, int toIndex, byte value) {
715721
return buffer.indexOf(fromIndex, toIndex, value);

servicetalk-buffer-netty/src/main/java/io/servicetalk/buffer/netty/NettyCompositeBuffer.java

+7
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import io.netty.buffer.CompositeByteBuf;
2222

2323
import java.nio.ByteBuffer;
24+
import java.nio.charset.Charset;
2425

2526
final class NettyCompositeBuffer extends NettyBuffer<CompositeByteBuf> implements CompositeBuffer {
2627

@@ -363,4 +364,10 @@ public CompositeBuffer writeUtf8(CharSequence seq, int ensureWritable) {
363364
super.writeUtf8(seq, ensureWritable);
364365
return this;
365366
}
367+
368+
@Override
369+
public CompositeBuffer writeCharSequence(CharSequence seq, Charset charset) {
370+
super.writeCharSequence(seq, charset);
371+
return this;
372+
}
366373
}

servicetalk-buffer-netty/src/main/java/io/servicetalk/buffer/netty/ReadOnlyBuffer.java

+6
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.io.InputStream;
2121
import java.nio.ByteBuffer;
2222
import java.nio.ReadOnlyBufferException;
23+
import java.nio.charset.Charset;
2324

2425
final class ReadOnlyBuffer extends WrappedBuffer {
2526

@@ -302,6 +303,11 @@ public Buffer writeUtf8(CharSequence seq, int ensureWritable) {
302303
throw new ReadOnlyBufferException();
303304
}
304305

306+
@Override
307+
public Buffer writeCharSequence(CharSequence seq, Charset charset) {
308+
throw new ReadOnlyBufferException();
309+
}
310+
305311
@Override
306312
public Buffer readSlice(int length) {
307313
return buffer.readSlice(length).asReadOnly();

servicetalk-buffer-netty/src/main/java/io/servicetalk/buffer/netty/WrappedBuffer.java

+6
Original file line numberDiff line numberDiff line change
@@ -658,6 +658,12 @@ public Buffer writeUtf8(CharSequence seq, int ensureWritable) {
658658
return this;
659659
}
660660

661+
@Override
662+
public Buffer writeCharSequence(CharSequence seq, Charset charset) {
663+
buffer.writeCharSequence(seq, charset);
664+
return this;
665+
}
666+
661667
@Override
662668
public int indexOf(int fromIndex, int toIndex, byte value) {
663669
return buffer.indexOf(fromIndex, toIndex, value);

servicetalk-concurrent-reactivestreams/build.gradle

+2
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ dependencies {
2121
api "org.reactivestreams:reactive-streams:$reactiveStreamsVersion"
2222

2323
implementation project(":servicetalk-annotations")
24+
implementation project(":servicetalk-serializer-utils")
25+
implementation project(":servicetalk-buffer-netty")
2426
implementation "com.google.code.findbugs:jsr305:$jsr305Version"
2527
implementation "org.slf4j:slf4j-api:$slf4jVersion"
2628

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Copyright © 2021 Apple Inc. and the ServiceTalk project authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.servicetalk.concurrent.reactivestreams.tck;
17+
18+
import io.servicetalk.concurrent.api.Publisher;
19+
import io.servicetalk.serializer.utils.FramedDeserializerOperator;
20+
21+
import org.testng.annotations.Test;
22+
23+
import static io.servicetalk.buffer.netty.BufferAllocators.DEFAULT_ALLOCATOR;
24+
import static java.util.function.Function.identity;
25+
26+
@Test
27+
public class FramedDeserializerOperatorTckTest extends AbstractPublisherTckTest<Integer> {
28+
@Override
29+
public Publisher<Integer> createServiceTalkPublisher(final long elements) {
30+
return Publisher.range(0, TckUtils.requestNToInt(elements))
31+
.map(i -> DEFAULT_ALLOCATOR.newBuffer().writeInt(i))
32+
.liftSync(new FramedDeserializerOperator<>(
33+
(serializedData, allocator) -> serializedData.readInt(),
34+
() -> (buffer, bufferAllocator) ->
35+
buffer.readableBytes() < Integer.BYTES ? null : buffer.readBytes(Integer.BYTES),
36+
DEFAULT_ALLOCATOR))
37+
.flatMapConcatIterable(identity());
38+
}
39+
40+
@Override
41+
public long maxElementsFromPublisher() {
42+
return TckUtils.maxElementsFromPublisher();
43+
}
44+
}

servicetalk-data-jackson-jersey/src/main/java/io/servicetalk/data/jackson/jersey/JacksonSerializationProviderContextResolver.java

+4
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,10 @@
2020
import javax.annotation.Nullable;
2121
import javax.ws.rs.ext.ContextResolver;
2222

23+
/**
24+
* @deprecated Use {@link JacksonSerializerFactoryContextResolver}.
25+
*/
26+
@Deprecated
2327
final class JacksonSerializationProviderContextResolver implements ContextResolver<JacksonSerializationProvider> {
2428
private final JacksonSerializationProvider jacksonSerializationProvider;
2529

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Copyright © 2021 Apple Inc. and the ServiceTalk project authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.servicetalk.data.jackson.jersey;
17+
18+
import io.servicetalk.data.jackson.JacksonSerializerFactory;
19+
20+
import javax.annotation.Nullable;
21+
import javax.ws.rs.ext.ContextResolver;
22+
23+
import static java.util.Objects.requireNonNull;
24+
25+
final class JacksonSerializerFactoryContextResolver implements ContextResolver<JacksonSerializerFactory> {
26+
private final JacksonSerializerFactory factory;
27+
28+
JacksonSerializerFactoryContextResolver(final JacksonSerializerFactory factory) {
29+
this.factory = requireNonNull(factory);
30+
}
31+
32+
@Nullable
33+
@Override
34+
public JacksonSerializerFactory getContext(final Class<?> aClass) {
35+
if (!JacksonSerializerFactory.class.isAssignableFrom(aClass)) {
36+
return null;
37+
}
38+
39+
return factory;
40+
}
41+
}

servicetalk-data-jackson-jersey/src/main/java/io/servicetalk/data/jackson/jersey/JacksonSerializerMessageBodyReaderWriter.java

+40-48
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,13 @@
1919
import io.servicetalk.buffer.api.BufferAllocator;
2020
import io.servicetalk.concurrent.api.Publisher;
2121
import io.servicetalk.concurrent.api.Single;
22-
import io.servicetalk.data.jackson.JacksonSerializationProvider;
22+
import io.servicetalk.data.jackson.JacksonSerializerFactory;
2323
import io.servicetalk.http.router.jersey.internal.SourceWrappers.PublisherSource;
2424
import io.servicetalk.http.router.jersey.internal.SourceWrappers.SingleSource;
25-
import io.servicetalk.serialization.api.DefaultSerializer;
26-
import io.servicetalk.serialization.api.SerializationException;
27-
import io.servicetalk.serialization.api.Serializer;
25+
import io.servicetalk.serializer.api.Deserializer;
26+
import io.servicetalk.serializer.api.SerializationException;
27+
import io.servicetalk.serializer.api.Serializer;
28+
import io.servicetalk.serializer.api.StreamingSerializer;
2829
import io.servicetalk.transport.api.ConnectionContext;
2930
import io.servicetalk.transport.api.ExecutionContext;
3031

@@ -68,9 +69,6 @@
6869
@Consumes(WILDCARD)
6970
@Produces(WILDCARD)
7071
final class JacksonSerializerMessageBodyReaderWriter implements MessageBodyReader<Object>, MessageBodyWriter<Object> {
71-
private static final JacksonSerializationProvider DEFAULT_JACKSON_SERIALIZATION_PROVIDER =
72-
new JacksonSerializationProvider();
73-
7472
// We can not use `@Context ConnectionContext` directly because we would not see the latest version
7573
// in case it has been rebound as part of offloading.
7674
@Context
@@ -95,86 +93,81 @@ public boolean isReadable(final Class<?> type, final Type genericType, final Ann
9593
public Object readFrom(final Class<Object> type, final Type genericType, final Annotation[] annotations,
9694
final MediaType mediaType, final MultivaluedMap<String, String> httpHeaders,
9795
final InputStream entityStream) throws WebApplicationException {
98-
99-
final Serializer serializer = getSerializer(mediaType);
96+
final JacksonSerializerFactory serializerFactory = getJacksonSerializerFactory(mediaType);
10097
final ExecutionContext executionContext = ctxRefProvider.get().get().executionContext();
10198
final BufferAllocator allocator = executionContext.bufferAllocator();
10299
final int contentLength = requestCtxProvider.get().getLength();
103100

104101
if (Single.class.isAssignableFrom(type)) {
105102
return handleEntityStream(entityStream, allocator,
106-
(p, a) -> deserialize(p, serializer, getSourceClass(genericType), contentLength, a),
107-
(is, a) -> new SingleSource<>(deserialize(toBufferPublisher(is, a), serializer,
108-
getSourceClass(genericType), contentLength, a)));
103+
(p, a) -> deserialize(p, serializerFactory.serializerDeserializer(getSourceClass(genericType)),
104+
contentLength, a),
105+
(is, a) -> new SingleSource<>(deserialize(toBufferPublisher(is, a),
106+
serializerFactory.serializerDeserializer(getSourceClass(genericType)), contentLength, a)));
109107
} else if (Publisher.class.isAssignableFrom(type)) {
110108
return handleEntityStream(entityStream, allocator,
111-
(p, a) -> serializer.deserialize(p, getSourceClass(genericType)),
112-
(is, a) -> new PublisherSource<>(serializer.deserialize(toBufferPublisher(is, a),
113-
getSourceClass(genericType))));
109+
(p, a) -> serializerFactory.streamingSerializerDeserializer(
110+
getSourceClass(genericType)).deserialize(p, a),
111+
(is, a) -> new PublisherSource<>(serializerFactory.streamingSerializerDeserializer(
112+
getSourceClass(genericType)).deserialize(toBufferPublisher(is, a), a)));
114113
}
115114

116115
return handleEntityStream(entityStream, allocator,
117-
(p, a) -> deserializeObject(p, serializer, type, contentLength, a),
118-
(is, a) -> deserializeObject(toBufferPublisher(is, a), serializer, type, contentLength, a));
116+
(p, a) -> deserializeObject(p, serializerFactory.serializerDeserializer(type), contentLength, a),
117+
(is, a) -> deserializeObject(toBufferPublisher(is, a), serializerFactory.serializerDeserializer(type),
118+
contentLength, a));
119119
}
120120

121121
@Override
122122
public boolean isWriteable(final Class<?> type, final Type genericType, final Annotation[] annotations,
123123
final MediaType mediaType) {
124-
125124
return !isSse(requestCtxProvider.get()) && isSupportedMediaType(mediaType);
126125
}
127126

127+
@SuppressWarnings({"rawtypes", "unchecked"})
128128
@Override
129129
public void writeTo(final Object o, final Class<?> type, final Type genericType, final Annotation[] annotations,
130130
final MediaType mediaType, final MultivaluedMap<String, Object> httpHeaders,
131131
final OutputStream entityStream) throws WebApplicationException {
132-
132+
final BufferAllocator allocator = ctxRefProvider.get().get().executionContext().bufferAllocator();
133133
final Publisher<Buffer> bufferPublisher;
134134
if (o instanceof Single) {
135-
bufferPublisher = getResponseBufferPublisher(((Single) o).toPublisher(), genericType, mediaType);
135+
final Class<?> clazz = genericType instanceof Class ? (Class) genericType : getSourceClass(genericType);
136+
Serializer serializer = getJacksonSerializerFactory(mediaType).serializerDeserializer(clazz);
137+
bufferPublisher = ((Single) o).map(t -> serializer.serialize(t, allocator)).toPublisher();
136138
} else if (o instanceof Publisher) {
137-
bufferPublisher = getResponseBufferPublisher((Publisher) o, genericType, mediaType);
139+
final Class<?> clazz = genericType instanceof Class ? (Class) genericType : getSourceClass(genericType);
140+
StreamingSerializer serializer = getJacksonSerializerFactory(mediaType)
141+
.streamingSerializerDeserializer(clazz);
142+
bufferPublisher = serializer.serialize((Publisher) o, allocator);
138143
} else {
139-
bufferPublisher = getResponseBufferPublisher(Publisher.from(o), o.getClass(), mediaType);
144+
Serializer serializer = getJacksonSerializerFactory(mediaType).serializerDeserializer(o.getClass());
145+
bufferPublisher = Publisher.from(serializer.serialize(o, allocator));
140146
}
141147

142148
setResponseBufferPublisher(bufferPublisher, requestCtxProvider.get());
143149
}
144150

145-
@SuppressWarnings("unchecked")
146-
private Publisher<Buffer> getResponseBufferPublisher(final Publisher publisher, final Type type,
147-
final MediaType mediaType) {
148-
final BufferAllocator allocator = ctxRefProvider.get().get().executionContext().bufferAllocator();
149-
return getSerializer(mediaType).serialize(publisher, allocator,
150-
type instanceof Class ? (Class) type : getSourceClass(type));
151-
}
152-
153-
private Serializer getSerializer(final MediaType mediaType) {
154-
return new DefaultSerializer(getJacksonSerializationProvider(mediaType));
155-
}
151+
private JacksonSerializerFactory getJacksonSerializerFactory(final MediaType mediaType) {
152+
final ContextResolver<JacksonSerializerFactory> contextResolver =
153+
providers.getContextResolver(JacksonSerializerFactory.class, mediaType);
156154

157-
private JacksonSerializationProvider getJacksonSerializationProvider(final MediaType mediaType) {
158-
final ContextResolver<JacksonSerializationProvider> contextResolver =
159-
providers.getContextResolver(JacksonSerializationProvider.class, mediaType);
160-
161-
return contextResolver != null ? contextResolver.getContext(JacksonSerializationProvider.class) :
162-
DEFAULT_JACKSON_SERIALIZATION_PROVIDER;
155+
return contextResolver != null ? contextResolver.getContext(JacksonSerializerFactory.class) :
156+
JacksonSerializerFactory.JACKSON;
163157
}
164158

165159
private static Publisher<Buffer> toBufferPublisher(final InputStream is, final BufferAllocator a) {
166160
return fromInputStream(is).map(a::wrap);
167161
}
168162

169-
private static <T> Single<T> deserialize(final Publisher<Buffer> bufferPublisher, final Serializer ser,
170-
final Class<T> type, final int contentLength,
171-
final BufferAllocator allocator) {
172-
163+
private static <T> Single<T> deserialize(
164+
final Publisher<Buffer> bufferPublisher, final Deserializer<T> deserializer, final int contentLength,
165+
final BufferAllocator allocator) {
173166
return bufferPublisher
174167
.collect(() -> newBufferForRequestContent(contentLength, allocator), Buffer::writeBytes)
175168
.map(buf -> {
176169
try {
177-
return ser.deserializeAggregatedSingle(buf, type);
170+
return deserializer.deserialize(buf, allocator);
178171
} catch (final NoSuchElementException e) {
179172
throw new BadRequestException("No deserializable JSON content", e);
180173
} catch (final SerializationException e) {
@@ -192,10 +185,9 @@ static Buffer newBufferForRequestContent(final int contentLength,
192185
}
193186

194187
// visible for testing
195-
static <T> T deserializeObject(final Publisher<Buffer> bufferPublisher, final Serializer ser,
196-
final Class<T> type, final int contentLength,
197-
final BufferAllocator allocator) {
198-
return awaitResult(deserialize(bufferPublisher, ser, type, contentLength, allocator).toFuture());
188+
static <T> T deserializeObject(final Publisher<Buffer> bufferPublisher, final Deserializer<T> deserializer,
189+
final int contentLength, final BufferAllocator allocator) {
190+
return awaitResult(deserialize(bufferPublisher, deserializer, contentLength, allocator).toFuture());
199191
}
200192

201193
private static boolean isSse(ContainerRequestContext requestCtx) {

0 commit comments

Comments
 (0)