Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

optimize historical range #3658

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 15 additions & 20 deletions x/merkledb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (

"github.com/prometheus/client_golang/prometheus"
"go.opentelemetry.io/otel/attribute"
"golang.org/x/exp/maps"

"github.com/ava-labs/avalanchego/database"
"github.com/ava-labs/avalanchego/ids"
Expand Down Expand Up @@ -331,8 +330,9 @@ func newDatabase(
rootChange: change[maybe.Maybe[*node]]{
after: trieDB.root,
},
values: map[Key]*change[maybe.Maybe[[]byte]]{},
nodes: map[Key]*change[*node]{},
values: map[Key]*keyChange{},
nodes: map[Key]*change[*node]{},
sortedKeyChanges: []*keyChange{},
})

// mark that the db has not yet been cleanly closed
Expand Down Expand Up @@ -731,29 +731,23 @@ func (db *merkleDB) GetChangeProof(
return nil, database.ErrClosed
}

// [changes] contains a subset of the keys that were added or had their
// values modified between [startRootID] to [endRootID].
changes, err := db.history.getValueChanges(startRootID, endRootID, start, end, maxLength)
if err != nil {
return nil, err
}

// [changedKeys] are a subset of the keys that were added or had their
// values modified between [startRootID] to [endRootID] sorted in increasing
// order.
changedKeys := maps.Keys(changes.values)
utils.Sort(changedKeys)

result := &ChangeProof{
KeyChanges: make([]KeyChange, 0, len(changedKeys)),
KeyChanges: make([]KeyChange, len(changes)),
}

for _, key := range changedKeys {
change := changes.values[key]

result.KeyChanges = append(result.KeyChanges, KeyChange{
Key: key.Bytes(),
for i, keyChange := range changes {
result.KeyChanges[i] = KeyChange{
Key: keyChange.key.Bytes(),
// create a copy so edits of the []byte don't affect the db
Value: maybe.Bind(change.after, slices.Clone[[]byte]),
})
Value: maybe.Bind(keyChange.after, slices.Clone[[]byte]),
}
}

largestKey := end
Expand Down Expand Up @@ -1367,9 +1361,10 @@ func (db *merkleDB) Clear() error {
// Clear history
db.history = newTrieHistory(db.history.maxHistoryLen)
db.history.record(&changeSummary{
rootID: db.rootID,
values: map[Key]*change[maybe.Maybe[[]byte]]{},
nodes: map[Key]*change[*node]{},
rootID: db.rootID,
values: map[Key]*keyChange{},
nodes: map[Key]*change[*node]{},
sortedKeyChanges: make([]*keyChange, 0),
})
return nil
}
Expand Down
225 changes: 161 additions & 64 deletions x/merkledb/history.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@ import (
"bytes"
"errors"
"fmt"
"slices"

"golang.org/x/exp/maps"

"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/utils"
"github.com/ava-labs/avalanchego/utils/buffer"
"github.com/ava-labs/avalanchego/utils/heap"
"github.com/ava-labs/avalanchego/utils/maybe"
"github.com/ava-labs/avalanchego/utils/set"
)

var (
Expand Down Expand Up @@ -52,22 +54,29 @@ type changeSummaryAndInsertNumber struct {
insertNumber uint64
}

type keyChange struct {
*change[maybe.Maybe[[]byte]]
key Key
}

// Tracks all the node and value changes that resulted in the rootID.
type changeSummary struct {
// The ID of the trie after these changes.
rootID ids.ID
// The root before/after this change.
// Set in [applyValueChanges].
rootChange change[maybe.Maybe[*node]]
nodes map[Key]*change[*node]
values map[Key]*change[maybe.Maybe[[]byte]]
rootChange change[maybe.Maybe[*node]]
nodes map[Key]*change[*node]
values map[Key]*keyChange
sortedKeyChanges []*keyChange
}

func newChangeSummary(estimatedSize int) *changeSummary {
return &changeSummary{
nodes: make(map[Key]*change[*node], estimatedSize),
values: make(map[Key]*change[maybe.Maybe[[]byte]], estimatedSize),
rootChange: change[maybe.Maybe[*node]]{},
nodes: make(map[Key]*change[*node], estimatedSize),
values: make(map[Key]*keyChange, estimatedSize),
sortedKeyChanges: make([]*keyChange, 0, estimatedSize),
rootChange: change[maybe.Maybe[*node]]{},
}
}

Expand All @@ -79,7 +88,7 @@ func newTrieHistory(maxHistoryLookback int) *trieHistory {
}
}

// Returns up to [maxLength] key-value pair changes with keys in
// Returns up to [maxLength] sorted changes with keys in
// [start, end] that occurred between [startRoot] and [endRoot].
// If [start] is Nothing, there's no lower bound on the range.
// If [end] is Nothing, there's no upper bound on the range.
Expand All @@ -93,13 +102,13 @@ func (th *trieHistory) getValueChanges(
start maybe.Maybe[[]byte],
end maybe.Maybe[[]byte],
maxLength int,
) (*changeSummary, error) {
) ([]*keyChange, error) {
if maxLength <= 0 {
return nil, fmt.Errorf("%w but was %d", ErrInvalidMaxLength, maxLength)
}

if startRoot == endRoot {
return newChangeSummary(maxLength), nil
return []*keyChange{}, nil
}

// [endRootChanges] is the last change in the history resulting in [endRoot].
Expand Down Expand Up @@ -156,20 +165,30 @@ func (th *trieHistory) getValueChanges(
}
}

var (
// Keep track of changed keys so the largest can be removed
// in order to stay within the [maxLength] limit if necessary.
changedKeys = set.Set[Key]{}
// historyChangesIndex is used for tracking keyChanges index from each historical root.
type historyChangesIndex struct {
changes *changeSummaryAndInsertNumber
kvChangeIndex int
}

// historyChangesIndexHeap is used to traverse the changes sorted by ASC [key] and ASC [insertNumber].
historyChangesIndexHeap := heap.NewQueue[*historyChangesIndex](func(a, b *historyChangesIndex) bool {
keyComparison := a.changes.sortedKeyChanges[a.kvChangeIndex].key.Compare(b.changes.sortedKeyChanges[b.kvChangeIndex].key)
if keyComparison != 0 {
return keyComparison < 0
}

return a.changes.insertNumber < b.changes.insertNumber
})

var (
startKey = maybe.Bind(start, ToKey)
endKey = maybe.Bind(end, ToKey)

// For each element in the history in the range between [startRoot]'s
// last appearance (exclusive) and [endRoot]'s last appearance (inclusive),
// add the changes to keys in [start, end] to [combinedChanges].
// Only the key-value pairs with the greatest [maxLength] keys will be kept.
combinedChanges = newChangeSummary(maxLength)

// The difference between the index of [startRootChanges] and [endRootChanges] in [th.history].
startToEndOffset = int(endRootChanges.insertNumber - startRootChanges.insertNumber)

Expand All @@ -178,62 +197,107 @@ func (th *trieHistory) getValueChanges(
startRootIndex = endRootIndex - startToEndOffset
)

// For each change after [startRootChanges] up to and including
// [endRootChanges], record the change in [combinedChanges].
// Push in the heap first key in [startKey, endKey] for each historical root.
for i := startRootIndex + 1; i <= endRootIndex; i++ {
changes, _ := th.history.Index(i)
historyChanges, ok := th.history.Index(i)
if !ok {
return nil, fmt.Errorf("missing history changes at index %d", i)
}

startKeyIndex := 0
if startKey.HasValue() {
// Binary search for [startKey] index.
startKeyIndex, _ = slices.BinarySearchFunc(historyChanges.sortedKeyChanges, startKey, func(k *keyChange, m maybe.Maybe[Key]) int {
return k.key.Compare(m.Value())
})

// Add the changes from this commit to [combinedChanges].
for key, valueChange := range changes.values {
// The key is outside the range [start, end].
if (startKey.HasValue() && key.Less(startKey.Value())) ||
(end.HasValue() && key.Greater(endKey.Value())) {
if startKeyIndex >= len(historyChanges.sortedKeyChanges) {
// [startKey] is after last key of [sortedKeyChanges].
continue
}
}

// A change to this key already exists in [combinedChanges]
// so update its before value with the earlier before value
if existing, ok := combinedChanges.values[key]; ok {
existing.after = valueChange.after
if existing.before.HasValue() == existing.after.HasValue() &&
bytes.Equal(existing.before.Value(), existing.after.Value()) {
// The change to this key is a no-op, so remove it from [combinedChanges].
delete(combinedChanges.values, key)
changedKeys.Remove(key)
}
} else {
combinedChanges.values[key] = &change[maybe.Maybe[[]byte]]{
before: valueChange.before,
after: valueChange.after,
keyChange := historyChanges.sortedKeyChanges[startKeyIndex]
if end.HasValue() && keyChange.key.Greater(endKey.Value()) {
// [keyChange] is after [endKey].
continue
}

// [startKeyIndex] is the index of the first key in [startKey, endKey] from [sortedKeyChanges].
historyChangesIndexHeap.Push(&historyChangesIndex{
changes: historyChanges,
kvChangeIndex: startKeyIndex,
})
}

var (
combinedKeyChanges = make([]*keyChange, 0, maxLength)

// Used for combining the changes of all the historical changes, for the current smallest key.
currentKeyChange *keyChange
)

for historyChangesIndexHeap.Len() > 0 {
historyRootChanges, _ := historyChangesIndexHeap.Pop()
kvChange := historyRootChanges.changes.sortedKeyChanges[historyRootChanges.kvChangeIndex]

if end.HasValue() && kvChange.key.Greater(endKey.Value()) {
// Skip processing the current [historyRootChanges] if we are after [endKey].
continue
}

if len(historyRootChanges.changes.sortedKeyChanges) > 1+historyRootChanges.kvChangeIndex {
// If there are remaining changes in the current [historyRootChanges], push to minheap.
historyRootChanges.kvChangeIndex++
historyChangesIndexHeap.Push(historyRootChanges)
}

if currentKeyChange != nil {
if currentKeyChange.key.value == kvChange.key.value {
// Same key, update [after] value.
currentKeyChange.after = kvChange.after

continue
}

// New key

// Add the last [currentKeyChange] to [combinedKeyChanges] if there is an actual change.
if !maybe.Equal(currentKeyChange.before, currentKeyChange.after, bytes.Equal) {
combinedKeyChanges = append(combinedKeyChanges, currentKeyChange)

if len(combinedKeyChanges) >= maxLength {
// If we have [maxLength] changes, we can return the current [combinedKeyChanges].
return combinedKeyChanges, nil
}
changedKeys.Add(key)
}
}
}

// If we have <= [maxLength] elements, we're done.
if changedKeys.Len() <= maxLength {
return combinedChanges, nil
currentKeyChange = &keyChange{
change: &change[maybe.Maybe[[]byte]]{
before: kvChange.before,
after: kvChange.after,
},
key: kvChange.key,
}
}

// Keep only the smallest [maxLength] items in [combinedChanges.values].
sortedChangedKeys := changedKeys.List()
utils.Sort(sortedChangedKeys)
for len(sortedChangedKeys) > maxLength {
greatestKey := sortedChangedKeys[len(sortedChangedKeys)-1]
sortedChangedKeys = sortedChangedKeys[:len(sortedChangedKeys)-1]
delete(combinedChanges.values, greatestKey)
if currentKeyChange != nil {
// Add the last [currentKeyChange] to [combinedKeyChanges] if there is an actual change.
if !maybe.Equal(currentKeyChange.before, currentKeyChange.after, bytes.Equal) {
combinedKeyChanges = append(combinedKeyChanges, currentKeyChange)
}
}

return combinedChanges, nil
return combinedKeyChanges, nil
}

// Returns the changes to go from the current trie state back to the requested [rootID]
// for the keys in [start, end].
// If [start] is Nothing, all keys are considered > [start].
// If [end] is Nothing, all keys are considered < [end].
func (th *trieHistory) getChangesToGetToRoot(rootID ids.ID, start maybe.Maybe[[]byte], end maybe.Maybe[[]byte]) (*changeSummary, error) {
// [lastRootChange] is the last change in the history resulting in [rootID].
// [lastRootChange] is the last change in the historyChanges resulting in [rootID].
lastRootChange, ok := th.lastChanges[rootID]
if !ok {
return nil, ErrInsufficientHistory
Expand Down Expand Up @@ -268,21 +332,54 @@ func (th *trieHistory) getChangesToGetToRoot(rootID ids.ID, start maybe.Maybe[[]
}
}

for key, valueChange := range changes.values {
if (startKey.IsNothing() || !key.Less(startKey.Value())) &&
(endKey.IsNothing() || !key.Greater(endKey.Value())) {
if existing, ok := combinedChanges.values[key]; ok {
existing.after = valueChange.before
} else {
combinedChanges.values[key] = &change[maybe.Maybe[[]byte]]{
before: valueChange.after,
after: valueChange.before,
}
}
startKeyIndex := 0
if startKey.HasValue() {
// Binary search for [startKey] index.
startKeyIndex, _ = slices.BinarySearchFunc(changes.sortedKeyChanges, startKey, func(k *keyChange, m maybe.Maybe[Key]) int {
return k.key.Compare(m.Value())
})
}

for _, kc := range changes.sortedKeyChanges[startKeyIndex:] {
if end.HasValue() && kc.key.Greater(endKey.Value()) {
break
}

if existing, ok := combinedChanges.values[kc.key]; ok {
// Update existing [after] with current [before]
existing.after = kc.before

continue
}

keyChange := keyChange{
change: &change[maybe.Maybe[[]byte]]{
before: kc.after,
after: kc.before,
},
key: kc.key,
}

combinedChanges.values[kc.key] = &keyChange
}
}

sortedKeys := maps.Keys(combinedChanges.values)
slices.SortFunc(sortedKeys, func(a, b Key) int {
return a.Compare(b)
})

for _, key := range sortedKeys {
if maybe.Equal(combinedChanges.values[key].before, combinedChanges.values[key].after, bytes.Equal) {
// Remove changed key, if there is a no-op.
delete(combinedChanges.values, key)

continue
}

combinedChanges.sortedKeyChanges = append(combinedChanges.sortedKeyChanges, combinedChanges.values[key])
}

return combinedChanges, nil
}

Expand Down
Loading
Loading