Skip to content

Commit dbfa5f9

Browse files
committed
add basic support in sdk for block-stm
1 parent f18c6ea commit dbfa5f9

File tree

8 files changed

+158
-44
lines changed

8 files changed

+158
-44
lines changed

CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,12 @@ Ref: https://keepachangelog.com/en/1.0.0/
6060
* x/distribution can now utilize an externally managed community pool. NOTE: this will make the message handlers for FundCommunityPool and CommunityPoolSpend error, as well as the query handler for CommunityPool.
6161
* (client) [#18101](https://github.com/cosmos/cosmos-sdk/pull/18101) Add a `keyring-default-keyname` in `client.toml` for specifying a default key name, and skip the need to use the `--from` flag when signing transactions.
6262
* (x/gov) [#24355](https://github.com/cosmos/cosmos-sdk/pull/24355) Allow users to set a custom CalculateVoteResultsAndVotingPower function to be used in govkeeper.Tally.
63+
<<<<<<< HEAD
6364
* (api) [#24428](https://github.com/cosmos/cosmos-sdk/pull/24428) Add block height to response headers
65+
* (baseapp) [#24458](https://github.com/cosmos/cosmos-sdk/pull/24458) Add `TxExecutor` baseapp option, add `TxIndex`/`TxCount`/`MsgIndex`/`BlockGasUsed` fields to `Context, to support parallel execution, introduce incarnation cache for performance optimisation
66+
=======
67+
* (baseapp) [#24458](https://github.com/cosmos/cosmos-sdk/pull/24458) Add `TxExecutor` to support custom execution logic and incarnation cache for performance optimisation
68+
>>>>>>> 1e1fe257d1 (remove change in context for tx and msg index)
6469
6570
### Improvements
6671

baseapp/abci.go

Lines changed: 44 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -798,34 +798,10 @@ func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.Request
798798
//
799799
// NOTE: Not all raw transactions may adhere to the sdk.Tx interface, e.g.
800800
// vote extensions, so skip those.
801-
txResults := make([]*abci.ExecTxResult, 0, len(req.Txs))
802-
for _, rawTx := range req.Txs {
803-
var response *abci.ExecTxResult
804-
805-
if _, err := app.txDecoder(rawTx); err == nil {
806-
response = app.deliverTx(rawTx)
807-
} else {
808-
// In the case where a transaction included in a block proposal is malformed,
809-
// we still want to return a default response to comet. This is because comet
810-
// expects a response for each transaction included in a block proposal.
811-
response = sdkerrors.ResponseExecTxResultWithEvents(
812-
sdkerrors.ErrTxDecode,
813-
0,
814-
0,
815-
nil,
816-
false,
817-
)
818-
}
819-
820-
// check after every tx if we should abort
821-
select {
822-
case <-ctx.Done():
823-
return nil, ctx.Err()
824-
default:
825-
// continue
826-
}
827-
828-
txResults = append(txResults, response)
801+
txResults, err := app.executeTxsWithExecutor(ctx, req.Txs)
802+
if err != nil {
803+
// usually due to canceled
804+
return nil, err
829805
}
830806

831807
if app.finalizeBlockState.ms.TracingEnabled() {
@@ -856,6 +832,46 @@ func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.Request
856832
}, nil
857833
}
858834

835+
func (app *BaseApp) executeTxsWithExecutor(ctx context.Context, txs [][]byte) ([]*abci.ExecTxResult, error) {
836+
if app.txExecutor != nil {
837+
return app.txExecutor(ctx, txs, app.finalizeBlockState.ms, func(i int, memTx sdk.Tx, ms storetypes.MultiStore, incarnationCache map[string]any) *abci.ExecTxResult {
838+
return app.deliverTxWithMultiStore(txs[i], ms, incarnationCache)
839+
})
840+
}
841+
842+
// Fallback to the default execution logic
843+
txResults := make([]*abci.ExecTxResult, 0, len(txs))
844+
for _, rawTx := range txs {
845+
var response *abci.ExecTxResult
846+
847+
if _, err := app.txDecoder(rawTx); err == nil {
848+
response = app.deliverTx(rawTx)
849+
} else {
850+
// In the case where a transaction included in a block proposal is malformed,
851+
// we still want to return a default response to comet. This is because comet
852+
// expects a response for each transaction included in a block proposal.
853+
response = sdkerrors.ResponseExecTxResultWithEvents(
854+
sdkerrors.ErrTxDecode,
855+
0,
856+
0,
857+
nil,
858+
false,
859+
)
860+
}
861+
862+
// check after every tx if we should abort
863+
select {
864+
case <-ctx.Done():
865+
return nil, ctx.Err()
866+
default:
867+
// continue
868+
}
869+
870+
txResults = append(txResults, response)
871+
}
872+
return txResults, nil
873+
}
874+
859875
// FinalizeBlock will execute the block proposal provided by RequestFinalizeBlock.
860876
// Specifically, it will execute an application's BeginBlock (if defined), followed
861877
// by the transactions in the proposal, finally followed by the application's

baseapp/baseapp.go

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,9 @@ type BaseApp struct {
198198
//
199199
// SAFETY: it's safe to do if validators validate the total gas wanted in the `ProcessProposal`, which is the case in the default handler.
200200
disableBlockGasMeter bool
201+
202+
// Optional alternative tx executor, used for block-stm parallel transaction execution. If nil, default executor is used.
203+
txExecutor TxExecutor
201204
}
202205

203206
// NewBaseApp returns a reference to an initialized BaseApp. It accepts a
@@ -770,6 +773,10 @@ func (app *BaseApp) beginBlock(_ *abci.RequestFinalizeBlock) (sdk.BeginBlock, er
770773
}
771774

772775
func (app *BaseApp) deliverTx(tx []byte) *abci.ExecTxResult {
776+
return app.deliverTxWithMultiStore(tx, nil, nil)
777+
}
778+
779+
func (app *BaseApp) deliverTxWithMultiStore(tx []byte, txMultiStore storetypes.MultiStore, incarnationCache map[string]any) *abci.ExecTxResult {
773780
gInfo := sdk.GasInfo{}
774781
resultStr := "successful"
775782

@@ -782,7 +789,7 @@ func (app *BaseApp) deliverTx(tx []byte) *abci.ExecTxResult {
782789
telemetry.SetGauge(float32(gInfo.GasWanted), "tx", "gas", "wanted")
783790
}()
784791

785-
gInfo, result, anteEvents, err := app.runTx(execModeFinalize, tx, nil)
792+
gInfo, result, anteEvents, err := app.runTxWithMultiStore(execModeFinalize, tx, nil, txMultiStore, incarnationCache)
786793
if err != nil {
787794
resultStr = "failed"
788795
resp = sdkerrors.ResponseExecTxResultWithEvents(
@@ -842,12 +849,22 @@ func (app *BaseApp) endBlock(_ context.Context) (sdk.EndBlock, error) {
842849
// both txbytes and the decoded tx are passed to runTx to avoid the state machine encoding the tx and decoding the transaction twice
843850
// passing the decoded tx to runTX is optional, it will be decoded if the tx is nil
844851
func (app *BaseApp) runTx(mode execMode, txBytes []byte, tx sdk.Tx) (gInfo sdk.GasInfo, result *sdk.Result, anteEvents []abci.Event, err error) {
852+
return app.runTxWithMultiStore(mode, txBytes, tx, nil, nil)
853+
}
854+
855+
func (app *BaseApp) runTxWithMultiStore(mode execMode, txBytes []byte, tx sdk.Tx, txMultiStore storetypes.MultiStore, incarnationCache map[string]any) (gInfo sdk.GasInfo, result *sdk.Result, anteEvents []abci.Event, err error) {
845856
// NOTE: GasWanted should be returned by the AnteHandler. GasUsed is
846857
// determined by the GasMeter. We need access to the context to get the gas
847858
// meter, so we initialize upfront.
848859
var gasWanted uint64
849860

850861
ctx := app.getContextForTx(mode, txBytes)
862+
if incarnationCache != nil {
863+
ctx = ctx.WithIncarnationCache(incarnationCache)
864+
}
865+
if txMultiStore != nil {
866+
ctx = ctx.WithMultiStore(txMultiStore)
867+
}
851868
ms := ctx.MultiStore()
852869

853870
// only run the tx if there is block gas remaining

baseapp/options.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,11 @@ func SetOptimisticExecution(opts ...func(*oe.OptimisticExecution)) func(*BaseApp
124124
}
125125
}
126126

127+
// SetTxExecutor sets a custom tx executor for the BaseApp (e.g for parallel execution).
128+
func SetTxExecutor(executor TxExecutor) func(*BaseApp) {
129+
return func(app *BaseApp) { app.txExecutor = executor }
130+
}
131+
127132
// DisableBlockGasMeter disables the block gas meter.
128133
func DisableBlockGasMeter() func(*BaseApp) {
129134
return func(app *BaseApp) { app.SetDisableBlockGasMeter(true) }
@@ -403,3 +408,8 @@ func (app *BaseApp) SetMsgServiceRouter(msgServiceRouter *MsgServiceRouter) {
403408
func (app *BaseApp) SetGRPCQueryRouter(grpcQueryRouter *GRPCQueryRouter) {
404409
app.grpcQueryRouter = grpcQueryRouter
405410
}
411+
412+
// SetTxExecutor sets a custom tx executor for the BaseApp (e.g for parallel execution).
413+
func (app *BaseApp) SetTxExecutor(executor TxExecutor) {
414+
app.txExecutor = executor
415+
}

baseapp/txexecutor.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package baseapp
2+
3+
import (
4+
"context"
5+
6+
abci "github.com/cometbft/cometbft/abci/types"
7+
8+
"cosmossdk.io/store/types"
9+
10+
sdk "github.com/cosmos/cosmos-sdk/types"
11+
)
12+
13+
// TxExecutor function type for implementing custom execution logic, such as block-stm
14+
type TxExecutor func(
15+
ctx context.Context,
16+
block [][]byte,
17+
cms types.MultiStore,
18+
deliverTxWithMultiStore func(int, sdk.Tx, types.MultiStore, map[string]any) *abci.ExecTxResult,
19+
) ([]*abci.ExecTxResult, error)

simapp/app.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,13 @@ package simapp
55
import (
66
"encoding/json"
77
"fmt"
8+
"io"
9+
"maps"
10+
811
abci "github.com/cometbft/cometbft/abci/types"
912
dbm "github.com/cosmos/cosmos-db"
1013
"github.com/cosmos/gogoproto/proto"
1114
"github.com/spf13/cast"
12-
"io"
13-
"maps"
1415

1516
autocliv1 "cosmossdk.io/api/cosmos/autocli/v1"
1617
reflectionv1 "cosmossdk.io/api/cosmos/reflection/v1"
@@ -123,7 +124,8 @@ var (
123124
govtypes.ModuleName: {authtypes.Burner},
124125
nft.ModuleName: nil,
125126
protocolpooltypes.ModuleName: nil,
126-
protocolpooltypes.ProtocolPoolEscrowAccount: nil}
127+
protocolpooltypes.ProtocolPoolEscrowAccount: nil,
128+
}
127129
)
128130

129131
var (
@@ -466,7 +468,7 @@ func NewSimApp(
466468

467469
app.GovKeeper = *govKeeper.SetHooks(
468470
govtypes.NewMultiGovHooks(
469-
// register the governance hooks
471+
// register the governance hooks
470472
),
471473
)
472474

@@ -496,7 +498,7 @@ func NewSimApp(
496498

497499
app.EpochsKeeper.SetHooks(
498500
epochstypes.NewMultiEpochHooks(
499-
// insert epoch hooks receivers here
501+
// insert epoch hooks receivers here
500502
),
501503
)
502504

types/context.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,9 @@ type Context struct {
6464
streamingManager storetypes.StreamingManager
6565
cometInfo comet.BlockInfo
6666
headerInfo header.Info
67+
68+
// For block-stm
69+
incarnationCache map[string]any // incarnationCache is shared between multiple incarnations of the same transaction, it must only cache stateless computation results that only depends on tx body and block level information that don't change during block execution, like the result of tx signature verification.
6770
}
6871

6972
// Proposed rename, not done to avoid API breakage
@@ -92,6 +95,7 @@ func (c Context) TransientKVGasConfig() storetypes.GasConfig { return c.trans
9295
func (c Context) StreamingManager() storetypes.StreamingManager { return c.streamingManager }
9396
func (c Context) CometInfo() comet.BlockInfo { return c.cometInfo }
9497
func (c Context) HeaderInfo() header.Info { return c.headerInfo }
98+
func (c Context) IncarnationCache() map[string]any { return c.incarnationCache }
9599

96100
// BlockHeader returns the header by value.
97101
func (c Context) BlockHeader() cmtproto.Header {
@@ -365,6 +369,27 @@ func (c Context) CacheContext() (cc Context, writeCache func()) {
365369
return cc, writeCache
366370
}
367371

372+
func (c Context) GetIncarnationCache(key string) (any, bool) {
373+
if c.incarnationCache == nil {
374+
return nil, false
375+
}
376+
val, ok := c.incarnationCache[key]
377+
return val, ok
378+
}
379+
380+
func (c Context) SetIncarnationCache(key string, value any) {
381+
if c.incarnationCache == nil {
382+
// noop if cache is not initialized
383+
return
384+
}
385+
c.incarnationCache[key] = value
386+
}
387+
388+
func (c Context) WithIncarnationCache(cache map[string]any) Context {
389+
c.incarnationCache = cache
390+
return c
391+
}
392+
368393
var (
369394
_ context.Context = Context{}
370395
_ storetypes.Context = Context{}

x/auth/ante/sigverify.go

Lines changed: 30 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,9 @@ var (
3131
key = make([]byte, secp256k1.PubKeySize)
3232
simSecp256k1Pubkey = &secp256k1.PubKey{Key: key}
3333
simSecp256k1Sig [64]byte
34+
35+
// SigVerificationResultCacheKey default key for incarnation cache
36+
SigVerificationResultCacheKey = "ante:SigVerificationResult"
3437
)
3538

3639
func init() {
@@ -251,9 +254,26 @@ func OnlyLegacyAminoSigners(sigData signing.SignatureData) bool {
251254
}
252255

253256
func (svd SigVerificationDecorator) AnteHandle(ctx sdk.Context, tx sdk.Tx, simulate bool, next sdk.AnteHandler) (newCtx sdk.Context, err error) {
257+
if v, ok := ctx.GetIncarnationCache(SigVerificationResultCacheKey); ok {
258+
// can't convert `nil` to interface
259+
if v != nil {
260+
err = v.(error)
261+
}
262+
} else {
263+
err = svd.verify(ctx, tx, simulate)
264+
// if enabled, use incarnation cache to store the result to improve performance for block-stm
265+
ctx.SetIncarnationCache(SigVerificationResultCacheKey, err)
266+
}
267+
if err != nil {
268+
return ctx, err
269+
}
270+
return next(ctx, tx, simulate)
271+
}
272+
273+
func (svd SigVerificationDecorator) verify(ctx sdk.Context, tx sdk.Tx, simulate bool) error {
254274
sigTx, ok := tx.(authsigning.Tx)
255275
if !ok {
256-
return ctx, errorsmod.Wrap(sdkerrors.ErrTxDecode, "invalid transaction type")
276+
return errorsmod.Wrap(sdkerrors.ErrTxDecode, "invalid transaction type")
257277
}
258278

259279
utx, ok := tx.(sdk.TxWithUnordered)
@@ -263,35 +283,35 @@ func (svd SigVerificationDecorator) AnteHandle(ctx sdk.Context, tx sdk.Tx, simul
263283
// When simulating, this would just be a 0-length slice.
264284
sigs, err := sigTx.GetSignaturesV2()
265285
if err != nil {
266-
return ctx, err
286+
return err
267287
}
268288

269289
signers, err := sigTx.GetSigners()
270290
if err != nil {
271-
return ctx, err
291+
return err
272292
}
273293

274294
// check that signer length and signature length are the same
275295
if len(sigs) != len(signers) {
276-
return ctx, errorsmod.Wrapf(sdkerrors.ErrUnauthorized, "invalid number of signer; expected: %d, got %d", len(signers), len(sigs))
296+
return errorsmod.Wrapf(sdkerrors.ErrUnauthorized, "invalid number of signer; expected: %d, got %d", len(signers), len(sigs))
277297
}
278298

279299
for i, sig := range sigs {
280300
acc, err := GetSignerAcc(ctx, svd.ak, signers[i])
281301
if err != nil {
282-
return ctx, err
302+
return err
283303
}
284304

285305
// retrieve pubkey
286306
pubKey := acc.GetPubKey()
287307
if !simulate && pubKey == nil {
288-
return ctx, errorsmod.Wrap(sdkerrors.ErrInvalidPubKey, "pubkey on account is not set")
308+
return errorsmod.Wrap(sdkerrors.ErrInvalidPubKey, "pubkey on account is not set")
289309
}
290310

291311
// Check account sequence number.
292312
if !isUnordered {
293313
if sig.Sequence != acc.GetSequence() {
294-
return ctx, errorsmod.Wrapf(
314+
return errorsmod.Wrapf(
295315
sdkerrors.ErrWrongSequence,
296316
"account sequence mismatch, expected %d, got %d", acc.GetSequence(), sig.Sequence,
297317
)
@@ -322,7 +342,7 @@ func (svd SigVerificationDecorator) AnteHandle(ctx sdk.Context, tx sdk.Tx, simul
322342
}
323343
adaptableTx, ok := tx.(authsigning.V2AdaptableTx)
324344
if !ok {
325-
return ctx, fmt.Errorf("expected tx to implement V2AdaptableTx, got %T", tx)
345+
return fmt.Errorf("expected tx to implement V2AdaptableTx, got %T", tx)
326346
}
327347
txData := adaptableTx.GetSigningTxData()
328348
err = authsigning.VerifySignature(ctx, pubKey, signerData, sig.Data, svd.signModeHandler, txData)
@@ -335,13 +355,13 @@ func (svd SigVerificationDecorator) AnteHandle(ctx sdk.Context, tx sdk.Tx, simul
335355
} else {
336356
errMsg = fmt.Sprintf("signature verification failed; please verify account number (%d) and chain-id (%s): (%s)", accNum, chainID, err.Error())
337357
}
338-
return ctx, errorsmod.Wrap(sdkerrors.ErrUnauthorized, errMsg)
358+
return errorsmod.Wrap(sdkerrors.ErrUnauthorized, errMsg)
339359

340360
}
341361
}
342362
}
343363

344-
return next(ctx, tx, simulate)
364+
return nil
345365
}
346366

347367
// IncrementSequenceDecorator handles incrementing sequences of all signers.

0 commit comments

Comments
 (0)