Skip to content

Commit 1b99d3d

Browse files
Antoine de Chevignéclaude
andcommitted
Add auto-disable for explorers with unreachable RPCs
When an explorer's RPC becomes unreachable, the explorerSyncCheck job keeps trying to start PM2 processes that immediately fail. This creates unnecessary load on the PM2 server. This change adds automatic sync deactivation after 3 consecutive failures: - Track sync failures on Explorer model (syncFailedAttempts, syncDisabledAt, syncDisabledReason, nextRecoveryCheckAt) - Auto-disable shouldSync after 3 consecutive RPC failures or PM2 timeouts - Add syncRecoveryCheck job that periodically tests disabled explorers - Use exponential backoff for recovery checks (5m -> 15m -> 1h -> 6h) - Reset failure tracking when sync is manually re-enabled Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1 parent e261cdf commit 1b99d3d

9 files changed

Lines changed: 571 additions & 10 deletions

run/jobs/index.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ module.exports = {
4545
rpcHealthCheck: require('./rpcHealthCheck'),
4646
rpcHealthCheckStarter: require('./rpcHealthCheckStarter'),
4747
explorerSyncCheck: require('./explorerSyncCheck'),
48+
syncRecoveryCheck: require('./syncRecoveryCheck'),
4849
workspaceReset: require('./workspaceReset'),
4950
batchBlockDelete: require('./batchBlockDelete'),
5051
batchContractDelete: require('./batchContractDelete'),

run/jobs/syncRecoveryCheck.js

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/**
2+
* @fileoverview Sync recovery check job.
3+
* Periodically checks auto-disabled explorers to see if their RPC has recovered.
4+
* Re-enables sync when RPC becomes reachable, using exponential backoff for retries.
5+
*
6+
* Backoff schedule: 5m -> 15m -> 1h -> 6h (max)
7+
*
8+
* @module jobs/syncRecoveryCheck
9+
*/
10+
11+
const { Explorer, Workspace } = require('../models');
12+
const { ProviderConnector } = require('../lib/rpc');
13+
const { withTimeout } = require('../lib/utils');
14+
const { Op } = require('sequelize');
15+
const logger = require('../lib/logger');
16+
17+
module.exports = async () => {
18+
// Find explorers that are due for a recovery check
19+
const explorers = await Explorer.findAll({
20+
where: {
21+
syncDisabledReason: { [Op.ne]: null },
22+
nextRecoveryCheckAt: { [Op.lte]: new Date() }
23+
},
24+
include: [{
25+
model: Workspace,
26+
as: 'workspace',
27+
attributes: ['id', 'rpcServer']
28+
}]
29+
});
30+
31+
if (explorers.length === 0) {
32+
return 'No explorers due for recovery check';
33+
}
34+
35+
let recovered = 0;
36+
let stillUnreachable = 0;
37+
38+
for (const explorer of explorers) {
39+
try {
40+
const provider = new ProviderConnector(explorer.workspace.rpcServer);
41+
const block = await withTimeout(provider.fetchLatestBlock());
42+
43+
if (block) {
44+
// RPC is reachable - re-enable sync
45+
await explorer.enableSyncAfterRecovery();
46+
recovered++;
47+
48+
logger.info({
49+
message: 'Explorer re-enabled after RPC recovery',
50+
explorerId: explorer.id,
51+
explorerSlug: explorer.slug,
52+
disabledReason: explorer.syncDisabledReason
53+
});
54+
} else {
55+
// Block fetch returned null - still unreachable
56+
await explorer.scheduleNextRecoveryCheck();
57+
stillUnreachable++;
58+
}
59+
} catch (error) {
60+
// RPC check failed - schedule next check with backoff
61+
await explorer.scheduleNextRecoveryCheck();
62+
stillUnreachable++;
63+
64+
logger.debug({
65+
message: 'Explorer recovery check failed',
66+
explorerId: explorer.id,
67+
explorerSlug: explorer.slug,
68+
error: error.message
69+
});
70+
}
71+
}
72+
73+
return `Checked ${explorers.length} explorers: ${recovered} recovered, ${stillUnreachable} still unreachable`;
74+
};

run/jobs/updateExplorerSyncingProcess.js

Lines changed: 42 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,15 @@
11
/**
22
* @fileoverview Explorer sync process update job.
33
* Manages PM2 processes for explorer block syncing based on status.
4+
* Auto-disables sync after repeated RPC failures with exponential backoff recovery.
45
* @module jobs/updateExplorerSyncingProcess
56
*/
67

78
const { Explorer, Workspace, RpcHealthCheck, StripeSubscription, StripePlan } = require('../models');
89
const PM2 = require('../lib/pm2');
10+
const logger = require('../lib/logger');
11+
12+
const SYNC_FAILURE_THRESHOLD = 3;
913

1014
module.exports = async job => {
1115
const data = job.data;
@@ -57,7 +61,19 @@ module.exports = async job => {
5761
}
5862
else if (explorer.workspace.rpcHealthCheck && !explorer.workspace.rpcHealthCheck.isReachable && existingProcess) {
5963
await pm2.delete(explorer.slug);
60-
return 'Process deleted: RPC is not reachable.';
64+
// Track RPC failure and potentially auto-disable
65+
const result = await explorer.incrementSyncFailures('rpc_unreachable');
66+
if (result.disabled) {
67+
logger.info({
68+
message: 'Explorer auto-disabled due to RPC failures',
69+
explorerId: explorer.id,
70+
explorerSlug: explorer.slug,
71+
attempts: result.attempts,
72+
reason: 'rpc_unreachable'
73+
});
74+
return `Process deleted and sync auto-disabled after ${result.attempts} RPC failures.`;
75+
}
76+
return `Process deleted: RPC is not reachable (attempt ${result.attempts}/${SYNC_FAILURE_THRESHOLD}).`;
6177
}
6278
else if (!explorer.shouldSync && existingProcess) {
6379
await pm2.delete(explorer.slug);
@@ -69,17 +85,41 @@ module.exports = async job => {
6985
}
7086
else if (explorer.shouldSync && !existingProcess) {
7187
await pm2.start(explorer.slug, explorer.workspaceId);
88+
// Reset failure counter on successful start
89+
if (explorer.syncFailedAttempts > 0) {
90+
await explorer.update({ syncFailedAttempts: 0 });
91+
}
7292
return 'Process started.';
7393
}
7494
else if (explorer.shouldSync && existingProcess && existingProcess.pm2_env.status == 'stopped') {
7595
await pm2.resume(explorer.slug, explorer.workspaceId);
96+
// Reset failure counter on successful resume
97+
if (explorer.syncFailedAttempts > 0) {
98+
await explorer.update({ syncFailedAttempts: 0 });
99+
}
76100
return 'Process resumed.';
77101
}
78102
else
79103
return 'No process change.';
80104
} catch(error) {
81-
if (error.message.startsWith('Timed out after'))
105+
if (error.message.startsWith('Timed out after')) {
106+
// Track timeout as a failure if explorer exists and sync is enabled
107+
if (explorer && explorer.shouldSync) {
108+
const result = await explorer.incrementSyncFailures('pm2_timeout');
109+
if (result.disabled) {
110+
logger.info({
111+
message: 'Explorer auto-disabled due to PM2 timeouts',
112+
explorerId: explorer.id,
113+
explorerSlug: explorer.slug,
114+
attempts: result.attempts,
115+
reason: 'pm2_timeout'
116+
});
117+
return `Timed out and sync auto-disabled after ${result.attempts} failures.`;
118+
}
119+
return `Timed out (attempt ${result.attempts}/${SYNC_FAILURE_THRESHOLD})`;
120+
}
82121
return 'Timed out';
122+
}
83123
else
84124
throw error;
85125
}
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
'use strict';
2+
3+
module.exports = {
4+
async up (queryInterface, Sequelize) {
5+
const transaction = await queryInterface.sequelize.transaction();
6+
try {
7+
await queryInterface.addColumn('explorers', 'syncFailedAttempts', {
8+
type: Sequelize.DataTypes.INTEGER,
9+
allowNull: false,
10+
defaultValue: 0
11+
}, { transaction });
12+
13+
await queryInterface.addColumn('explorers', 'syncDisabledAt', {
14+
type: Sequelize.DataTypes.DATE,
15+
allowNull: true
16+
}, { transaction });
17+
18+
await queryInterface.addColumn('explorers', 'syncDisabledReason', {
19+
type: Sequelize.DataTypes.STRING,
20+
allowNull: true
21+
}, { transaction });
22+
23+
await queryInterface.addColumn('explorers', 'nextRecoveryCheckAt', {
24+
type: Sequelize.DataTypes.DATE,
25+
allowNull: true
26+
}, { transaction });
27+
28+
// Add index on nextRecoveryCheckAt for efficient recovery job queries
29+
await queryInterface.addIndex('explorers', ['nextRecoveryCheckAt'], {
30+
name: 'explorers_next_recovery_check_at_idx',
31+
where: { nextRecoveryCheckAt: { [Sequelize.Op.ne]: null } },
32+
transaction
33+
});
34+
35+
await transaction.commit();
36+
} catch(error) {
37+
console.log(error);
38+
await transaction.rollback();
39+
throw error;
40+
}
41+
},
42+
43+
async down(queryInterface, Sequelize) {
44+
const transaction = await queryInterface.sequelize.transaction();
45+
try {
46+
await queryInterface.removeIndex('explorers', 'explorers_next_recovery_check_at_idx', { transaction });
47+
await queryInterface.removeColumn('explorers', 'syncFailedAttempts', { transaction });
48+
await queryInterface.removeColumn('explorers', 'syncDisabledAt', { transaction });
49+
await queryInterface.removeColumn('explorers', 'syncDisabledReason', { transaction });
50+
await queryInterface.removeColumn('explorers', 'nextRecoveryCheckAt', { transaction });
51+
52+
await transaction.commit();
53+
} catch(error) {
54+
console.log(error);
55+
await transaction.rollback();
56+
throw error;
57+
}
58+
}
59+
};

run/models/explorer.js

Lines changed: 119 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,15 @@ const IUniswapV2Router02 = require('../lib/abis/IUniswapV2Router02.json');
3131
const IUniswapV2Factory = require('../lib/abis/IUniswapV2Factory.json');
3232
const MAX_RPC_ATTEMPTS = 3;
3333

34+
// Sync failure auto-disable configuration
35+
const SYNC_FAILURE_THRESHOLD = 3;
36+
const RECOVERY_BACKOFF_SCHEDULE = [
37+
5 * 60 * 1000, // 5 minutes
38+
15 * 60 * 1000, // 15 minutes
39+
60 * 60 * 1000, // 1 hour
40+
6 * 60 * 60 * 1000 // 6 hours (max)
41+
];
42+
3443
module.exports = (sequelize, DataTypes) => {
3544
class Explorer extends Model {
3645
/**
@@ -299,10 +308,18 @@ module.exports = (sequelize, DataTypes) => {
299308

300309
/**
301310
* Starts block synchronization for the explorer.
311+
* Resets all failure tracking when manually enabling sync.
302312
* @returns {Promise<Explorer>} Updated explorer
303313
*/
304-
startSync() {
305-
return this.update({ shouldSync: true });
314+
async startSync() {
315+
await this.update({
316+
shouldSync: true,
317+
syncFailedAttempts: 0,
318+
syncDisabledAt: null,
319+
syncDisabledReason: null,
320+
nextRecoveryCheckAt: null
321+
});
322+
return this;
306323
}
307324

308325
/**
@@ -313,6 +330,101 @@ module.exports = (sequelize, DataTypes) => {
313330
return this.update({ shouldSync: false });
314331
}
315332

333+
/**
334+
* Increments the sync failure counter and auto-disables if threshold reached.
335+
* @param {string} [reason='rpc_unreachable'] - Reason for the failure
336+
* @returns {Promise<{disabled: boolean, attempts: number}>} Result with disable status
337+
*/
338+
async incrementSyncFailures(reason = 'rpc_unreachable') {
339+
const newCount = (this.syncFailedAttempts || 0) + 1;
340+
await this.update({ syncFailedAttempts: newCount });
341+
342+
if (newCount >= SYNC_FAILURE_THRESHOLD) {
343+
await this.autoDisableSync(reason);
344+
return { disabled: true, attempts: newCount };
345+
}
346+
return { disabled: false, attempts: newCount };
347+
}
348+
349+
/**
350+
* Auto-disables sync and schedules first recovery check.
351+
* @param {string} reason - Reason for disabling (e.g., 'rpc_unreachable')
352+
* @returns {Promise<Explorer>} Updated explorer
353+
*/
354+
async autoDisableSync(reason) {
355+
const nextCheck = new Date(Date.now() + RECOVERY_BACKOFF_SCHEDULE[0]);
356+
await this.update({
357+
shouldSync: false,
358+
syncDisabledAt: new Date(),
359+
syncDisabledReason: reason,
360+
nextRecoveryCheckAt: nextCheck
361+
});
362+
return this;
363+
}
364+
365+
/**
366+
* Resets all sync failure tracking state.
367+
* @returns {Promise<Explorer>} Updated explorer
368+
*/
369+
async resetSyncState() {
370+
await this.update({
371+
syncFailedAttempts: 0,
372+
syncDisabledAt: null,
373+
syncDisabledReason: null,
374+
nextRecoveryCheckAt: null
375+
});
376+
return this;
377+
}
378+
379+
/**
380+
* Schedules the next recovery check using exponential backoff.
381+
* Backoff schedule: 5m -> 15m -> 1h -> 6h (max)
382+
* @returns {Promise<Explorer>} Updated explorer
383+
*/
384+
async scheduleNextRecoveryCheck() {
385+
if (!this.syncDisabledAt) {
386+
return this;
387+
}
388+
389+
const timeSinceDisabled = Date.now() - new Date(this.syncDisabledAt).getTime();
390+
let cumulativeTime = 0;
391+
let backoffIndex = 0;
392+
393+
// Find which backoff interval we should use based on time since disabled
394+
for (let i = 0; i < RECOVERY_BACKOFF_SCHEDULE.length; i++) {
395+
cumulativeTime += RECOVERY_BACKOFF_SCHEDULE[i];
396+
if (timeSinceDisabled < cumulativeTime) {
397+
backoffIndex = i;
398+
break;
399+
}
400+
backoffIndex = i;
401+
}
402+
403+
// Cap at max backoff (last element)
404+
if (backoffIndex >= RECOVERY_BACKOFF_SCHEDULE.length) {
405+
backoffIndex = RECOVERY_BACKOFF_SCHEDULE.length - 1;
406+
}
407+
408+
const nextCheck = new Date(Date.now() + RECOVERY_BACKOFF_SCHEDULE[backoffIndex]);
409+
await this.update({ nextRecoveryCheckAt: nextCheck });
410+
return this;
411+
}
412+
413+
/**
414+
* Re-enables sync after successful recovery check.
415+
* @returns {Promise<Explorer>} Updated explorer
416+
*/
417+
async enableSyncAfterRecovery() {
418+
await this.update({
419+
shouldSync: true,
420+
syncFailedAttempts: 0,
421+
syncDisabledAt: null,
422+
syncDisabledReason: null,
423+
nextRecoveryCheckAt: null
424+
});
425+
return this;
426+
}
427+
316428
/**
317429
* Creates a Uniswap V2 compatible DEX for the explorer.
318430
* @param {string} routerAddress - DEX router contract address
@@ -690,7 +802,11 @@ module.exports = (sequelize, DataTypes) => {
690802
shouldEnforceQuota: DataTypes.BOOLEAN,
691803
isDemo: DataTypes.BOOLEAN,
692804
gasAnalyticsEnabled: DataTypes.BOOLEAN,
693-
displayTopAccounts: DataTypes.BOOLEAN
805+
displayTopAccounts: DataTypes.BOOLEAN,
806+
syncFailedAttempts: DataTypes.INTEGER,
807+
syncDisabledAt: DataTypes.DATE,
808+
syncDisabledReason: DataTypes.STRING,
809+
nextRecoveryCheckAt: DataTypes.DATE
694810
}, {
695811
hooks: {
696812
afterCreate(explorer, options) {

run/scheduler.js

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ const { enqueue } = require('./lib/queue');
33
const INTEGRITY_CHECK_INTERVAL = 5 * 60 * 1000;
44
const RPC_HEALTH_CHECK_INTERVAL = 5 * 60 * 1000;
55
const SUBSCRIPTION_CHECK_INTERVAL = 5 * 60 * 1000;
6+
const SYNC_RECOVERY_CHECK_INTERVAL = 5 * 60 * 1000;
67
const QUEUE_MONITORING_INTERVAL = 60 * 1000;
78
const CANCEL_DEMO_INTERVAL = 60 * 60 * 1000;
89
const BLOCK_SYNC_MONITORING_INTERVAL = 60 * 1000;
@@ -48,6 +49,14 @@ const BLOCK_SYNC_MONITORING_INTERVAL = 60 * 1000;
4849
{ every: SUBSCRIPTION_CHECK_INTERVAL }
4950
);
5051

52+
await enqueue(
53+
'syncRecoveryCheck',
54+
'syncRecoveryCheck',
55+
{},
56+
10,
57+
{ every: SYNC_RECOVERY_CHECK_INTERVAL }
58+
);
59+
5160
await enqueue(
5261
'queueMonitoring',
5362
'queueMonitoring',

0 commit comments

Comments
 (0)