Skip to content

Commit 4e78b5d

Browse files
committed
RSocket @MessageMapping handling
See gh-21987
1 parent f2bb95b commit 4e78b5d

10 files changed

+1153
-0
lines changed

spring-messaging/spring-messaging.gradle

+4
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,15 @@ dependencyManagement {
77
}
88
}
99

10+
def rsocketVersion = "0.11.15"
11+
1012
dependencies {
1113
compile(project(":spring-beans"))
1214
compile(project(":spring-core"))
1315
optional(project(":spring-context"))
1416
optional(project(":spring-oxm"))
1517
optional("io.projectreactor.netty:reactor-netty")
18+
optional("io.rsocket:rsocket-core:${rsocketVersion}")
1619
optional("com.fasterxml.jackson.core:jackson-databind:${jackson2Version}")
1720
optional("javax.xml.bind:jaxb-api:2.3.1")
1821
testCompile("javax.inject:javax.inject-tck:1")
@@ -26,6 +29,7 @@ dependencies {
2629
testCompile("org.apache.activemq:activemq-stomp:5.8.0")
2730
testCompile("io.projectreactor:reactor-test")
2831
testCompile "io.reactivex.rxjava2:rxjava:${rxjava2Version}"
32+
testCompile("io.rsocket:rsocket-transport-netty:${rsocketVersion}")
2933
testCompile("org.jetbrains.kotlin:kotlin-reflect:${kotlinVersion}")
3034
testCompile("org.jetbrains.kotlin:kotlin-stdlib:${kotlinVersion}")
3135
testCompile("org.xmlunit:xmlunit-matchers:2.6.2")
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
/*
2+
* Copyright 2002-2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.messaging.rsocket;
17+
18+
import java.util.function.Function;
19+
import java.util.function.Predicate;
20+
21+
import io.netty.buffer.PooledByteBufAllocator;
22+
import io.rsocket.ConnectionSetupPayload;
23+
import io.rsocket.RSocket;
24+
import io.rsocket.SocketAcceptor;
25+
import reactor.core.publisher.Mono;
26+
27+
import org.springframework.core.io.buffer.NettyDataBufferFactory;
28+
import org.springframework.lang.Nullable;
29+
import org.springframework.messaging.Message;
30+
import org.springframework.messaging.ReactiveMessageChannel;
31+
import org.springframework.util.Assert;
32+
import org.springframework.util.MimeType;
33+
import org.springframework.util.MimeTypeUtils;
34+
35+
/**
36+
* RSocket acceptor for
37+
* {@link io.rsocket.RSocketFactory.ClientRSocketFactory#acceptor(Function) client} or
38+
* {@link io.rsocket.RSocketFactory.ServerRSocketFactory#acceptor(SocketAcceptor) server}
39+
* side use. It wraps requests with a {@link Message} envelope and sends them
40+
* to a {@link ReactiveMessageChannel} for handling, e.g. via
41+
* {@code @MessageMapping} method.
42+
*
43+
* @author Rossen Stoyanchev
44+
* @since 5.2
45+
*/
46+
public final class MessagingAcceptor implements SocketAcceptor, Function<RSocket, RSocket> {
47+
48+
private final ReactiveMessageChannel messageChannel;
49+
50+
private NettyDataBufferFactory bufferFactory = new NettyDataBufferFactory(PooledByteBufAllocator.DEFAULT);
51+
52+
@Nullable
53+
private MimeType defaultDataMimeType;
54+
55+
56+
/**
57+
* Constructor with a message channel to send messages to.
58+
* @param messageChannel the message channel to use
59+
* <p>This assumes a Spring configuration setup with a
60+
* {@code ReactiveMessageChannel} and an {@link RSocketMessageHandler} which
61+
* by default auto-detects {@code @MessageMapping} methods in
62+
* {@code @Controller} classes, but can also be configured with a
63+
* {@link RSocketMessageHandler#setHandlerPredicate(Predicate) handlerPredicate}
64+
* or with handler instances.
65+
*/
66+
public MessagingAcceptor(ReactiveMessageChannel messageChannel) {
67+
Assert.notNull(messageChannel, "ReactiveMessageChannel is required");
68+
this.messageChannel = messageChannel;
69+
}
70+
71+
72+
/**
73+
* Configure the default content type for data payloads. For server
74+
* acceptors this is available from the {@link ConnectionSetupPayload} but
75+
* for client acceptors it's not and must be provided here.
76+
* <p>By default this is not set.
77+
* @param defaultDataMimeType the MimeType to use
78+
*/
79+
public void setDefaultDataMimeType(@Nullable MimeType defaultDataMimeType) {
80+
this.defaultDataMimeType = defaultDataMimeType;
81+
}
82+
83+
/**
84+
* Configure the buffer factory to use.
85+
* <p>By default this is initialized with the allocator instance
86+
* {@link PooledByteBufAllocator#DEFAULT}.
87+
* @param bufferFactory the bufferFactory to use
88+
*/
89+
public void setNettyDataBufferFactory(NettyDataBufferFactory bufferFactory) {
90+
Assert.notNull(bufferFactory, "DataBufferFactory is required");
91+
this.bufferFactory = bufferFactory;
92+
}
93+
94+
95+
@Override
96+
public Mono<RSocket> accept(ConnectionSetupPayload setupPayload, RSocket sendingRSocket) {
97+
98+
MimeType mimeType = setupPayload.dataMimeType() != null ?
99+
MimeTypeUtils.parseMimeType(setupPayload.dataMimeType()) : this.defaultDataMimeType;
100+
101+
MessagingRSocket rsocket = createRSocket(sendingRSocket, mimeType);
102+
return rsocket.afterConnectionEstablished(setupPayload).then(Mono.just(rsocket));
103+
}
104+
105+
@Override
106+
public RSocket apply(RSocket sendingRSocket) {
107+
return createRSocket(sendingRSocket, this.defaultDataMimeType);
108+
}
109+
110+
private MessagingRSocket createRSocket(RSocket sendingRSocket, @Nullable MimeType dataMimeType) {
111+
return new MessagingRSocket(this.messageChannel, this.bufferFactory, sendingRSocket, dataMimeType);
112+
}
113+
114+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
/*
2+
* Copyright 2002-2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.messaging.rsocket;
17+
18+
import java.util.function.Function;
19+
20+
import io.rsocket.ConnectionSetupPayload;
21+
import io.rsocket.Payload;
22+
import io.rsocket.RSocket;
23+
import org.reactivestreams.Publisher;
24+
import reactor.core.publisher.Flux;
25+
import reactor.core.publisher.Mono;
26+
import reactor.core.publisher.MonoProcessor;
27+
28+
import org.springframework.core.io.buffer.DataBufferUtils;
29+
import org.springframework.core.io.buffer.NettyDataBuffer;
30+
import org.springframework.core.io.buffer.NettyDataBufferFactory;
31+
import org.springframework.core.io.buffer.PooledDataBuffer;
32+
import org.springframework.lang.Nullable;
33+
import org.springframework.messaging.Message;
34+
import org.springframework.messaging.MessageDeliveryException;
35+
import org.springframework.messaging.MessageHeaders;
36+
import org.springframework.messaging.ReactiveMessageChannel;
37+
import org.springframework.messaging.handler.DestinationPatternsMessageCondition;
38+
import org.springframework.messaging.handler.invocation.reactive.HandlerMethodReturnValueHandler;
39+
import org.springframework.messaging.support.MessageBuilder;
40+
import org.springframework.messaging.support.MessageHeaderAccessor;
41+
import org.springframework.util.Assert;
42+
import org.springframework.util.MimeType;
43+
44+
/**
45+
* Package private implementation of {@link RSocket} used from
46+
* {@link MessagingAcceptor}.
47+
*
48+
* @author Rossen Stoyanchev
49+
* @since 5.2
50+
*/
51+
class MessagingRSocket implements RSocket {
52+
53+
private final ReactiveMessageChannel messageChannel;
54+
55+
private final NettyDataBufferFactory bufferFactory;
56+
57+
private final RSocket sendingRSocket;
58+
59+
@Nullable
60+
private final MimeType dataMimeType;
61+
62+
63+
MessagingRSocket(ReactiveMessageChannel messageChannel, NettyDataBufferFactory bufferFactory,
64+
RSocket sendingRSocket, @Nullable MimeType dataMimeType) {
65+
66+
Assert.notNull(messageChannel, "'messageChannel' is required");
67+
Assert.notNull(bufferFactory, "'bufferFactory' is required");
68+
Assert.notNull(sendingRSocket, "'sendingRSocket' is required");
69+
this.messageChannel = messageChannel;
70+
this.bufferFactory = bufferFactory;
71+
this.sendingRSocket = sendingRSocket;
72+
this.dataMimeType = dataMimeType;
73+
}
74+
75+
76+
public Mono<Void> afterConnectionEstablished(ConnectionSetupPayload payload) {
77+
return execute(payload).flatMap(flux -> flux.take(0).then());
78+
}
79+
80+
81+
@Override
82+
public Mono<Void> fireAndForget(Payload payload) {
83+
return execute(payload).flatMap(flux -> flux.take(0).then());
84+
}
85+
86+
@Override
87+
public Mono<Payload> requestResponse(Payload payload) {
88+
return execute(payload).flatMap(Flux::next);
89+
}
90+
91+
@Override
92+
public Flux<Payload> requestStream(Payload payload) {
93+
return execute(payload).flatMapMany(Function.identity());
94+
}
95+
96+
@Override
97+
public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
98+
return Flux.from(payloads)
99+
.switchOnFirst((signal, inner) -> {
100+
Payload first = signal.get();
101+
return first != null ? execute(first, inner).flatMapMany(Function.identity()) : inner;
102+
});
103+
}
104+
105+
@Override
106+
public Mono<Void> metadataPush(Payload payload) {
107+
return null;
108+
}
109+
110+
private Mono<Flux<Payload>> execute(Payload payload) {
111+
return execute(payload, Flux.just(payload));
112+
}
113+
114+
private Mono<Flux<Payload>> execute(Payload firstPayload, Flux<Payload> payloads) {
115+
116+
// TODO:
117+
// Since we do retain(), we need to ensure buffers are released if not consumed,
118+
// e.g. error before Flux subscribed to, no handler found, @MessageMapping ignores payload, etc.
119+
120+
Flux<NettyDataBuffer> payloadDataBuffers = payloads
121+
.map(payload -> this.bufferFactory.wrap(payload.retain().sliceData()))
122+
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
123+
124+
MonoProcessor<Flux<Payload>> replyMono = MonoProcessor.create();
125+
MessageHeaders headers = createHeaders(firstPayload, replyMono);
126+
127+
Message<?> message = MessageBuilder.createMessage(payloadDataBuffers, headers);
128+
129+
return this.messageChannel.send(message).flatMap(result -> result ?
130+
replyMono.isTerminated() ? replyMono : Mono.empty() :
131+
Mono.error(new MessageDeliveryException("RSocket interaction not handled")));
132+
}
133+
134+
private MessageHeaders createHeaders(Payload payload, MonoProcessor<?> replyMono) {
135+
136+
// For now treat the metadata as a simple string with routing information.
137+
// We'll have to get more sophisticated once the routing extension is completed.
138+
// https://github.com/rsocket/rsocket-java/issues/568
139+
140+
MessageHeaderAccessor headers = new MessageHeaderAccessor();
141+
142+
String destination = payload.getMetadataUtf8();
143+
headers.setHeader(DestinationPatternsMessageCondition.LOOKUP_DESTINATION_HEADER, destination);
144+
145+
if (this.dataMimeType != null) {
146+
headers.setContentType(this.dataMimeType);
147+
}
148+
149+
headers.setHeader(SendingRSocketMethodArgumentResolver.SENDING_RSOCKET_HEADER, this.sendingRSocket);
150+
headers.setHeader(RSocketPayloadReturnValueHandler.RESPONSE_HEADER, replyMono);
151+
headers.setHeader(HandlerMethodReturnValueHandler.DATA_BUFFER_FACTORY_HEADER, this.bufferFactory);
152+
153+
return headers.getMessageHeaders();
154+
}
155+
156+
@Override
157+
public Mono<Void> onClose() {
158+
return null;
159+
}
160+
161+
@Override
162+
public void dispose() {
163+
}
164+
165+
}

0 commit comments

Comments
 (0)