11package io .rsocket .integration ;
22
3- import java .time .Duration ;
4- import java .util .concurrent .atomic .AtomicBoolean ;
5- import java .util .function .Function ;
6-
73import io .rsocket .Payload ;
84import io .rsocket .RSocket ;
95import io .rsocket .core .RSocketClient ;
1410import io .rsocket .transport .netty .server .CloseableChannel ;
1511import io .rsocket .transport .netty .server .TcpServerTransport ;
1612import io .rsocket .util .DefaultPayload ;
13+ import java .time .Duration ;
14+ import java .util .concurrent .atomic .AtomicBoolean ;
15+ import java .util .function .Function ;
1716import org .junit .jupiter .api .Test ;
1817import org .slf4j .Logger ;
1918import org .slf4j .LoggerFactory ;
2726
2827public class KeepaliveTest {
2928
30- private static final Logger LOG = LoggerFactory .getLogger (KeepaliveTest .class );
31- private static final int PORT = 23200 ;
32-
33- @ Test
34- void keepAliveTest () {
35- createServer ().block ();
36- RSocketClient rsocketClient = createClient ();
37-
38- int expectedCount = 4 ;
39- AtomicBoolean sleepOnce = new AtomicBoolean (true );
40- StepVerifier .create (
41- Flux .range (0 , expectedCount )
42- .delayElements (Duration .ofMillis (2000 ))
43- .concatMap (i ->
44- rsocketClient .requestResponse (Mono .just (DefaultPayload .create ("" )))
45- .doOnNext (__ -> {
46- if (sleepOnce .getAndSet (false )) {
47- try {
48- LOG .info ("Sleeping..." );
49- Thread .sleep (1_000 );
50- LOG .info ("Waking up." );
51- } catch (InterruptedException e ) {
52- throw new RuntimeException (e );
53- }
54- }
55- })
56- .log ("id " + i )
57- .onErrorComplete ()
58- ))
59- .expectSubscription ()
60- .expectNextCount (expectedCount )
61- .verifyComplete ();
62- }
63-
64- @ Test
65- void keepAliveTestLazy () {
66- createServer ().block ();
67- Mono <RSocket > rsocketMono = createClientLazy ();
68-
69- int expectedCount = 4 ;
70- AtomicBoolean sleepOnce = new AtomicBoolean (true );
71- StepVerifier .create (
72- Flux .range (0 , expectedCount )
73- .delayElements (Duration .ofMillis (2000 ))
74- .concatMap (i ->
75- rsocketMono .flatMap (rsocket -> rsocket .requestResponse (DefaultPayload .create ("" ))
76- .doOnNext (__ -> {
77- if (sleepOnce .getAndSet (false )) {
78- try {
79- LOG .info ("Sleeping..." );
80- Thread .sleep (1_000 );
81- LOG .info ("Waking up." );
82- } catch (InterruptedException e ) {
83- throw new RuntimeException (e );
84- }
85- }
86- })
87- .log ("id " + i )
88- .onErrorComplete ()
89- )
90- ))
91- .expectSubscription ()
92- .expectNextCount (expectedCount )
93- .verifyComplete ();
94- }
95-
96- private static Mono <CloseableChannel > createServer () {
97- LOG .info ("Starting server at port {}" , PORT );
98-
99- TcpServer tcpServer = TcpServer .create ().host ("localhost" ).port (PORT );
100-
101- return RSocketServer .create ((setupPayload , rSocket ) -> {
102- rSocket .onClose ()
103- .doFirst (() -> LOG .info ("Connected on server side." ))
104- .doOnTerminate (() -> LOG .info ("Connection closed on server side." ))
105- .subscribe ();
106-
107- return Mono .just (new MyServerRsocket ());
108- })
109- .payloadDecoder (PayloadDecoder .ZERO_COPY )
110- .bind (TcpServerTransport .create (tcpServer ))
111- .doOnNext (closeableChannel -> LOG .info ("RSocket server started." ));
112- }
113-
114- private static RSocketClient createClient () {
115- LOG .info ("Connecting...." );
116-
117- Function <String , RetryBackoffSpec > reconnectSpec = reason -> Retry .backoff (Long .MAX_VALUE , Duration .ofSeconds (10L ))
118- .doBeforeRetry (retrySignal -> LOG .info ("Reconnecting. Reason: {}" , reason ));
119-
120- Mono <RSocket > rsocketMono = RSocketConnector .create ()
121- .fragment (16384 )
122- .reconnect (reconnectSpec .apply ("connector-close" ))
123- .keepAlive (Duration .ofMillis (100L ), Duration .ofMillis (900L ))
124- .connect (TcpClientTransport .create (TcpClient .create ().host ("localhost" ).port (PORT )));
125-
126- RSocketClient client = RSocketClient .from (rsocketMono );
127-
128- client
129- .source ()
130- .doOnNext (r -> LOG .info ("Got RSocket" ))
131- .flatMap (RSocket ::onClose )
132- .doOnError (err -> LOG .error ("Error during onClose." , err ))
133- .retryWhen (reconnectSpec .apply ("client-close" ))
134- .doFirst (() -> LOG .info ("Connected on client side." ))
135- .doOnTerminate (() -> LOG .info ("Connection closed on client side." ))
136- .repeat ()
137- .subscribe ();
138-
139- return client ;
140- }
141-
142-
143- private static Mono <RSocket > createClientLazy () {
144- LOG .info ("Connecting...." );
145-
146- Function <String , RetryBackoffSpec > reconnectSpec = reason -> Retry .backoff (Long .MAX_VALUE , Duration .ofSeconds (10L ))
147- .doBeforeRetry (retrySignal -> LOG .info ("Reconnecting. Reason: {}" , reason ));
148-
149- return RSocketConnector .create ()
150- .fragment (16384 )
151- .reconnect (reconnectSpec .apply ("connector-close" ))
152- .keepAlive (Duration .ofMillis (100L ), Duration .ofMillis (900L ))
153- .connect (TcpClientTransport .create (TcpClient .create ().host ("localhost" ).port (PORT )));
154-
155- // RSocketClient client = RSocketClient.from(rsocketMono);
156-
157- // client
158- // .source()
159- // .doOnNext(r -> LOG.info("Got RSocket"))
160- // .flatMap(RSocket::onClose)
161- // .doOnError(err -> LOG.error("Error during onClose.", err))
162- // .retryWhen(reconnectSpec.apply("client-close"))
163- // .doFirst(() -> LOG.info("Connected on client side."))
164- // .doOnTerminate(() -> LOG.info("Connection closed on client side."))
165- // .repeat()
166- // .subscribe();
167-
168- // return client;
169- }
170-
171- public static class MyServerRsocket implements RSocket {
172-
173- @ Override
174- public Mono <Payload > requestResponse (Payload payload ) {
175- return Mono .just ("Pong" ).map (DefaultPayload ::create );
176- }
177- }
178- }
29+ private static final Logger LOG = LoggerFactory .getLogger (KeepaliveTest .class );
30+ private static final int PORT = 23200 ;
31+
32+ @ Test
33+ void keepAliveTest () {
34+ createServer ().block ();
35+ RSocketClient rsocketClient = createClient ();
36+
37+ int expectedCount = 4 ;
38+ AtomicBoolean sleepOnce = new AtomicBoolean (true );
39+ StepVerifier .create (
40+ Flux .range (0 , expectedCount )
41+ .delayElements (Duration .ofMillis (2000 ))
42+ .concatMap (
43+ i ->
44+ rsocketClient
45+ .requestResponse (Mono .just (DefaultPayload .create ("" )))
46+ .doOnNext (
47+ __ -> {
48+ if (sleepOnce .getAndSet (false )) {
49+ try {
50+ LOG .info ("Sleeping..." );
51+ Thread .sleep (1_000 );
52+ LOG .info ("Waking up." );
53+ } catch (InterruptedException e ) {
54+ throw new RuntimeException (e );
55+ }
56+ }
57+ })
58+ .log ("id " + i )
59+ .onErrorComplete ()))
60+ .expectSubscription ()
61+ .expectNextCount (expectedCount )
62+ .verifyComplete ();
63+ }
64+
65+ @ Test
66+ void keepAliveTestLazy () {
67+ createServer ().block ();
68+ Mono <RSocket > rsocketMono = createClientLazy ();
69+
70+ int expectedCount = 4 ;
71+ AtomicBoolean sleepOnce = new AtomicBoolean (true );
72+ StepVerifier .create (
73+ Flux .range (0 , expectedCount )
74+ .delayElements (Duration .ofMillis (2000 ))
75+ .concatMap (
76+ i ->
77+ rsocketMono .flatMap (
78+ rsocket ->
79+ rsocket
80+ .requestResponse (DefaultPayload .create ("" ))
81+ .doOnNext (
82+ __ -> {
83+ if (sleepOnce .getAndSet (false )) {
84+ try {
85+ LOG .info ("Sleeping..." );
86+ Thread .sleep (1_000 );
87+ LOG .info ("Waking up." );
88+ } catch (InterruptedException e ) {
89+ throw new RuntimeException (e );
90+ }
91+ }
92+ })
93+ .log ("id " + i )
94+ .onErrorComplete ())))
95+ .expectSubscription ()
96+ .expectNextCount (expectedCount )
97+ .verifyComplete ();
98+ }
99+
100+ private static Mono <CloseableChannel > createServer () {
101+ LOG .info ("Starting server at port {}" , PORT );
102+
103+ TcpServer tcpServer = TcpServer .create ().host ("localhost" ).port (PORT );
104+
105+ return RSocketServer .create (
106+ (setupPayload , rSocket ) -> {
107+ rSocket
108+ .onClose ()
109+ .doFirst (() -> LOG .info ("Connected on server side." ))
110+ .doOnTerminate (() -> LOG .info ("Connection closed on server side." ))
111+ .subscribe ();
112+
113+ return Mono .just (new MyServerRsocket ());
114+ })
115+ .payloadDecoder (PayloadDecoder .ZERO_COPY )
116+ .bind (TcpServerTransport .create (tcpServer ))
117+ .doOnNext (closeableChannel -> LOG .info ("RSocket server started." ));
118+ }
119+
120+ private static RSocketClient createClient () {
121+ LOG .info ("Connecting...." );
122+
123+ Function <String , RetryBackoffSpec > reconnectSpec =
124+ reason ->
125+ Retry .backoff (Long .MAX_VALUE , Duration .ofSeconds (10L ))
126+ .doBeforeRetry (retrySignal -> LOG .info ("Reconnecting. Reason: {}" , reason ));
127+
128+ Mono <RSocket > rsocketMono =
129+ RSocketConnector .create ()
130+ .fragment (16384 )
131+ .reconnect (reconnectSpec .apply ("connector-close" ))
132+ .keepAlive (Duration .ofMillis (100L ), Duration .ofMillis (900L ))
133+ .connect (TcpClientTransport .create (TcpClient .create ().host ("localhost" ).port (PORT )));
134+
135+ RSocketClient client = RSocketClient .from (rsocketMono );
136+
137+ client
138+ .source ()
139+ .doOnNext (r -> LOG .info ("Got RSocket" ))
140+ .flatMap (RSocket ::onClose )
141+ .doOnError (err -> LOG .error ("Error during onClose." , err ))
142+ .retryWhen (reconnectSpec .apply ("client-close" ))
143+ .doFirst (() -> LOG .info ("Connected on client side." ))
144+ .doOnTerminate (() -> LOG .info ("Connection closed on client side." ))
145+ .repeat ()
146+ .subscribe ();
147+
148+ return client ;
149+ }
150+
151+ private static Mono <RSocket > createClientLazy () {
152+ LOG .info ("Connecting...." );
153+
154+ Function <String , RetryBackoffSpec > reconnectSpec =
155+ reason ->
156+ Retry .backoff (Long .MAX_VALUE , Duration .ofSeconds (10L ))
157+ .doBeforeRetry (retrySignal -> LOG .info ("Reconnecting. Reason: {}" , reason ));
158+
159+ return RSocketConnector .create ()
160+ .fragment (16384 )
161+ .reconnect (reconnectSpec .apply ("connector-close" ))
162+ .keepAlive (Duration .ofMillis (100L ), Duration .ofMillis (900L ))
163+ .connect (TcpClientTransport .create (TcpClient .create ().host ("localhost" ).port (PORT )));
164+
165+ // RSocketClient client = RSocketClient.from(rsocketMono);
166+
167+ // client
168+ // .source()
169+ // .doOnNext(r -> LOG.info("Got RSocket"))
170+ // .flatMap(RSocket::onClose)
171+ // .doOnError(err -> LOG.error("Error during onClose.", err))
172+ // .retryWhen(reconnectSpec.apply("client-close"))
173+ // .doFirst(() -> LOG.info("Connected on client side."))
174+ // .doOnTerminate(() -> LOG.info("Connection closed on client side."))
175+ // .repeat()
176+ // .subscribe();
177+
178+ // return client;
179+ }
180+
181+ public static class MyServerRsocket implements RSocket {
182+
183+ @ Override
184+ public Mono <Payload > requestResponse (Payload payload ) {
185+ return Mono .just ("Pong" ).map (DefaultPayload ::create );
186+ }
187+ }
188+ }
0 commit comments