Skip to content

Commit

Permalink
Start to track events from Leo, and test (WIP)
Browse files Browse the repository at this point in the history
  • Loading branch information
af-afk committed Aug 16, 2024
1 parent 5faa640 commit 41848fe
Show file tree
Hide file tree
Showing 23 changed files with 824 additions and 194 deletions.
85 changes: 57 additions & 28 deletions cmd/ingestor.logs.ethereum/func.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,16 @@ import (
"math/big"
"time"

"github.com/fluidity-money/long.so/lib/events/erc20"
"github.com/fluidity-money/long.so/lib/events/seawater"
"github.com/fluidity-money/long.so/lib/config"
"github.com/fluidity-money/long.so/lib/setup"
"github.com/fluidity-money/long.so/lib/events/thirdweb"
"github.com/fluidity-money/long.so/lib/features"
"github.com/fluidity-money/long.so/lib/heartbeat"
"github.com/fluidity-money/long.so/lib/setup"
"github.com/fluidity-money/long.so/lib/types"
"github.com/fluidity-money/long.so/lib/events/thirdweb"

"github.com/fluidity-money/long.so/lib/events/erc20"
"github.com/fluidity-money/long.so/lib/events/leo"
"github.com/fluidity-money/long.so/lib/events/seawater"

"gorm.io/gorm"

Expand All @@ -30,6 +32,8 @@ import (
var FilterTopics = [][]ethCommon.Hash{{ // Matches any of these in the first topic position.
erc20.TopicTransfer,
thirdweb.TopicAccountCreated,
leo.TopicCampaignBalanceUpdated,
leo.TopicCampaignCreated,
seawater.TopicMintPosition,
seawater.TopicBurnPosition,
seawater.TopicTransferPosition,
Expand All @@ -44,19 +48,20 @@ var FilterTopics = [][]ethCommon.Hash{{ // Matches any of these in the first top
// Entry function, using the database to determine if polling should be
// used exclusively to receive logs, polling only for catchup, or
// exclusively websockets.
func Entry(f features.F, config config.C, thirdwebFactoryAddr types.Address, ingestorPagination int, pollWait int, c *ethclient.Client, db *gorm.DB) {
func Entry(f features.F, config config.C, thirdwebFactoryAddr, leoAddr types.Address, ingestorPagination int, pollWait int, c *ethclient.Client, db *gorm.DB) {
var (
seawaterAddr = ethCommon.HexToAddress(config.SeawaterAddr.String())
thirdwebAddr = ethCommon.HexToAddress(thirdwebFactoryAddr.String())
leoAddr_ =ethCommon.HexToAddress(leoAddr.String())
)
IngestPolling(f, c, db,ingestorPagination, pollWait, seawaterAddr, thirdwebAddr)
IngestPolling(f, c, db, ingestorPagination, pollWait, seawaterAddr, thirdwebAddr, leoAddr_)
}

// IngestPolling by repeatedly polling the Geth RPC for changes to
// receive log updates. Checks the database first to determine where the
// last point is before continuing. Assumes ethclient is HTTP.
// Uses the IngestBlockRange function to do all the lifting.
func IngestPolling(f features.F, c *ethclient.Client, db *gorm.DB, ingestorPagination, ingestorPollWait int, seawaterAddr, thirdwebAddr ethCommon.Address) {
func IngestPolling(f features.F, c *ethclient.Client, db *gorm.DB, ingestorPagination, ingestorPollWait int, seawaterAddr, thirdwebAddr, leoAddr ethCommon.Address) {
if ingestorPagination <= 0 {
panic("bad ingestor pagination")
}
Expand All @@ -72,7 +77,7 @@ func IngestPolling(f features.F, c *ethclient.Client, db *gorm.DB, ingestorPagin
"from", from,
"collecting until", to,
)
IngestBlockRange(f, c, db, seawaterAddr, thirdwebAddr, from, to)
IngestBlockRange(f, c, db, seawaterAddr, thirdwebAddr, leoAddr, from, to)
slog.Info("about to sleep before polling again",
"poll seconds", ingestorPollWait,
)
Expand All @@ -85,7 +90,7 @@ func IngestPolling(f features.F, c *ethclient.Client, db *gorm.DB, ingestorPagin
// funciton to write records found to the database. Assumes the ethclient
// provided is a HTTP client. Also updates the underlying last block it
// saw into the database checkpoints. Fatals if something goes wrong.
func IngestBlockRange(f features.F, c *ethclient.Client, db *gorm.DB, seawaterAddr, thirdwebAddr ethCommon.Address, from, to uint64) {
func IngestBlockRange(f features.F, c *ethclient.Client, db *gorm.DB, seawaterAddr, thirdwebAddr, leoAddr ethCommon.Address, from, to uint64) {
logs, err := c.FilterLogs(context.Background(), ethereum.FilterQuery{
FromBlock: new(big.Int).SetUint64(from),
ToBlock: new(big.Int).SetUint64(to),
Expand All @@ -98,7 +103,7 @@ func IngestBlockRange(f features.F, c *ethclient.Client, db *gorm.DB, seawaterAd
wasChanged := false
biggestBlockNo := from
for _, l := range logs {
if err := handleLog(db, seawaterAddr, thirdwebAddr, l); err != nil {
if err := handleLog(db, seawaterAddr, thirdwebAddr, leoAddr, l); err != nil {
return fmt.Errorf("failed to unpack log: %v", err)
}
isBiggerOrEqual := biggestBlockNo <= l.BlockNumber
Expand All @@ -120,13 +125,13 @@ func IngestBlockRange(f features.F, c *ethclient.Client, db *gorm.DB, seawaterAd
}
}

func handleLog(db *gorm.DB, seawaterAddr, thirdwebAddr ethCommon.Address, l ethTypes.Log) error {
return handleLogCallback(seawaterAddr, thirdwebAddr, l, func(t string, a any) error {
func handleLog(db *gorm.DB, seawaterAddr, thirdwebAddr, leoAddr ethCommon.Address, l ethTypes.Log) error {
return handleLogCallback(seawaterAddr, thirdwebAddr, leoAddr, l, func(t string, a any) error {
// Use the database connection as the callback to insert this log.
return databaseInsertLog(db, t, a)
})
}
func handleLogCallback(seawaterAddr, thirdwebAddr ethCommon.Address, l ethTypes.Log, cb func(table string, l any) error) error {
func handleLogCallback(seawaterAddr, thirdwebAddr, leoAddr ethCommon.Address, l ethTypes.Log, cb func(table string, l any) error) error {
var topic1, topic2, topic3 ethCommon.Hash
topic0 := l.Topics[0]
if len(l.Topics) > 1 {
Expand Down Expand Up @@ -176,10 +181,8 @@ func handleLogCallback(seawaterAddr, thirdwebAddr ethCommon.Address, l ethTypes.
// If the event was made by Seawater or Thirdweb. Assumed to be the case, so
// non-Seawater events should set this to false in this switch. Used to
// check the event emitter.
var (
isSeawater = true
isThirdweb = false
)
isSeawater := true // This should be invalidated by other contracts!
var isThirdweb, isLeo bool
switch topic0 {
case erc20.TopicTransfer:
a, err = erc20.UnpackTransfer(topic1, topic2, data)
Expand All @@ -194,6 +197,20 @@ func handleLogCallback(seawaterAddr, thirdwebAddr ethCommon.Address, l ethTypes.
isSeawater = false
isThirdweb = true

case leo.TopicCampaignBalanceUpdated:
a, err = leo.UnpackCampaignBalanceUpdated(topic1, topic2, topic3, data)
logEvent("CampaignBalanceCreated")
table = "events_leo_campaignbalancecreated"
isSeawater = false
isLeo = true

case leo.TopicCampaignCreated:
a, err = leo.UnpackCampaignCreated(topic1, topic2, topic3, data)
logEvent("CampaignCreated")
table = "events_leo_campaigncreated"
isSeawater = false
isLeo = true

case seawater.TopicMintPosition:
a, err = seawater.UnpackMintPosition(topic1, topic2, topic3, data)
logEvent("MintPosition")
Expand Down Expand Up @@ -245,17 +262,29 @@ func handleLogCallback(seawaterAddr, thirdwebAddr ethCommon.Address, l ethTypes.
if err != nil {
return fmt.Errorf("failed to process topic for table %#v: %v", table, err)
}
if isThirdweb {
if thirdwebAddr != emitterAddr {
slog.Warn("ignoring a Thirdweb log from a sender that wasn't thirdweb",
"seawater address", seawaterAddr,
"thirdweb address", thirdwebAddr,
"emitter address", emitterAddr,
"topic0", topic0,
"transaction hash", transactionHash,
)
return nil
}
// Skip Thirdweb if the defacto contract didn't create the event.
if isThirdweb && thirdwebAddr != emitterAddr {
slog.Warn("ignoring a Thirdweb log from a sender that wasn't Leo",
"seawater address", seawaterAddr,
"thirdweb address", thirdwebAddr,
"leo address", leoAddr,
"emitter address", emitterAddr,
"topic0", topic0,
"transaction hash", transactionHash,
)
return nil
}
// Skip Leo if the defacto contract didn't create the event.
if isLeo && leoAddr != emitterAddr {
slog.Warn("ignoring a Leo log from a sender that wasn't Leo",
"seawater address", seawaterAddr,
"thirdweb address", thirdwebAddr,
"leo address", leoAddr,
"emitter address", emitterAddr,
"topic0", topic0,
"transaction hash", transactionHash,
)
return nil
}
if isSeawater {
// Make sure that the log came from the Seawater contract.
Expand Down
15 changes: 9 additions & 6 deletions cmd/ingestor.logs.ethereum/func_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ import (
"github.com/stretchr/testify/assert"
)

// EmptyAddr for testing reasons
var EmptyAddr ethCommon.Address

func TestHandleLogCallbackNewPool(t *testing.T) {
// Test the new pool handling code.
seawaterAddr := ethCommon.HexToAddress("0x0fFC26C47FeD8C54AF2f0872cc51d79D173730a8")
Expand All @@ -37,7 +40,7 @@ func TestHandleLogCallbackNewPool(t *testing.T) {
var l ethTypes.Log
assert.Nilf(t, json.NewDecoder(s).Decode(&l), "failed to decode log")
wasRun := false
handleLogCallback(seawaterAddr, seawaterAddr, l, func(table string, a any) error {
handleLogCallback(seawaterAddr, EmptyAddr, EmptyAddr, l, func(table string, a any) error {
assert.Equalf(t, "events_seawater_newpool", table, "table not equal")
// This test is captured in a unit test, so we can focus on just testing
// this one field.
Expand Down Expand Up @@ -74,7 +77,7 @@ func TestHandleLogCallbackUpdatePositionLiquidity(t *testing.T) {
var l ethTypes.Log
assert.Nilf(t, json.NewDecoder(s).Decode(&l), "failed to decode log")
wasRun := false
handleLogCallback(seawaterAddr, seawaterAddr, l, func(table string, a any) error {
handleLogCallback(seawaterAddr, EmptyAddr, EmptyAddr, l, func(table string, a any) error {
assert.Equalf(t, "events_seawater_updatepositionliquidity", table, "table not equal")
updatePositionLiq, ok := a.(*seawater.UpdatePositionLiquidity)
assert.Truef(t, ok, "UpdatePositionLiquidity type coercion not true")
Expand Down Expand Up @@ -121,7 +124,7 @@ func TestHandleLogCallbackMintPosition(t *testing.T) {
var l ethTypes.Log
assert.Nilf(t, json.NewDecoder(s).Decode(&l), "failed to decode log")
wasRun := false
handleLogCallback(seawaterAddr, seawaterAddr, l, func(table string, a any) error {
handleLogCallback(seawaterAddr, EmptyAddr, EmptyAddr, l, func(table string, a any) error {
assert.Equalf(t, "events_seawater_mintposition", table, "table not equal")
newPool, ok := a.(*seawater.MintPosition)
assert.Truef(t, ok, "MintPosition type coercion not true")
Expand Down Expand Up @@ -157,7 +160,7 @@ func TestHandleLogCallbackSwap1(t *testing.T) {
var l ethTypes.Log
assert.Nilf(t, json.NewDecoder(s).Decode(&l), "failed to decode log")
wasRun := false
handleLogCallback(seawaterAddr, seawaterAddr, l, func(table string, a any) error {
handleLogCallback(seawaterAddr, EmptyAddr, EmptyAddr, l, func(table string, a any) error {
assert.Equalf(t, "events_seawater_swap1", table, "table not equal")
// This test is captured in a unit test, so we can focus on just testing
// this one field.
Expand Down Expand Up @@ -196,7 +199,7 @@ func TestHandleLogCallbackSwap2(t *testing.T) {
var l ethTypes.Log
assert.Nilf(t, json.NewDecoder(s).Decode(&l), "failed to decode log")
wasRun := false
err := handleLogCallback(seawaterAddr, seawaterAddr, l, func(table string, a any) error {
err := handleLogCallback(seawaterAddr, EmptyAddr, EmptyAddr, l, func(table string, a any) error {
assert.Equalf(t, "events_seawater_swap2", table, "table not equal")
_, ok := a.(*seawater.Swap2)
assert.Truef(t, ok, "Swap2 type coercion not true")
Expand Down Expand Up @@ -228,7 +231,7 @@ func TestHandleLogCallbackAccountCreated(t *testing.T) {
var l ethTypes.Log
wasRun := false
assert.Nilf(t, json.NewDecoder(s).Decode(&l), "failed to decode log")
handleLogCallback(thirdwebAddr, thirdwebAddr, l, func(table string, a any) error {
handleLogCallback(EmptyAddr, thirdwebAddr, EmptyAddr, l, func(table string, a any) error {
assert.Equalf(t, "events_thirdweb_accountcreated", table, "table not equal")
accountCreated, ok := a.(*thirdweb.AccountCreated)
assert.Truef(t, ok, "AccountCreated type coercion not true")
Expand Down
11 changes: 10 additions & 1 deletion cmd/ingestor.logs.ethereum/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ import (
"github.com/ethereum/go-ethereum/ethclient"
)

const EnvThirdwebAddr = "SPN_THIRDWEB_ACCOUNT_FACTORY_ADDR"
const (
EnvThirdwebAddr = "SPN_THIRDWEB_ACCOUNT_FACTORY_ADDR"
EnvLeoAddr = "SPN_LEO_ADDR"
)

const (
// DefaultPaginationBlockCountMin to use as the minimum number of blocks
Expand Down Expand Up @@ -62,10 +65,16 @@ func main() {
setup.Exitf("%v not set", EnvThirdwebAddr)
}
thirdwebFactoryAddr := types.AddressFromString(thirdwebFactoryAddr_)
leoAddr_ := os.Getenv(EnvLeoAddr)
if leoAddr_ == "" {
setup.Exitf("%v not set", EnvLeoAddr)
}
leoAddr := types.AddressFromString(leoAddr_)
Entry(
features.Get(),
config,
thirdwebFactoryAddr,
leoAddr,
ingestorPagination,
DefaultPaginationPollWait,
c,
Expand Down
35 changes: 35 additions & 0 deletions db/migrations/1723693657-events_leo.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
-- migrate:up

CREATE TABLE events_leo_campaignbalanceupdated (
id SERIAL PRIMARY KEY,
created_by TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP,
block_hash HASH NOT NULL,
transaction_hash HASH NOT NULL,
block_number INTEGER NOT NULL,
emitter_addr ADDRESS NOT NULL,

identifier VARCHAR NOT NULL,
new_maximum HUGEINT NOT NULL
);

CREATE TABLE events_leo_campaigncreated (
id SERIAL PRIMARY KEY,
created_by TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP,
block_hash HASH NOT NULL,
transaction_hash HASH NOT NULL,
block_number INTEGER NOT NULL,
emitter_addr ADDRESS NOT NULL,

identifier VARCHAR NOT NULL,
pool ADDRESS NOT NULL,
token ADDRESS NOT NULL,
tick_lower INTEGER NOT NULL,
tick_upper INTEGER NOT NULL,
owner ADDRESS NOT NULL,
starting TIMESTAMP WITHOUT TIME ZONE,
ending TIMESTAMP WITHOUT TIME ZONE
);

CREATE UNIQUE INDEX ON events_leo_campaigncreated (identifier, pool);

-- migrate:down
52 changes: 28 additions & 24 deletions lib/events/leo/leo.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,21 @@ var abiBytes []byte

var abi, _ = ethAbi.JSON(bytes.NewReader(abiBytes))

func hashToNumber(h ethCommon.Hash) types.Number {
func hashToUnscaledNumber(h ethCommon.Hash) types.UnscaledNumber {
// Always assumes the hash is well-formed.
return types.NumberFromBig(h.Big())
return types.UnscaledNumberFromBig(h.Big())
}

func UnpackCampaignBalanceUpdated(topic1, topic2, topic3 ethCommon.Hash, d []byte) (*CampaignBalanceUpdated, error) {
return &CampaignBalanceUpdated{
Identifier: hashToBytes8Data(topic1),
NewMaximum: hashToNumber(topic2),
NewMaximum: hashToUnscaledNumber(topic2),
}, nil
}

// UnpackCampaignCreated events, also unpacking the packed "details"
// field to some of the extra bits of information we track
func UnpackCampaignCreated(topic1, topic2, topic3 ethCommon.Hash, d []byte) (*CampaignBalanceUpdated, error) {
func UnpackCampaignCreated(topic1, topic2, topic3 ethCommon.Hash, d []byte) (*CampaignCreated, error) {
i, err := abi.Unpack("CampaignCreated", d)
if err != nil {
return nil, err
Expand All @@ -49,35 +49,39 @@ func UnpackCampaignCreated(topic1, topic2, topic3 ethCommon.Hash, d []byte) (*Ca
if !ok {
return nil, fmt.Errorf("bad times: %T", i[1])
}
tickLower, tickUpper, owner, err := unpackDetails(details)
if err != nil {
return nil, err
}
starting, ending, err := unpackTimes(times)
if err != nil {
return nil, err
}
tickLower, tickUpper, owner := unpackDetails(details)
starting, ending := unpackTimes(times)
return &CampaignCreated{
Identifier: hashToBytes8Data(topic1),
Pool: hashToAddress(topic2),
Token: hashToAddress(topic3),
TickLower: tickLower,
TickUpper: tickUpper,
Owner: owner,
Starting: starting,
Ending: ending,
Pool: hashToAddr(topic2),
Token: hashToAddr(topic3),
TickLower: tickLower,
TickUpper: tickUpper,
Owner: owner,
Starting: starting,
Ending: ending,
}, nil
}

func hashToBytes8Data(t ethCommon.Hash) types.Data {
b := t.Bytes()[:8]
b := t.Bytes()[:5]
return types.DataFromBytes(b)
}

func unpackDetails(i *big.Int) (tickLower int32, tickUpper int32, owner types.Address, err error) {

func hashToAddr(h ethCommon.Hash) types.Address {
v := ethCommon.BytesToAddress(h.Bytes())
return types.AddressFromString(v.String())
}

func unpackTimes(i int128) (starting uint64, ending uint64, err error) {
func unpackDetails(i *big.Int) (tickLower int32, tickUpper int32, owner types.Address) {
tickLower = int32(new(big.Int).Rsh(i, 32 + 160).Int64())
tickUpper = int32(new(big.Int).Rsh(i, 160).Int64())
owner = types.AddressFromString(ethCommon.BigToAddress(i).String())
return
}

}
func unpackTimes(i *big.Int) (starting uint64, ending uint64) {
starting = new(big.Int).Rsh(i, 64).Uint64()
ending = i.Uint64()
return
}
Loading

0 comments on commit 41848fe

Please sign in to comment.