diff --git a/CHANGELOG_UNRELEASED.md b/CHANGELOG_UNRELEASED.md index 78807318de..2c1d52bcd6 100644 --- a/CHANGELOG_UNRELEASED.md +++ b/CHANGELOG_UNRELEASED.md @@ -14,6 +14,8 @@ ### Yoda +- (impv) [\#2569](https://github.com/bandprotocol/bandchain/pull/2569) Queued and combine report msgs to one transaction + Remove hard code gas used on report and add retry logic when transaction out of gas. + ### Emitter & Flusher - (bugs) [\#2565](https://github.com/bandprotocol/bandchain/pull/2565) cdb: Fix bug reporters table diff --git a/chain/scripts/start_yoda.sh b/chain/scripts/start_yoda.sh index 2a19c412a6..fd491fd08c 100755 --- a/chain/scripts/start_yoda.sh +++ b/chain/scripts/start_yoda.sh @@ -12,7 +12,7 @@ yoda config validator $(bandcli keys show $1 -a --bech val --keyring-backend tes yoda config executor "rest:https://iv3lgtv11a.execute-api.ap-southeast-1.amazonaws.com/live/master?timeout=10s" # setup broadcast-timeout to yoda config -yoda config broadcast-timeout "30s" +yoda config broadcast-timeout "5m" # setup rpc-poll-interval to yoda config yoda config rpc-poll-interval "1s" diff --git a/chain/yoda/context.go b/chain/yoda/context.go index c13f28be5c..21a4fcbd72 100644 --- a/chain/yoda/context.go +++ b/chain/yoda/context.go @@ -1,6 +1,7 @@ package yoda import ( + "sync/atomic" "time" "github.com/cosmos/cosmos-sdk/crypto/keys" @@ -8,17 +9,34 @@ import ( rpcclient "github.com/tendermint/tendermint/rpc/client" "github.com/bandprotocol/bandchain/chain/pkg/filecache" + "github.com/bandprotocol/bandchain/chain/x/oracle/types" "github.com/bandprotocol/bandchain/chain/yoda/executor" ) +type ReportMsgWithKey struct { + msg types.MsgReportData + execVersion []string + keyIndex int64 +} + type Context struct { client rpcclient.Client validator sdk.ValAddress gasPrices sdk.DecCoins - keys chan keys.Info + keys []keys.Info executor executor.Executor fileCache filecache.Cache broadcastTimeout time.Duration maxTry uint64 rpcPollInterval time.Duration + maxReport uint64 + + pendingMsgs chan ReportMsgWithKey + freeKeys chan int64 + keyRoundRobinIndex int64 // Must use in conjunction with sync/atomic +} + +func (c *Context) nextKeyIndex() int64 { + keyIndex := atomic.AddInt64(&c.keyRoundRobinIndex, 1) % int64(len(c.keys)) + return keyIndex } diff --git a/chain/yoda/event.go b/chain/yoda/event.go index 8a7eedc184..dc34cea2c3 100644 --- a/chain/yoda/event.go +++ b/chain/yoda/event.go @@ -4,23 +4,24 @@ import ( "fmt" "strconv" - otypes "github.com/bandprotocol/bandchain/chain/x/oracle/types" sdk "github.com/cosmos/cosmos-sdk/types" + + "github.com/bandprotocol/bandchain/chain/x/oracle/types" ) type rawRequest struct { - dataSourceID otypes.DataSourceID + dataSourceID types.DataSourceID dataSourceHash string - externalID otypes.ExternalID + externalID types.ExternalID calldata string } // GetRawRequests returns the list of all raw data requests in the given log. func GetRawRequests(log sdk.ABCIMessageLog) ([]rawRequest, error) { - dataSourceIDs := GetEventValues(log, otypes.EventTypeRawRequest, otypes.AttributeKeyDataSourceID) - dataSourceHashList := GetEventValues(log, otypes.EventTypeRawRequest, otypes.AttributeKeyDataSourceHash) - externalIDs := GetEventValues(log, otypes.EventTypeRawRequest, otypes.AttributeKeyExternalID) - calldataList := GetEventValues(log, otypes.EventTypeRawRequest, otypes.AttributeKeyCalldata) + dataSourceIDs := GetEventValues(log, types.EventTypeRawRequest, types.AttributeKeyDataSourceID) + dataSourceHashList := GetEventValues(log, types.EventTypeRawRequest, types.AttributeKeyDataSourceHash) + externalIDs := GetEventValues(log, types.EventTypeRawRequest, types.AttributeKeyExternalID) + calldataList := GetEventValues(log, types.EventTypeRawRequest, types.AttributeKeyCalldata) if len(dataSourceIDs) != len(externalIDs) { return nil, fmt.Errorf("Inconsistent data source count and external ID count") @@ -42,9 +43,9 @@ func GetRawRequests(log sdk.ABCIMessageLog) ([]rawRequest, error) { } reqs = append(reqs, rawRequest{ - dataSourceID: otypes.DataSourceID(dataSourceID), + dataSourceID: types.DataSourceID(dataSourceID), dataSourceHash: dataSourceHashList[idx], - externalID: otypes.ExternalID(externalID), + externalID: types.ExternalID(externalID), calldata: calldataList[idx], }) } diff --git a/chain/yoda/execute.go b/chain/yoda/execute.go index 04457e450e..180e8847a0 100644 --- a/chain/yoda/execute.go +++ b/chain/yoda/execute.go @@ -2,18 +2,20 @@ package yoda import ( "fmt" + "strings" "time" sdkCtx "github.com/cosmos/cosmos-sdk/client/context" ckeys "github.com/cosmos/cosmos-sdk/client/keys" "github.com/cosmos/cosmos-sdk/crypto/keys" sdk "github.com/cosmos/cosmos-sdk/types" + sdkerrors "github.com/cosmos/cosmos-sdk/types/errors" "github.com/cosmos/cosmos-sdk/version" "github.com/cosmos/cosmos-sdk/x/auth" "github.com/cosmos/cosmos-sdk/x/auth/client/utils" "github.com/bandprotocol/bandchain/chain/app" - otypes "github.com/bandprotocol/bandchain/chain/x/oracle/types" + "github.com/bandprotocol/bandchain/chain/x/oracle/types" rpcclient "github.com/tendermint/tendermint/rpc/client" ) @@ -21,65 +23,157 @@ var ( cdc = app.MakeCodec() ) -func SubmitReport(c *Context, l *Logger, key keys.Info, id otypes.RequestID, reps []otypes.RawReport, execVersion string) { - msg := otypes.NewMsgReportData(otypes.RequestID(id), reps, c.validator, key.GetAddress()) - if err := msg.ValidateBasic(); err != nil { - l.Error(":exploding_head: Failed to validate basic with error: %s", err.Error()) - return +// TODO: Improve precision of equation. +// const ( +// rawReportMultiplier = uint64(10) +// dataSizeMultiplier = uint64(10) +// msgReportDataConstant = uint64(10) +// txSizeConstant = uint64(10) +// baseTransaction = uint64(30000) +// ) + +// Constant used to estimate gas price of reports transaction. +const ( + dataSizeMultiplier = uint64(50) + msgReportDataConstant = uint64(16000) + txSizeConstant = uint64(5) // Using DefaultTxSizeCostPerByte of BandChain + baseTransaction = uint64(40000) + pendingRequests = uint64(4000) +) + +func estimatedReportsGas(msgs []sdk.Msg) uint64 { + est := baseTransaction + txSize := uint64(0) + for _, msg := range msgs { + msg, ok := msg.(types.MsgReportData) + if !ok { + panic("Don't support non-report data message") + } + calldataSize := uint64(0) + for _, c := range msg.RawReports { + calldataSize += uint64(len(c.Data)) + } + est += dataSizeMultiplier*calldataSize + msgReportDataConstant + txSize += uint64(len(msg.GetSignBytes())) } + return est + txSize*txSizeConstant + pendingRequests +} + +func signAndBroadcast( + c *Context, key keys.Info, msgs []sdk.Msg, gasLimit uint64, memo string, +) (string, error) { cliCtx := sdkCtx.CLIContext{Client: c.client, TrustNode: true, Codec: cdc} - txHash := "" - for try := uint64(1); try <= c.maxTry; try++ { - l.Info(":e-mail: Try to broadcast report transaction(%d/%d)", try, c.maxTry) - acc, err := auth.NewAccountRetriever(cliCtx).GetAccount(key.GetAddress()) - if err != nil { - l.Info(":warning: Failed to retreive account with error: %s", err.Error()) - time.Sleep(c.rpcPollInterval) - continue - } + acc, err := auth.NewAccountRetriever(cliCtx).GetAccount(key.GetAddress()) + if err != nil { + return "", fmt.Errorf("Failed to retreive account with error: %s", err.Error()) + } - txBldr := auth.NewTxBuilder( - auth.DefaultTxEncoder(cdc), acc.GetAccountNumber(), acc.GetSequence(), - 200000, 1, false, cfg.ChainID, fmt.Sprintf("yoda:%s/exec:%s", version.Version, execVersion), sdk.NewCoins(), c.gasPrices, - ) - // txBldr, err = authclient.EnrichWithGas(txBldr, cliCtx, []sdk.Msg{msg}) - // if err != nil { - // l.Error(":exploding_head: Failed to enrich with gas with error: %s", err.Error()) - // return - // } - out, err := txBldr.WithKeybase(keybase).BuildAndSign(key.GetName(), ckeys.DefaultKeyPass, []sdk.Msg{msg}) - if err != nil { - l.Info(":warning: Failed to build tx with error: %s", err.Error()) - time.Sleep(c.rpcPollInterval) - continue + txBldr := auth.NewTxBuilder( + auth.DefaultTxEncoder(cdc), acc.GetAccountNumber(), acc.GetSequence(), + gasLimit, 1, false, cfg.ChainID, memo, sdk.NewCoins(), c.gasPrices, + ) + // txBldr, err = authclient.EnrichWithGas(txBldr, cliCtx, []sdk.Msg{msg}) + // if err != nil { + // l.Error(":exploding_head: Failed to enrich with gas with error: %s", err.Error()) + // return + // } + + out, err := txBldr.WithKeybase(keybase).BuildAndSign(key.GetName(), ckeys.DefaultKeyPass, msgs) + if err != nil { + return "", fmt.Errorf("Failed to build tx with error: %s", err.Error()) + } + + res, err := cliCtx.BroadcastTxSync(out) + if err != nil { + return "", fmt.Errorf("Failed to broadcast tx with error: %s", err.Error()) + } + return res.TxHash, nil +} + +func SubmitReport(c *Context, l *Logger, keyIndex int64, reports []ReportMsgWithKey) { + // Return key when done with SubmitReport whether successfully or not. + defer func() { + c.freeKeys <- keyIndex + }() + + // Summarize execute version + versionMap := make(map[string]bool) + msgs := make([]sdk.Msg, len(reports)) + ids := make([]types.RequestID, len(reports)) + + for i, report := range reports { + if err := report.msg.ValidateBasic(); err != nil { + l.Error(":exploding_head: Failed to validate basic with error: %s", err.Error()) + return } - res, err := cliCtx.BroadcastTxSync(out) - if err == nil { - txHash = res.TxHash - break + msgs[i] = report.msg + ids[i] = report.msg.RequestID + for _, exec := range report.execVersion { + versionMap[exec] = true } - l.Info(":warning: Failed to broadcast tx with error: %s", err.Error()) - time.Sleep(c.rpcPollInterval) } - if txHash == "" { - l.Error(":exploding_head: Cannot try to broadcast more than %d try", c.maxTry) - return + l = l.With("rids", ids) + + versions := make([]string, 0, len(versionMap)) + for exec := range versionMap { + versions = append(versions, exec) } - for start := time.Now(); time.Since(start) < c.broadcastTimeout; { - time.Sleep(c.rpcPollInterval) - txRes, err := utils.QueryTx(cliCtx, txHash) - if err != nil { - l.Debug(":warning: Failed to query tx with error: %s", err.Error()) - continue + memo := fmt.Sprintf("yoda:%s/exec:%s", version.Version, strings.Join(versions, ",")) + key := c.keys[keyIndex] + cliCtx := sdkCtx.CLIContext{Client: c.client, TrustNode: true, Codec: cdc} + gasLimit := estimatedReportsGas(msgs) + // We want to resend transaction only if tx returns Out of gas error. + for sendAttempt := uint64(1); sendAttempt <= c.maxTry; sendAttempt++ { + var txHash string + l.Info(":e-mail: Sending report transaction attempt: (%d/%d)", sendAttempt, c.maxTry) + for broadcastTry := uint64(1); broadcastTry <= c.maxTry; broadcastTry++ { + l.Info(":writing_hand: Try to sign and broadcast report transaction(%d/%d)", broadcastTry, c.maxTry) + hash, err := signAndBroadcast(c, key, msgs, gasLimit, memo) + if err != nil { + // Use info level because this error can happen and retry process can solve this error. + l.Info(":warning: %s", err.Error()) + time.Sleep(c.rpcPollInterval) + continue + } + // Transaction passed CheckTx process and wait to include in block. + txHash = hash + break + } + if txHash == "" { + l.Error(":exploding_head: Cannot try to broadcast more than %d try", c.maxTry) + return + } + txFound := false + FindTx: + for start := time.Now(); time.Since(start) < c.broadcastTimeout; { + time.Sleep(c.rpcPollInterval) + txRes, err := utils.QueryTx(cliCtx, txHash) + if err != nil { + l.Debug(":warning: Failed to query tx with error: %s", err.Error()) + continue + } + switch txRes.Code { + case 0: + l.Info(":smiling_face_with_sunglasses: Successfully broadcast tx with hash: %s", txHash) + return + case sdkerrors.ErrOutOfGas.ABCICode(): + // Increase gas limit and try to broadcast again + gasLimit = gasLimit * 110 / 100 + l.Info(":fuel_pump: Tx(%s) is out of gas and will be rebroadcasted with %d gas", txHash, gasLimit) + txFound = true + break FindTx + default: + l.Error(":exploding_head: Tx returned nonzero code %d with log %s, tx hash: %s", txRes.Code, txRes.RawLog, txRes.TxHash) + return + } } - if txRes.Code != 0 { - l.Error(":exploding_head: Tx returned nonzero code %d with log %s, tx hash: %s", txRes.Code, txRes.RawLog, txRes.TxHash) + if !txFound { + l.Error(":question_mark: Cannot get transaction response from hash: %s transaction might be included in the next few blocks or check your node's health.", txHash) return } - l.Info(":smiling_face_with_sunglasses: Successfully broadcast tx with hash: %s", txHash) - return } - l.Info(":question_mark: Cannot get transaction response from hash: %s transaction might be included in the next few blocks or check your node's health.", txHash) + l.Error(":anxious_face_with_sweat: Cannot send reports with adjusted gas: %d", gasLimit) + return } // GetExecutable fetches data source executable using the provided client. @@ -87,7 +181,7 @@ func GetExecutable(c *Context, l *Logger, hash string) ([]byte, error) { resValue, err := c.fileCache.GetFile(hash) if err != nil { l.Debug(":magnifying_glass_tilted_left: Fetching data source hash: %s from bandchain querier", hash) - res, err := c.client.ABCIQueryWithOptions(fmt.Sprintf("custom/%s/%s/%s", otypes.StoreKey, otypes.QueryData, hash), nil, rpcclient.ABCIQueryOptions{}) + res, err := c.client.ABCIQueryWithOptions(fmt.Sprintf("custom/%s/%s/%s", types.StoreKey, types.QueryData, hash), nil, rpcclient.ABCIQueryOptions{}) if err != nil { l.Error(":exploding_head: Failed to get data source with error: %s", err.Error()) return nil, err diff --git a/chain/yoda/handler.go b/chain/yoda/handler.go index 17c1543563..d9ad82531c 100644 --- a/chain/yoda/handler.go +++ b/chain/yoda/handler.go @@ -2,7 +2,6 @@ package yoda import ( "strconv" - "strings" "sync" ckeys "github.com/cosmos/cosmos-sdk/client/keys" @@ -86,10 +85,8 @@ func handleRequestLog(c *Context, l *Logger, log sdk.ABCIMessageLog) { l.Error(":skull: Failed to parse raw requests with error: %s", err.Error()) } - key := <-c.keys - defer func() { - c.keys <- key - }() + keyIndex := c.nextKeyIndex() + key := c.keys[keyIndex] reportsChan := make(chan types.RawReport, len(reqs)) var version sync.Map @@ -143,5 +140,10 @@ func handleRequestLog(c *Context, l *Logger, log sdk.ABCIMessageLog) { execVersions = append(execVersions, key.(string)) return true }) - SubmitReport(c, l, key, types.RequestID(id), reports, strings.Join(execVersions, "+")) + + c.pendingMsgs <- ReportMsgWithKey{ + msg: types.NewMsgReportData(types.RequestID(id), reports, c.validator, key.GetAddress()), + execVersion: execVersions, + keyIndex: keyIndex, + } } diff --git a/chain/yoda/main.go b/chain/yoda/main.go index a81059f24b..38b323eea3 100644 --- a/chain/yoda/main.go +++ b/chain/yoda/main.go @@ -22,6 +22,7 @@ const ( flagBroadcastTimeout = "broadcast-timeout" flagRPCPollInterval = "rpc-poll-interval" flagMaxTry = "max-try" + flagMaxReport = "max-report" ) // Config data structure for yoda daemon. @@ -35,6 +36,7 @@ type Config struct { BroadcastTimeout string `mapstructure:"broadcast-timeout"` // The time that Yoda will wait for tx commit RPCPollInterval string `mapstructure:"rpc-poll-interval"` // The duration of rpc poll interval MaxTry uint64 `mapstructure:"max-try"` // The maximum number of tries to submit a report transaction + MaxReport uint64 `mapstructure:"max-report"` // The maximum number of reports in one transaction } // Global instances. diff --git a/chain/yoda/run.go b/chain/yoda/run.go index 20a6b62f31..58a5a815d3 100644 --- a/chain/yoda/run.go +++ b/chain/yoda/run.go @@ -7,7 +7,6 @@ import ( "time" "github.com/cosmos/cosmos-sdk/client/flags" - keyring "github.com/cosmos/cosmos-sdk/crypto/keys" sdk "github.com/cosmos/cosmos-sdk/types" "github.com/spf13/cobra" "github.com/spf13/viper" @@ -42,10 +41,36 @@ func runImpl(c *Context, l *Logger) error { return err } + availiableKeys := make([]bool, len(c.keys)) + waitingMsgs := make([][]ReportMsgWithKey, len(c.keys)) + for i := range availiableKeys { + availiableKeys[i] = true + waitingMsgs[i] = []ReportMsgWithKey{} + } + for { select { case ev := <-eventChan: go handleTransaction(c, l, ev.Data.(tmtypes.EventDataTx).TxResult) + case keyIndex := <-c.freeKeys: + if len(waitingMsgs[keyIndex]) != 0 { + if uint64(len(waitingMsgs[keyIndex])) > c.maxReport { + go SubmitReport(c, l, keyIndex, waitingMsgs[keyIndex][:c.maxReport]) + waitingMsgs[keyIndex] = waitingMsgs[keyIndex][c.maxReport:] + } else { + go SubmitReport(c, l, keyIndex, waitingMsgs[keyIndex]) + waitingMsgs[keyIndex] = []ReportMsgWithKey{} + } + } else { + availiableKeys[keyIndex] = true + } + case pm := <-c.pendingMsgs: + if availiableKeys[pm.keyIndex] { + availiableKeys[pm.keyIndex] = false + go SubmitReport(c, l, pm.keyIndex, []ReportMsgWithKey{pm}) + } else { + waitingMsgs[pm.keyIndex] = append(waitingMsgs[pm.keyIndex], pm) + } } } } @@ -67,10 +92,7 @@ func runCmd(c *Context) *cobra.Command { if len(keys) == 0 { return errors.New("No key available") } - c.keys = make(chan keyring.Info, len(keys)) - for _, key := range keys { - c.keys <- key - } + c.keys = keys c.validator, err = sdk.ValAddressFromBech32(cfg.Validator) if err != nil { return err @@ -103,10 +125,14 @@ func runCmd(c *Context) *cobra.Command { return err } c.maxTry = cfg.MaxTry + c.maxReport = cfg.MaxReport c.rpcPollInterval, err = time.ParseDuration(cfg.RPCPollInterval) if err != nil { return err } + c.pendingMsgs = make(chan ReportMsgWithKey) + c.freeKeys = make(chan int64, len(keys)) + c.keyRoundRobinIndex = -1 return runImpl(c, l) }, } @@ -119,6 +145,7 @@ func runCmd(c *Context) *cobra.Command { cmd.Flags().String(flagBroadcastTimeout, "5m", "The time that Yoda will wait for tx commit") cmd.Flags().String(flagRPCPollInterval, "1s", "The duration of rpc poll interval") cmd.Flags().Uint64(flagMaxTry, 5, "The maximum number of tries to submit a report transaction") + cmd.Flags().Uint64(flagMaxReport, 10, "The maximum number of reports in one transaction") viper.BindPFlag(flags.FlagChainID, cmd.Flags().Lookup(flags.FlagChainID)) viper.BindPFlag(flags.FlagNode, cmd.Flags().Lookup(flags.FlagNode)) viper.BindPFlag(flagValidator, cmd.Flags().Lookup(flagValidator)) @@ -128,5 +155,6 @@ func runCmd(c *Context) *cobra.Command { viper.BindPFlag(flagBroadcastTimeout, cmd.Flags().Lookup(flagBroadcastTimeout)) viper.BindPFlag(flagRPCPollInterval, cmd.Flags().Lookup(flagRPCPollInterval)) viper.BindPFlag(flagMaxTry, cmd.Flags().Lookup(flagMaxTry)) + viper.BindPFlag(flagMaxReport, cmd.Flags().Lookup(flagMaxReport)) return cmd }