diff --git a/.dockerignore b/.dockerignore index e41b6d4..fda475d 100644 --- a/.dockerignore +++ b/.dockerignore @@ -9,3 +9,6 @@ logs *.db ./prisma/db/* ./prisma/generated +prisma/dev.db +*.sqlite +*.sqlite-journal diff --git a/bun.lock b/bun.lock index 6a20668..fec0aff 100644 --- a/bun.lock +++ b/bun.lock @@ -4,7 +4,7 @@ "": { "name": "concero-v2-operators", "dependencies": { - "@concero/operator-utils": "https://github.com/concero/operator-utils.git#feature/add-block-checkpoints", + "@concero/operator-utils": "https://github.com/concero/operator-utils.git#feature/restart-failed-transaction", "@prisma/client": "6.16.2", "@slack/web-api": "7.9.1", "@types/jest": "29.5.14", diff --git a/docker/Dockerfile b/docker/Dockerfile index 69cd0b5..3e67214 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -33,5 +33,6 @@ COPY --from=builder --chown=bunuser:bunuser /app/prisma /app/prisma COPY --chown=bunuser:bunuser ./rpc.extensions.json /app/ COPY --chown=bunuser:bunuser ./rpc.overrides.json /app/ -ENV NODE_ENV=production +ENV NODE_ENV=production \ + DATABASE_URL=file:/app/data/concero.sqlite ENTRYPOINT ["/app/entrypoint.sh"] diff --git a/package.json b/package.json index 4883d6a..081c498 100644 --- a/package.json +++ b/package.json @@ -45,7 +45,7 @@ "typescript": "5.0.0" }, "dependencies": { - "@concero/operator-utils": "https://github.com/concero/operator-utils.git#feature/add-block-checkpoints", + "@concero/operator-utils": "https://github.com/concero/operator-utils.git#feature/restart-failed-transaction", "@prisma/client": "6.16.2", "@slack/web-api": "7.9.1", "@types/jest": "29.5.14", diff --git a/prisma/migrations/20251016181328_init/migration.sql b/prisma/migrations/20251016181328_init/migration.sql new file mode 100644 index 0000000..adbfb68 --- /dev/null +++ b/prisma/migrations/20251016181328_init/migration.sql @@ -0,0 +1,16 @@ +-- CreateTable +CREATE TABLE "RelayerJob" ( + "id" INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, + "jobType" TEXT NOT NULL DEFAULT 'tx-submit', + "txHash" TEXT, + "chainName" TEXT NOT NULL, + "payload" TEXT NOT NULL, + "status" TEXT NOT NULL DEFAULT 'pending', + "attempts" INTEGER NOT NULL DEFAULT 0, + "nextRetryAt" DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, + "createdAt" DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, + "updatedAt" DATETIME NOT NULL +); + +-- CreateIndex +CREATE INDEX "RelayerJob_status_nextRetryAt_idx" ON "RelayerJob"("status", "nextRetryAt"); diff --git a/prisma/migrations/20251020165731_add_compound_unique_relayerjob/migration.sql b/prisma/migrations/20251020165731_add_compound_unique_relayerjob/migration.sql new file mode 100644 index 0000000..1521c4c --- /dev/null +++ b/prisma/migrations/20251020165731_add_compound_unique_relayerjob/migration.sql @@ -0,0 +1,8 @@ +/* + Warnings: + + - A unique constraint covering the columns `[jobType,txHash]` on the table `RelayerJob` will be added. If there are existing duplicate values, this will fail. + +*/ +-- CreateIndex +CREATE UNIQUE INDEX "RelayerJob_jobType_txHash_key" ON "RelayerJob"("jobType", "txHash"); diff --git a/prisma/schema.prisma b/prisma/schema.prisma index f7e738f..59e49f1 100644 --- a/prisma/schema.prisma +++ b/prisma/schema.prisma @@ -21,3 +21,19 @@ model LogsListenerBlockCheckpoints { @@index([chainSelector, contractAddress]) @@map("logsListener_blockCheckpoints") } + +model RelayerJob { + id Int @id @default(autoincrement()) + jobType String @default("tx-submit") // tx-submit | report-request + chainName String + txHash String? + payload String + status String @default("pending") // pending | processing | done | failed + attempts Int @default(0) + nextRetryAt DateTime @default(now()) + createdAt DateTime @default(now()) + updatedAt DateTime @updatedAt + + @@index([status, nextRetryAt]) + @@unique([jobType, txHash]) +} diff --git a/src/managers/Relayer.ts b/src/managers/Relayer.ts index f6c4f0e..531dfcb 100644 --- a/src/managers/Relayer.ts +++ b/src/managers/Relayer.ts @@ -1,5 +1,6 @@ import { Address, + ByteArray, decodeAbiParameters, encodeAbiParameters, getAbiItem, @@ -18,6 +19,7 @@ import { ViemClientManager, } from '@concero/operator-utils'; import { MessagingDeploymentManager, RelayerSetup } from './index'; +import { PrismaClient } from '@prisma/client'; import { eventEmitter } from '../constants'; import { decodeLogs } from '../eventListener/decodeLogs'; @@ -26,6 +28,7 @@ import { DecodedLog } from '../types/DecodedLog'; import { RelayerConfig } from '../types/ManagerConfigs'; import { decodeCLFReport, decodeMessageReportResult } from '../utils'; import { DecodedMessageReportResult } from '../utils/decoders/types'; +import { RelayerJobQueue } from '../utils/RelayerJobQueue'; export class Relayer extends ManagerBase { private static instance: Relayer | undefined; @@ -39,6 +42,8 @@ export class Relayer extends ManagerBase { private readonly txMonitor: ITxMonitor; private readonly setup: RelayerSetup; private readonly config: RelayerConfig; + private prisma = new PrismaClient(); + private jobQueue = new RelayerJobQueue(this.prisma); private verifierNetwork!: ConceroNetwork; private verifierAddress!: Address; @@ -144,6 +149,56 @@ export class Relayer extends ManagerBase { await this.setup.executeSetup(); await this.setupEventListeners(); + setInterval(async () => { + const jobs = await this.jobQueue.getDue(10); + + for (const job of jobs) { + if (job.jobType === 'report-request') { + try { + const ctx = JSON.parse(job.payload); + this.logger.info( + `[report-request] retry #${job.attempts + 1} for messageId=${job.txHash}`, + ); + await this.requestMessageReport(ctx.decodedLog, ctx.srcChainSelector); + } catch (err) { + this.logger.error(`[report-request] error: ${err}`); + } finally { + await this.jobQueue.rescheduleReportRequest(job.id, job.attempts); + } + continue; + } + + const ctx = JSON.parse(job.payload); + const { chainName, reportSubmission, messages, indexes, results, totalGasLimit } = + ctx; + const dstChain = this.networkManager.getNetworkByName(chainName); + if (!dstChain) { + this.logger.error(`[${chainName}] Retry failed: no network`); + await this.jobQueue.markFailed(job.id, job.attempts); + continue; + } + + try { + this.logger.info( + `[${chainName}] Retrying job ${job.id} (attempt ${job.attempts + 1})`, + ); + const newTxHash = await this.submitBatchToDestination( + dstChain, + reportSubmission, + messages, + indexes, + results, + totalGasLimit, + ); + this.logger.info(`[${chainName}] Retry success: ${newTxHash}`); + await this.jobQueue.markSuccess(job.id); + } catch (err) { + this.logger.error(`[${chainName}] Retry error: ${err}`); + await this.jobQueue.markFailed(job.id, job.attempts + 1); + } + } + }, 15_000); + this.logger.info('initialized'); } @@ -228,11 +283,11 @@ export class Relayer extends ManagerBase { const decodedLogs = decodeLogs(logs, this.config.abi.CONCERO_ROUTER); const immediateProcessLogs = decodedLogs.filter( - log => log.args.shouldFinaliseSrc === false, + log => log.args?.shouldFinaliseSrc === false, ); const finalityRequiredLogs = decodedLogs.filter( - log => log.args.shouldFinaliseSrc === true, + log => log.args?.shouldFinaliseSrc === true, ); finalityRequiredLogs.forEach(decodedLog => { @@ -242,10 +297,11 @@ export class Relayer extends ManagerBase { chainSelector: network.chainSelector, }); - this.txMonitor.ensureTxFinality( + this.txMonitor.trackTxFinality( txHash, network.name, - this.onFinalityCallback.bind(this), + //todo + 'relayer' ); }); @@ -292,6 +348,10 @@ export class Relayer extends ManagerBase { ); return; } + const allMessageIds = messageResults.map(r => r.messageId); + await Promise.allSettled( + allMessageIds.map(id => this.jobQueue.cancelReportRequest(id)), + ); const messagesByDstChain = this.groupMessagesByDestination(messageResults); @@ -324,7 +384,7 @@ export class Relayer extends ManagerBase { ); const validMessages: string[] = []; - const validIndexes: bigint[] = []; + const validIndexes: number[] = []; const validResults: any[] = []; let totalGasLimit = 0n; @@ -369,20 +429,21 @@ export class Relayer extends ManagerBase { const messageIds = validResults.map(r => r.messageId); - // this.destinationChainFinalityMap.set(submissionTxHash, { - // chainName: dstChain.name, - // messageIds, - // reportSubmission, - // messages: validMessages, - // indexes: validIndexes, - // results: validResults, - // totalGasLimit, - // }); - - this.txMonitor.ensureTxFinality( + this.destinationChainFinalityMap.set(submissionTxHash, { + chainName: dstChain.name, + messageIds, + reportSubmission, + messages: validMessages, + indexes: validIndexes, + results: validResults, + totalGasLimit, + }); + + this.txMonitor.trackTxFinality( submissionTxHash, dstChain.name, - this.onFinalityCallback.bind(this), + //todo + 'relayer' ); } catch (err) { this.logger.error( @@ -420,8 +481,11 @@ export class Relayer extends ManagerBase { ): Promise { try { const args = decodedLog.args; - const { messageId, message, sender } = args; - + const { messageId, message, sender } = args as unknown as { + messageId: string; + message: Hash | ByteArray; + sender: Hash; + }; if (!messageId || !message || !sender || !decodedLog.blockNumber) { this.logger.error(`Missing required data in log: ${decodedLog}`); return; @@ -437,12 +501,7 @@ export class Relayer extends ManagerBase { ], }, ], - [ - { - blockNumber: BigInt(decodedLog.blockNumber), - sender, - }, - ], + [{ blockNumber: BigInt(decodedLog.blockNumber), sender }], ); const txHash = await this.txWriter.callContract(this.verifierNetwork, { @@ -453,18 +512,29 @@ export class Relayer extends ManagerBase { chain: this.verifierNetwork.viemChain, }); - if (txHash) { - eventEmitter.emit('requestMessageReport', { - txHash: txHash, - }); - this.logger.info(`Report requested, tx: ${txHash}`); - } else { - this.logger.error(`Failed to submit Report request`); - } + await this.jobQueue.upsertReportRequest( + messageId, + this.verifierNetwork.name, + { decodedLog, srcChainSelector }, + 5, + ); + + eventEmitter.emit('requestMessageReport', { txHash }); + this.logger.info(`Report requested, tx: ${txHash}`); } catch (error) { const messageId = decodedLog.args?.messageId; this.logger.error( - `[${this.verifierNetwork.name}] Error requesting CLF message report for messageId ${messageId || 'unknown'}: ${error}`, + `[${this.verifierNetwork.name}] Error requesting report for messageId ${messageId || 'unknown'}: ${error}`, + ); + + await this.jobQueue.upsertReportRequest( + decodedLog.args?.messageId ?? 'unknown', + this.verifierNetwork.name, + { + decodedLog, + srcChainSelector, + }, + 10, ); } } @@ -560,7 +630,10 @@ export class Relayer extends ManagerBase { return { message: null, gasLimit: 0n }; } - const { message, dstChainData } = conceroMessageSentLog.args; + const { message, dstChainData } = conceroMessageSentLog.args as { + message: string; + dstChainData: Hash | ByteArray; + }; const decodedDstChainData = decodeAbiParameters( [ @@ -582,10 +655,10 @@ export class Relayer extends ManagerBase { dstChain: ConceroNetwork, reportSubmission: any, messages: string[], - indexes: number[], + indexes: number[] | bigint[], results: DecodedMessageReportResult[], totalGasLimit: bigint, - ): Promise { + ): Promise { const dstConceroRouter = await this.deploymentManager.getRouterByChainName(dstChain.name); const txHash = await this.txWriter.callContract( @@ -642,7 +715,7 @@ export class Relayer extends ManagerBase { if (isFinalized) { this.destinationChainFinalityMap.delete(txHash); } else { - this.retryDestinationSubmission(txHash); + this.retryDestinationSubmissionWithQueue(txHash); } return; } @@ -650,61 +723,19 @@ export class Relayer extends ManagerBase { this.logger.error(`No context found for transaction ${txHash} on chain ${chainName}`); } - private async retryDestinationSubmission(originalTxHash: string): Promise { + private async retryDestinationSubmissionWithQueue(originalTxHash: string): Promise { const context = this.destinationChainFinalityMap.get(originalTxHash); if (!context) { - this.logger.error(`Cannot retry: no context found for ${originalTxHash}`); + this.logger.error(`Cannot retry: no context for ${originalTxHash}`); return; } - const { chainName, reportSubmission, messages, indexes, results, totalGasLimit } = context; + const { chainName } = context; - const dstChain = this.networkManager.getNetworkByName(chainName); - if (!dstChain) { - this.logger.error(`Cannot retry: network ${chainName} not found`); - this.destinationChainFinalityMap.delete(originalTxHash); - return; - } - - const maxAttempts = 3; - for (let attempt = 1; attempt <= maxAttempts; attempt++) { - try { - this.logger.info( - `[${chainName}] Retrying report submission (attempt ${attempt}/${maxAttempts}). Message IDs: ${context.messageIds.join(', ')}`, - ); - - const newTxHash = await this.submitBatchToDestination( - dstChain, - reportSubmission, - messages, - indexes, - results, - totalGasLimit, - ); + await this.jobQueue.add('tx-submit', chainName, originalTxHash, context); - this.destinationChainFinalityMap.delete(originalTxHash); - this.destinationChainFinalityMap.set(newTxHash, context); - this.txMonitor.ensureTxFinality( - newTxHash, - dstChain.name, - this.onFinalityCallback.bind(this), - ); - return; - } catch (error) { - this.logger.error( - `Error in destination submission attempt ${attempt}/${maxAttempts} for ${originalTxHash}: ${error}`, - ); - } - - if (attempt < maxAttempts) { - await new Promise(resolve => setTimeout(resolve, 5000)); - } - } - - // All attempts threw exceptions, clean up the original failed tx - this.logger.error( - `[${chainName}] All ${maxAttempts} submission attempts threw exceptions. Giving up. Message IDs: ${context.messageIds.join(', ')}`, - ); this.destinationChainFinalityMap.delete(originalTxHash); + + this.logger.info(`[${chainName}] Queued failed tx ${originalTxHash} for retry`); } } diff --git a/src/types/envPrefixes.ts b/src/types/envPrefixes.ts index faad3b3..f30aaff 100644 --- a/src/types/envPrefixes.ts +++ b/src/types/envPrefixes.ts @@ -1,7 +1,16 @@ export type EnvPrefixes = { + nodeEnv: string; + networkMode: string; + logLevel: string; + operatorAddress: string; + operatorPrivateKey: string; + pollingIntervalMs: string; + dryRun: string; + rpcServiceGitBranch: string; + deploymentsServiceGitBranch: string; router: string; verifier: string; - lpToken: string; - create3Factory: string; - pause: string; + lpToken?: string; + create3Factory?: string; + pause?: string; }; diff --git a/src/utils/RelayerJobQueue.ts b/src/utils/RelayerJobQueue.ts new file mode 100644 index 0000000..40a72ed --- /dev/null +++ b/src/utils/RelayerJobQueue.ts @@ -0,0 +1,83 @@ +import { PrismaClient } from '@prisma/client'; + +const DELAYS = [5, 10, 30, 120, 300, 600, 1200, 3600]; +const nextDelay = (attempts: number) => + attempts < DELAYS.length ? DELAYS[attempts] : DELAYS[DELAYS.length - 1]; + +export class RelayerJobQueue { + constructor(private prisma: PrismaClient) {} + + async getDue(limit = 10) { + return this.prisma.relayerJob.findMany({ + where: { status: 'pending', nextRetryAt: { lte: new Date() } }, + orderBy: { id: 'asc' }, + take: limit, + }); + } + + async markSuccess(id: number) { + await this.prisma.relayerJob.delete({ where: { id } }); + } + + async markFailed(id: number, attempts: number) { + const delay = nextDelay(attempts); + const next = new Date(Date.now() + delay * 1000); + await this.prisma.relayerJob.update({ + where: { id }, + data: { attempts: { increment: 1 }, nextRetryAt: next }, + }); + } + + async add(jobType: 'tx-submit', chainName: string, txHash: string, payload: any) { + await this.prisma.relayerJob.create({ + data: { + jobType, + chainName, + txHash, + payload: JSON.stringify(payload, (_, v) => typeof v === 'bigint' ? v.toString() : v), + }, + }); + } + + async upsertReportRequest( + messageId: string, + chainName: string, + payload: any, + initialDelaySec = 5, + ) { + const next = new Date(Date.now() + initialDelaySec * 1000); + await this.prisma.relayerJob.upsert({ + where: { jobType_txHash: { jobType: 'report-request', txHash: messageId } }, + update: { + chainName, + payload: JSON.stringify(payload, (_, v) => typeof v === 'bigint' ? v.toString() : v), + status: 'pending', + nextRetryAt: next, + }, + create: { + jobType: 'report-request', + chainName, + txHash: messageId, + payload: JSON.stringify(payload, (_, v) => typeof v === 'bigint' ? v.toString() : v), + status: 'pending', + attempts: 0, + nextRetryAt: next, + }, + }); + } + + async rescheduleReportRequest(id: number, attempts: number) { + const delay = nextDelay(attempts); + const next = new Date(Date.now() + delay * 1000); + await this.prisma.relayerJob.update({ + where: { id }, + data: { attempts: { increment: 1 }, nextRetryAt: next }, + }); + } + + async cancelReportRequest(messageId: string) { + await this.prisma.relayerJob.deleteMany({ + where: { jobType: 'report-request', txHash: messageId }, + }); + } +} diff --git a/src/utils/configureDotEnv.ts b/src/utils/configureDotEnv.ts index 58c5fe4..e0930c1 100644 --- a/src/utils/configureDotEnv.ts +++ b/src/utils/configureDotEnv.ts @@ -4,6 +4,40 @@ import * as path from 'path'; export const ENV_FILES: string[] = ['.env']; +function stripJunk(raw: string) { + return raw + .replace(/^\uFEFF/, '') + .replace(/[\u200B-\u200D\uFEFF]/g, '') + .trim() + .replace(/^['"]|['"]$/g, ''); +} + +function sanitizeHexPrivateKey(raw: string | undefined): `0x${string}` { + if (!raw) throw new Error('OPERATOR_PRIVATE_KEY is not set'); + + let v = stripJunk(raw); + + if (/^[0-9a-fA-F]{64}$/.test(v)) v = '0x' + v; + + if (!/^0x[0-9a-fA-F]{64}$/.test(v)) { + const sample = v.slice(0, 12); + throw new Error( + `OPERATOR_PRIVATE_KEY has invalid format. Got "${sample}..." (len=${v.length}). Expected 0x + 64 hex.`, + ); + } + return v as `0x${string}`; +} + +function sanitizeEthAddress(raw: string | undefined): `0x${string}` { + if (!raw) throw new Error('OPERATOR_ADDRESS is not set'); + + const v = stripJunk(raw); + if (!/^0x[0-9a-fA-F]{40}$/.test(v)) { + throw new Error(`OPERATOR_ADDRESS has invalid format: "${v}". Expected 0x + 40 hex.`); + } + return v as `0x${string}`; +} + /** * Configures dotenv loading order: * 1) Base files from ENV_FILES (in order added) @@ -13,22 +47,29 @@ export const ENV_FILES: string[] = ['.env']; * Any other NODE_ENV values are coerced to "development". * * @param {string} [basePath='./'] - Base path where .env files are located. - */ + * */ export function configureDotEnv(basePath = './'): void { const baseDir = basePath.endsWith(path.sep) ? basePath : `${basePath}${path.sep}`; - const nodeEnvNormalized = process.env.NODE_ENV === 'production' ? 'production' : 'development'; for (const file of ENV_FILES) { const p = path.resolve(baseDir, file); - dotenv.config({ path: p, override: false }); + dotenv.config({ path: p, override: true }); } - const localFilePath = path.resolve(baseDir, '.env.local'); - dotenv.config({ path: localFilePath, override: true }); + dotenv.config({ path: path.resolve(baseDir, '.env.local'), override: true }); - const envFilePath = path.resolve(baseDir, `.env.${nodeEnvNormalized}`); - dotenv.config({ path: envFilePath, override: true }); + dotenv.config({ path: path.resolve(baseDir, `.env.${nodeEnvNormalized}`), override: true }); + + try { + process.env.OPERATOR_PRIVATE_KEY = sanitizeHexPrivateKey(process.env.OPERATOR_PRIVATE_KEY); + } catch (e) { + throw e; + } + + if (process.env.OPERATOR_ADDRESS) { + process.env.OPERATOR_ADDRESS = sanitizeEthAddress(process.env.OPERATOR_ADDRESS); + } } configureDotEnv(); diff --git a/src/utils/resolvePrivateKey.ts b/src/utils/resolvePrivateKey.ts new file mode 100644 index 0000000..486f77d --- /dev/null +++ b/src/utils/resolvePrivateKey.ts @@ -0,0 +1,18 @@ +export function resolvePrivateKey(raw: string | undefined): `0x${string}` { + if (!raw) throw new Error('OPERATOR_PRIVATE_KEY is not set'); + + let t = raw + .trim() + .replace(/^['"]|['"]$/g, '') + .replace(/\s+/g, ''); + + if (/^[0-9a-fA-F]{64}$/.test(t)) t = '0x' + t; + + if (!/^0x[0-9a-fA-F]{64}$/.test(t)) { + const sample = t.slice(0, 10); + throw new Error( + `OPERATOR_PRIVATE_KEY has invalid format. Got: "${sample}..." (len=${t.length}). Expected 0x + 64 hex.`, + ); + } + return t as `0x${string}`; +}