From c3fba978042a3499ba746b5c66c8cef7f6ecacde Mon Sep 17 00:00:00 2001 From: bayge Date: Fri, 9 Aug 2024 00:08:02 +0930 Subject: [PATCH] Start to track errors properly, include swap2 properly --- cmd/ingestor.logs.ethereum/func.go | 54 ++----------------------- cmd/ingestor.logs.ethereum/func_test.go | 37 ++++++++++++++++- cmd/ingestor.logs.ethereum/main.go | 11 +---- lib/events/seawater/seawater.go | 8 ++-- lib/events/seawater/types.go | 6 +-- 5 files changed, 46 insertions(+), 70 deletions(-) diff --git a/cmd/ingestor.logs.ethereum/func.go b/cmd/ingestor.logs.ethereum/func.go index a92b8711..2aef4b6c 100644 --- a/cmd/ingestor.logs.ethereum/func.go +++ b/cmd/ingestor.logs.ethereum/func.go @@ -44,16 +44,12 @@ var FilterTopics = [][]ethCommon.Hash{{ // Matches any of these in the first top // Entry function, using the database to determine if polling should be // used exclusively to receive logs, polling only for catchup, or // exclusively websockets. -func Entry(f features.F, config config.C, thirdwebFactoryAddr types.Address, shouldPoll bool, ingestorPagination int, pollWait int, c *ethclient.Client, db *gorm.DB) { +func Entry(f features.F, config config.C, thirdwebFactoryAddr types.Address, ingestorPagination int, pollWait int, c *ethclient.Client, db *gorm.DB) { var ( seawaterAddr = ethCommon.HexToAddress(config.SeawaterAddr.String()) thirdwebAddr = ethCommon.HexToAddress(thirdwebFactoryAddr.String()) ) - if shouldPoll { - IngestPolling(f, c, db,ingestorPagination, pollWait, seawaterAddr, thirdwebAddr) - } else { - IngestWebsocket(f, c, db, seawaterAddr, thirdwebAddr) - } + IngestPolling(f, c, db,ingestorPagination, pollWait, seawaterAddr, thirdwebAddr) } // IngestPolling by repeatedly polling the Geth RPC for changes to @@ -124,55 +120,11 @@ func IngestBlockRange(f features.F, c *ethclient.Client, db *gorm.DB, seawaterAd } } -// IngestWebsocket from the websocket provided, using the handleLog function -// to write records found to the database. Assumes that the ethclient -// provided is a websocket. Also updates the checkpoints to track the latest block. -func IngestWebsocket(f features.F, c *ethclient.Client, db *gorm.DB, seawaterAddr, thirdwebAddr ethCommon.Address) { - filter := ethereum.FilterQuery{ - Topics: FilterTopics, - } - var ( - logs = make(chan ethTypes.Log) - errors = make(chan error) - ) - go func() { - subscription, err := c.SubscribeFilterLogs(context.Background(), filter, logs) - if err != nil { - setup.Exitf("eth log subscription: %v", err) - } - err = <-subscription.Err() - errors <- err - }() - for { - select { - case err := <-errors: - setup.Exitf("subscription error: %v", err) - case l := <-logs: - // Figure out what kind of log this is, and then insert it into the database. - err := db.Transaction(func(db *gorm.DB) error { - if err := handleLog(db, seawaterAddr, thirdwebAddr, l); err != nil { - return fmt.Errorf("failed to handle a database log: %v", err) - } - // Update the checkpoint here. Assuming the log here's block number is the latest. - if err := updateCheckpoint(db, l.BlockNumber); err != nil { - return fmt.Errorf("failed to update a checkpoint: %v", err) - } - return nil - }) - if err != nil { - setup.Exitf("failed to handle a database log: %v", err) - } - heartbeat.Pulse() // Report that we're alive. - } - } -} - func handleLog(db *gorm.DB, seawaterAddr, thirdwebAddr ethCommon.Address, l ethTypes.Log) error { - handleLogCallback(seawaterAddr, thirdwebAddr, l, func(t string, a any) error { + return handleLogCallback(seawaterAddr, thirdwebAddr, l, func(t string, a any) error { // Use the database connection as the callback to insert this log. return databaseInsertLog(db, t, a) }) - return nil } func handleLogCallback(seawaterAddr, thirdwebAddr ethCommon.Address, l ethTypes.Log, cb func(table string, l any) error) error { var topic1, topic2, topic3 ethCommon.Hash diff --git a/cmd/ingestor.logs.ethereum/func_test.go b/cmd/ingestor.logs.ethereum/func_test.go index 9b2e0e16..13ccdf9c 100644 --- a/cmd/ingestor.logs.ethereum/func_test.go +++ b/cmd/ingestor.logs.ethereum/func_test.go @@ -168,11 +168,11 @@ func TestHandleLogCallbackSwap1(t *testing.T) { assert.Equalf(t, "events_seawater_swap1", table, "table not equal") // This test is captured in a unit test, so we can focus on just testing // this one field. - newPool, ok := a.(*seawater.Swap1) + swap1, ok := a.(*seawater.Swap1) assert.Truef(t, ok, "Swap1 type coercion not true") assert.Equalf(t, types.AddressFromString("0xFEb6034FC7dF27dF18a3a6baD5Fb94C0D3dCb6d5"), - newPool.User, + swap1.User, "token not equal", ) wasRun = true @@ -181,6 +181,39 @@ func TestHandleLogCallbackSwap1(t *testing.T) { assert.True(t, wasRun) } +func TestHandleLogCallbackSwap2(t *testing.T) { + seawaterAddr := ethCommon.HexToAddress("0xE13Fec14aBFbAa5b185cFb46670A56BF072E13b1") + s := strings.NewReader(` +{ + "address": "0xe13fec14abfbaa5b185cfb46670a56bf072e13b1", + "blockHash": "0xfa2557048aba87af6b0ae1a3ddd87b665cf03b208544c4b57f9cd30c06482f39", + "blockNumber": "0x760c3d", + "data": "0x00000000000000000000000000000000000000000000000ad78ebc5ac6200000000000000000000000000000000000000000000000000000000000001009539600000000000000000000000000000000000000000000000000000002f06f4a04fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffc6a9d0000000000000000000000000000000000000000000000000000000000009656", + "logIndex": "0x2", + "removed": false, + "topics": [ + "0xd3593b1fa4a2b80431faf29b3fb80cd1ef82a2b65128a650c625c4ed8d1b4d92", + "0x000000000000000000000000feb6034fc7df27df18a3a6bad5fb94c0d3dcb6d5", + "0x00000000000000000000000022b9fa698b68bba071b513959794e9a47d19214c", + "0x0000000000000000000000006437fdc89ced41941b97a9f1f8992d88718c81c5" + ], + "transactionHash": "0x5fedbc94388f657cbb527989af55f596665d06d68f71643c4fe41a83cfdbe643", + "transactionIndex": "0x1" +}`) + var l ethTypes.Log + assert.Nilf(t, json.NewDecoder(s).Decode(&l), "failed to decode log") + wasRun := false + err := handleLogCallback(seawaterAddr, seawaterAddr, l, func(table string, a any) error { + assert.Equalf(t, "events_seawater_swap2", table, "table not equal") + _, ok := a.(*seawater.Swap2) + assert.Truef(t, ok, "Swap2 type coercion not true") + wasRun = true + return nil + }) + assert.Nil(t, err) + assert.True(t, wasRun) +} + func TestHandleLogCallbackAccountCreated(t *testing.T) { // Test the new pool handling code. thirdwebAddr := ethCommon.HexToAddress("0x85e23b94e7F5E9cC1fF78BCe78cfb15B81f0DF00") diff --git a/cmd/ingestor.logs.ethereum/main.go b/cmd/ingestor.logs.ethereum/main.go index d721ec31..f1ae3118 100644 --- a/cmd/ingestor.logs.ethereum/main.go +++ b/cmd/ingestor.logs.ethereum/main.go @@ -20,12 +20,7 @@ import ( "github.com/ethereum/go-ethereum/ethclient" ) -const ( - EnvIngestorShouldPoll = "SPN_INGESTOR_SHOULD_POLL" - EnvPaginationBlockCount = "SPN_INGESTOR_PAGINATION_BLOCK_COUNT" - EnvPaginationPollWait = "SPN_INGESTOR_PAGINATION_POLL_WAIT" - EnvThirdwebAddr = "SPN_THIRDWEB_ACCOUNT_FACTORY_ADDR" -) +const EnvThirdwebAddr = "SPN_THIRDWEB_ACCOUNT_FACTORY_ADDR" const ( // DefaultPaginationBlockCountMin to use as the minimum number of blocks @@ -55,9 +50,6 @@ func main() { setup.Exitf("websocket dial: %v", err) } defer c.Close() - /* Ingestor-specific configuration. */ - ingestorShouldPoll := os.Getenv(EnvIngestorShouldPoll) != "" - slog.Info("ingestor should poll?", "status", ingestorShouldPoll) ingestorPagination := rand.Intn(DefaultPaginationBlockCountMax-DefaultPaginationBlockCountMin) + DefaultPaginationBlockCountMin slog.Info("polling configuration", "poll wait time amount", DefaultPaginationPollWait, @@ -70,7 +62,6 @@ func main() { features.Get(), config, thirdwebFactoryAddr, - ingestorShouldPoll, ingestorPagination, DefaultPaginationPollWait, c, diff --git a/lib/events/seawater/seawater.go b/lib/events/seawater/seawater.go index 9a5141e2..3d842fb0 100644 --- a/lib/events/seawater/seawater.go +++ b/lib/events/seawater/seawater.go @@ -188,11 +188,11 @@ func UnpackSwap2(topic1, topic2, topic3 ethCommon.Hash, d []byte) (*Swap2, error if !ok { return nil, fmt.Errorf("bad fluidVolume: %T", i[3]) } - finalTick0, ok := i[3].(*big.Int) + finalTick0, ok := i[3].(int32) if !ok { return nil, fmt.Errorf("bad finalTick0: %T", i[4]) } - finalTick1, ok := i[4].(*big.Int) + finalTick1, ok := i[4].(int32) if !ok { return nil, fmt.Errorf("bad finalTick1: %T", i[5]) } @@ -203,8 +203,8 @@ func UnpackSwap2(topic1, topic2, topic3 ethCommon.Hash, d []byte) (*Swap2, error AmountIn: types.UnscaledNumberFromBig(amountIn), AmountOut: types.UnscaledNumberFromBig(amountOut), FluidVolume: types.UnscaledNumberFromBig(fluidVolume), - FinalTick0: types.NumberFromBig(finalTick0), - FinalTick1: types.NumberFromBig(finalTick1), + FinalTick0: types.NumberFromInt64(int64(finalTick0)), + FinalTick1: types.NumberFromInt64(int64(finalTick1)), }, nil } diff --git a/lib/events/seawater/types.go b/lib/events/seawater/types.go index 3384a26f..1db8ee16 100644 --- a/lib/events/seawater/types.go +++ b/lib/events/seawater/types.go @@ -82,9 +82,9 @@ type ( Swap2 struct { events.Event - User types.Address `json:"user_"` - From types.Address `json:"from_"` - To types.Address `json:"to_"` + User types.Address `gorm:"column:user_"` + From types.Address `gorm:"column:from_"` + To types.Address `gorm:"column:to_"` AmountIn types.UnscaledNumber `json:"amountIn"` AmountOut types.UnscaledNumber `json:"amountOut"` FluidVolume types.UnscaledNumber `json:"fluidVolume"`