Skip to content

Commit e3e1ffc

Browse files
committed
Encoder/Decoder based payload serialization
See gh-21987
1 parent bcf4f39 commit e3e1ffc

File tree

12 files changed

+869
-7
lines changed

12 files changed

+869
-7
lines changed

spring-messaging/spring-messaging.gradle

+1
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ dependencies {
2525
}
2626
testCompile("org.apache.activemq:activemq-stomp:5.8.0")
2727
testCompile("io.projectreactor:reactor-test")
28+
testCompile "io.reactivex.rxjava2:rxjava:${rxjava2Version}"
2829
testCompile("org.jetbrains.kotlin:kotlin-reflect:${kotlinVersion}")
2930
testCompile("org.jetbrains.kotlin:kotlin-stdlib:${kotlinVersion}")
3031
testCompile("org.xmlunit:xmlunit-matchers:2.6.2")
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,299 @@
1+
/*
2+
* Copyright 2002-2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.messaging.handler.annotation.support.reactive;
17+
18+
import java.lang.annotation.Annotation;
19+
import java.util.ArrayList;
20+
import java.util.Collections;
21+
import java.util.List;
22+
import java.util.function.Consumer;
23+
24+
import org.apache.commons.logging.Log;
25+
import org.apache.commons.logging.LogFactory;
26+
import org.reactivestreams.Publisher;
27+
import reactor.core.publisher.Flux;
28+
import reactor.core.publisher.Mono;
29+
30+
import org.springframework.core.Conventions;
31+
import org.springframework.core.MethodParameter;
32+
import org.springframework.core.ReactiveAdapter;
33+
import org.springframework.core.ReactiveAdapterRegistry;
34+
import org.springframework.core.ResolvableType;
35+
import org.springframework.core.annotation.AnnotationUtils;
36+
import org.springframework.core.codec.Decoder;
37+
import org.springframework.core.codec.DecodingException;
38+
import org.springframework.core.io.buffer.DataBuffer;
39+
import org.springframework.lang.Nullable;
40+
import org.springframework.messaging.Message;
41+
import org.springframework.messaging.MessageHeaders;
42+
import org.springframework.messaging.handler.annotation.Payload;
43+
import org.springframework.messaging.handler.annotation.support.MethodArgumentNotValidException;
44+
import org.springframework.messaging.handler.invocation.MethodArgumentResolutionException;
45+
import org.springframework.messaging.handler.invocation.reactive.HandlerMethodArgumentResolver;
46+
import org.springframework.util.Assert;
47+
import org.springframework.util.CollectionUtils;
48+
import org.springframework.util.MimeType;
49+
import org.springframework.util.MimeTypeUtils;
50+
import org.springframework.util.ObjectUtils;
51+
import org.springframework.util.StringUtils;
52+
import org.springframework.validation.BeanPropertyBindingResult;
53+
import org.springframework.validation.SmartValidator;
54+
import org.springframework.validation.Validator;
55+
import org.springframework.validation.annotation.Validated;
56+
57+
/**
58+
* A resolver to extract and decode the payload of a message using a
59+
* {@link Decoder}, where the payload is expected to be a {@link Publisher} of
60+
* {@link DataBuffer DataBuffer}.
61+
*
62+
* <p>Validation is applied if the method argument is annotated with
63+
* {@code @javax.validation.Valid} or
64+
* {@link org.springframework.validation.annotation.Validated}. Validation
65+
* failure results in an {@link MethodArgumentNotValidException}.
66+
*
67+
* <p>This resolver should be ordered last if {@link #useDefaultResolution} is
68+
* set to {@code true} since in that case it supports all types and does not
69+
* require the presence of {@link Payload}.
70+
*
71+
* @author Rossen Stoyanchev
72+
* @since 5.2
73+
*/
74+
public class PayloadMethodArgumentResolver implements HandlerMethodArgumentResolver {
75+
76+
protected final Log logger = LogFactory.getLog(getClass());
77+
78+
79+
private final List<Decoder<?>> decoders;
80+
81+
@Nullable
82+
private final Validator validator;
83+
84+
private final ReactiveAdapterRegistry adapterRegistry;
85+
86+
private final boolean useDefaultResolution;
87+
88+
89+
public PayloadMethodArgumentResolver(List<? extends Decoder<?>> decoders, @Nullable Validator validator,
90+
@Nullable ReactiveAdapterRegistry registry, boolean useDefaultResolution) {
91+
92+
Assert.isTrue(!CollectionUtils.isEmpty(decoders), "At least one Decoder is required.");
93+
this.decoders = Collections.unmodifiableList(new ArrayList<>(decoders));
94+
this.validator = validator;
95+
this.adapterRegistry = registry != null ? registry : ReactiveAdapterRegistry.getSharedInstance();
96+
this.useDefaultResolution = useDefaultResolution;
97+
}
98+
99+
100+
/**
101+
* Return a read-only list of the configured decoders.
102+
*/
103+
public List<Decoder<?>> getDecoders() {
104+
return this.decoders;
105+
}
106+
107+
/**
108+
* Return the configured validator, if any.
109+
*/
110+
@Nullable
111+
public Validator getValidator() {
112+
return this.validator;
113+
}
114+
115+
/**
116+
* Return the configured {@link ReactiveAdapterRegistry}.
117+
*/
118+
public ReactiveAdapterRegistry getAdapterRegistry() {
119+
return this.adapterRegistry;
120+
}
121+
122+
/**
123+
* Whether this resolver is configured to use default resolution, i.e.
124+
* works for any argument type regardless of whether {@code @Payload} is
125+
* present or not.
126+
*/
127+
public boolean isUseDefaultResolution() {
128+
return this.useDefaultResolution;
129+
}
130+
131+
132+
@Override
133+
public boolean supportsParameter(MethodParameter parameter) {
134+
return parameter.hasParameterAnnotation(Payload.class) || this.useDefaultResolution;
135+
}
136+
137+
138+
/**
139+
* Decode the content of the given message payload through a compatible
140+
* {@link Decoder}.
141+
*
142+
* <p>Validation is applied if the method argument is annotated with
143+
* {@code @javax.validation.Valid} or
144+
* {@link org.springframework.validation.annotation.Validated}. Validation
145+
* failure results in an {@link MethodArgumentNotValidException}.
146+
*
147+
* @param parameter the target method argument that we are decoding to
148+
* @param message the message from which the content was extracted
149+
* @return a Mono with the result of argument resolution
150+
*
151+
* @see #extractPayloadContent(MethodParameter, Message)
152+
* @see #getMimeType(Message)
153+
*/
154+
@Override
155+
public final Mono<Object> resolveArgument(MethodParameter parameter, Message<?> message) {
156+
Payload ann = parameter.getParameterAnnotation(Payload.class);
157+
if (ann != null && StringUtils.hasText(ann.expression())) {
158+
throw new IllegalStateException("@Payload SpEL expressions not supported by this resolver");
159+
}
160+
Publisher<DataBuffer> content = extractPayloadContent(parameter, message);
161+
return decodeContent(parameter, message, ann == null || ann.required(), content, getMimeType(message));
162+
}
163+
164+
/**
165+
* Extract the content to decode from the message. By default, the message
166+
* payload is expected to be {@code Publisher<DataBuffer>}. Sub-classes can
167+
* override this method to change that assumption.
168+
* @param parameter the target method parameter we're decoding to
169+
* @param message the input message with the content
170+
* @return the content to decode
171+
*/
172+
@SuppressWarnings("unchecked")
173+
protected Publisher<DataBuffer> extractPayloadContent(MethodParameter parameter, Message<?> message) {
174+
Publisher<DataBuffer> content;
175+
try {
176+
content = (Publisher<DataBuffer>) message.getPayload();
177+
}
178+
catch (ClassCastException ex) {
179+
throw new MethodArgumentResolutionException(
180+
message, parameter, "Expected Publisher<DataBuffer> payload", ex);
181+
}
182+
return content;
183+
}
184+
185+
/**
186+
* Return the mime type for the content. By default this method checks the
187+
* {@link MessageHeaders#CONTENT_TYPE} header expecting to find a
188+
* {@link MimeType} value or a String to parse to a {@link MimeType}.
189+
* @param message the input message
190+
*/
191+
@Nullable
192+
protected MimeType getMimeType(Message<?> message) {
193+
Object headerValue = message.getHeaders().get(MessageHeaders.CONTENT_TYPE);
194+
if (headerValue == null) {
195+
return null;
196+
}
197+
else if (headerValue instanceof String) {
198+
return MimeTypeUtils.parseMimeType((String) headerValue);
199+
}
200+
else if (headerValue instanceof MimeType) {
201+
return (MimeType) headerValue;
202+
}
203+
else {
204+
throw new IllegalArgumentException("Unexpected MimeType value: " + headerValue);
205+
}
206+
}
207+
208+
private Mono<Object> decodeContent(MethodParameter parameter, Message<?> message,
209+
boolean isContentRequired, Publisher<DataBuffer> content, @Nullable MimeType mimeType) {
210+
211+
ResolvableType targetType = ResolvableType.forMethodParameter(parameter);
212+
Class<?> resolvedType = targetType.resolve();
213+
ReactiveAdapter adapter = (resolvedType != null ? getAdapterRegistry().getAdapter(resolvedType) : null);
214+
ResolvableType elementType = (adapter != null ? targetType.getGeneric() : targetType);
215+
isContentRequired = isContentRequired || (adapter != null && !adapter.supportsEmpty());
216+
Consumer<Object> validator = getValidator(message, parameter);
217+
218+
if (logger.isDebugEnabled()) {
219+
logger.debug("Mime type:" + mimeType);
220+
}
221+
mimeType = mimeType != null ? mimeType : MimeTypeUtils.APPLICATION_OCTET_STREAM;
222+
223+
for (Decoder<?> decoder : this.decoders) {
224+
if (decoder.canDecode(elementType, mimeType)) {
225+
if (adapter != null && adapter.isMultiValue()) {
226+
if (logger.isDebugEnabled()) {
227+
logger.debug("0..N [" + elementType + "]");
228+
}
229+
Flux<?> flux = decoder.decode(content, elementType, mimeType, Collections.emptyMap());
230+
flux = flux.onErrorResume(ex -> Flux.error(handleReadError(parameter, message, ex)));
231+
if (isContentRequired) {
232+
flux = flux.switchIfEmpty(Flux.error(() -> handleMissingBody(parameter, message)));
233+
}
234+
if (validator != null) {
235+
flux = flux.doOnNext(validator::accept);
236+
}
237+
return Mono.just(adapter.fromPublisher(flux));
238+
}
239+
else {
240+
if (logger.isDebugEnabled()) {
241+
logger.debug("0..1 [" + elementType + "]");
242+
}
243+
// Single-value (with or without reactive type wrapper)
244+
Mono<?> mono = decoder.decodeToMono(content, targetType, mimeType, Collections.emptyMap());
245+
mono = mono.onErrorResume(ex -> Mono.error(handleReadError(parameter, message, ex)));
246+
if (isContentRequired) {
247+
mono = mono.switchIfEmpty(Mono.error(() -> handleMissingBody(parameter, message)));
248+
}
249+
if (validator != null) {
250+
mono = mono.doOnNext(validator::accept);
251+
}
252+
return (adapter != null ? Mono.just(adapter.fromPublisher(mono)) : Mono.from(mono));
253+
}
254+
}
255+
}
256+
257+
return Mono.error(new MethodArgumentResolutionException(
258+
message, parameter, "Cannot decode to [" + targetType + "]" + message));
259+
}
260+
261+
private Throwable handleReadError(MethodParameter parameter, Message<?> message, Throwable ex) {
262+
return ex instanceof DecodingException ?
263+
new MethodArgumentResolutionException(message, parameter, "Failed to read HTTP message", ex) : ex;
264+
}
265+
266+
private MethodArgumentResolutionException handleMissingBody(MethodParameter param, Message<?> message) {
267+
return new MethodArgumentResolutionException(message, param,
268+
"Payload content is missing: " + param.getExecutable().toGenericString());
269+
}
270+
271+
@Nullable
272+
private Consumer<Object> getValidator(Message<?> message, MethodParameter parameter) {
273+
if (this.validator == null) {
274+
return null;
275+
}
276+
for (Annotation ann : parameter.getParameterAnnotations()) {
277+
Validated validatedAnn = AnnotationUtils.getAnnotation(ann, Validated.class);
278+
if (validatedAnn != null || ann.annotationType().getSimpleName().startsWith("Valid")) {
279+
Object hints = (validatedAnn != null ? validatedAnn.value() : AnnotationUtils.getValue(ann));
280+
Object[] validationHints = (hints instanceof Object[] ? (Object[]) hints : new Object[] {hints});
281+
String name = Conventions.getVariableNameForParameter(parameter);
282+
return target -> {
283+
BeanPropertyBindingResult bindingResult = new BeanPropertyBindingResult(target, name);
284+
if (!ObjectUtils.isEmpty(validationHints) && this.validator instanceof SmartValidator) {
285+
((SmartValidator) this.validator).validate(target, bindingResult, validationHints);
286+
}
287+
else {
288+
this.validator.validate(target, bindingResult);
289+
}
290+
if (bindingResult.hasErrors()) {
291+
throw new MethodArgumentNotValidException(message, parameter, bindingResult);
292+
}
293+
};
294+
}
295+
}
296+
return null;
297+
}
298+
299+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
/**
2+
* Support classes for working with annotated message-handling methods with
3+
* non-blocking, reactive contracts.
4+
*/
5+
@NonNullApi
6+
@NonNullFields
7+
package org.springframework.messaging.handler.annotation.support.reactive;
8+
9+
import org.springframework.lang.NonNullApi;
10+
import org.springframework.lang.NonNullFields;

spring-messaging/src/main/java/org/springframework/messaging/handler/invocation/MethodArgumentResolutionException.java

+12
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.springframework.messaging.handler.invocation;
1818

1919
import org.springframework.core.MethodParameter;
20+
import org.springframework.lang.Nullable;
2021
import org.springframework.messaging.Message;
2122
import org.springframework.messaging.MessagingException;
2223

@@ -51,6 +52,17 @@ public MethodArgumentResolutionException(Message<?> message, MethodParameter par
5152
this.parameter = parameter;
5253
}
5354

55+
/**
56+
* Create a new instance providing the invalid {@code MethodParameter},
57+
* prepared description, and a cause.
58+
*/
59+
public MethodArgumentResolutionException(
60+
Message<?> message, MethodParameter parameter, String description, @Nullable Throwable cause) {
61+
62+
super(message, getMethodParameterMessage(parameter) + ": " + description, cause);
63+
this.parameter = parameter;
64+
}
65+
5466

5567
/**
5668
* Return the MethodParameter that was rejected.

0 commit comments

Comments
 (0)