1717 */
1818package org .apache .ratis .util ;
1919
20+ import org .apache .ratis .protocol .RaftClientRequest ;
2021import org .apache .ratis .protocol .exceptions .AlreadyClosedException ;
2122import org .slf4j .Logger ;
2223import org .slf4j .LoggerFactory ;
2526import java .util .ArrayList ;
2627import java .util .Iterator ;
2728import java .util .List ;
29+ import java .util .Map ;
2830import java .util .SortedMap ;
2931import java .util .TreeMap ;
32+ import java .util .concurrent .ConcurrentHashMap ;
3033import java .util .concurrent .ConcurrentSkipListMap ;
3134import java .util .concurrent .TimeUnit ;
3235import java .util .function .Consumer ;
@@ -56,6 +59,7 @@ interface Request<REPLY> {
5659
5760 interface ClientSideRequest <REPLY > extends Request <REPLY > {
5861 void setFirstRequest ();
62+ long getCallId ();
5963 }
6064
6165 interface ServerSideRequest <REPLY > extends Request <REPLY > {
@@ -228,13 +232,14 @@ class Client<REQUEST extends ClientSideRequest<REPLY>, REPLY> {
228232 private final RequestMap <REQUEST , REPLY > requests ;
229233 /** Delayed requests. */
230234 private final DelayedRequests delayedRequests = new DelayedRequests ();
235+ private final ConcurrentHashMap <Long , Long > map = new ConcurrentHashMap <>();
231236
232237 /** The seqNum for the next new request. */
233238 private long nextSeqNum = 1 ;
234239 /** The seqNum of the first request. */
235- private long firstSeqNum = -1 ;
240+ private volatile long firstSeqNum = -1 ;
236241 /** Is the first request replied? */
237- private boolean firstReplied ;
242+ private volatile boolean firstReplied ;
238243 /** The exception, if there is any. */
239244 private Throwable exception ;
240245
@@ -300,6 +305,7 @@ private boolean sendOrDelayRequest(REQUEST request, Consumer<REQUEST> sendMethod
300305
301306 if (firstReplied ) {
302307 // already received the reply for the first request, submit any request.
308+ map .put (request .getCallId (), getFirstSeqNum ());
303309 sendMethod .accept (request );
304310 return true ;
305311 }
@@ -309,6 +315,7 @@ private boolean sendOrDelayRequest(REQUEST request, Consumer<REQUEST> sendMethod
309315 LOG .debug ("{}: detect firstSubmitted {} in {}" , requests .getName (), request , this );
310316 firstSeqNum = seqNum ;
311317 request .setFirstRequest ();
318+ map .put (request .getCallId (), getFirstSeqNum ());
312319 sendMethod .accept (request );
313320 return true ;
314321 }
@@ -333,7 +340,9 @@ public synchronized void retry(REQUEST request, Consumer<REQUEST> sendMethod) {
333340 private void removeRepliedFromHead () {
334341 for (final Iterator <REQUEST > i = requests .iterator (); i .hasNext (); i .remove ()) {
335342 final REQUEST r = i .next ();
336- if (!r .hasReply ()) {
343+ if (r .hasReply ()) {
344+ map .remove (r .getCallId ());
345+ } else {
337346 return ;
338347 }
339348 }
@@ -360,24 +369,16 @@ private void trySendDelayed(Consumer<REQUEST> sendMethod) {
360369 // after first received, all other requests can be submitted (out-of-order)
361370 delayedRequests .getAllAndClear ().forEach (
362371 seqNum -> sendMethod .accept (requests .getNonRepliedRequest (seqNum , "trySendDelayed" )));
363- } else {
364- // Otherwise, submit the first only if it is a delayed request
365- final Iterator <REQUEST > i = requests .iterator ();
366- if (i .hasNext ()) {
367- final REQUEST r = i .next ();
368- final Long delayed = delayedRequests .remove (r .getSeqNum ());
369- if (delayed != null ) {
370- sendOrDelayRequest (r , sendMethod );
371- }
372- }
373372 }
374373 }
375374
376375 /** Reset the {@link #firstSeqNum} The stream has an error. */
377- public synchronized void resetFirstSeqNum () {
378- firstSeqNum = -1 ;
379- firstReplied = false ;
380- LOG .debug ("After resetFirstSeqNum: {}" , this );
376+ public synchronized void resetFirstSeqNum (long callId ) {
377+ if (callId == -1 || getFirstSeqNum () == map .get (callId )) {
378+ firstSeqNum = -1 ;
379+ firstReplied = false ;
380+ LOG .debug ("After resetFirstSeqNum: {}" , this );
381+ }
381382 }
382383
383384 /** Fail all requests starting from the given seqNum. */
@@ -409,6 +410,10 @@ private void alreadyClosed(REQUEST request, Throwable e) {
409410 public synchronized boolean isFirst (long seqNum ) {
410411 return seqNum == (firstSeqNum != -1 ? firstSeqNum : requests .firstSeqNum ());
411412 }
413+
414+ public long getFirstSeqNum () {
415+ return firstSeqNum != -1 ? firstSeqNum : requests .firstSeqNum ();
416+ }
412417 }
413418
414419 /**
0 commit comments