-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathconsumer.go
168 lines (154 loc) · 6.85 KB
/
consumer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
package main
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"math/big"
"regexp"
"strings"
log "github.com/inconshreveable/log15"
streamsTransports "github.com/openrelayxyz/cardinal-streams/transports"
"github.com/openrelayxyz/cardinal-types"
"github.com/openrelayxyz/cardinal-rpc"
"github.com/openrelayxyz/cardinal-types/hexutil"
"github.com/openrelayxyz/cardinal-flume/config"
"github.com/openrelayxyz/cardinal-flume/heavy"
"github.com/openrelayxyz/cardinal-flume/plugins"
)
var trackedPrefixes = []*regexp.Regexp{
regexp.MustCompile("c/[0-9a-z]+/b/[0-9a-z]+/h"),
regexp.MustCompile("c/[0-9a-z]+/b/[0-9a-z]+/d"),
regexp.MustCompile("c/[0-9a-z]+/b/[0-9a-z]+/u"),
regexp.MustCompile("c/[0-9a-z]+/b/[0-9a-z]+/t/"),
regexp.MustCompile("c/[0-9a-z]+/b/[0-9a-z]+/r/"),
regexp.MustCompile("c/[0-9a-z]+/b/[0-9a-z]+/l/"),
regexp.MustCompile("c/[0-9a-z]+/b/[0-9a-z]+/w"),
}
func deliverConsumer(brokerParams []streamsTransports.BrokerParams, resumption string, reorgThreshold, resumptionTime, lastNumber int64, lastHash, lastWeight []byte, tp []*regexp.Regexp, wl map[uint64]types.Hash) (streamsTransports.Consumer, error) { // brokerParams := cfg.BrokerParams
rt := []byte(resumption)
if resumptionTime > 0 {
r, err := streamsTransports.ResumptionForTimestamp(brokerParams, resumptionTime)
if err != nil {
log.Warn("Could not load resumption from timestamp:", "error", err.Error())
} else {
rt = r
}
}
return streamsTransports.ResolveMuxConsumer(brokerParams, rt, lastNumber, types.BytesToHash(lastHash), new(big.Int).SetBytes(lastWeight), reorgThreshold, tp, wl)
}
func AcquireConsumer(db *sql.DB, cfg *config.Config, resumptionTime int64, useBlockTime bool, pl *plugins.PluginLoader) (streamsTransports.Consumer, error) {
brokerParams := cfg.BrokerParams
reorgThreshold := cfg.ReorgThreshold
var err error
var tableName string
ptpi := pl.Lookup("TrackedPrefixes", func(v interface{}) bool {
_, ok := v.(*[]*regexp.Regexp)
return ok
})
ptp := []*regexp.Regexp{}
for _, v := range ptpi {
if prefixes, ok := v.(*[]*regexp.Regexp); ok {
ptp = append(ptp, (*prefixes)...)
}
}
db.QueryRowContext(context.Background(), "SELECT name FROM blocks.sqlite_master WHERE type='table' and name='cardinal_offsets';").Scan(&tableName)
if tableName != "cardinal_offsets" {
if _, err = db.Exec("CREATE TABLE blocks.cardinal_offsets (partition INT, offset BIGINT, topic STRING, PRIMARY KEY (topic, partition));"); err != nil {
return nil, err
}
}
startOffsets := []string{}
for _, broker := range brokerParams {
for _, topic := range broker.Topics {
var partition int32
var offset int64
rows, err := db.QueryContext(context.Background(), "SELECT partition, offset FROM cardinal_offsets WHERE topic = ?;", topic)
if err != nil {
return nil, err
}
for rows.Next() {
if err := rows.Scan(&partition, &offset); err != nil {
return nil, err
}
startOffsets = append(startOffsets, fmt.Sprintf("%v:%v=%v", topic, partition, offset))
}
}
}
resumption := strings.Join(startOffsets, ";")
var lastHash, lastWeight []byte
var lastNumber, timestamp int64
db.QueryRowContext(context.Background(), "SELECT max(number), hash, td, time FROM blocks;").Scan(&lastNumber, &lastHash, &lastWeight, ×tamp)
if cfg.LightSeed != 0 {
lastNumber = cfg.LightSeed
log.Info("flume light service initiated from lightSeed value", "block", cfg.LightSeed)
consumer, err := deliverConsumer(brokerParams, resumption, reorgThreshold, resumptionTime, lastNumber, lastHash, lastWeight, append(trackedPrefixes, ptp...), cfg.WhitelistExternal)
if err != nil {
log.Error("Error constructing consumer from stand alone light instance with seeded inital block", "err", err.Error())
return nil, err
}
return consumer, nil
}
if len(cfg.HeavyServer) > 0 && lastNumber == 0 {
highestBlock, err := heavy.CallHeavy[rpc.BlockNumber](context.Background(), cfg.HeavyServer, "eth_blockNumber")
if err != nil {
log.Info("Failed to connect with heavy server, flume light service initiated from most recent block")
consumer, err := deliverConsumer(brokerParams, resumption, reorgThreshold, resumptionTime, lastNumber, lastHash, lastWeight, append(trackedPrefixes, ptp...), cfg.WhitelistExternal)
if err != nil {
log.Error("Error constructing consumer from stand alone light instance", "err", err.Error())
return nil, err
}
return consumer, nil
}
log.Debug("Current block aquired from heavy", "block", int64(*highestBlock))
// below both an archive configuration of flume as well as cardinal streams are relying on resumptionBlockNumber
// archive flume has tests which require a minimum of 128 block overlap between databases, and so we do a comparison
// to catch this contingency while defaulting to the reorgThreshold so as to maintain consistency in all other cases.
var resumptionBlockNumber int64
a := int64(*highestBlock) - reorgThreshold
b := int64(*highestBlock) - 129
if a < b {
resumptionBlockNumber = a
} else {
resumptionBlockNumber = b
}
resumptionBlock, err := heavy.CallHeavy[map[string]json.RawMessage](context.Background(), cfg.HeavyServer, "eth_getBlockByNumber", hexutil.Uint64(resumptionBlockNumber), false)
if err != nil {
return nil, err
}
var rb map[string]json.RawMessage = *resumptionBlock
var rT hexutil.Uint64
var lH types.Hash
var lW hexutil.Bytes
if err := json.Unmarshal(rb["totalDifficulty"], &lW); err != nil {
log.Warn("Json unmarshalling error AcquireConsumer, lightserver condition totoal difficulty", "err", err)
}
if err := json.Unmarshal(rb["hash"], &lH); err != nil {
log.Warn("Json unmarshalling error AcquireConsumer, lightserver condition hash", "err", err)
}
if err := json.Unmarshal(rb["timestamp"], &rT); err != nil {
log.Warn("Json unmarshalling error AcquireConsumer, lightserver condition timestamp", "err", err)
}
lastWeight = lW
lastNumber = resumptionBlockNumber
lastHash = lH.Bytes()
resumptionTime = int64(rT) * 1000
consumer, err := deliverConsumer(brokerParams, resumption, reorgThreshold, resumptionTime, lastNumber, lastHash, lastWeight, append(trackedPrefixes, ptp...), cfg.WhitelistExternal)
if err != nil {
log.Error("Error constructing consumer from heavy connected flume light instance", "err", err.Error())
return nil, err
}
log.Info("Flume light service initiated, beginning from block:", "number", lastNumber)
return consumer, nil
}
if resumptionTime < 0 && timestamp > 0 && useBlockTime {
resumptionTime = timestamp * 1000
}
consumer, err := deliverConsumer(brokerParams, resumption, reorgThreshold, resumptionTime, lastNumber, lastHash, lastWeight, append(trackedPrefixes, ptp...), cfg.WhitelistExternal)
if err != nil {
log.Error("Error constructing consumer from flume heavy instance", "err", err.Error())
return nil, err
}
log.Info("Flume heavey service initiated, Resuming to block", "number", lastNumber)
return consumer, nil
}