Skip to content

Commit f2bb95b

Browse files
committed
Payload encoding/decoding and handling refinements
See gh-21987
1 parent 33682d7 commit f2bb95b

File tree

11 files changed

+370
-252
lines changed

11 files changed

+370
-252
lines changed

spring-messaging/src/main/java/org/springframework/messaging/handler/HandlerMethod.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -298,7 +298,7 @@ public HandlerMethod createWithResolvedBean() {
298298
*/
299299
public String getShortLogMessage() {
300300
int args = this.method.getParameterCount();
301-
return getBeanType().getName() + "#" + this.method.getName() + "[" + args + " args]";
301+
return getBeanType().getSimpleName() + "#" + this.method.getName() + "[" + args + " args]";
302302
}
303303

304304

spring-messaging/src/main/java/org/springframework/messaging/handler/annotation/support/reactive/MessageMappingMessageHandler.java

+74-58
Original file line numberDiff line numberDiff line change
@@ -19,21 +19,25 @@
1919
import java.lang.reflect.Method;
2020
import java.util.ArrayList;
2121
import java.util.Arrays;
22+
import java.util.Collections;
2223
import java.util.Comparator;
2324
import java.util.LinkedHashSet;
2425
import java.util.List;
2526
import java.util.Set;
27+
import java.util.function.Predicate;
2628

2729
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
2830
import org.springframework.context.ApplicationContext;
2931
import org.springframework.context.ConfigurableApplicationContext;
3032
import org.springframework.context.EmbeddedValueResolverAware;
33+
import org.springframework.context.SmartLifecycle;
3134
import org.springframework.core.annotation.AnnotatedElementUtils;
3235
import org.springframework.core.codec.Decoder;
3336
import org.springframework.core.convert.ConversionService;
3437
import org.springframework.format.support.DefaultFormattingConversionService;
3538
import org.springframework.lang.Nullable;
3639
import org.springframework.messaging.Message;
40+
import org.springframework.messaging.ReactiveSubscribableChannel;
3741
import org.springframework.messaging.handler.CompositeMessageCondition;
3842
import org.springframework.messaging.handler.DestinationPatternsMessageCondition;
3943
import org.springframework.messaging.handler.annotation.MessageMapping;
@@ -51,58 +55,57 @@
5155
import org.springframework.validation.Validator;
5256

5357
/**
54-
* Extension of {@link AbstractMethodMessageHandler} for
55-
* {@link MessageMapping @MessageMapping} methods.
58+
* Extension of {@link AbstractMethodMessageHandler} for reactive, non-blocking
59+
* handling of messages via {@link MessageMapping @MessageMapping} methods.
60+
* By default such methods are detected in {@code @Controller} Spring beans but
61+
* that can be changed via {@link #setHandlerPredicate(Predicate)}.
5662
*
57-
* <p>The payload of incoming messages is decoded through
58-
* {@link PayloadMethodArgumentResolver} using one of the configured
59-
* {@link #setDecoders(List)} decoders.
63+
* <p>Payloads for incoming messages are decoded through the configured
64+
* {@link #setDecoders(List)} decoders, with the help of
65+
* {@link PayloadMethodArgumentResolver}.
6066
*
61-
* <p>The {@link #setEncoderReturnValueHandler encoderReturnValueHandler}
62-
* property must be set to encode and handle return values from
63-
* {@code @MessageMapping} methods.
67+
* <p>There is no default handling for return values but
68+
* {@link #setReturnValueHandlerConfigurer} can be used to configure custom
69+
* return value handlers. Sub-classes may also override
70+
* {@link #initReturnValueHandlers()} to set up default return value handlers.
6471
*
6572
* @author Rossen Stoyanchev
6673
* @since 5.2
74+
* @see AbstractEncoderMethodReturnValueHandler
6775
*/
6876
public class MessageMappingMessageHandler extends AbstractMethodMessageHandler<CompositeMessageCondition>
69-
implements EmbeddedValueResolverAware {
77+
implements SmartLifecycle, EmbeddedValueResolverAware {
7078

71-
private PathMatcher pathMatcher = new AntPathMatcher();
79+
private final ReactiveSubscribableChannel inboundChannel;
7280

7381
private final List<Decoder<?>> decoders = new ArrayList<>();
7482

7583
@Nullable
7684
private Validator validator;
7785

78-
@Nullable
79-
private HandlerMethodReturnValueHandler encoderReturnValueHandler;
86+
private PathMatcher pathMatcher;
8087

8188
private ConversionService conversionService = new DefaultFormattingConversionService();
8289

8390
@Nullable
8491
private StringValueResolver valueResolver;
8592

93+
private volatile boolean running = false;
8694

87-
/**
88-
* Set the PathMatcher implementation to use for matching destinations
89-
* against configured destination patterns.
90-
* <p>By default, {@link AntPathMatcher} is used.
91-
*/
92-
public void setPathMatcher(PathMatcher pathMatcher) {
93-
Assert.notNull(pathMatcher, "PathMatcher must not be null");
94-
this.pathMatcher = pathMatcher;
95-
}
95+
private final Object lifecycleMonitor = new Object();
9696

97-
/**
98-
* Return the PathMatcher implementation to use for matching destinations.
99-
*/
100-
public PathMatcher getPathMatcher() {
101-
return this.pathMatcher;
97+
98+
public MessageMappingMessageHandler(ReactiveSubscribableChannel inboundChannel) {
99+
Assert.notNull(inboundChannel, "`inboundChannel` is required");
100+
this.inboundChannel = inboundChannel;
101+
this.pathMatcher = new AntPathMatcher();
102+
((AntPathMatcher) this.pathMatcher).setPathSeparator(".");
103+
setHandlerPredicate(beanType -> AnnotatedElementUtils.hasAnnotation(beanType, Controller.class));
102104
}
103105

106+
104107
/**
105-
* Configure the decoders to user for incoming payloads.
108+
* Configure the decoders to use for incoming payloads.
106109
*/
107110
public void setDecoders(List<? extends Decoder<?>> decoders) {
108111
this.decoders.addAll(decoders);
@@ -115,14 +118,6 @@ public List<? extends Decoder<?>> getDecoders() {
115118
return this.decoders;
116119
}
117120

118-
/**
119-
* Return the configured Validator instance.
120-
*/
121-
@Nullable
122-
public Validator getValidator() {
123-
return this.validator;
124-
}
125-
126121
/**
127122
* Set the Validator instance used for validating {@code @Payload} arguments.
128123
* @see org.springframework.validation.annotation.Validated
@@ -133,27 +128,28 @@ public void setValidator(@Nullable Validator validator) {
133128
}
134129

135130
/**
136-
* Configure the return value handler that will encode response content.
137-
* Consider extending {@link AbstractEncoderMethodReturnValueHandler} which
138-
* provides the infrastructure to encode and all that's left is to somehow
139-
* handle the encoded content, e.g. by wrapping as a message and passing it
140-
* to something or sending it somewhere.
141-
* <p>By default this is not configured in which case payload/content return
142-
* values from {@code @MessageMapping} methods will remain unhandled.
143-
* @param encoderReturnValueHandler the return value handler to use
144-
* @see AbstractEncoderMethodReturnValueHandler
131+
* Return the configured Validator instance.
145132
*/
146-
public void setEncoderReturnValueHandler(@Nullable HandlerMethodReturnValueHandler encoderReturnValueHandler) {
147-
this.encoderReturnValueHandler = encoderReturnValueHandler;
133+
@Nullable
134+
public Validator getValidator() {
135+
return this.validator;
148136
}
149137

150138
/**
151-
* Return the configured
152-
* {@link #setEncoderReturnValueHandler encoderReturnValueHandler}.
139+
* Set the PathMatcher implementation to use for matching destinations
140+
* against configured destination patterns.
141+
* <p>By default, {@link AntPathMatcher} is used with separator set to ".".
153142
*/
154-
@Nullable
155-
public HandlerMethodReturnValueHandler getEncoderReturnValueHandler() {
156-
return this.encoderReturnValueHandler;
143+
public void setPathMatcher(PathMatcher pathMatcher) {
144+
Assert.notNull(pathMatcher, "PathMatcher must not be null");
145+
this.pathMatcher = pathMatcher;
146+
}
147+
148+
/**
149+
* Return the PathMatcher implementation to use for matching destinations.
150+
*/
151+
public PathMatcher getPathMatcher() {
152+
return this.pathMatcher;
157153
}
158154

159155
/**
@@ -204,20 +200,40 @@ protected List<? extends HandlerMethodArgumentResolver> initArgumentResolvers()
204200

205201
@Override
206202
protected List<? extends HandlerMethodReturnValueHandler> initReturnValueHandlers() {
207-
List<HandlerMethodReturnValueHandler> handlers = new ArrayList<>();
208-
handlers.addAll(getReturnValueHandlerConfigurer().getCustomHandlers());
209-
if (this.encoderReturnValueHandler != null) {
210-
handlers.add(this.encoderReturnValueHandler);
203+
return Collections.emptyList();
204+
}
205+
206+
207+
@Override
208+
public final void start() {
209+
synchronized (this.lifecycleMonitor) {
210+
this.inboundChannel.subscribe(this);
211+
this.running = true;
211212
}
212-
return handlers;
213213
}
214214

215+
@Override
216+
public final void stop() {
217+
synchronized (this.lifecycleMonitor) {
218+
this.running = false;
219+
this.inboundChannel.unsubscribe(this);
220+
}
221+
}
222+
223+
@Override
224+
public final void stop(Runnable callback) {
225+
synchronized (this.lifecycleMonitor) {
226+
stop();
227+
callback.run();
228+
}
229+
}
215230

216231
@Override
217-
protected boolean isHandler(Class<?> beanType) {
218-
return AnnotatedElementUtils.hasAnnotation(beanType, Controller.class);
232+
public final boolean isRunning() {
233+
return this.running;
219234
}
220235

236+
221237
@Override
222238
protected CompositeMessageCondition getMappingForMethod(Method method, Class<?> handlerType) {
223239
CompositeMessageCondition methodCondition = getCondition(method);

spring-messaging/src/main/java/org/springframework/messaging/handler/annotation/support/reactive/PayloadMethodArgumentResolver.java

+37-34
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.util.ArrayList;
2020
import java.util.Collections;
2121
import java.util.List;
22+
import java.util.Map;
2223
import java.util.function.Consumer;
2324

2425
import org.apache.commons.logging.Log;
@@ -148,38 +149,47 @@ public boolean supportsParameter(MethodParameter parameter) {
148149
* @param message the message from which the content was extracted
149150
* @return a Mono with the result of argument resolution
150151
*
151-
* @see #extractPayloadContent(MethodParameter, Message)
152+
* @see #extractContent(MethodParameter, Message)
152153
* @see #getMimeType(Message)
153154
*/
154155
@Override
155156
public final Mono<Object> resolveArgument(MethodParameter parameter, Message<?> message) {
157+
156158
Payload ann = parameter.getParameterAnnotation(Payload.class);
157159
if (ann != null && StringUtils.hasText(ann.expression())) {
158160
throw new IllegalStateException("@Payload SpEL expressions not supported by this resolver");
159161
}
160-
Publisher<DataBuffer> content = extractPayloadContent(parameter, message);
161-
return decodeContent(parameter, message, ann == null || ann.required(), content, getMimeType(message));
162+
163+
MimeType mimeType = getMimeType(message);
164+
mimeType = mimeType != null ? mimeType : MimeTypeUtils.APPLICATION_OCTET_STREAM;
165+
166+
Flux<DataBuffer> content = extractContent(parameter, message);
167+
return decodeContent(parameter, message, ann == null || ann.required(), content, mimeType);
162168
}
163169

164-
/**
165-
* Extract the content to decode from the message. By default, the message
166-
* payload is expected to be {@code Publisher<DataBuffer>}. Sub-classes can
167-
* override this method to change that assumption.
168-
* @param parameter the target method parameter we're decoding to
169-
* @param message the input message with the content
170-
* @return the content to decode
171-
*/
172170
@SuppressWarnings("unchecked")
173-
protected Publisher<DataBuffer> extractPayloadContent(MethodParameter parameter, Message<?> message) {
174-
Publisher<DataBuffer> content;
175-
try {
176-
content = (Publisher<DataBuffer>) message.getPayload();
171+
private Flux<DataBuffer> extractContent(MethodParameter parameter, Message<?> message) {
172+
Object payload = message.getPayload();
173+
if (payload instanceof DataBuffer) {
174+
return Flux.just((DataBuffer) payload);
177175
}
178-
catch (ClassCastException ex) {
179-
throw new MethodArgumentResolutionException(
180-
message, parameter, "Expected Publisher<DataBuffer> payload", ex);
176+
if (payload instanceof Publisher) {
177+
return Flux.from((Publisher<?>) payload).map(value -> {
178+
if (value instanceof DataBuffer) {
179+
return (DataBuffer) value;
180+
}
181+
String className = value.getClass().getName();
182+
throw getUnexpectedPayloadError(message, parameter, "Publisher<" + className + ">");
183+
});
181184
}
182-
return content;
185+
return Flux.error(getUnexpectedPayloadError(message, parameter, payload.getClass().getName()));
186+
}
187+
188+
private MethodArgumentResolutionException getUnexpectedPayloadError(
189+
Message<?> message, MethodParameter parameter, String actualType) {
190+
191+
return new MethodArgumentResolutionException(message, parameter,
192+
"Expected DataBuffer or Publisher<DataBuffer> for the Message payload, actual: " + actualType);
183193
}
184194

185195
/**
@@ -206,7 +216,7 @@ else if (headerValue instanceof MimeType) {
206216
}
207217

208218
private Mono<Object> decodeContent(MethodParameter parameter, Message<?> message,
209-
boolean isContentRequired, Publisher<DataBuffer> content, @Nullable MimeType mimeType) {
219+
boolean isContentRequired, Flux<DataBuffer> content, MimeType mimeType) {
210220

211221
ResolvableType targetType = ResolvableType.forMethodParameter(parameter);
212222
Class<?> resolvedType = targetType.resolve();
@@ -215,19 +225,14 @@ private Mono<Object> decodeContent(MethodParameter parameter, Message<?> message
215225
isContentRequired = isContentRequired || (adapter != null && !adapter.supportsEmpty());
216226
Consumer<Object> validator = getValidator(message, parameter);
217227

218-
if (logger.isDebugEnabled()) {
219-
logger.debug("Mime type:" + mimeType);
220-
}
221-
mimeType = mimeType != null ? mimeType : MimeTypeUtils.APPLICATION_OCTET_STREAM;
228+
Map<String, Object> hints = Collections.emptyMap();
222229

223230
for (Decoder<?> decoder : this.decoders) {
224231
if (decoder.canDecode(elementType, mimeType)) {
225232
if (adapter != null && adapter.isMultiValue()) {
226-
if (logger.isDebugEnabled()) {
227-
logger.debug("0..N [" + elementType + "]");
228-
}
229-
Flux<?> flux = decoder.decode(content, elementType, mimeType, Collections.emptyMap());
230-
flux = flux.onErrorResume(ex -> Flux.error(handleReadError(parameter, message, ex)));
233+
Flux<?> flux = content
234+
.concatMap(buffer -> decoder.decode(Mono.just(buffer), elementType, mimeType, hints))
235+
.onErrorResume(ex -> Flux.error(handleReadError(parameter, message, ex)));
231236
if (isContentRequired) {
232237
flux = flux.switchIfEmpty(Flux.error(() -> handleMissingBody(parameter, message)));
233238
}
@@ -237,12 +242,10 @@ private Mono<Object> decodeContent(MethodParameter parameter, Message<?> message
237242
return Mono.just(adapter.fromPublisher(flux));
238243
}
239244
else {
240-
if (logger.isDebugEnabled()) {
241-
logger.debug("0..1 [" + elementType + "]");
242-
}
243245
// Single-value (with or without reactive type wrapper)
244-
Mono<?> mono = decoder.decodeToMono(content, targetType, mimeType, Collections.emptyMap());
245-
mono = mono.onErrorResume(ex -> Mono.error(handleReadError(parameter, message, ex)));
246+
Mono<?> mono = decoder
247+
.decodeToMono(content.next(), targetType, mimeType, hints)
248+
.onErrorResume(ex -> Mono.error(handleReadError(parameter, message, ex)));
246249
if (isContentRequired) {
247250
mono = mono.switchIfEmpty(Mono.error(() -> handleMissingBody(parameter, message)));
248251
}

0 commit comments

Comments
 (0)