Skip to content

Commit fa6c5ff

Browse files
committed
MessagingAcceptor/RSocket refinements + upgrade to 0.11.17
See spring-projectsgh-21987
1 parent a9d3343 commit fa6c5ff

10 files changed

+105
-116
lines changed

spring-messaging/spring-messaging.gradle

+1-1
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ dependencyManagement {
77
}
88
}
99

10-
def rsocketVersion = "0.11.15"
10+
def rsocketVersion = "0.11.17"
1111

1212
dependencies {
1313
compile(project(":spring-beans"))

spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketRequester.java

+13-8
Original file line numberDiff line numberDiff line change
@@ -149,9 +149,14 @@ else if (adapter != null) {
149149
.concatMap(value -> encodeValue(value, dataType, encoder))
150150
.switchOnFirst((signal, inner) -> {
151151
DataBuffer data = signal.get();
152-
return data != null ?
153-
Flux.concat(Mono.just(firstPayload(data)), inner.skip(1).map(PayloadUtils::asPayload)) :
154-
inner.map(PayloadUtils::asPayload);
152+
if (data != null) {
153+
return Flux.concat(
154+
Mono.just(firstPayload(data)),
155+
inner.skip(1).map(PayloadUtils::createPayload));
156+
}
157+
else {
158+
return inner.map(PayloadUtils::createPayload);
159+
}
155160
})
156161
.switchIfEmpty(emptyPayload());
157162
return new DefaultResponseSpec(payloadFlux);
@@ -167,7 +172,7 @@ private <T> Mono<DataBuffer> encodeValue(T value, ResolvableType valueType, @Nul
167172
}
168173

169174
private Payload firstPayload(DataBuffer data) {
170-
return PayloadUtils.asPayload(getMetadata(), data);
175+
return PayloadUtils.createPayload(getMetadata(), data);
171176
}
172177

173178
private Mono<Payload> emptyPayload() {
@@ -239,7 +244,7 @@ private <T> Mono<T> retrieveMono(ResolvableType elementType) {
239244

240245
Decoder<?> decoder = strategies.decoder(elementType, dataMimeType);
241246
return (Mono<T>) decoder.decodeToMono(
242-
payloadMono.map(this::asDataBuffer), elementType, dataMimeType, EMPTY_HINTS);
247+
payloadMono.map(this::wrapPayloadData), elementType, dataMimeType, EMPTY_HINTS);
243248
}
244249

245250
@SuppressWarnings("unchecked")
@@ -255,12 +260,12 @@ private <T> Flux<T> retrieveFlux(ResolvableType elementType) {
255260

256261
Decoder<?> decoder = strategies.decoder(elementType, dataMimeType);
257262

258-
return payloadFlux.map(this::asDataBuffer).concatMap(dataBuffer ->
263+
return payloadFlux.map(this::wrapPayloadData).concatMap(dataBuffer ->
259264
(Mono<T>) decoder.decodeToMono(Mono.just(dataBuffer), elementType, dataMimeType, EMPTY_HINTS));
260265
}
261266

262-
private DataBuffer asDataBuffer(Payload payload) {
263-
return PayloadUtils.asDataBuffer(payload, strategies.dataBufferFactory());
267+
private DataBuffer wrapPayloadData(Payload payload) {
268+
return PayloadUtils.wrapPayloadData(payload, strategies.dataBufferFactory());
264269
}
265270
}
266271

spring-messaging/src/main/java/org/springframework/messaging/rsocket/MessagingAcceptor.java

+9-14
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import org.springframework.messaging.ReactiveMessageChannel;
2929
import org.springframework.util.Assert;
3030
import org.springframework.util.MimeType;
31-
import org.springframework.util.MimeTypeUtils;
3231

3332
/**
3433
* RSocket acceptor for
@@ -79,10 +78,9 @@ public MessagingAcceptor(ReactiveMessageChannel messageChannel, RSocketStrategie
7978

8079

8180
/**
82-
* Configure the default content type for data payloads. For server
83-
* acceptors this is available from the {@link ConnectionSetupPayload} but
84-
* for client acceptors it's not and must be provided here.
85-
* <p>By default this is not set.
81+
* Configure the default content type to use for data payloads.
82+
* <p>By default this is not set. However a server acceptor will use the
83+
* content type from the {@link ConnectionSetupPayload}.
8684
* @param defaultDataMimeType the MimeType to use
8785
*/
8886
public void setDefaultDataMimeType(@Nullable MimeType defaultDataMimeType) {
@@ -92,21 +90,18 @@ public void setDefaultDataMimeType(@Nullable MimeType defaultDataMimeType) {
9290

9391
@Override
9492
public Mono<RSocket> accept(ConnectionSetupPayload setupPayload, RSocket sendingRSocket) {
95-
96-
MimeType mimeType = setupPayload.dataMimeType() != null ?
97-
MimeTypeUtils.parseMimeType(setupPayload.dataMimeType()) : this.defaultDataMimeType;
98-
99-
MessagingRSocket rsocket = createRSocket(sendingRSocket, mimeType);
100-
return rsocket.afterConnectionEstablished(setupPayload).then(Mono.just(rsocket));
93+
MessagingRSocket rsocket = createRSocket(sendingRSocket);
94+
rsocket.handleConnectionSetupPayload(setupPayload).subscribe();
95+
return Mono.just(rsocket);
10196
}
10297

10398
@Override
10499
public RSocket apply(RSocket sendingRSocket) {
105-
return createRSocket(sendingRSocket, this.defaultDataMimeType);
100+
return createRSocket(sendingRSocket);
106101
}
107102

108-
private MessagingRSocket createRSocket(RSocket sendingRSocket, @Nullable MimeType dataMimeType) {
109-
return new MessagingRSocket(this.messageChannel, sendingRSocket, dataMimeType, this.rsocketStrategies);
103+
private MessagingRSocket createRSocket(RSocket rsocket) {
104+
return new MessagingRSocket(this.messageChannel, rsocket, this.defaultDataMimeType, this.rsocketStrategies);
110105
}
111106

112107
}

spring-messaging/src/main/java/org/springframework/messaging/rsocket/MessagingRSocket.java

+45-38
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import java.util.function.Function;
1919

20+
import io.rsocket.AbstractRSocket;
2021
import io.rsocket.ConnectionSetupPayload;
2122
import io.rsocket.Payload;
2223
import io.rsocket.RSocket;
@@ -40,6 +41,8 @@
4041
import org.springframework.messaging.support.MessageHeaderAccessor;
4142
import org.springframework.util.Assert;
4243
import org.springframework.util.MimeType;
44+
import org.springframework.util.MimeTypeUtils;
45+
import org.springframework.util.StringUtils;
4346

4447
/**
4548
* Package private implementation of {@link RSocket} that is is hooked into an
@@ -49,90 +52,96 @@
4952
* @author Rossen Stoyanchev
5053
* @since 5.2
5154
*/
52-
class MessagingRSocket implements RSocket {
55+
class MessagingRSocket extends AbstractRSocket {
5356

5457
private final ReactiveMessageChannel messageChannel;
5558

5659
private final RSocketRequester requester;
5760

5861
@Nullable
59-
private final MimeType dataMimeType;
62+
private MimeType dataMimeType;
6063

6164
private final RSocketStrategies strategies;
6265

6366

6467
MessagingRSocket(ReactiveMessageChannel messageChannel,
65-
RSocket sendingRSocket, @Nullable MimeType dataMimeType, RSocketStrategies strategies) {
68+
RSocket sendingRSocket, @Nullable MimeType defaultDataMimeType, RSocketStrategies strategies) {
6669

6770
Assert.notNull(messageChannel, "'messageChannel' is required");
6871
Assert.notNull(sendingRSocket, "'sendingRSocket' is required");
6972
this.messageChannel = messageChannel;
70-
this.requester = RSocketRequester.create(sendingRSocket, dataMimeType, strategies);
71-
this.dataMimeType = dataMimeType;
73+
this.requester = RSocketRequester.create(sendingRSocket, defaultDataMimeType, strategies);
74+
this.dataMimeType = defaultDataMimeType;
7275
this.strategies = strategies;
7376
}
7477

7578

76-
public Mono<Void> afterConnectionEstablished(ConnectionSetupPayload payload) {
77-
return execute(payload).flatMap(flux -> flux.take(0).then());
79+
80+
public Mono<Void> handleConnectionSetupPayload(ConnectionSetupPayload payload) {
81+
if (StringUtils.hasText(payload.dataMimeType())) {
82+
this.dataMimeType = MimeTypeUtils.parseMimeType(payload.dataMimeType());
83+
}
84+
return handle(payload);
7885
}
7986

8087

8188
@Override
8289
public Mono<Void> fireAndForget(Payload payload) {
83-
return execute(payload).flatMap(flux -> flux.take(0).then());
90+
return handle(payload);
8491
}
8592

8693
@Override
8794
public Mono<Payload> requestResponse(Payload payload) {
88-
return execute(payload).flatMap(Flux::next);
95+
return handleAndReply(payload, Flux.just(payload)).next();
8996
}
9097

9198
@Override
9299
public Flux<Payload> requestStream(Payload payload) {
93-
return execute(payload).flatMapMany(Function.identity());
100+
return handleAndReply(payload, Flux.just(payload));
94101
}
95102

96103
@Override
97104
public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
98105
return Flux.from(payloads)
99-
.switchOnFirst((signal, inner) -> {
100-
Payload first = signal.get();
101-
return first != null ? execute(first, inner).flatMapMany(Function.identity()) : inner;
106+
.switchOnFirst((signal, innerFlux) -> {
107+
Payload firstPayload = signal.get();
108+
return firstPayload == null ? innerFlux : handleAndReply(firstPayload, innerFlux);
102109
});
103110
}
104111

105112
@Override
106113
public Mono<Void> metadataPush(Payload payload) {
107-
return null;
114+
// This won't be very useful until createHeaders starting doing something more with metadata..
115+
return handle(payload);
108116
}
109117

110-
private Mono<Flux<Payload>> execute(Payload payload) {
111-
return execute(payload, Flux.just(payload));
112-
}
113118

114-
private Mono<Flux<Payload>> execute(Payload firstPayload, Flux<Payload> payloads) {
119+
private Mono<Void> handle(Payload payload) {
115120

116-
// TODO:
117-
// Since we do retain(), we need to ensure buffers are released if not consumed,
118-
// e.g. error before Flux subscribed to, no handler found, @MessageMapping ignores payload, etc.
121+
Message<?> message = MessageBuilder.createMessage(
122+
Mono.fromCallable(() -> wrapPayloadData(payload)),
123+
createHeaders(payload, null));
119124

120-
Flux<DataBuffer> payloadDataBuffers = payloads
121-
.map(payload -> PayloadUtils.asDataBuffer(payload, this.strategies.dataBufferFactory()))
122-
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
125+
return this.messageChannel.send(message).flatMap(result -> result ?
126+
Mono.empty() : Mono.error(new MessageDeliveryException("RSocket request not handled")));
127+
}
128+
129+
private Flux<Payload> handleAndReply(Payload firstPayload, Flux<Payload> payloads) {
123130

124131
MonoProcessor<Flux<Payload>> replyMono = MonoProcessor.create();
125-
MessageHeaders headers = createHeaders(firstPayload, replyMono);
126132

127-
Message<?> message = MessageBuilder.createMessage(payloadDataBuffers, headers);
133+
Message<?> message = MessageBuilder.createMessage(
134+
payloads.map(this::wrapPayloadData).doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release),
135+
createHeaders(firstPayload, replyMono));
128136

129-
return this.messageChannel.send(message).flatMap(result -> result ?
130-
replyMono.isTerminated() ? replyMono : Mono.empty() :
131-
Mono.error(new MessageDeliveryException("RSocket interaction not handled")));
137+
return this.messageChannel.send(message).flatMapMany(result ->
138+
result && replyMono.isTerminated() ? replyMono.flatMapMany(Function.identity()) :
139+
Mono.error(new MessageDeliveryException("RSocket request not handled")));
132140
}
133141

134-
private MessageHeaders createHeaders(Payload payload, MonoProcessor<?> replyMono) {
142+
private MessageHeaders createHeaders(Payload payload, @Nullable MonoProcessor<?> replyMono) {
135143

144+
// TODO:
136145
// For now treat the metadata as a simple string with routing information.
137146
// We'll have to get more sophisticated once the routing extension is completed.
138147
// https://github.com/rsocket/rsocket-java/issues/568
@@ -147,21 +156,19 @@ private MessageHeaders createHeaders(Payload payload, MonoProcessor<?> replyMono
147156
}
148157

149158
headers.setHeader(RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER, this.requester);
150-
headers.setHeader(RSocketPayloadReturnValueHandler.RESPONSE_HEADER, replyMono);
159+
160+
if (replyMono != null) {
161+
headers.setHeader(RSocketPayloadReturnValueHandler.RESPONSE_HEADER, replyMono);
162+
}
151163

152164
DataBufferFactory bufferFactory = this.strategies.dataBufferFactory();
153165
headers.setHeader(HandlerMethodReturnValueHandler.DATA_BUFFER_FACTORY_HEADER, bufferFactory);
154166

155167
return headers.getMessageHeaders();
156168
}
157169

158-
@Override
159-
public Mono<Void> onClose() {
160-
return null;
161-
}
162-
163-
@Override
164-
public void dispose() {
170+
private DataBuffer wrapPayloadData(Payload payload) {
171+
return PayloadUtils.wrapPayloadData(payload, this.strategies.dataBufferFactory());
165172
}
166173

167174
}

spring-messaging/src/main/java/org/springframework/messaging/rsocket/PayloadUtils.java

+11-18
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,6 @@
1515
*/
1616
package org.springframework.messaging.rsocket;
1717

18-
import java.nio.ByteBuffer;
19-
20-
import io.netty.buffer.ByteBuf;
2118
import io.rsocket.Payload;
2219
import io.rsocket.util.ByteBufPayload;
2320
import io.rsocket.util.DefaultPayload;
@@ -44,7 +41,7 @@ abstract class PayloadUtils {
4441
* @param bufferFactory the BufferFactory to use to wrap
4542
* @return the DataBuffer wrapper
4643
*/
47-
public static DataBuffer asDataBuffer(Payload payload, DataBufferFactory bufferFactory) {
44+
public static DataBuffer wrapPayloadData(Payload payload, DataBufferFactory bufferFactory) {
4845
if (bufferFactory instanceof NettyDataBufferFactory) {
4946
return ((NettyDataBufferFactory) bufferFactory).wrap(payload.retain().sliceData());
5047
}
@@ -59,12 +56,16 @@ public static DataBuffer asDataBuffer(Payload payload, DataBufferFactory bufferF
5956
* @param data the data part for the payload
6057
* @return the created Payload
6158
*/
62-
public static Payload asPayload(DataBuffer metadata, DataBuffer data) {
59+
public static Payload createPayload(DataBuffer metadata, DataBuffer data) {
6360
if (metadata instanceof NettyDataBuffer && data instanceof NettyDataBuffer) {
64-
return ByteBufPayload.create(getByteBuf(data), getByteBuf(metadata));
61+
return ByteBufPayload.create(
62+
((NettyDataBuffer) data).getNativeBuffer(),
63+
((NettyDataBuffer) metadata).getNativeBuffer());
6564
}
6665
else if (metadata instanceof DefaultDataBuffer && data instanceof DefaultDataBuffer) {
67-
return DefaultPayload.create(getByteBuffer(data), getByteBuffer(metadata));
66+
return DefaultPayload.create(
67+
((DefaultDataBuffer) data).getNativeBuffer(),
68+
((DefaultDataBuffer) metadata).getNativeBuffer());
6869
}
6970
else {
7071
return DefaultPayload.create(data.asByteBuffer(), metadata.asByteBuffer());
@@ -76,24 +77,16 @@ else if (metadata instanceof DefaultDataBuffer && data instanceof DefaultDataBuf
7677
* @param data the data part for the payload
7778
* @return the created Payload
7879
*/
79-
public static Payload asPayload(DataBuffer data) {
80+
public static Payload createPayload(DataBuffer data) {
8081
if (data instanceof NettyDataBuffer) {
81-
return ByteBufPayload.create(getByteBuf(data));
82+
return ByteBufPayload.create(((NettyDataBuffer) data).getNativeBuffer());
8283
}
8384
else if (data instanceof DefaultDataBuffer) {
84-
return DefaultPayload.create(getByteBuffer(data));
85+
return DefaultPayload.create(((DefaultDataBuffer) data).getNativeBuffer());
8586
}
8687
else {
8788
return DefaultPayload.create(data.asByteBuffer());
8889
}
8990
}
9091

91-
private static ByteBuf getByteBuf(DataBuffer dataBuffer) {
92-
return ((NettyDataBuffer) dataBuffer).getNativeBuffer();
93-
}
94-
95-
private static
96-
ByteBuffer getByteBuffer(DataBuffer dataBuffer) {
97-
return ((DefaultDataBuffer) dataBuffer).getNativeBuffer();
98-
}
9992
}

spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketMessageHandler.java

-13
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,10 @@
2121
import org.springframework.core.codec.Decoder;
2222
import org.springframework.core.codec.Encoder;
2323
import org.springframework.lang.Nullable;
24-
import org.springframework.messaging.Message;
25-
import org.springframework.messaging.MessageDeliveryException;
2624
import org.springframework.messaging.ReactiveSubscribableChannel;
2725
import org.springframework.messaging.handler.annotation.support.reactive.MessageMappingMessageHandler;
2826
import org.springframework.messaging.handler.invocation.reactive.HandlerMethodReturnValueHandler;
2927
import org.springframework.util.Assert;
30-
import org.springframework.util.StringUtils;
3128

3229
/**
3330
* RSocket-specific extension of {@link MessageMappingMessageHandler}.
@@ -124,14 +121,4 @@ protected List<? extends HandlerMethodReturnValueHandler> initReturnValueHandler
124121
return handlers;
125122
}
126123

127-
128-
@Override
129-
protected void handleNoMatch(@Nullable String destination, Message<?> message) {
130-
// Ignore empty destination, probably the ConnectionSetupPayload
131-
if (!StringUtils.isEmpty(destination)) {
132-
super.handleNoMatch(destination, message);
133-
throw new MessageDeliveryException("No handler for '" + destination + "'");
134-
}
135-
}
136-
137124
}

spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketPayloadReturnValueHandler.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ protected Mono<Void> handleEncodedContent(
6363
Assert.isInstanceOf(MonoProcessor.class, headerValue, "Expected MonoProcessor");
6464

6565
MonoProcessor<Flux<Payload>> monoProcessor = (MonoProcessor<Flux<Payload>>) headerValue;
66-
monoProcessor.onNext(encodedContent.map(PayloadUtils::asPayload));
66+
monoProcessor.onNext(encodedContent.map(PayloadUtils::createPayload));
6767
monoProcessor.onComplete();
6868

6969
return Mono.empty();

spring-messaging/src/test/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterTests.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,7 @@ public void rejectFluxToMono() {
199199
}
200200

201201
private Payload toPayload(String value) {
202-
return PayloadUtils.asPayload(bufferFactory.wrap(value.getBytes(StandardCharsets.UTF_8)));
202+
return PayloadUtils.createPayload(bufferFactory.wrap(value.getBytes(StandardCharsets.UTF_8)));
203203
}
204204

205205

0 commit comments

Comments
 (0)