Skip to content

Commit ba31d61

Browse files
state diff kv functions
1 parent 14282bb commit ba31d61

File tree

5 files changed

+1077
-1
lines changed

5 files changed

+1077
-1
lines changed

beacon-chain/db/kv/BUILD.bazel

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,9 @@ go_library(
2727
"p2p.go",
2828
"schema.go",
2929
"state.go",
30+
"state_diff.go",
3031
"state_diff_cache.go",
32+
"state_diff_helpers.go",
3133
"state_summary.go",
3234
"state_summary_cache.go",
3335
"utils.go",
@@ -47,6 +49,7 @@ go_library(
4749
"//config/fieldparams:go_default_library",
4850
"//config/params:go_default_library",
4951
"//consensus-types/blocks:go_default_library",
52+
"//consensus-types/hdiff:go_default_library",
5053
"//consensus-types/interfaces:go_default_library",
5154
"//consensus-types/light-client:go_default_library",
5255
"//consensus-types/primitives:go_default_library",
@@ -55,6 +58,7 @@ go_library(
5558
"//encoding/ssz/detect:go_default_library",
5659
"//genesis:go_default_library",
5760
"//io/file:go_default_library",
61+
"//math:go_default_library",
5862
"//monitoring/progress:go_default_library",
5963
"//monitoring/tracing:go_default_library",
6064
"//monitoring/tracing/trace:go_default_library",
@@ -100,6 +104,7 @@ go_test(
100104
"migration_block_slot_index_test.go",
101105
"migration_state_validators_test.go",
102106
"p2p_test.go",
107+
"state_diff_test.go",
103108
"state_summary_test.go",
104109
"state_test.go",
105110
"utils_test.go",
@@ -113,6 +118,7 @@ go_test(
113118
"//beacon-chain/db/iface:go_default_library",
114119
"//beacon-chain/state:go_default_library",
115120
"//beacon-chain/state/state-native:go_default_library",
121+
"//cmd/beacon-chain/flags:go_default_library",
116122
"//config/features:go_default_library",
117123
"//config/fieldparams:go_default_library",
118124
"//config/params:go_default_library",
@@ -122,6 +128,7 @@ go_test(
122128
"//consensus-types/primitives:go_default_library",
123129
"//encoding/bytesutil:go_default_library",
124130
"//genesis:go_default_library",
131+
"//math:go_default_library",
125132
"//proto/dbval:go_default_library",
126133
"//proto/engine/v1:go_default_library",
127134
"//proto/prysm/v1alpha1:go_default_library",

beacon-chain/db/kv/state_diff.go

Lines changed: 232 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,232 @@
1+
package kv
2+
3+
import (
4+
"context"
5+
6+
"github.com/OffchainLabs/prysm/v6/beacon-chain/state"
7+
"github.com/OffchainLabs/prysm/v6/cmd/beacon-chain/flags"
8+
"github.com/OffchainLabs/prysm/v6/consensus-types/hdiff"
9+
"github.com/OffchainLabs/prysm/v6/consensus-types/primitives"
10+
"github.com/OffchainLabs/prysm/v6/monitoring/tracing/trace"
11+
"github.com/pkg/errors"
12+
bolt "go.etcd.io/bbolt"
13+
)
14+
15+
const (
16+
stateSuffix = "_s"
17+
validatorSuffix = "_v"
18+
balancesSuffix = "_b"
19+
)
20+
21+
/*
22+
We use a level-based approach to save state diffs. The levels are 0-6, where each level corresponds to an exponent of 2 (exponents[lvl]).
23+
The data at level 0 is saved every 2**exponent[0] slots and always contains a full state snapshot that is used as a base for the delta saved at other levels.
24+
*/
25+
26+
// saveStateByDiff takes a state and decides between saving a full state snapshot or a diff.
27+
func (s *Store) saveStateByDiff(ctx context.Context, st state.ReadOnlyBeaconState) error {
28+
_, span := trace.StartSpan(ctx, "BeaconDB.saveStateByDiff")
29+
defer span.End()
30+
31+
if st == nil {
32+
return errors.New("state is nil")
33+
}
34+
35+
slot := st.Slot()
36+
offset := s.getOffset()
37+
if uint64(slot) < offset {
38+
return ErrSlotBeforeOffset
39+
}
40+
41+
// Find the level to save the state.
42+
lvl := computeLevel(offset, slot)
43+
if lvl == -1 {
44+
return nil
45+
}
46+
47+
// Save full state if level is 0.
48+
if lvl == 0 {
49+
return s.saveFullSnapshot(st)
50+
}
51+
52+
// Get anchor state to compute the diff from.
53+
anchorState, err := s.getAnchorState(offset, lvl, slot)
54+
if err != nil {
55+
return err
56+
}
57+
58+
err = s.saveHdiff(lvl, anchorState, st)
59+
if err != nil {
60+
return err
61+
}
62+
63+
return nil
64+
}
65+
66+
// stateByDiff retrieves the full state for a given slot.
67+
func (s *Store) stateByDiff(ctx context.Context, slot primitives.Slot) (state.BeaconState, error) {
68+
offset := s.getOffset()
69+
if uint64(slot) < offset {
70+
return nil, ErrSlotBeforeOffset
71+
}
72+
73+
snapshot, diffChain, err := s.getBaseAndDiffChain(offset, slot)
74+
if err != nil {
75+
return nil, err
76+
}
77+
78+
for _, diff := range diffChain {
79+
snapshot, err = hdiff.ApplyDiff(ctx, snapshot, diff)
80+
if err != nil {
81+
return nil, err
82+
}
83+
}
84+
85+
return snapshot, nil
86+
}
87+
88+
// SaveHdiff computes the diff between the anchor state and the current state and saves it to the database.
89+
func (s *Store) saveHdiff(lvl int, anchor, st state.ReadOnlyBeaconState) error {
90+
slot := uint64(st.Slot())
91+
key := makeKey(lvl, slot)
92+
93+
diff, err := hdiff.Diff(anchor, st)
94+
if err != nil {
95+
return err
96+
}
97+
98+
err = s.db.Update(func(tx *bolt.Tx) error {
99+
bucket := tx.Bucket(stateDiffBucket)
100+
if bucket == nil {
101+
return bolt.ErrBucketNotFound
102+
}
103+
buf := append(key, stateSuffix...)
104+
if err := bucket.Put(buf, diff.StateDiff); err != nil {
105+
return err
106+
}
107+
buf = append(key, validatorSuffix...)
108+
if err := bucket.Put(buf, diff.ValidatorDiffs); err != nil {
109+
return err
110+
}
111+
buf = append(key, balancesSuffix...)
112+
if err := bucket.Put(buf, diff.BalancesDiff); err != nil {
113+
return err
114+
}
115+
return nil
116+
})
117+
if err != nil {
118+
return err
119+
}
120+
121+
// Save the full state to the cache (if not the last level).
122+
if lvl != len(flags.Get().StateDiffExponents)-1 {
123+
err = s.stateDiffCache.setAnchor(lvl, st)
124+
if err != nil {
125+
return err
126+
}
127+
}
128+
129+
return nil
130+
}
131+
132+
// SaveFullSnapshot saves the full level 0 state snapshot to the database.
133+
func (s *Store) saveFullSnapshot(st state.ReadOnlyBeaconState) error {
134+
slot := uint64(st.Slot())
135+
key := makeKey(0, slot)
136+
stateBytes, err := st.MarshalSSZ()
137+
if err != nil {
138+
return err
139+
}
140+
// add version key to value
141+
enc, err := addKey(st.Version(), stateBytes)
142+
if err != nil {
143+
return err
144+
}
145+
146+
err = s.db.Update(func(tx *bolt.Tx) error {
147+
bucket := tx.Bucket(stateDiffBucket)
148+
if bucket == nil {
149+
return bolt.ErrBucketNotFound
150+
}
151+
152+
if err := bucket.Put(key, enc); err != nil {
153+
return err
154+
}
155+
156+
return nil
157+
})
158+
if err != nil {
159+
return err
160+
}
161+
// Save the full state to the cache, and invalidate other levels.
162+
s.stateDiffCache.clearAnchors()
163+
err = s.stateDiffCache.setAnchor(0, st)
164+
if err != nil {
165+
return err
166+
}
167+
168+
return nil
169+
}
170+
171+
func (s *Store) getDiff(lvl int, slot uint64) (hdiff.HdiffBytes, error) {
172+
key := makeKey(lvl, slot)
173+
var stateDiff []byte
174+
var validatorDiff []byte
175+
var balancesDiff []byte
176+
177+
err := s.db.View(func(tx *bolt.Tx) error {
178+
bucket := tx.Bucket(stateDiffBucket)
179+
if bucket == nil {
180+
return bolt.ErrBucketNotFound
181+
}
182+
buf := append(key, stateSuffix...)
183+
stateDiff = bucket.Get(buf)
184+
if stateDiff == nil {
185+
return errors.New("state diff not found")
186+
}
187+
buf = append(key, validatorSuffix...)
188+
validatorDiff = bucket.Get(buf)
189+
if validatorDiff == nil {
190+
return errors.New("validator diff not found")
191+
}
192+
buf = append(key, balancesSuffix...)
193+
balancesDiff = bucket.Get(buf)
194+
if balancesDiff == nil {
195+
return errors.New("balances diff not found")
196+
}
197+
return nil
198+
})
199+
200+
if err != nil {
201+
return hdiff.HdiffBytes{}, err
202+
}
203+
204+
return hdiff.HdiffBytes{
205+
StateDiff: stateDiff,
206+
ValidatorDiffs: validatorDiff,
207+
BalancesDiff: balancesDiff,
208+
}, nil
209+
}
210+
211+
func (s *Store) getFullSnapshot(slot uint64) (state.BeaconState, error) {
212+
key := makeKey(0, slot)
213+
var enc []byte
214+
215+
err := s.db.View(func(tx *bolt.Tx) error {
216+
bucket := tx.Bucket(stateDiffBucket)
217+
if bucket == nil {
218+
return bolt.ErrBucketNotFound
219+
}
220+
enc = bucket.Get(key)
221+
if enc == nil {
222+
return errors.New("state not found")
223+
}
224+
return nil
225+
})
226+
227+
if err != nil {
228+
return nil, err
229+
}
230+
231+
return s.decodeStateSnapshot(enc)
232+
}

0 commit comments

Comments
 (0)