37
37
import java .util .concurrent .atomic .AtomicReference ;
38
38
39
39
import static com .mongodb .assertions .Assertions .assertTrue ;
40
+ import static com .mongodb .internal .async .AsyncRunnable .beginAsync ;
40
41
import static com .mongodb .internal .thread .InterruptionUtil .interruptAndCreateMongoInterruptedException ;
41
42
import static java .util .concurrent .TimeUnit .MILLISECONDS ;
42
43
@@ -88,7 +89,7 @@ protected void setChannel(final ExtendedAsynchronousByteChannel channel) {
88
89
89
90
@ Override
90
91
public void writeAsync (final List <ByteBuf > buffers , final OperationContext operationContext ,
91
- final AsyncCompletionHandler <Void > handler ) {
92
+ final AsyncCompletionHandler <Void > handler ) {
92
93
AsyncWritableByteChannelAdapter byteChannel = new AsyncWritableByteChannelAdapter ();
93
94
Iterator <ByteBuf > iter = buffers .iterator ();
94
95
pipeOneBuffer (byteChannel , iter .next (), operationContext , new AsyncCompletionHandler <Void >() {
@@ -189,8 +190,11 @@ public void failed(final Throwable t) {
189
190
190
191
private class AsyncWritableByteChannelAdapter {
191
192
void write (final ByteBuffer src , final OperationContext operationContext , final AsyncCompletionHandler <Void > handler ) {
192
- getChannel ().write (src , operationContext .getTimeoutContext ().getWriteTimeoutMS (), MILLISECONDS , null ,
193
- new AsyncWritableByteChannelAdapter .WriteCompletionHandler (handler ));
193
+ beginAsync ().thenRun ((c ) -> {
194
+ long writeTimeoutMS = operationContext .getTimeoutContext ().getWriteTimeoutMS ();
195
+ getChannel ().write (src , writeTimeoutMS , MILLISECONDS , null ,
196
+ new AsyncWritableByteChannelAdapter .WriteCompletionHandler (c .asHandler ()));
197
+ }).finish (handler .asCallback ());
194
198
}
195
199
196
200
private class WriteCompletionHandler extends BaseCompletionHandler <Void , Integer , Object > {
@@ -222,7 +226,7 @@ private final class BasicCompletionHandler extends BaseCompletionHandler<ByteBuf
222
226
private final OperationContext operationContext ;
223
227
224
228
private BasicCompletionHandler (final ByteBuf dst , final OperationContext operationContext ,
225
- final AsyncCompletionHandler <ByteBuf > handler ) {
229
+ final AsyncCompletionHandler <ByteBuf > handler ) {
226
230
super (handler );
227
231
this .byteBufReference = new AtomicReference <>(dst );
228
232
this .operationContext = operationContext ;
@@ -231,17 +235,20 @@ private BasicCompletionHandler(final ByteBuf dst, final OperationContext operati
231
235
@ Override
232
236
public void completed (final Integer result , final Void attachment ) {
233
237
AsyncCompletionHandler <ByteBuf > localHandler = getHandlerAndClear ();
234
- ByteBuf localByteBuf = byteBufReference .getAndSet (null );
235
- if (result == -1 ) {
236
- localByteBuf .release ();
237
- localHandler .failed (new MongoSocketReadException ("Prematurely reached end of stream" , serverAddress ));
238
- } else if (!localByteBuf .hasRemaining ()) {
239
- localByteBuf .flip ();
240
- localHandler .completed (localByteBuf );
241
- } else {
242
- getChannel ().read (localByteBuf .asNIO (), operationContext .getTimeoutContext ().getReadTimeoutMS (), MILLISECONDS , null ,
243
- new BasicCompletionHandler (localByteBuf , operationContext , localHandler ));
244
- }
238
+ beginAsync ().<ByteBuf >thenSupply ((c ) -> {
239
+ ByteBuf localByteBuf = byteBufReference .getAndSet (null );
240
+ if (result == -1 ) {
241
+ localByteBuf .release ();
242
+ throw new MongoSocketReadException ("Prematurely reached end of stream" , serverAddress );
243
+ } else if (!localByteBuf .hasRemaining ()) {
244
+ localByteBuf .flip ();
245
+ c .complete (localByteBuf );
246
+ } else {
247
+ long readTimeoutMS = operationContext .getTimeoutContext ().getReadTimeoutMS ();
248
+ getChannel ().read (localByteBuf .asNIO (), readTimeoutMS , MILLISECONDS , null ,
249
+ new BasicCompletionHandler (localByteBuf , operationContext , c .asHandler ()));
250
+ }
251
+ }).finish (localHandler .asCallback ());
245
252
}
246
253
247
254
@ Override
0 commit comments