@@ -151,6 +151,12 @@ export class RetryingCall implements Call {
151151 private initialMetadata : Metadata | null = null ;
152152 private underlyingCalls : UnderlyingCall [ ] = [ ] ;
153153 private writeBuffer : WriteBufferEntry [ ] = [ ] ;
154+ /**
155+ * The offset of message indices in the writeBuffer. For example, if
156+ * writeBufferOffset is 10, message 10 is in writeBuffer[0] and message 15
157+ * is in writeBuffer[5].
158+ */
159+ private writeBufferOffset = 0 ;
154160 /**
155161 * Tracks whether a read has been started, so that we know whether to start
156162 * reads on new child calls. This only matters for the first read, because
@@ -203,14 +209,8 @@ export class RetryingCall implements Call {
203209 private reportStatus ( statusObject : StatusObject ) {
204210 this . trace ( 'ended with status: code=' + statusObject . code + ' details="' + statusObject . details + '"' ) ;
205211 this . bufferTracker . freeAll ( this . callNumber ) ;
206- for ( let i = 0 ; i < this . writeBuffer . length ; i ++ ) {
207- if ( this . writeBuffer [ i ] . entryType === 'MESSAGE' ) {
208- this . writeBuffer [ i ] = {
209- entryType : 'FREED' ,
210- allocated : false
211- } ;
212- }
213- }
212+ this . writeBufferOffset = this . writeBufferOffset + this . writeBuffer . length ;
213+ this . writeBuffer = [ ] ;
214214 process . nextTick ( ( ) => {
215215 // Explicitly construct status object to remove progress field
216216 this . listener ?. onReceiveStatus ( {
@@ -236,20 +236,27 @@ export class RetryingCall implements Call {
236236 }
237237 }
238238
239- private maybefreeMessageBufferEntry ( messageIndex : number ) {
239+ private getBufferEntry ( messageIndex : number ) : WriteBufferEntry {
240+ return this . writeBuffer [ messageIndex - this . writeBufferOffset ] ?? { entryType : 'FREED' , allocated : false } ;
241+ }
242+
243+ private getNextBufferIndex ( ) {
244+ return this . writeBufferOffset + this . writeBuffer . length ;
245+ }
246+
247+ private clearSentMessages ( ) {
240248 if ( this . state !== 'COMMITTED' ) {
241249 return ;
242250 }
243- const bufferEntry = this . writeBuffer [ messageIndex ] ;
244- if ( bufferEntry . entryType === 'MESSAGE' ) {
251+ const earliestNeededMessageIndex = this . underlyingCalls [ this . committedCallIndex ! ] . nextMessageToSend ;
252+ for ( let messageIndex = this . writeBufferOffset ; messageIndex < earliestNeededMessageIndex ; messageIndex ++ ) {
253+ const bufferEntry = this . getBufferEntry ( messageIndex ) ;
245254 if ( bufferEntry . allocated ) {
246255 this . bufferTracker . free ( bufferEntry . message ! . message . length , this . callNumber ) ;
247256 }
248- this . writeBuffer [ messageIndex ] = {
249- entryType : 'FREED' ,
250- allocated : false
251- } ;
252257 }
258+ this . writeBuffer = this . writeBuffer . slice ( earliestNeededMessageIndex - this . writeBufferOffset ) ;
259+ this . writeBufferOffset = earliestNeededMessageIndex ;
253260 }
254261
255262 private commitCall ( index : number ) {
@@ -272,9 +279,7 @@ export class RetryingCall implements Call {
272279 this . underlyingCalls [ i ] . state = 'COMPLETED' ;
273280 this . underlyingCalls [ i ] . call . cancelWithStatus ( Status . CANCELLED , 'Discarded in favor of other hedged attempt' ) ;
274281 }
275- for ( let messageIndex = 0 ; messageIndex < this . underlyingCalls [ index ] . nextMessageToSend - 1 ; messageIndex += 1 ) {
276- this . maybefreeMessageBufferEntry ( messageIndex ) ;
277- }
282+ this . clearSentMessages ( ) ;
278283 }
279284
280285 private commitCallWithMostMessages ( ) {
@@ -555,8 +560,8 @@ export class RetryingCall implements Call {
555560 private handleChildWriteCompleted ( childIndex : number ) {
556561 const childCall = this . underlyingCalls [ childIndex ] ;
557562 const messageIndex = childCall . nextMessageToSend ;
558- this . writeBuffer [ messageIndex ] . callback ?.( ) ;
559- this . maybefreeMessageBufferEntry ( messageIndex ) ;
563+ this . getBufferEntry ( messageIndex ) . callback ?.( ) ;
564+ this . clearSentMessages ( ) ;
560565 childCall . nextMessageToSend += 1 ;
561566 this . sendNextChildMessage ( childIndex ) ;
562567 }
@@ -566,10 +571,10 @@ export class RetryingCall implements Call {
566571 if ( childCall . state === 'COMPLETED' ) {
567572 return ;
568573 }
569- if ( this . writeBuffer [ childCall . nextMessageToSend ] ) {
570- const bufferEntry = this . writeBuffer [ childCall . nextMessageToSend ] ;
574+ if ( this . getBufferEntry ( childCall . nextMessageToSend ) ) {
575+ const bufferEntry = this . getBufferEntry ( childCall . nextMessageToSend ) ;
571576 switch ( bufferEntry . entryType ) {
572- case 'MESSAGE' :
577+ case 'MESSAGE' :
573578 childCall . call . sendMessageWithContext ( {
574579 callback : ( error ) => {
575580 // Ignore error
@@ -594,13 +599,13 @@ export class RetryingCall implements Call {
594599 message,
595600 flags : context . flags ,
596601 } ;
597- const messageIndex = this . writeBuffer . length ;
602+ const messageIndex = this . getNextBufferIndex ( ) ;
598603 const bufferEntry : WriteBufferEntry = {
599604 entryType : 'MESSAGE' ,
600605 message : writeObj ,
601606 allocated : this . bufferTracker . allocate ( message . length , this . callNumber )
602607 } ;
603- this . writeBuffer [ messageIndex ] = bufferEntry ;
608+ this . writeBuffer . push ( bufferEntry ) ;
604609 if ( bufferEntry . allocated ) {
605610 context . callback ?.( ) ;
606611 for ( const [ callIndex , call ] of this . underlyingCalls . entries ( ) ) {
@@ -642,11 +647,11 @@ export class RetryingCall implements Call {
642647 }
643648 halfClose ( ) : void {
644649 this . trace ( 'halfClose called' ) ;
645- const halfCloseIndex = this . writeBuffer . length ;
646- this . writeBuffer [ halfCloseIndex ] = {
650+ const halfCloseIndex = this . getNextBufferIndex ( ) ;
651+ this . writeBuffer . push ( {
647652 entryType : 'HALF_CLOSE' ,
648653 allocated : false
649- } ;
654+ } ) ;
650655 for ( const call of this . underlyingCalls ) {
651656 if ( call ?. state === 'ACTIVE' && call . nextMessageToSend === halfCloseIndex ) {
652657 call . nextMessageToSend += 1 ;
0 commit comments