Skip to content

Commit 681da40

Browse files
committed
improve(SpokePoolClient): Support restart of failed listener
In the event that a child process terminates improperly, support restarting it with its initial lookback.
1 parent 45a5c9e commit 681da40

File tree

1 file changed

+46
-0
lines changed

1 file changed

+46
-0
lines changed

src/clients/SpokePoolClient.ts

+46
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ export class IndexedSpokePoolClient extends clients.SpokePoolClient {
7979
* @returns void
8080
*/
8181
protected startWorker(): void {
82+
// fromBlock is retained for the life of the SpokePoolClient and is reused in case of listener crash.
8283
const {
8384
finality,
8485
eventSearchConfig: { fromBlock, maxBlockLookBack: blockRange },
@@ -92,6 +93,7 @@ export class IndexedSpokePoolClient extends clients.SpokePoolClient {
9293
stdio: ["ignore", "inherit", "inherit", "ipc"],
9394
});
9495

96+
this.worker.on("exit", (code, signal) => this.childExit(code, signal));
9597
this.worker.on("message", (message) => this.indexerUpdate(message));
9698
this.logger.debug({
9799
at: "SpokePoolClient#startWorker",
@@ -100,6 +102,50 @@ export class IndexedSpokePoolClient extends clients.SpokePoolClient {
100102
});
101103
}
102104

105+
/**
106+
* The worker process has exited. Optionally restart it based on the exit code.
107+
* See also: https://nodejs.org/api/child_process.html#event-exit
108+
* @param code Optional exit code.
109+
* @param signal Optional signal resulting in termination.
110+
* @returns void
111+
*/
112+
protected childExit(code?: number, signal?: string): void {
113+
if (code === 0) {
114+
return;
115+
}
116+
117+
this.logger.warn({
118+
at: "SpokePoolClient#childExit",
119+
message: `${this.chain} SpokePool listener exited.`,
120+
code,
121+
signal
122+
});
123+
124+
// Flush all ingested deposits to protect against re-org.
125+
// xxx this probably belongs upstream in a `protected SpokePoolClient.init()` method
126+
// that is called in the constructor, such that it can be re-called by this method to
127+
// re-initialise the same defaults.
128+
this.currentTime = 0;
129+
this.oldestTime = 0;
130+
this.depositHashes = {};
131+
this.depositHashesToFills = {};
132+
this.speedUps = {};
133+
this.slowFillRequests = {};
134+
this.fills = {};
135+
this.depositRoutes = {};
136+
137+
this.tokensBridged = [];
138+
this.rootBundleRelays = [];
139+
this.relayerRefundExecutions = [];
140+
this.earliestDepositIdQueried = Number.MAX_SAFE_INTEGER;
141+
this.latestDepositIdQueried = 0;
142+
this.firstDepositIdForSpokePool = Number.MAX_SAFE_INTEGER;
143+
this.lastDepositIdForSpokePool = Number.MAX_SAFE_INTEGER;
144+
145+
// Restart the listener process from the initial `fromBlock`.
146+
this.startWorker();
147+
}
148+
103149
/**
104150
* Receive an update from the external indexer process.
105151
* @param rawMessage Message to be parsed.

0 commit comments

Comments
 (0)