20
20
import static org .springframework .data .redis .util .ByteUtils .*;
21
21
22
22
import reactor .core .Disposable ;
23
- import reactor .core .publisher .DirectProcessor ;
24
23
import reactor .core .publisher .Flux ;
25
24
import reactor .core .publisher .Mono ;
26
- import reactor .core .publisher .MonoProcessor ;
25
+ import reactor .core .publisher .Sinks ;
27
26
import reactor .test .StepVerifier ;
28
27
29
28
import java .nio .ByteBuffer ;
@@ -92,8 +91,7 @@ void shouldSubscribeToMultiplePatterns() {
92
91
container = createContainer ();
93
92
94
93
container .receive (PatternTopic .of ("foo*" ), PatternTopic .of ("bar*" )).as (StepVerifier ::create ).thenRequest (1 )
95
- .thenAwait ()
96
- .thenCancel ().verify ();
94
+ .thenAwait ().thenCancel ().verify ();
97
95
98
96
verify (subscriptionMock ).pSubscribe (getByteBuffer ("foo*" ), getByteBuffer ("bar*" ));
99
97
}
@@ -124,15 +122,15 @@ void shouldSubscribeToMultipleChannels() {
124
122
@ Test // DATAREDIS-612
125
123
void shouldEmitChannelMessage () {
126
124
127
- DirectProcessor <Message <ByteBuffer , ByteBuffer >> processor = DirectProcessor . create ();
125
+ Sinks . Many <Message <ByteBuffer , ByteBuffer >> sink = Sinks . many (). unicast (). onBackpressureBuffer ();
128
126
129
- when (subscriptionMock .receive ()).thenReturn (processor );
127
+ when (subscriptionMock .receive ()).thenReturn (sink . asFlux () );
130
128
container = createContainer ();
131
129
132
130
Flux <Message <String , String >> messageStream = container .receive (ChannelTopic .of ("foo" ));
133
131
134
132
messageStream .as (StepVerifier ::create ).then (() -> {
135
- processor . onNext (createChannelMessage ("foo" , "message" ));
133
+ sink . tryEmitNext (createChannelMessage ("foo" , "message" ));
136
134
}).assertNext (msg -> {
137
135
138
136
assertThat (msg .getChannel ()).isEqualTo ("foo" );
@@ -143,15 +141,15 @@ void shouldEmitChannelMessage() {
143
141
@ Test // DATAREDIS-612
144
142
void shouldEmitPatternMessage () {
145
143
146
- DirectProcessor <Message <ByteBuffer , ByteBuffer >> processor = DirectProcessor . create ();
144
+ Sinks . Many <Message <ByteBuffer , ByteBuffer >> sink = Sinks . many (). unicast (). onBackpressureBuffer ();
147
145
148
- when (subscriptionMock .receive ()).thenReturn (processor );
146
+ when (subscriptionMock .receive ()).thenReturn (sink . asFlux () );
149
147
container = createContainer ();
150
148
151
149
Flux <PatternMessage <String , String , String >> messageStream = container .receive (PatternTopic .of ("foo*" ));
152
150
153
151
messageStream .as (StepVerifier ::create ).then (() -> {
154
- processor . onNext (createPatternMessage ("foo*" , "foo" , "message" ));
152
+ sink . tryEmitNext (createPatternMessage ("foo*" , "foo" , "message" ));
155
153
}).assertNext (msg -> {
156
154
157
155
assertThat (msg .getPattern ()).isEqualTo ("foo*" );
@@ -163,20 +161,22 @@ void shouldEmitPatternMessage() {
163
161
@ Test // DATAREDIS-612
164
162
void shouldRegisterSubscription () {
165
163
166
- MonoProcessor <Void > subscribeMono = MonoProcessor .create ();
164
+ Sinks .Many <Message <ByteBuffer , ByteBuffer >> sink = Sinks .many ().multicast ().onBackpressureBuffer ();
165
+
166
+ Sinks .One <Void > subscribeMono = Sinks .one ();
167
167
168
168
reset (subscriptionMock );
169
- when (subscriptionMock .subscribe (any ())).thenReturn (subscribeMono );
169
+ when (subscriptionMock .subscribe (any ())).thenReturn (subscribeMono . asMono () );
170
170
when (subscriptionMock .unsubscribe ()).thenReturn (Mono .empty ());
171
- when (subscriptionMock .receive ()).thenReturn (DirectProcessor . create ());
171
+ when (subscriptionMock .receive ()).thenReturn (sink . asFlux ());
172
172
container = createContainer ();
173
173
174
174
Flux <Message <String , String >> messageStream = container .receive (ChannelTopic .of ("foo*" ));
175
175
176
176
Disposable subscription = messageStream .subscribe ();
177
177
178
178
assertThat (container .getActiveSubscriptions ()).isEmpty ();
179
- subscribeMono .onComplete ();
179
+ subscribeMono .tryEmitEmpty ();
180
180
assertThat (container .getActiveSubscriptions ()).isNotEmpty ();
181
181
subscription .dispose ();
182
182
assertThat (container .getActiveSubscriptions ()).isEmpty ();
@@ -185,10 +185,12 @@ void shouldRegisterSubscription() {
185
185
@ Test // DATAREDIS-612, GH-1622
186
186
void shouldRegisterSubscriptionMultipleSubscribers () {
187
187
188
+ Sinks .Many <Message <ByteBuffer , ByteBuffer >> sink = Sinks .many ().multicast ().onBackpressureBuffer ();
189
+
188
190
reset (subscriptionMock );
189
191
when (subscriptionMock .subscribe (any ())).thenReturn (Mono .empty ());
190
192
when (subscriptionMock .unsubscribe ()).thenReturn (Mono .empty ());
191
- when (subscriptionMock .receive ()).thenReturn (DirectProcessor . create ());
193
+ when (subscriptionMock .receive ()).thenReturn (sink . asFlux ());
192
194
container = createContainer ();
193
195
194
196
Flux <Message <String , String >> messageStream = container .receive (new ChannelTopic ("foo*" ));
@@ -210,7 +212,8 @@ void shouldRegisterSubscriptionMultipleSubscribers() {
210
212
@ Test // DATAREDIS-612, GH-1622
211
213
void shouldUnsubscribeOnCancel () {
212
214
213
- when (subscriptionMock .receive ()).thenReturn (DirectProcessor .create ());
215
+ Sinks .Many <Message <ByteBuffer , ByteBuffer >> sink = Sinks .many ().unicast ().onBackpressureBuffer ();
216
+ when (subscriptionMock .receive ()).thenReturn (sink .asFlux ());
214
217
container = createContainer ();
215
218
216
219
Flux <PatternMessage <String , String , String >> messageStream = container .receive (PatternTopic .of ("foo*" ));
@@ -227,12 +230,12 @@ void shouldUnsubscribeOnCancel() {
227
230
@ Test // DATAREDIS-612
228
231
void shouldTerminateSubscriptionsOnShutdown () {
229
232
230
- DirectProcessor <Message <ByteBuffer , ByteBuffer >> processor = DirectProcessor . create ();
233
+ Sinks . Many <Message <ByteBuffer , ByteBuffer >> sink = Sinks . many (). unicast (). onBackpressureBuffer ();
231
234
232
- when (subscriptionMock .receive ()).thenReturn (processor );
235
+ when (subscriptionMock .receive ()).thenReturn (sink . asFlux () );
233
236
when (subscriptionMock .cancel ()).thenReturn (Mono .defer (() -> {
234
237
235
- processor . onError (new CancellationException ());
238
+ sink . tryEmitError (new CancellationException ());
236
239
return Mono .empty ();
237
240
}));
238
241
container = createContainer ();
@@ -247,19 +250,19 @@ void shouldTerminateSubscriptionsOnShutdown() {
247
250
@ Test // DATAREDIS-612
248
251
void shouldCleanupDownstream () {
249
252
250
- DirectProcessor <Message <ByteBuffer , ByteBuffer >> processor = DirectProcessor . create ();
253
+ Sinks . Many <Message <ByteBuffer , ByteBuffer >> sink = Sinks . many (). unicast (). onBackpressureBuffer ();
251
254
252
- when (subscriptionMock .receive ()).thenReturn (processor );
255
+ when (subscriptionMock .receive ()).thenReturn (sink . asFlux () );
253
256
container = createContainer ();
254
257
255
258
Flux <PatternMessage <String , String , String >> messageStream = container .receive (PatternTopic .of ("foo*" ));
256
259
257
260
messageStream .as (StepVerifier ::create ).then (() -> {
258
- assertThat (processor . hasDownstreams ()).isTrue ( );
259
- processor . onNext (createPatternMessage ("foo*" , "foo" , "message" ));
261
+ assertThat (sink . currentSubscriberCount ()).isGreaterThan ( 0 );
262
+ sink . tryEmitNext (createPatternMessage ("foo*" , "foo" , "message" ));
260
263
}).expectNextCount (1 ).thenCancel ().verify ();
261
264
262
- assertThat (processor . hasDownstreams ()).isFalse ( );
265
+ assertThat (sink . currentSubscriberCount ()).isEqualTo ( 0 );
263
266
}
264
267
265
268
private ReactiveRedisMessageListenerContainer createContainer () {
0 commit comments