Skip to content

Commit b323b67

Browse files
committed
add STM executor
1 parent 949bbe8 commit b323b67

File tree

4 files changed

+294
-55
lines changed

4 files changed

+294
-55
lines changed

baseapp/executor.go

Lines changed: 291 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,291 @@
1+
package baseapp
2+
3+
import (
4+
"context"
5+
"cosmossdk.io/collections"
6+
"cosmossdk.io/store/cachemulti"
7+
storetypes "cosmossdk.io/store/types"
8+
authtypes "github.com/cosmos/cosmos-sdk/x/auth/types"
9+
banktypes "github.com/cosmos/cosmos-sdk/x/bank/types"
10+
"io"
11+
"sync"
12+
"sync/atomic"
13+
14+
abci "github.com/cometbft/cometbft/abci/types"
15+
16+
sdk "github.com/cosmos/cosmos-sdk/types"
17+
sdkerrors "github.com/cosmos/cosmos-sdk/types/errors"
18+
19+
blockstm "github.com/crypto-org-chain/go-block-stm"
20+
)
21+
22+
// TxExecutor the interface for implementing custom execution logic, such as block-stm
23+
type TxExecutor interface {
24+
run(txs [][]byte) ([]*abci.ExecTxResult, error)
25+
}
26+
27+
// DefaultExecutor default executor without parallelism
28+
type DefaultExecutor struct {
29+
ctx context.Context
30+
txDecoder sdk.TxDecoder
31+
deliverTx func(tx []byte) *abci.ExecTxResult
32+
}
33+
34+
func (d DefaultExecutor) run(txs [][]byte) ([]*abci.ExecTxResult, error) {
35+
// Fallback to the default execution logic
36+
txResults := make([]*abci.ExecTxResult, 0, len(txs))
37+
for _, rawTx := range txs {
38+
var response *abci.ExecTxResult
39+
40+
if _, err := d.txDecoder(rawTx); err == nil {
41+
response = d.deliverTx(rawTx)
42+
} else {
43+
// In the case where a transaction included in a block proposal is malformed,
44+
// we still want to return a default response to comet. This is because comet
45+
// expects a response for each transaction included in a block proposal.
46+
response = sdkerrors.ResponseExecTxResultWithEvents(
47+
sdkerrors.ErrTxDecode,
48+
0,
49+
0,
50+
nil,
51+
false,
52+
)
53+
}
54+
55+
// check after every tx if we should abort
56+
select {
57+
case <-d.ctx.Done():
58+
return nil, d.ctx.Err()
59+
default:
60+
// continue
61+
}
62+
63+
txResults = append(txResults, response)
64+
}
65+
return txResults, nil
66+
}
67+
68+
// STMExecutor simple implementation of block-stm
69+
type STMExecutor struct {
70+
ctx context.Context
71+
txDecoder sdk.TxDecoder
72+
stores []storetypes.StoreKey
73+
ms storetypes.MultiStore
74+
workers int
75+
estimate bool
76+
coinDenom string
77+
deliverTx func(int, sdk.Tx, storetypes.MultiStore, map[string]any) *abci.ExecTxResult
78+
}
79+
80+
func (e STMExecutor) run(txs [][]byte) ([]*abci.ExecTxResult, error) {
81+
var authStore, bankStore int
82+
index := make(map[storetypes.StoreKey]int, len(e.stores))
83+
for i, k := range e.stores {
84+
switch k.Name() {
85+
case authtypes.StoreKey:
86+
authStore = i
87+
case banktypes.StoreKey:
88+
bankStore = i
89+
}
90+
index[k] = i
91+
}
92+
93+
blockSize := len(txs)
94+
if blockSize == 0 {
95+
return nil, nil
96+
}
97+
results := make([]*abci.ExecTxResult, blockSize)
98+
incarnationCache := make([]atomic.Pointer[map[string]any], blockSize)
99+
for i := 0; i < blockSize; i++ {
100+
m := make(map[string]any)
101+
incarnationCache[i].Store(&m)
102+
}
103+
104+
var (
105+
estimates []blockstm.MultiLocations
106+
memTxs []sdk.Tx
107+
)
108+
109+
if e.estimate {
110+
memTxs, estimates = preEstimates(txs, e.workers, authStore, bankStore, e.coinDenom, e.txDecoder)
111+
}
112+
113+
if err := blockstm.ExecuteBlockWithEstimates(
114+
e.ctx,
115+
blockSize,
116+
index,
117+
stmMultiStoreWrapper{e.ms},
118+
e.workers,
119+
estimates,
120+
func(txn blockstm.TxnIndex, ms blockstm.MultiStore) {
121+
var cache map[string]any
122+
123+
// only one of the concurrent incarnations gets the cache if there are any, otherwise execute without
124+
// cache, concurrent incarnations should be rare.
125+
v := incarnationCache[txn].Swap(nil)
126+
if v != nil {
127+
cache = *v
128+
}
129+
130+
var memTx sdk.Tx
131+
if memTxs != nil {
132+
memTx = memTxs[txn]
133+
}
134+
results[txn] = e.deliverTx(int(txn), memTx, msWrapper{ms}, cache)
135+
136+
if v != nil {
137+
incarnationCache[txn].Store(v)
138+
}
139+
},
140+
); err != nil {
141+
return nil, err
142+
}
143+
144+
return results, nil
145+
146+
}
147+
148+
type msWrapper struct {
149+
blockstm.MultiStore
150+
}
151+
152+
func (ms msWrapper) CacheWrapWithTrace(w io.Writer, tc storetypes.TraceContext) storetypes.CacheWrap {
153+
//TODO implement me
154+
panic("implement me")
155+
}
156+
157+
func (ms msWrapper) CacheMultiStoreWithVersion(version int64) (storetypes.CacheMultiStore, error) {
158+
//TODO implement me
159+
panic("implement me")
160+
}
161+
162+
func (ms msWrapper) LatestVersion() int64 {
163+
//TODO implement me
164+
panic("implement me")
165+
}
166+
167+
var _ storetypes.MultiStore = msWrapper{}
168+
169+
func (ms msWrapper) getCacheWrapper(key storetypes.StoreKey) storetypes.CacheWrapper {
170+
return ms.GetStore(key)
171+
}
172+
173+
func (ms msWrapper) GetStore(key storetypes.StoreKey) storetypes.Store {
174+
return ms.MultiStore.GetStore(key)
175+
}
176+
177+
func (ms msWrapper) GetKVStore(key storetypes.StoreKey) storetypes.KVStore {
178+
return ms.MultiStore.GetKVStore(key)
179+
}
180+
181+
func (ms msWrapper) GetObjKVStore(key storetypes.StoreKey) storetypes.ObjKVStore {
182+
return ms.MultiStore.GetObjKVStore(key)
183+
}
184+
185+
func (ms msWrapper) CacheMultiStore() storetypes.CacheMultiStore {
186+
return cachemulti.NewFromParent(ms.getCacheWrapper, nil, nil)
187+
}
188+
189+
// CacheWrap Implements CacheWrapper.
190+
func (ms msWrapper) CacheWrap() storetypes.CacheWrap {
191+
return ms.CacheMultiStore().(storetypes.CacheWrap)
192+
}
193+
194+
// GetStoreType returns the type of the store.
195+
func (ms msWrapper) GetStoreType() storetypes.StoreType {
196+
return storetypes.StoreTypeMulti
197+
}
198+
199+
// SetTracer Implements interface MultiStore
200+
func (ms msWrapper) SetTracer(io.Writer) storetypes.MultiStore {
201+
return nil
202+
}
203+
204+
// SetTracingContext Implements interface MultiStore
205+
func (ms msWrapper) SetTracingContext(storetypes.TraceContext) storetypes.MultiStore {
206+
return nil
207+
}
208+
209+
// TracingEnabled Implements interface MultiStore
210+
func (ms msWrapper) TracingEnabled() bool {
211+
return false
212+
}
213+
214+
type stmMultiStoreWrapper struct {
215+
storetypes.MultiStore
216+
}
217+
218+
var _ blockstm.MultiStore = stmMultiStoreWrapper{}
219+
220+
func (ms stmMultiStoreWrapper) GetStore(key storetypes.StoreKey) storetypes.Store {
221+
return ms.MultiStore.GetStore(key)
222+
}
223+
224+
func (ms stmMultiStoreWrapper) GetKVStore(key storetypes.StoreKey) storetypes.KVStore {
225+
return ms.MultiStore.GetKVStore(key)
226+
}
227+
228+
// preEstimates returns a static estimation of the written keys for each transaction.
229+
// NOTE: make sure it sync with the latest sdk logic when sdk upgrade.
230+
func preEstimates(txs [][]byte, workers, authStore, bankStore int, coinDenom string, txDecoder sdk.TxDecoder) ([]sdk.Tx, []blockstm.MultiLocations) {
231+
memTxs := make([]sdk.Tx, len(txs))
232+
estimates := make([]blockstm.MultiLocations, len(txs))
233+
234+
job := func(start, end int) {
235+
for i := start; i < end; i++ {
236+
rawTx := txs[i]
237+
tx, err := txDecoder(rawTx)
238+
if err != nil {
239+
continue
240+
}
241+
memTxs[i] = tx
242+
243+
feeTx, ok := tx.(sdk.FeeTx)
244+
if !ok {
245+
continue
246+
}
247+
feePayer := sdk.AccAddress(feeTx.FeePayer())
248+
249+
// account key
250+
accKey, err := collections.EncodeKeyWithPrefix(
251+
authtypes.AddressStoreKeyPrefix,
252+
sdk.AccAddressKey,
253+
feePayer,
254+
)
255+
if err != nil {
256+
continue
257+
}
258+
259+
// balance key
260+
balanceKey, err := collections.EncodeKeyWithPrefix(
261+
banktypes.BalancesPrefix,
262+
collections.PairKeyCodec(sdk.AccAddressKey, collections.StringKey),
263+
collections.Join(feePayer, coinDenom),
264+
)
265+
if err != nil {
266+
continue
267+
}
268+
269+
estimates[i] = blockstm.MultiLocations{
270+
authStore: {accKey},
271+
bankStore: {balanceKey},
272+
}
273+
}
274+
}
275+
276+
blockSize := len(txs)
277+
chunk := (blockSize + workers - 1) / workers
278+
var wg sync.WaitGroup
279+
for i := 0; i < blockSize; i += chunk {
280+
start := i
281+
end := min(i+chunk, blockSize)
282+
wg.Add(1)
283+
go func() {
284+
defer wg.Done()
285+
job(start, end)
286+
}()
287+
}
288+
wg.Wait()
289+
290+
return memTxs, estimates
291+
}

baseapp/txexecutor.go

Lines changed: 0 additions & 55 deletions
This file was deleted.

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ require (
8585
github.com/cometbft/cometbft-db v0.14.1 // indirect
8686
github.com/cosmos/iavl v1.2.2 // indirect
8787
github.com/cosmos/ics23/go v0.11.0 // indirect
88+
github.com/crypto-org-chain/go-block-stm v0.0.0-20241213061541-7afe924fb4a6 // indirect
8889
github.com/danieljoos/wincred v1.1.2 // indirect
8990
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
9091
github.com/desertbit/timer v0.0.0-20180107155436-c41aec40b27f // indirect

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,8 @@ github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:ma
172172
github.com/cpuguy83/go-md2man/v2 v2.0.6/go.mod h1:oOW0eioCTA6cOiMLiUPZOpcVxMig6NIQQ7OS05n1F4g=
173173
github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY=
174174
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
175+
github.com/crypto-org-chain/go-block-stm v0.0.0-20241213061541-7afe924fb4a6 h1:6KPEi8dWkDSBddQb4NAvEXmNnTXymF3yVeTaT4Hz1iU=
176+
github.com/crypto-org-chain/go-block-stm v0.0.0-20241213061541-7afe924fb4a6/go.mod h1:iwQTX9xMX8NV9k3o2BiWXA0SswpsZrDk5q3gA7nWYiE=
175177
github.com/danieljoos/wincred v1.1.2 h1:QLdCxFs1/Yl4zduvBdcHB8goaYk9RARS2SgLLRuAyr0=
176178
github.com/danieljoos/wincred v1.1.2/go.mod h1:GijpziifJoIBfYh+S7BbkdUTU4LfM+QnGqR5Vl2tAx0=
177179
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=

0 commit comments

Comments
 (0)