Skip to content

Commit

Permalink
Start to track errors properly, include swap2 properly
Browse files Browse the repository at this point in the history
  • Loading branch information
af-afk committed Aug 8, 2024
1 parent 925f0f6 commit c3fba97
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 70 deletions.
54 changes: 3 additions & 51 deletions cmd/ingestor.logs.ethereum/func.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,12 @@ var FilterTopics = [][]ethCommon.Hash{{ // Matches any of these in the first top
// Entry function, using the database to determine if polling should be
// used exclusively to receive logs, polling only for catchup, or
// exclusively websockets.
func Entry(f features.F, config config.C, thirdwebFactoryAddr types.Address, shouldPoll bool, ingestorPagination int, pollWait int, c *ethclient.Client, db *gorm.DB) {
func Entry(f features.F, config config.C, thirdwebFactoryAddr types.Address, ingestorPagination int, pollWait int, c *ethclient.Client, db *gorm.DB) {
var (
seawaterAddr = ethCommon.HexToAddress(config.SeawaterAddr.String())
thirdwebAddr = ethCommon.HexToAddress(thirdwebFactoryAddr.String())
)
if shouldPoll {
IngestPolling(f, c, db,ingestorPagination, pollWait, seawaterAddr, thirdwebAddr)
} else {
IngestWebsocket(f, c, db, seawaterAddr, thirdwebAddr)
}
IngestPolling(f, c, db,ingestorPagination, pollWait, seawaterAddr, thirdwebAddr)
}

// IngestPolling by repeatedly polling the Geth RPC for changes to
Expand Down Expand Up @@ -124,55 +120,11 @@ func IngestBlockRange(f features.F, c *ethclient.Client, db *gorm.DB, seawaterAd
}
}

// IngestWebsocket from the websocket provided, using the handleLog function
// to write records found to the database. Assumes that the ethclient
// provided is a websocket. Also updates the checkpoints to track the latest block.
func IngestWebsocket(f features.F, c *ethclient.Client, db *gorm.DB, seawaterAddr, thirdwebAddr ethCommon.Address) {
filter := ethereum.FilterQuery{
Topics: FilterTopics,
}
var (
logs = make(chan ethTypes.Log)
errors = make(chan error)
)
go func() {
subscription, err := c.SubscribeFilterLogs(context.Background(), filter, logs)
if err != nil {
setup.Exitf("eth log subscription: %v", err)
}
err = <-subscription.Err()
errors <- err
}()
for {
select {
case err := <-errors:
setup.Exitf("subscription error: %v", err)
case l := <-logs:
// Figure out what kind of log this is, and then insert it into the database.
err := db.Transaction(func(db *gorm.DB) error {
if err := handleLog(db, seawaterAddr, thirdwebAddr, l); err != nil {
return fmt.Errorf("failed to handle a database log: %v", err)
}
// Update the checkpoint here. Assuming the log here's block number is the latest.
if err := updateCheckpoint(db, l.BlockNumber); err != nil {
return fmt.Errorf("failed to update a checkpoint: %v", err)
}
return nil
})
if err != nil {
setup.Exitf("failed to handle a database log: %v", err)
}
heartbeat.Pulse() // Report that we're alive.
}
}
}

func handleLog(db *gorm.DB, seawaterAddr, thirdwebAddr ethCommon.Address, l ethTypes.Log) error {
handleLogCallback(seawaterAddr, thirdwebAddr, l, func(t string, a any) error {
return handleLogCallback(seawaterAddr, thirdwebAddr, l, func(t string, a any) error {
// Use the database connection as the callback to insert this log.
return databaseInsertLog(db, t, a)
})
return nil
}
func handleLogCallback(seawaterAddr, thirdwebAddr ethCommon.Address, l ethTypes.Log, cb func(table string, l any) error) error {
var topic1, topic2, topic3 ethCommon.Hash
Expand Down
37 changes: 35 additions & 2 deletions cmd/ingestor.logs.ethereum/func_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,11 +168,11 @@ func TestHandleLogCallbackSwap1(t *testing.T) {
assert.Equalf(t, "events_seawater_swap1", table, "table not equal")
// This test is captured in a unit test, so we can focus on just testing
// this one field.
newPool, ok := a.(*seawater.Swap1)
swap1, ok := a.(*seawater.Swap1)
assert.Truef(t, ok, "Swap1 type coercion not true")
assert.Equalf(t,
types.AddressFromString("0xFEb6034FC7dF27dF18a3a6baD5Fb94C0D3dCb6d5"),
newPool.User,
swap1.User,
"token not equal",
)
wasRun = true
Expand All @@ -181,6 +181,39 @@ func TestHandleLogCallbackSwap1(t *testing.T) {
assert.True(t, wasRun)
}

func TestHandleLogCallbackSwap2(t *testing.T) {
seawaterAddr := ethCommon.HexToAddress("0xE13Fec14aBFbAa5b185cFb46670A56BF072E13b1")
s := strings.NewReader(`
{
"address": "0xe13fec14abfbaa5b185cfb46670a56bf072e13b1",
"blockHash": "0xfa2557048aba87af6b0ae1a3ddd87b665cf03b208544c4b57f9cd30c06482f39",
"blockNumber": "0x760c3d",
"data": "0x00000000000000000000000000000000000000000000000ad78ebc5ac6200000000000000000000000000000000000000000000000000000000000001009539600000000000000000000000000000000000000000000000000000002f06f4a04fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffc6a9d0000000000000000000000000000000000000000000000000000000000009656",
"logIndex": "0x2",
"removed": false,
"topics": [
"0xd3593b1fa4a2b80431faf29b3fb80cd1ef82a2b65128a650c625c4ed8d1b4d92",
"0x000000000000000000000000feb6034fc7df27df18a3a6bad5fb94c0d3dcb6d5",
"0x00000000000000000000000022b9fa698b68bba071b513959794e9a47d19214c",
"0x0000000000000000000000006437fdc89ced41941b97a9f1f8992d88718c81c5"
],
"transactionHash": "0x5fedbc94388f657cbb527989af55f596665d06d68f71643c4fe41a83cfdbe643",
"transactionIndex": "0x1"
}`)
var l ethTypes.Log
assert.Nilf(t, json.NewDecoder(s).Decode(&l), "failed to decode log")
wasRun := false
err := handleLogCallback(seawaterAddr, seawaterAddr, l, func(table string, a any) error {
assert.Equalf(t, "events_seawater_swap2", table, "table not equal")
_, ok := a.(*seawater.Swap2)
assert.Truef(t, ok, "Swap2 type coercion not true")
wasRun = true
return nil
})
assert.Nil(t, err)
assert.True(t, wasRun)
}

func TestHandleLogCallbackAccountCreated(t *testing.T) {
// Test the new pool handling code.
thirdwebAddr := ethCommon.HexToAddress("0x85e23b94e7F5E9cC1fF78BCe78cfb15B81f0DF00")
Expand Down
11 changes: 1 addition & 10 deletions cmd/ingestor.logs.ethereum/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,7 @@ import (
"github.com/ethereum/go-ethereum/ethclient"
)

const (
EnvIngestorShouldPoll = "SPN_INGESTOR_SHOULD_POLL"
EnvPaginationBlockCount = "SPN_INGESTOR_PAGINATION_BLOCK_COUNT"
EnvPaginationPollWait = "SPN_INGESTOR_PAGINATION_POLL_WAIT"
EnvThirdwebAddr = "SPN_THIRDWEB_ACCOUNT_FACTORY_ADDR"
)
const EnvThirdwebAddr = "SPN_THIRDWEB_ACCOUNT_FACTORY_ADDR"

const (
// DefaultPaginationBlockCountMin to use as the minimum number of blocks
Expand Down Expand Up @@ -55,9 +50,6 @@ func main() {
setup.Exitf("websocket dial: %v", err)
}
defer c.Close()
/* Ingestor-specific configuration. */
ingestorShouldPoll := os.Getenv(EnvIngestorShouldPoll) != ""
slog.Info("ingestor should poll?", "status", ingestorShouldPoll)
ingestorPagination := rand.Intn(DefaultPaginationBlockCountMax-DefaultPaginationBlockCountMin) + DefaultPaginationBlockCountMin
slog.Info("polling configuration",
"poll wait time amount", DefaultPaginationPollWait,
Expand All @@ -70,7 +62,6 @@ func main() {
features.Get(),
config,
thirdwebFactoryAddr,
ingestorShouldPoll,
ingestorPagination,
DefaultPaginationPollWait,
c,
Expand Down
8 changes: 4 additions & 4 deletions lib/events/seawater/seawater.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,11 +188,11 @@ func UnpackSwap2(topic1, topic2, topic3 ethCommon.Hash, d []byte) (*Swap2, error
if !ok {
return nil, fmt.Errorf("bad fluidVolume: %T", i[3])
}
finalTick0, ok := i[3].(*big.Int)
finalTick0, ok := i[3].(int32)
if !ok {
return nil, fmt.Errorf("bad finalTick0: %T", i[4])
}
finalTick1, ok := i[4].(*big.Int)
finalTick1, ok := i[4].(int32)
if !ok {
return nil, fmt.Errorf("bad finalTick1: %T", i[5])
}
Expand All @@ -203,8 +203,8 @@ func UnpackSwap2(topic1, topic2, topic3 ethCommon.Hash, d []byte) (*Swap2, error
AmountIn: types.UnscaledNumberFromBig(amountIn),
AmountOut: types.UnscaledNumberFromBig(amountOut),
FluidVolume: types.UnscaledNumberFromBig(fluidVolume),
FinalTick0: types.NumberFromBig(finalTick0),
FinalTick1: types.NumberFromBig(finalTick1),
FinalTick0: types.NumberFromInt64(int64(finalTick0)),
FinalTick1: types.NumberFromInt64(int64(finalTick1)),
}, nil
}

Expand Down
6 changes: 3 additions & 3 deletions lib/events/seawater/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,9 @@ type (
Swap2 struct {
events.Event

User types.Address `json:"user_"`
From types.Address `json:"from_"`
To types.Address `json:"to_"`
User types.Address `gorm:"column:user_"`
From types.Address `gorm:"column:from_"`
To types.Address `gorm:"column:to_"`
AmountIn types.UnscaledNumber `json:"amountIn"`
AmountOut types.UnscaledNumber `json:"amountOut"`
FluidVolume types.UnscaledNumber `json:"fluidVolume"`
Expand Down

0 comments on commit c3fba97

Please sign in to comment.