Skip to content
This repository has been archived by the owner on Nov 16, 2022. It is now read-only.

Commit

Permalink
Merge pull request #2569 from bandprotocol/yoda-multi-report
Browse files Browse the repository at this point in the history
yoda: Queued and combine report msgs to one transaction + Gas estimation on multi-report transaction
  • Loading branch information
taobun authored Aug 27, 2020
2 parents 0455390 + 0fdf288 commit 4a05630
Show file tree
Hide file tree
Showing 8 changed files with 219 additions and 72 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG_UNRELEASED.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

### Yoda

- (impv) [\#2569](https://github.com/bandprotocol/bandchain/pull/2569) Queued and combine report msgs to one transaction + Remove hard code gas used on report and add retry logic when transaction out of gas.

### Emitter & Flusher

- (bugs) [\#2565](https://github.com/bandprotocol/bandchain/pull/2565) cdb: Fix bug reporters table
Expand Down
2 changes: 1 addition & 1 deletion chain/scripts/start_yoda.sh
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ yoda config validator $(bandcli keys show $1 -a --bech val --keyring-backend tes
yoda config executor "rest:https://iv3lgtv11a.execute-api.ap-southeast-1.amazonaws.com/live/master?timeout=10s"

# setup broadcast-timeout to yoda config
yoda config broadcast-timeout "30s"
yoda config broadcast-timeout "5m"

# setup rpc-poll-interval to yoda config
yoda config rpc-poll-interval "1s"
Expand Down
20 changes: 19 additions & 1 deletion chain/yoda/context.go
Original file line number Diff line number Diff line change
@@ -1,24 +1,42 @@
package yoda

import (
"sync/atomic"
"time"

"github.com/cosmos/cosmos-sdk/crypto/keys"
sdk "github.com/cosmos/cosmos-sdk/types"
rpcclient "github.com/tendermint/tendermint/rpc/client"

"github.com/bandprotocol/bandchain/chain/pkg/filecache"
"github.com/bandprotocol/bandchain/chain/x/oracle/types"
"github.com/bandprotocol/bandchain/chain/yoda/executor"
)

type ReportMsgWithKey struct {
msg types.MsgReportData
execVersion []string
keyIndex int64
}

type Context struct {
client rpcclient.Client
validator sdk.ValAddress
gasPrices sdk.DecCoins
keys chan keys.Info
keys []keys.Info
executor executor.Executor
fileCache filecache.Cache
broadcastTimeout time.Duration
maxTry uint64
rpcPollInterval time.Duration
maxReport uint64

pendingMsgs chan ReportMsgWithKey
freeKeys chan int64
keyRoundRobinIndex int64 // Must use in conjunction with sync/atomic
}

func (c *Context) nextKeyIndex() int64 {
keyIndex := atomic.AddInt64(&c.keyRoundRobinIndex, 1) % int64(len(c.keys))
return keyIndex
}
19 changes: 10 additions & 9 deletions chain/yoda/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,24 @@ import (
"fmt"
"strconv"

otypes "github.com/bandprotocol/bandchain/chain/x/oracle/types"
sdk "github.com/cosmos/cosmos-sdk/types"

"github.com/bandprotocol/bandchain/chain/x/oracle/types"
)

type rawRequest struct {
dataSourceID otypes.DataSourceID
dataSourceID types.DataSourceID
dataSourceHash string
externalID otypes.ExternalID
externalID types.ExternalID
calldata string
}

// GetRawRequests returns the list of all raw data requests in the given log.
func GetRawRequests(log sdk.ABCIMessageLog) ([]rawRequest, error) {
dataSourceIDs := GetEventValues(log, otypes.EventTypeRawRequest, otypes.AttributeKeyDataSourceID)
dataSourceHashList := GetEventValues(log, otypes.EventTypeRawRequest, otypes.AttributeKeyDataSourceHash)
externalIDs := GetEventValues(log, otypes.EventTypeRawRequest, otypes.AttributeKeyExternalID)
calldataList := GetEventValues(log, otypes.EventTypeRawRequest, otypes.AttributeKeyCalldata)
dataSourceIDs := GetEventValues(log, types.EventTypeRawRequest, types.AttributeKeyDataSourceID)
dataSourceHashList := GetEventValues(log, types.EventTypeRawRequest, types.AttributeKeyDataSourceHash)
externalIDs := GetEventValues(log, types.EventTypeRawRequest, types.AttributeKeyExternalID)
calldataList := GetEventValues(log, types.EventTypeRawRequest, types.AttributeKeyCalldata)

if len(dataSourceIDs) != len(externalIDs) {
return nil, fmt.Errorf("Inconsistent data source count and external ID count")
Expand All @@ -42,9 +43,9 @@ func GetRawRequests(log sdk.ABCIMessageLog) ([]rawRequest, error) {
}

reqs = append(reqs, rawRequest{
dataSourceID: otypes.DataSourceID(dataSourceID),
dataSourceID: types.DataSourceID(dataSourceID),
dataSourceHash: dataSourceHashList[idx],
externalID: otypes.ExternalID(externalID),
externalID: types.ExternalID(externalID),
calldata: calldataList[idx],
})
}
Expand Down
194 changes: 144 additions & 50 deletions chain/yoda/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,92 +2,186 @@ package yoda

import (
"fmt"
"strings"
"time"

sdkCtx "github.com/cosmos/cosmos-sdk/client/context"
ckeys "github.com/cosmos/cosmos-sdk/client/keys"
"github.com/cosmos/cosmos-sdk/crypto/keys"
sdk "github.com/cosmos/cosmos-sdk/types"
sdkerrors "github.com/cosmos/cosmos-sdk/types/errors"
"github.com/cosmos/cosmos-sdk/version"
"github.com/cosmos/cosmos-sdk/x/auth"
"github.com/cosmos/cosmos-sdk/x/auth/client/utils"

"github.com/bandprotocol/bandchain/chain/app"
otypes "github.com/bandprotocol/bandchain/chain/x/oracle/types"
"github.com/bandprotocol/bandchain/chain/x/oracle/types"
rpcclient "github.com/tendermint/tendermint/rpc/client"
)

var (
cdc = app.MakeCodec()
)

func SubmitReport(c *Context, l *Logger, key keys.Info, id otypes.RequestID, reps []otypes.RawReport, execVersion string) {
msg := otypes.NewMsgReportData(otypes.RequestID(id), reps, c.validator, key.GetAddress())
if err := msg.ValidateBasic(); err != nil {
l.Error(":exploding_head: Failed to validate basic with error: %s", err.Error())
return
// TODO: Improve precision of equation.
// const (
// rawReportMultiplier = uint64(10)
// dataSizeMultiplier = uint64(10)
// msgReportDataConstant = uint64(10)
// txSizeConstant = uint64(10)
// baseTransaction = uint64(30000)
// )

// Constant used to estimate gas price of reports transaction.
const (
dataSizeMultiplier = uint64(50)
msgReportDataConstant = uint64(16000)
txSizeConstant = uint64(5) // Using DefaultTxSizeCostPerByte of BandChain
baseTransaction = uint64(40000)
pendingRequests = uint64(4000)
)

func estimatedReportsGas(msgs []sdk.Msg) uint64 {
est := baseTransaction
txSize := uint64(0)
for _, msg := range msgs {
msg, ok := msg.(types.MsgReportData)
if !ok {
panic("Don't support non-report data message")
}
calldataSize := uint64(0)
for _, c := range msg.RawReports {
calldataSize += uint64(len(c.Data))
}
est += dataSizeMultiplier*calldataSize + msgReportDataConstant
txSize += uint64(len(msg.GetSignBytes()))
}
return est + txSize*txSizeConstant + pendingRequests
}

func signAndBroadcast(
c *Context, key keys.Info, msgs []sdk.Msg, gasLimit uint64, memo string,
) (string, error) {
cliCtx := sdkCtx.CLIContext{Client: c.client, TrustNode: true, Codec: cdc}
txHash := ""
for try := uint64(1); try <= c.maxTry; try++ {
l.Info(":e-mail: Try to broadcast report transaction(%d/%d)", try, c.maxTry)
acc, err := auth.NewAccountRetriever(cliCtx).GetAccount(key.GetAddress())
if err != nil {
l.Info(":warning: Failed to retreive account with error: %s", err.Error())
time.Sleep(c.rpcPollInterval)
continue
}
acc, err := auth.NewAccountRetriever(cliCtx).GetAccount(key.GetAddress())
if err != nil {
return "", fmt.Errorf("Failed to retreive account with error: %s", err.Error())
}

txBldr := auth.NewTxBuilder(
auth.DefaultTxEncoder(cdc), acc.GetAccountNumber(), acc.GetSequence(),
200000, 1, false, cfg.ChainID, fmt.Sprintf("yoda:%s/exec:%s", version.Version, execVersion), sdk.NewCoins(), c.gasPrices,
)
// txBldr, err = authclient.EnrichWithGas(txBldr, cliCtx, []sdk.Msg{msg})
// if err != nil {
// l.Error(":exploding_head: Failed to enrich with gas with error: %s", err.Error())
// return
// }
out, err := txBldr.WithKeybase(keybase).BuildAndSign(key.GetName(), ckeys.DefaultKeyPass, []sdk.Msg{msg})
if err != nil {
l.Info(":warning: Failed to build tx with error: %s", err.Error())
time.Sleep(c.rpcPollInterval)
continue
txBldr := auth.NewTxBuilder(
auth.DefaultTxEncoder(cdc), acc.GetAccountNumber(), acc.GetSequence(),
gasLimit, 1, false, cfg.ChainID, memo, sdk.NewCoins(), c.gasPrices,
)
// txBldr, err = authclient.EnrichWithGas(txBldr, cliCtx, []sdk.Msg{msg})
// if err != nil {
// l.Error(":exploding_head: Failed to enrich with gas with error: %s", err.Error())
// return
// }

out, err := txBldr.WithKeybase(keybase).BuildAndSign(key.GetName(), ckeys.DefaultKeyPass, msgs)
if err != nil {
return "", fmt.Errorf("Failed to build tx with error: %s", err.Error())
}

res, err := cliCtx.BroadcastTxSync(out)
if err != nil {
return "", fmt.Errorf("Failed to broadcast tx with error: %s", err.Error())
}
return res.TxHash, nil
}

func SubmitReport(c *Context, l *Logger, keyIndex int64, reports []ReportMsgWithKey) {
// Return key when done with SubmitReport whether successfully or not.
defer func() {
c.freeKeys <- keyIndex
}()

// Summarize execute version
versionMap := make(map[string]bool)
msgs := make([]sdk.Msg, len(reports))
ids := make([]types.RequestID, len(reports))

for i, report := range reports {
if err := report.msg.ValidateBasic(); err != nil {
l.Error(":exploding_head: Failed to validate basic with error: %s", err.Error())
return
}
res, err := cliCtx.BroadcastTxSync(out)
if err == nil {
txHash = res.TxHash
break
msgs[i] = report.msg
ids[i] = report.msg.RequestID
for _, exec := range report.execVersion {
versionMap[exec] = true
}
l.Info(":warning: Failed to broadcast tx with error: %s", err.Error())
time.Sleep(c.rpcPollInterval)
}
if txHash == "" {
l.Error(":exploding_head: Cannot try to broadcast more than %d try", c.maxTry)
return
l = l.With("rids", ids)

versions := make([]string, 0, len(versionMap))
for exec := range versionMap {
versions = append(versions, exec)
}
for start := time.Now(); time.Since(start) < c.broadcastTimeout; {
time.Sleep(c.rpcPollInterval)
txRes, err := utils.QueryTx(cliCtx, txHash)
if err != nil {
l.Debug(":warning: Failed to query tx with error: %s", err.Error())
continue
memo := fmt.Sprintf("yoda:%s/exec:%s", version.Version, strings.Join(versions, ","))
key := c.keys[keyIndex]
cliCtx := sdkCtx.CLIContext{Client: c.client, TrustNode: true, Codec: cdc}
gasLimit := estimatedReportsGas(msgs)
// We want to resend transaction only if tx returns Out of gas error.
for sendAttempt := uint64(1); sendAttempt <= c.maxTry; sendAttempt++ {
var txHash string
l.Info(":e-mail: Sending report transaction attempt: (%d/%d)", sendAttempt, c.maxTry)
for broadcastTry := uint64(1); broadcastTry <= c.maxTry; broadcastTry++ {
l.Info(":writing_hand: Try to sign and broadcast report transaction(%d/%d)", broadcastTry, c.maxTry)
hash, err := signAndBroadcast(c, key, msgs, gasLimit, memo)
if err != nil {
// Use info level because this error can happen and retry process can solve this error.
l.Info(":warning: %s", err.Error())
time.Sleep(c.rpcPollInterval)
continue
}
// Transaction passed CheckTx process and wait to include in block.
txHash = hash
break
}
if txHash == "" {
l.Error(":exploding_head: Cannot try to broadcast more than %d try", c.maxTry)
return
}
txFound := false
FindTx:
for start := time.Now(); time.Since(start) < c.broadcastTimeout; {
time.Sleep(c.rpcPollInterval)
txRes, err := utils.QueryTx(cliCtx, txHash)
if err != nil {
l.Debug(":warning: Failed to query tx with error: %s", err.Error())
continue
}
switch txRes.Code {
case 0:
l.Info(":smiling_face_with_sunglasses: Successfully broadcast tx with hash: %s", txHash)
return
case sdkerrors.ErrOutOfGas.ABCICode():
// Increase gas limit and try to broadcast again
gasLimit = gasLimit * 110 / 100
l.Info(":fuel_pump: Tx(%s) is out of gas and will be rebroadcasted with %d gas", txHash, gasLimit)
txFound = true
break FindTx
default:
l.Error(":exploding_head: Tx returned nonzero code %d with log %s, tx hash: %s", txRes.Code, txRes.RawLog, txRes.TxHash)
return
}
}
if txRes.Code != 0 {
l.Error(":exploding_head: Tx returned nonzero code %d with log %s, tx hash: %s", txRes.Code, txRes.RawLog, txRes.TxHash)
if !txFound {
l.Error(":question_mark: Cannot get transaction response from hash: %s transaction might be included in the next few blocks or check your node's health.", txHash)
return
}
l.Info(":smiling_face_with_sunglasses: Successfully broadcast tx with hash: %s", txHash)
return
}
l.Info(":question_mark: Cannot get transaction response from hash: %s transaction might be included in the next few blocks or check your node's health.", txHash)
l.Error(":anxious_face_with_sweat: Cannot send reports with adjusted gas: %d", gasLimit)
return
}

// GetExecutable fetches data source executable using the provided client.
func GetExecutable(c *Context, l *Logger, hash string) ([]byte, error) {
resValue, err := c.fileCache.GetFile(hash)
if err != nil {
l.Debug(":magnifying_glass_tilted_left: Fetching data source hash: %s from bandchain querier", hash)
res, err := c.client.ABCIQueryWithOptions(fmt.Sprintf("custom/%s/%s/%s", otypes.StoreKey, otypes.QueryData, hash), nil, rpcclient.ABCIQueryOptions{})
res, err := c.client.ABCIQueryWithOptions(fmt.Sprintf("custom/%s/%s/%s", types.StoreKey, types.QueryData, hash), nil, rpcclient.ABCIQueryOptions{})
if err != nil {
l.Error(":exploding_head: Failed to get data source with error: %s", err.Error())
return nil, err
Expand Down
14 changes: 8 additions & 6 deletions chain/yoda/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package yoda

import (
"strconv"
"strings"
"sync"

ckeys "github.com/cosmos/cosmos-sdk/client/keys"
Expand Down Expand Up @@ -86,10 +85,8 @@ func handleRequestLog(c *Context, l *Logger, log sdk.ABCIMessageLog) {
l.Error(":skull: Failed to parse raw requests with error: %s", err.Error())
}

key := <-c.keys
defer func() {
c.keys <- key
}()
keyIndex := c.nextKeyIndex()
key := c.keys[keyIndex]

reportsChan := make(chan types.RawReport, len(reqs))
var version sync.Map
Expand Down Expand Up @@ -143,5 +140,10 @@ func handleRequestLog(c *Context, l *Logger, log sdk.ABCIMessageLog) {
execVersions = append(execVersions, key.(string))
return true
})
SubmitReport(c, l, key, types.RequestID(id), reports, strings.Join(execVersions, "+"))

c.pendingMsgs <- ReportMsgWithKey{
msg: types.NewMsgReportData(types.RequestID(id), reports, c.validator, key.GetAddress()),
execVersion: execVersions,
keyIndex: keyIndex,
}
}
2 changes: 2 additions & 0 deletions chain/yoda/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ const (
flagBroadcastTimeout = "broadcast-timeout"
flagRPCPollInterval = "rpc-poll-interval"
flagMaxTry = "max-try"
flagMaxReport = "max-report"
)

// Config data structure for yoda daemon.
Expand All @@ -35,6 +36,7 @@ type Config struct {
BroadcastTimeout string `mapstructure:"broadcast-timeout"` // The time that Yoda will wait for tx commit
RPCPollInterval string `mapstructure:"rpc-poll-interval"` // The duration of rpc poll interval
MaxTry uint64 `mapstructure:"max-try"` // The maximum number of tries to submit a report transaction
MaxReport uint64 `mapstructure:"max-report"` // The maximum number of reports in one transaction
}

// Global instances.
Expand Down
Loading

0 comments on commit 4a05630

Please sign in to comment.