Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
293 changes: 260 additions & 33 deletions src/modules/scraper/adapter/messaging/BlocksEventsConsumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,30 @@ import {
RequestedSpeedUpV3DepositEvent,
} from "../../../web3/model";
import SwapAndBridgeAbi from "../../../web3/services/abi/SwapAndBridge.json";
import WethMainnetAbi from "../../../web3/services/abi/WETH_1.json";
import WethOptimismAbi from "../../../web3/services/abi/WETH_10.json";
import WethArbitrumAbi from "../../../web3/services/abi/WETH_42161.json";
import WethLineaAbi from "../../../web3/services/abi/WETH_59144.json";
import WethBaseAbi from "../../../web3/services/abi/WETH_8453.json";
import { SwapBeforeBridgeEvent } from "../../../web3/model/swap-and-bridge-events";
import { ChainIds } from "../../../web3/model/ChainId";
import {
WethDepositEvent,
WethDepositEventBase,
WethDepositEventLinea,
WethDepositEventOptimism,
WethTransferEventArbitrum,
} from "../../../web3/model/weth-events";
import { AppConfig } from "../../../configuration/configuration.service";
import { splitBlockRanges } from "../../utils";
import { AcrossContractsVersion } from "../../../web3/model/across-version";

const SPOKE_POOL_VERIFIER_CONTRACT_ADDRESS = "0x269727F088F16E1Aea52Cf5a97B1CD41DAA3f02D";
type EnrichedDepositEvent = {
depositEvent: FundsDepositedV3Event;
swapEvent?: SwapBeforeBridgeEvent;
wethEvent?: any;
};

@Processor(ScraperQueue.BlocksEvents)
export class BlocksEventsConsumer {
Expand Down Expand Up @@ -204,52 +222,210 @@ export class BlocksEventsConsumer {
const depositEventsByTxHash = typedEvents.reduce((acc, event) => {
acc[event.transactionHash] = [...(acc[event.transactionHash] || []), event];
return acc;
}, {} as Record<string, FundsDepositedV3Event[]>);
const swapEventsByTxHash: Record<string, SwapBeforeBridgeEvent[]> = {};
}, {} as { [txHash: string]: FundsDepositedV3Event[] });
const swapEventsByTxHash = await this.extractSwapEvents(depositEventsByTxHash, chainId);
const wethEventsByTxHash = await this.extractWethEvents(depositEventsByTxHash, chainId);
const matchedEvents = this.matchDepositEventsWithSwapEvents(
depositEventsByTxHash,
swapEventsByTxHash,
wethEventsByTxHash,
chainId,
);
await this.saveDepositEvents(chainId, matchedEvents);
}

for (const transactionHash of Object.keys(depositEventsByTxHash)) {
const txReceipt = await this.providers.getCachedTransactionReceipt(chainId, transactionHash);
const swapBeforeBridgeEvents = this.providers.parseTransactionReceiptLogs(
txReceipt,
"SwapBeforeBridge",
SwapAndBridgeAbi,
) as unknown as SwapBeforeBridgeEvent[];
swapEventsByTxHash[transactionHash] = swapBeforeBridgeEvents;
async saveDepositEvents(chainId: number, events: { [txHash: string]: EnrichedDepositEvent[] }) {
for (const txHash of Object.keys(events)) {
for (const matchedEvent of events[txHash]) {
try {
const { depositEvent, swapEvent, wethEvent } = matchedEvent;
const deposit = await this.fromFundsDepositedV3EventToDeposit(chainId, depositEvent, swapEvent, wethEvent);
const result = await this.depositRepository.insert(deposit);
await this.scraperQueuesService.publishMessage<BlockNumberQueueMessage>(ScraperQueue.BlockNumber, {
depositId: result.identifiers[0].id,
});
} catch (error) {
if (error instanceof QueryFailedError && error.driverError?.code === "23505") {
// Ignore duplicate key value violates unique constraint error.
this.logger.warn(error);
} else {
throw error;
}
}
}
}
}

public matchDepositEventsWithSwapEvents(
depositEventsByTxHash: { [txHash: string]: FundsDepositedV3Event[] },
swapEventsByTxHash: { [txHash: string]: SwapBeforeBridgeEvent[] },
wethEventsByTxHash: { [txHash: string]: any[] },
chainId: number,
) {
const events: { [txHash: string]: EnrichedDepositEvent[] } = {};

for (const transactionHash of Object.keys(depositEventsByTxHash)) {
const pairs: [FundsDepositedV3Event, SwapBeforeBridgeEvent | undefined][] = [];
const depositEvents = depositEventsByTxHash[transactionHash].sort((d1, d2) => d1.logIndex - d2.logIndex);
const swapEvents = swapEventsByTxHash[transactionHash].sort((s1, s2) => s1.logIndex - s2.logIndex);
const wethEvents = wethEventsByTxHash[transactionHash].sort((s1, s2) => s1.logIndex - s2.logIndex);

for (const depositEvent of depositEvents) {
const spokePoolAddresses = this.appConfig.values.web3.spokePoolContracts[chainId].map((sp) => sp.address);
const wethEventIndex = wethEvents.findIndex((e) => {
if (chainId === ChainIds.mainnet) {
const typedEvent = e as WethDepositEvent;
return (
e.logIndex < depositEvent.logIndex &&
typedEvent.args.wad.eq(depositEvent.args.inputAmount) &&
spokePoolAddresses.includes(typedEvent.args.dst)
);
} else if (chainId === ChainIds.optimism) {
const typedEvent = e as WethDepositEventOptimism;
return (
e.logIndex < depositEvent.logIndex &&
typedEvent.args.wad.eq(depositEvent.args.inputAmount) &&
spokePoolAddresses.includes(typedEvent.args.dst)
);
} else if (chainId === ChainIds.arbitrum) {
const typedEvent = e as WethTransferEventArbitrum;
return (
e.logIndex < depositEvent.logIndex &&
typedEvent.args.value.eq(depositEvent.args.inputAmount) &&
spokePoolAddresses.includes(typedEvent.args.to)
);
Comment on lines +291 to +295
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we need to also add the check

typedEvent.args.from === ZERO_ADDRESS

? So that we don't consider a normal WETH transfer but rather a "deposit", i.e. mint.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spokePoolAddresses.includes(typedEvent.args.to) checks if the recipient of WETH is the SpokePool contract. Do you think it's necessary to also check from address? I cannot see a case when someone sends WETH intentionally to the SpokePool followed by a deposit

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, you are right. Should be very unlikely

} else if (chainId === ChainIds.linea) {
const typedEvent = e as WethDepositEventLinea;
return (
e.logIndex < depositEvent.logIndex &&
typedEvent.args.wad.eq(depositEvent.args.inputAmount) &&
spokePoolAddresses.includes(typedEvent.args.dst)
);
} else if (chainId === ChainIds.base) {
const typedEvent = e as WethDepositEventBase;
return (
e.logIndex < depositEvent.logIndex &&
typedEvent.args.wad.eq(depositEvent.args.inputAmount) &&
spokePoolAddresses.includes(typedEvent.args.dst)
);
} else {
throw new Error(`Unkown match criteria for weth event on chainId ${chainId}`);
}
});

const swapEventIndex = swapEvents.findIndex((e) => e.logIndex < depositEvent.logIndex);
events[transactionHash] = [
...(events[transactionHash] || []),
{
depositEvent,
swapEvent: swapEventIndex >= 0 ? swapEvents[swapEventIndex] : undefined,
wethEvent: wethEventIndex >= 0 ? wethEvents[wethEventIndex] : undefined,
},
];

if (wethEventIndex >= 0) {
wethEvents.splice(wethEventIndex, 1);
}

if (swapEventIndex >= 0) {
pairs.push([depositEvent, swapEvents[swapEventIndex]]);
swapEvents.splice(swapEventIndex, 1);
} else {
pairs.push([depositEvent, undefined]);
}
}
}

for (const [depositEvent, swapEvent] of pairs) {
try {
const deposit = await this.fromFundsDepositedV3EventToDeposit(chainId, depositEvent, swapEvent);
const result = await this.depositRepository.insert(deposit);
await this.scraperQueuesService.publishMessage<BlockNumberQueueMessage>(ScraperQueue.BlockNumber, {
depositId: result.identifiers[0].id,
});
} catch (error) {
if (error instanceof QueryFailedError && error.driverError?.code === "23505") {
// Ignore duplicate key value violates unique constraint error.
this.logger.warn(error);
} else {
throw error;
}
}
return events;
}

private async extractSwapEvents(
depositEventsByTxHash: { [txHash: string]: FundsDepositedV3Event[] },
chainId: number,
) {
const swapEventsByTxHash: { [txHash: string]: SwapBeforeBridgeEvent[] } = {};

for (const transactionHash of Object.keys(depositEventsByTxHash)) {
const txReceipt = await this.providers.getCachedTransactionReceipt(chainId, transactionHash);
const swapBeforeBridgeEvents = this.providers.parseTransactionReceiptLogs(
txReceipt,
"SwapBeforeBridge",
SwapAndBridgeAbi,
) as unknown as SwapBeforeBridgeEvent[];
swapEventsByTxHash[transactionHash] = swapBeforeBridgeEvents;
}
return swapEventsByTxHash;
}

private async extractWethEvents(
depositEventsByTxHash: { [txHash: string]: FundsDepositedV3Event[] },
chainId: number,
) {
const wethEventsByTxHash: { [txHash: string]: any[] } = {};
const supportedChainIds = [ChainIds.mainnet, ChainIds.optimism, ChainIds.arbitrum, ChainIds.linea, ChainIds.base];

if (!supportedChainIds.includes(chainId)) {
for (const transactionHash of Object.keys(depositEventsByTxHash)) {
wethEventsByTxHash[transactionHash] = [];
}
return wethEventsByTxHash;
}

const eventName = this.getWethEventNameByChainId(chainId);
const abi = this.getWethContractAbiByChainId(chainId);

for (const transactionHash of Object.keys(depositEventsByTxHash)) {
const txReceipt = await this.providers.getCachedTransactionReceipt(chainId, transactionHash);
const wethEvents = this.providers.parseTransactionReceiptLogs(txReceipt, eventName, abi) as any;
wethEventsByTxHash[transactionHash] = wethEvents;
}

return wethEventsByTxHash;
}

private getWethContractAbiByChainId(chainId: number) {
let abi = undefined;
switch (chainId) {
case ChainIds.mainnet:
abi = WethMainnetAbi;
break;
case ChainIds.optimism:
abi = WethOptimismAbi;
break;
case ChainIds.arbitrum:
abi = WethArbitrumAbi;
break;
case ChainIds.linea:
abi = WethLineaAbi;
break;
case ChainIds.base:
abi = WethBaseAbi;
break;
default:
throw new Error(`Unkown weth event name for chainId ${chainId}`);
}
return abi;
}

private getWethEventNameByChainId(chainId: number) {
let eventName = "";

switch (chainId) {
case ChainIds.mainnet:
eventName = "Deposit";
break;
case ChainIds.optimism:
eventName = "Deposit";
break;
case ChainIds.arbitrum:
eventName = "Transfer";
break;
case ChainIds.linea:
eventName = "Deposit";
break;
case ChainIds.base:
eventName = "Deposit";
break;
default:
throw new Error(`Unkown weth event name for chainId ${chainId}`);
}
return eventName;
}

private async processFillEvents(chainId: number, events: Event[]) {
Expand Down Expand Up @@ -400,6 +576,7 @@ export class BlocksEventsConsumer {
chainId: number,
depositEvent: FundsDepositedV3Event,
swapEvent?: SwapBeforeBridgeEvent,
wethEvent?: any,
) {
const { transactionHash, blockNumber } = depositEvent;
const {
Expand All @@ -419,14 +596,14 @@ export class BlocksEventsConsumer {
const wei = BigNumber.from(10).pow(18);
const feePct = inputAmount.eq(0) ? BigNumber.from(0) : wei.sub(outputAmount.mul(wei).div(inputAmount));
const txReceipt = await this.providers.getCachedTransactionReceipt(chainId, transactionHash);
const swapToken = swapEvent ? await this.providers.getCachedToken(chainId, swapEvent.args.swapToken) : undefined;
let trueDepositor = depositor;
let exclusivityDeadlineDate = undefined;

if (exclusivityDeadline > 0) exclusivityDeadlineDate = new Date(exclusivityDeadline * 1000);
if (depositor === SPOKE_POOL_VERIFIER_CONTRACT_ADDRESS) {
trueDepositor = txReceipt.from;
}
const swapTokenValues = await this.extractSwapTokenValues(chainId, swapEvent, wethEvent);

return this.depositRepository.create({
depositId,
Expand All @@ -452,12 +629,62 @@ export class BlocksEventsConsumer {
relayer,
message,
// swap event properties
swapTokenId: swapToken?.id,
swapTokenAmount: swapEvent?.args.swapTokenAmount.toString(),
swapTokenAddress: swapEvent?.args.swapToken,
...swapTokenValues,
});
}

private async extractSwapTokenValues(chainId: number, swapEvent: SwapBeforeBridgeEvent, wethEvent: any) {
let swapTokenValues = {
swapTokenId: undefined,
swapTokenAmount: undefined,
swapTokenAddress: undefined,
};

if (swapEvent) {
const swapToken = await this.providers.getCachedToken(chainId, swapEvent.args.swapToken);
swapTokenValues = {
swapTokenId: swapToken.id,
swapTokenAmount: swapEvent.args.swapTokenAmount.toString(),
swapTokenAddress: swapEvent.args.swapToken,
};
} else if (wethEvent) {
if (chainId === ChainIds.mainnet) {
swapTokenValues = {
swapTokenId: undefined,
swapTokenAmount: (wethEvent as WethDepositEvent).args.wad.toString(),
swapTokenAddress: "native",
};
} else if (chainId === ChainIds.optimism) {
swapTokenValues = {
swapTokenId: undefined,
swapTokenAmount: (wethEvent as WethDepositEventOptimism).args.wad.toString(),
swapTokenAddress: "native",
};
} else if (chainId === ChainIds.arbitrum) {
swapTokenValues = {
swapTokenId: undefined,
swapTokenAmount: (wethEvent as WethTransferEventArbitrum).args.value.toString(),
swapTokenAddress: "native",
};
} else if (chainId === ChainIds.linea) {
swapTokenValues = {
swapTokenId: undefined,
swapTokenAmount: (wethEvent as WethDepositEventLinea).args.wad.toString(),
swapTokenAddress: "native",
};
} else if (chainId === ChainIds.base) {
swapTokenValues = {
swapTokenId: undefined,
swapTokenAmount: (wethEvent as WethDepositEventBase).args.wad.toString(),
swapTokenAddress: "native",
};
} else {
throw new Error(`Unkown swap token values for chainId ${chainId}`);
}
}
return swapTokenValues;
}

// private async insertRawDepositEvent(chainId: number, event: Event) {
// const typedEvent = event as FundsDepositedEvent2 | FundsDepositedEvent2_5;
// const { blockNumber, blockHash, transactionIndex, address, transactionHash, logIndex, args } = typedEvent;
Expand Down
37 changes: 37 additions & 0 deletions src/modules/web3/model/weth-events.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import { BigNumber, Event } from "ethers";

export interface WethDepositEvent extends Event {
args: [string, BigNumber] & {
dst: string;
wad: BigNumber;
};
}

export interface WethDepositEventOptimism extends Event {
args: [string, BigNumber] & {
dst: string;
wad: BigNumber;
};
}

export interface WethDepositEventLinea extends Event {
args: [string, BigNumber] & {
dst: string;
wad: BigNumber;
};
}

export interface WethDepositEventBase extends Event {
args: [string, BigNumber] & {
dst: string;
wad: BigNumber;
};
}

export interface WethTransferEventArbitrum extends Event {
args: [string, string, BigNumber] & {
from: string;
to: string;
value: BigNumber;
};
}
Loading