Skip to content

Commit

Permalink
feat: send end ACK (#762)
Browse files Browse the repository at this point in the history
  • Loading branch information
r4mmer authored Sep 6, 2024
1 parent 21a33bf commit ef71959
Showing 1 changed file with 5 additions and 2 deletions.
7 changes: 5 additions & 2 deletions src/sync/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ const QUEUE_GRACEFUL_SHUTDOWN_LIMIT = 10000;
interface IStreamSyncHistoryBegin {
type: 'stream:history:begin';
id: string;
seq: number;
}

interface IStreamSyncHistoryVertex {
Expand All @@ -37,6 +38,7 @@ interface IStreamSyncHistoryAddress {
interface IStreamSyncHistoryEnd {
type: 'stream:history:end';
id: string;
seq: number;
}

interface IStreamSyncHistoryError {
Expand Down Expand Up @@ -645,9 +647,10 @@ export class StreamManager extends AbortController {
}
}

endStream() {
endStream(seq: number) {
this.logger.debug('Received end-of-stream event.');
this.hasReceivedEndStream = true;
this.connection.sendStreamHistoryAck(this.streamId, seq);
}

async shutdown() {
Expand Down Expand Up @@ -713,7 +716,7 @@ function buildListener(manager: StreamManager, resolve: () => void) {
manager.generateNextBatch();
} else if (isStreamSyncHistoryEnd(wsData)) {
// cleanup and stop the method.
manager.endStream();
manager.endStream(wsData.seq);
resolve();
} else if (isStreamSyncHistoryError(wsData)) {
// An error happened on the fullnode, we should stop the stream
Expand Down

0 comments on commit ef71959

Please sign in to comment.