Skip to content

Commit

Permalink
use multicall3 to batch snapshotter positionLiquidity calls
Browse files Browse the repository at this point in the history
  • Loading branch information
eli-d committed Sep 3, 2024
1 parent fa12586 commit 148cc8c
Show file tree
Hide file tree
Showing 7 changed files with 236 additions and 63 deletions.
2 changes: 1 addition & 1 deletion cmd/ingestor.logs.ethereum/func.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func Entry(f features.F, config config.C, thirdwebFactoryAddr, leoAddr types.Add
var (
seawaterAddr = ethCommon.HexToAddress(config.SeawaterAddr.String())
thirdwebAddr = ethCommon.HexToAddress(thirdwebFactoryAddr.String())
leoAddr_ =ethCommon.HexToAddress(leoAddr.String())
leoAddr_ = ethCommon.HexToAddress(leoAddr.String())
)
IngestPolling(
f,
Expand Down
8 changes: 5 additions & 3 deletions cmd/snapshot.ethereum/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ type PoolDetails struct {

var Zero = new(big.Int)

const multicallAddr = "0xcA11bde05977b3631167028862bE2a173976CA11"

func main() {
defer setup.Flush()
config := config.Get()
Expand All @@ -50,7 +52,7 @@ func main() {
if err != nil {
setup.Exitf("seawater positions scan: %v", err)
}
slog.Debug("positions we're about to scan", "positions", positions)
slog.Debug("number of positions", "n", len(positions))
var poolDetails []PoolDetails
// Get the decimals for each unique pool.
err = db.Table("seawater_final_ticks_decimals_1").
Expand Down Expand Up @@ -80,7 +82,7 @@ func main() {
d := packRpcPosData(config.SeawaterAddr.String(), positionMap)
// Request from the RPC the batched lookup of this data.
// Makes multiple requests if the request size exceeds the current restriction.
resps, err := reqPositions(context.Background(), config.GethUrl, d, httpPost)
resps, err := reqPositions(context.Background(), config.GethUrl, d, len(positions), httpPost)
if err != nil {
setup.Exitf("positions request: %v", err)
}
Expand All @@ -95,7 +97,7 @@ func main() {
pos, ok := positionMap[r.Key]
if !ok {
slog.Info("position doesn't have any liquidity",
"position id", pos.Id,
"position id", r.Key,
)
continue
}
Expand Down
156 changes: 109 additions & 47 deletions cmd/snapshot.ethereum/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,28 +9,29 @@ import (
"io"
"log/slog"
"math/big"
"math/rand"
"strconv"
"strings"

"github.com/fluidity-money/long.so/lib/events/multicall"
"github.com/fluidity-money/long.so/lib/types"
"github.com/fluidity-money/long.so/lib/types/seawater"

"github.com/ethereum/go-ethereum/common"
ethCommon "github.com/ethereum/go-ethereum/common"
)

const (
// MinBatchLimit to get from the server before using multiple batches.
// This is the minimum that will be used.
MinBatchLimit = 100
// BatchLimit of calls to pack into a single JSON-RPC request.
// External RPC currently has a hard limit at 50/second.
BatchLimit = 18

// MaxBatchLimit to get from the server before using multiple
// batches. Half of the maximum amount since upstream started to
// choke.
MaxBatchLimit = 200
// MulticallBatchLimit for the number of calls to pack into each
// aggregate3 call. Exceeding 500 leads to issues with calldata size.
MulticallBatchLimit = 500

// WorkerCount of simultaneous requests that can be made max.
WorkerCount = 100
// External RPC currently has a hard limit at 50/second, so given that we
// batch 50 requests in each, we don't need additional workers for processing
WorkerCount = 1
)

type (
Expand All @@ -55,27 +56,47 @@ type (
}
)

// packRpcPosData by concatenating the pool address with the position id, so
// we can quickly unpack it later. Assumes poolAddr, and ammAddr, are
// correctly formatted (0x[A-Za-z0-9]{40}).
func packRpcPosData(ammAddr string, positions map[string]seawater.Position) (req []rpcReq) {
req = make([]rpcReq, len(positions))
i := 0
for k, p := range positions {
s := getCalldata(p.Pool, p.Id)
req[i] = rpcReq{
JsonRpc: "2.0",
Id: k,
Method: "eth_call",
Params: []any{
map[string]string{
"to": ammAddr,
"data": s,
},
"latest",
},
// packRpcPosData to pack positions into an array of JSON-RPC requests.
// each request is a multicall request with size MulticallBatchLimit.
func packRpcPosData(ammAddr string, positions map[string]seawater.Position) (reqs []rpcReq) {
calls := make([]multicall.AggregateCall3, MulticallBatchLimit)
var (
// loop iterator as positions is a map
i = 0
// index of the current call within the multicall request
c = 0
// ID contains position id and pool
// padded to allow looking up positions by offset
id = ""
)
for _, p := range positions {
calls[c] = multicall.AggregateCall3{
Target: common.HexToAddress(ammAddr),
CallData: getCalldata(p.Pool, p.Id),
}
id += encodeRpcId(p)
i++
c++
if c == MulticallBatchLimit || i == len(positions) {
calldata, err := multicall.PackAggregate3(calls[:c])
if err != nil {
panic(err)
}
reqs = append(reqs, rpcReq{
JsonRpc: "2.0",
Id: id,
Method: "eth_call",
Params: []any{
map[string]string{
"to": multicallAddr,
"data": "0x" + hex.EncodeToString(calldata),
},
"latest",
},
})
c = 0
id = ""
}
}
return
}
Expand All @@ -88,7 +109,7 @@ type HttpReqFn func(url, contentType string, r io.Reader) (io.ReadCloser, error)
// request is above the batch limit. If it encounters a situation where
// anything is returned in error, it sends a done message to all the
// Goroutines after attempting to drain them for 5 seconds.
func reqPositions(ctx context.Context, url string, reqs []rpcReq, makeReq HttpReqFn) ([]posResp, error) {
func reqPositions(ctx context.Context, url string, reqs []rpcReq, posCount int, makeReq HttpReqFn) ([]posResp, error) {
var (
chanReqs = make(chan []rpcReq)
chanResps = make(chan posResp)
Expand All @@ -97,13 +118,11 @@ func reqPositions(ctx context.Context, url string, reqs []rpcReq, makeReq HttpRe
)
// Figure out the maximum number of goroutines that we can run to
// make the requests. Scaling up accordingly.
batchLimit := rand.Intn(MaxBatchLimit-MinBatchLimit) + MinBatchLimit
slog.Info("sending requests using a randomly chosen batch limit",
batchLimit := BatchLimit
slog.Info("sending requests using a predefined batch limit",
"batch limit", batchLimit,
)
frames := len(reqs) / batchLimit
workerCount := max(frames, WorkerCount)
for i := 0; i < workerCount; i++ {
for i := 0; i < WorkerCount; i++ {
go func() {
for {
select {
Expand Down Expand Up @@ -137,17 +156,39 @@ func reqPositions(ctx context.Context, url string, reqs []rpcReq, makeReq HttpRe
chanErrs <- fmt.Errorf(`error reported: %v`, err)
return
}
delta, err := types.NumberFromHex(strings.TrimPrefix(p.Result, "0x"))
resultBytes, err := hex.DecodeString(p.Result[2:])
if err != nil {
chanErrs <- fmt.Errorf(`decoding hex: %v`, err)
return
}
unpacked, err := multicall.UnpackAggregate3(resultBytes)
if err != nil {
chanErrs <- fmt.Errorf("unpacking delta: %#v: %v", p, err)
chanErrs <- fmt.Errorf(`unpacking: %v`, err)
return
}
r := posResp{p.Id, *delta}
select {
case <-chanDone:
return // Hopefully this will prevent us from going through the rest.
case chanResps <- r:
// Do nothing. We sent.
for i, result := range unpacked {
if !result.Success {
chanErrs <- fmt.Errorf(`external call failure: %v`, result)
return
}
h := hex.EncodeToString(result.ReturnData)
delta, err := types.NumberFromHex(h)
if err != nil {
chanErrs <- fmt.Errorf("unpacking delta: %#v: %v", p, err)
return
}
pool, id, err := decodeRpcId(p.Id, i)
if err != nil {
chanErrs <- fmt.Errorf("unpacking pos id: %#v: %v", p, err)
return
}
r := posResp{encodeId(types.AddressFromString(pool), id), *delta}
select {
case <-chanDone:
return // Hopefully this will prevent us from going through the rest.
case chanResps <- r:
// Do nothing. We sent.
}
}
}
}
Expand Down Expand Up @@ -188,14 +229,14 @@ func reqPositions(ctx context.Context, url string, reqs []rpcReq, makeReq HttpRe

}
}()
resps := make([]posResp, len(reqs))
resps := make([]posResp, posCount)
sleepRoutines := func() {
for {
chanDone <- true
}
}
// Start to unpack everything/signal the worker group if we have an error.
for i := 0; i < len(reqs); i++ {
for i := 0; i < posCount; i++ {
select {
case resp := <-chanResps:
resps[i] = resp
Expand Down Expand Up @@ -229,7 +270,28 @@ func encodeId(pool types.Address, id int) string {
return fmt.Sprintf("%s%x", pool, id)
}

func getCalldata(pool types.Address, posId int) string {
// RPC ID is pool0id0pool1id1... where ID is padded to 20 characters
// to allow for relating positions to their ID via a standard offset
func encodeRpcId(p seawater.Position) string {
return p.Pool.String() + fmt.Sprintf("%020d", p.Id)
}

func decodeRpcId(p string, offset int) (string, int, error) {
const (
addrWidth = 42
idWidth = 20
)
combinedWidth := addrWidth + idWidth
pool := p[offset*combinedWidth : offset*combinedWidth+addrWidth]
idPadded := p[offset*combinedWidth+addrWidth : offset*combinedWidth+addrWidth+idWidth]
id, err := strconv.Atoi(idPadded)
if err != nil {
return "", 0, err
}
return pool, id, nil
}

func getCalldata(pool types.Address, posId int) []byte {
posIdB := new(big.Int).SetInt64(int64(posId)).Bytes()
x := append(
//positionLiquidity8D11C045(address,uint256)
Expand All @@ -239,5 +301,5 @@ func getCalldata(pool types.Address, posId int) string {
ethCommon.LeftPadBytes(posIdB, 32)...,
)...,
)
return "0x" + hex.EncodeToString(x)
return x
}
27 changes: 15 additions & 12 deletions cmd/snapshot.ethereum/rpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package main
import (
"bytes"
"context"
"encoding/hex"
"encoding/json"
"io"
"math/big"
Expand All @@ -22,9 +23,10 @@ func TestGetSlot(t *testing.T) {
types.AddressFromString("0xe984f758f362d255bd96601929970cef9ff19dd7"),
0,
)
d := "0x" + hex.EncodeToString(s)
assert.Equal(t,
"0x0000025b000000000000000000000000e984f758f362d255bd96601929970cef9ff19dd70000000000000000000000000000000000000000000000000000000000000000",
s,
d,
"calldata not equal",
)
}
Expand All @@ -35,7 +37,7 @@ func TestReqPositionsErr(t *testing.T) {
"": {},
})
ctx := context.TODO()
_, err := reqPositions(ctx, "", d, func(url string, contentType string, r io.Reader) (io.ReadCloser, error) {
_, err := reqPositions(ctx, "", d, 1, func(url string, contentType string, r io.Reader) (io.ReadCloser, error) {
var buf bytes.Buffer
_ = json.NewEncoder(&buf).Encode(rpcResp{
Id: "",
Expand All @@ -55,23 +57,24 @@ func TestReqPositionsSinglePosition(t *testing.T) {
Pool: types.AddressFromString("0xe984f758f362d255bd96601929970cef9ff19dd7"),
}
id := encodeId(p.Pool, p.Id)
rpcId := encodeRpcId(p)
d := packRpcPosData("", map[string]seawater.Position{id: p})
pool, posId, ok := decodeId(id)
assert.Equalf(t, p.Pool, pool, "pool not decoded")
assert.EqualValuesf(t, p.Pool, pool, "pool not decoded")
assert.Equalf(t, p.Id, posId, "id not decoded")
assert.Truef(t, ok, "decode id function not working")
r, err := reqPositions(ctx, "", d, func(url string, contentType string, r io.Reader) (io.ReadCloser, error) {
r, err := reqPositions(ctx, "", d, 1, func(url string, contentType string, r io.Reader) (io.ReadCloser, error) {
var buf bytes.Buffer
_ = json.NewEncoder(&buf).Encode([]rpcResp{{
Id: id,
Result: "0x00000000000000000000000000000000000000000000000000000000091c2e55",
Id: rpcId,
Result: "0x00000000000000000000000000000000000000000000000000000000000000200000000000000000000000000000000000000000000000000000000000000001000000000000000000000000000000000000000000000000000000000000002000000000000000000000000000000000000000000000000000000000000000010000000000000000000000000000000000000000000000000000000000000040000000000000000000000000000000000000000000000000000000000000002000000000000000000000000000000000000000000000000000000000091c2e55",
Error: nil,
}})
return io.NopCloser(&buf), nil
})
assert.Nilf(t, err, "req positions errored")
expected := []posResp{{
Key: id,
Key: id,
Delta: types.NumberFromInt64(152841813),
}}
assert.Equal(t, expected, r)
Expand All @@ -97,12 +100,12 @@ func TestReqPositionsHundredThousandPositions(t *testing.T) {
resps := make(map[int]rpcResp, 100_000)
for _, p := range positions {
resps[p.Id] = rpcResp{
Id: encodeId(p.Pool, p.Id),
Result: "0x00000000000000000000000000000000000000000000000000000000091c2e55",
Id: encodeRpcId(p),
Result: "0x00000000000000000000000000000000000000000000000000000000000000200000000000000000000000000000000000000000000000000000000000000001000000000000000000000000000000000000000000000000000000000000002000000000000000000000000000000000000000000000000000000000000000010000000000000000000000000000000000000000000000000000000000000040000000000000000000000000000000000000000000000000000000000000002000000000000000000000000000000000000000000000000000000000091c2e55",
Error: nil,
}
}
posResps, err := reqPositions(ctx, "", d, func(url string, contentType string, r io.Reader) (io.ReadCloser, error) {
posResps, err := reqPositions(ctx, "", d, 1, func(url string, contentType string, r io.Reader) (io.ReadCloser, error) {
// Decode the data to see which transactions are being sent.
var buf bytes.Buffer
var reqs []rpcReq
Expand All @@ -111,7 +114,7 @@ func TestReqPositionsHundredThousandPositions(t *testing.T) {
for i, r := range reqs {
resps[i] = rpcResp{
Id: r.Id,
Result: "0x00000000000000000000000000000000000000000000000000000000091c2e55",
Result: "0x00000000000000000000000000000000000000000000000000000000000000200000000000000000000000000000000000000000000000000000000000000001000000000000000000000000000000000000000000000000000000000000002000000000000000000000000000000000000000000000000000000000000000010000000000000000000000000000000000000000000000000000000000000040000000000000000000000000000000000000000000000000000000000000002000000000000000000000000000000000000000000000000000000000091c2e55",
Error: nil,
}
}
Expand Down Expand Up @@ -151,7 +154,7 @@ func TestReqPositionsHundredThousandErrors(t *testing.T) {
Error: nil,
}
}
posResps, err := reqPositions(ctx, "", d, func(url string, contentType string, r io.Reader) (io.ReadCloser, error) {
posResps, err := reqPositions(ctx, "", d, 1, func(url string, contentType string, r io.Reader) (io.ReadCloser, error) {
// Decode the data to see which transactions are being sent.
var buf bytes.Buffer
_ = json.NewEncoder(&buf).Encode([]rpcResp{{
Expand Down
Loading

0 comments on commit 148cc8c

Please sign in to comment.