Skip to content

Commit

Permalink
Fix interim finalized update (#1194)
Browse files Browse the repository at this point in the history
* fix interim finalized update

* cleanup

* fix

* fixes
  • Loading branch information
claravanstaden authored May 11, 2024
1 parent 378f89d commit b02684b
Show file tree
Hide file tree
Showing 12 changed files with 83 additions and 149 deletions.
2 changes: 1 addition & 1 deletion relayer/cmd/import_beacon_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func importBeaconState(cmd *cobra.Command, _ []string) error {

p := protocol.New(conf.Source.Beacon.Spec)
store := store.New(conf.Source.Beacon.DataStore.Location, conf.Source.Beacon.DataStore.MaxEntries, *p)
beaconClient := api.NewBeaconClient(conf.Source.Beacon.Endpoint)
beaconClient := api.NewBeaconClient(conf.Source.Beacon.Endpoint, conf.Source.Beacon.StateEndpoint)
syncer := syncer.New(beaconClient, &store, p)

err = store.Connect()
Expand Down
28 changes: 19 additions & 9 deletions relayer/relays/beacon/header/header.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,12 +126,18 @@ func (h *Header) SyncCommitteePeriodUpdate(ctx context.Context, period uint64) e

// If the gap between the last two finalized headers is more than the sync committee period, sync an interim
// finalized header
maxLatency := h.cache.Finalized.LastSyncedSlot + (h.protocol.Settings.SlotsInEpoch * h.protocol.Settings.EpochsPerSyncCommitteePeriod)
if maxLatency < uint64(update.Payload.FinalizedHeader.Slot) {
err = h.syncInterimFinalizedUpdate(ctx, h.cache.Finalized.LastSyncedSlot)
if err != nil {
return fmt.Errorf("sync interim finalized header update: %w", err)
if uint64(update.Payload.FinalizedHeader.Slot) > h.cache.Finalized.LastSyncedSlot {
diff := uint64(update.Payload.FinalizedHeader.Slot) - h.cache.Finalized.LastSyncedSlot
log.WithFields(log.Fields{"diff": diff, "last_finalized_slot": h.cache.Finalized.LastSyncedSlot, "new_finalized_slot": uint64(update.Payload.FinalizedHeader.Slot)}).Info("checking max latency")
if diff > h.protocol.Settings.SlotsInEpoch*h.protocol.Settings.EpochsPerSyncCommitteePeriod {
log.Info("syncing an interim update")
err = h.syncInterimFinalizedUpdate(ctx, h.cache.Finalized.LastSyncedSlot, uint64(update.Payload.FinalizedHeader.Slot))
if err != nil {
return fmt.Errorf("sync interim finalized header update: %w", err)
}
}
} else {
log.Info("interim update not required")
}

log.WithFields(log.Fields{
Expand Down Expand Up @@ -237,14 +243,18 @@ func (h *Header) SyncHeaders(ctx context.Context) error {
return nil
}

func (h *Header) syncInterimFinalizedUpdate(ctx context.Context, lastSyncedSlot uint64) error {
checkpointSlot := h.protocol.CalculateNextCheckpointSlot(lastSyncedSlot)
func (h *Header) syncInterimFinalizedUpdate(ctx context.Context, lastSyncedSlot, newCheckpointSlot uint64) error {
// Calculate the range that the interim finalized header update may be in
minSlot := newCheckpointSlot - h.protocol.SlotsPerHistoricalRoot
maxSlot := lastSyncedSlot + h.protocol.SlotsPerHistoricalRoot

finalizedUpdate, err := h.syncer.GetLatestPossibleFinalizedUpdate(checkpointSlot, lastSyncedSlot)
finalizedUpdate, err := h.syncer.GetFinalizedUpdateAtAttestedSlot(minSlot, maxSlot, false)
if err != nil {
return fmt.Errorf("get interim checkpoint to update chain (checkpoint slot %d, original slot: %d): %w", checkpointSlot, lastSyncedSlot, err)
return fmt.Errorf("get interim checkpoint to update chain (last synced slot %d, new slot: %d): %w", lastSyncedSlot, newCheckpointSlot, err)
}

log.WithField("slot", finalizedUpdate.Payload.FinalizedHeader.Slot).Info("syncing an interim update to on-chain")

err = h.updateFinalizedHeaderOnchain(ctx, finalizedUpdate)
if err != nil {
return fmt.Errorf("update interim finalized header on-chain: %w", err)
Expand Down
10 changes: 5 additions & 5 deletions relayer/relays/beacon/header/header_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func TestSyncInterimFinalizedUpdate_WithDataFromAPI(t *testing.T) {
)

// Find a checkpoint for a slot that is just out of the on-chain synced finalized header block roots range
err = h.syncInterimFinalizedUpdate(context.Background(), 4570722)
err = h.syncInterimFinalizedUpdate(context.Background(), 4563072, 4571360)
require.NoError(t, err)
}

Expand Down Expand Up @@ -131,7 +131,7 @@ func TestSyncInterimFinalizedUpdate_WithDataFromStore(t *testing.T) {
)

// Find a checkpoint for a slot that is just out of the on-chain synced finalized header block roots range
err = h.syncInterimFinalizedUpdate(context.Background(), 4570722)
err = h.syncInterimFinalizedUpdate(context.Background(), 4563072, 4571360)
require.NoError(t, err)
}

Expand Down Expand Up @@ -196,7 +196,7 @@ func TestSyncInterimFinalizedUpdate_WithDataFromStoreWithDifferentBlocks(t *test
)

// Find a checkpoint for a slot that is just out of the on-chain synced finalized header block roots range
err = h.syncInterimFinalizedUpdate(context.Background(), 4570722)
err = h.syncInterimFinalizedUpdate(context.Background(), 4563072, 4571360)
require.NoError(t, err)
}

Expand Down Expand Up @@ -241,7 +241,7 @@ func TestSyncInterimFinalizedUpdate_BeaconStateNotAvailableInAPIAndStore(t *test
)

// Find a checkpoint for a slot that is just out of the on-chain synced finalized header block roots range
err = h.syncInterimFinalizedUpdate(context.Background(), 4570722)
err = h.syncInterimFinalizedUpdate(context.Background(), 4570722, 4578922)
require.Error(t, err)
}

Expand Down Expand Up @@ -279,6 +279,6 @@ func TestSyncInterimFinalizedUpdate_NoValidBlocksFound(t *testing.T) {
)

// Find a checkpoint for a slot that is just out of the on-chain synced finalized header block roots range
err = h.syncInterimFinalizedUpdate(context.Background(), 4570722)
err = h.syncInterimFinalizedUpdate(context.Background(), 4570722, 4578922)
require.Errorf(t, err, "cannot find blocks at boundaries")
}
125 changes: 24 additions & 101 deletions relayer/relays/beacon/header/syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func (s *Syncer) GetSyncCommitteePeriodUpdate(period uint64, lastFinalizedSlot u
update, err := s.GetSyncCommitteePeriodUpdateFromEndpoint(period)
if err != nil {
log.WithFields(log.Fields{"period": period, "err": err}).Warn("fetch sync committee update period light client failed, trying building update manually")
update, err = s.GetFinalizedUpdateWithSyncCommittee(period, lastFinalizedSlot)
update, err = s.GetFinalizedUpdateWithSyncCommittee(period)
if err != nil {
return update, fmt.Errorf("build sync committee update: %w", err)
}
Expand Down Expand Up @@ -483,7 +483,6 @@ func (s *Syncer) GetHeaderUpdate(blockRoot common.Hash, checkpoint *cache.Proof)

func (s *Syncer) getBeaconStateAtSlot(slot uint64) (state.BeaconState, error) {
var beaconState state.BeaconState
log.WithField("slot", slot).Info("downloading state at slot")
beaconData, err := s.getBeaconState(slot)
if err != nil {
return beaconState, fmt.Errorf("fetch beacon state: %w", err)
Expand All @@ -510,45 +509,20 @@ func (s *Syncer) UnmarshalBeaconState(slot uint64, data []byte) (state.BeaconSta
return beaconState, nil
}

// Sanity check the finalized and attested header are at 32 boundary blocks, so we can download the beacon state
func (s *Syncer) FindLatestAttestedHeadersAtInterval(initialSlot, lowestSlot uint64) (uint64, error) {
slot := initialSlot

for {
finalizedSlot, attestedSlot, err := s.findValidUpdatePair(slot)
if err != nil {
if lowestSlot > slot {
return 0, fmt.Errorf("unable to find valid slot")
}

slot -= s.protocol.Settings.SlotsInEpoch

continue
}

log.WithFields(log.Fields{"attested": attestedSlot, "finalized": finalizedSlot}).Info("found boundary headers")
return attestedSlot, nil
}
}

// FindOldestAttestedHeaderAtInterval finds a set of headers (finalized and attested headers) that are at 32 boundary
// blocks (with a sync committee super majority signature), so we can download the beacon state.
func (s *Syncer) FindOldestAttestedHeaderAtInterval(initialSlot, highestSlot uint64) (uint64, error) {
// special case where the finalized beacon state is not set at genesis
if initialSlot == 0 {
initialSlot = 2 * s.protocol.Settings.SlotsInEpoch
}
slot := initialSlot

head, err := s.Client.GetHeaderAtHead()
if err != nil {
return 0, fmt.Errorf("get chain head: %w", err)
// FindValidAttestedHeader Find a valid beacon header attested and finalized header pair.
func (s *Syncer) FindValidAttestedHeader(minSlot, maxSlot uint64) (uint64, error) {
var slot uint64
// make sure the starting slot is in a multiple of 32
if minSlot%32 == 0 {
slot = minSlot
} else {
slot = ((minSlot / s.protocol.Settings.SlotsInEpoch) + 1) * s.protocol.Settings.SlotsInEpoch
}

for {
finalizedSlot, attestedSlot, err := s.findValidUpdatePair(slot)
if err != nil {
if highestSlot < slot || head.Slot < slot {
if slot > maxSlot {
return 0, fmt.Errorf("unable to find valid slot")
}

Expand Down Expand Up @@ -594,36 +568,32 @@ func (s *Syncer) findValidUpdatePair(slot uint64) (uint64, uint64, error) {
return finalizedHeader.Slot, attestedHeader.Slot, nil
}

func (s *Syncer) GetLatestPossibleFinalizedUpdate(attestedSlot uint64, boundary uint64) (scale.Update, error) {
attestedSlot, err := s.FindLatestAttestedHeadersAtInterval(attestedSlot, boundary)
func (s *Syncer) GetFinalizedUpdateWithSyncCommittee(syncCommitteePeriod uint64) (scale.Update, error) {
minSlot := syncCommitteePeriod * s.protocol.SlotsPerHistoricalRoot
maxSlot := ((syncCommitteePeriod + 1) * s.protocol.SlotsPerHistoricalRoot) - s.protocol.Settings.SlotsInEpoch // just before the new sync committee boundary

attestedSlot, err := s.FindValidAttestedHeader(minSlot, maxSlot)
if err != nil {
return scale.Update{}, fmt.Errorf("cannot find blocks at boundaries: %w", err)
}

return s.GetFinalizedUpdateAtAttestedSlot(attestedSlot, boundary, false)
return s.GetFinalizedUpdateAtAttestedSlot(attestedSlot, maxSlot, true)
}

func (s *Syncer) GetFinalizedUpdateWithSyncCommittee(syncCommitteePeriod, lastFinalizedSlot uint64) (scale.Update, error) {
slot := (syncCommitteePeriod) * s.protocol.Settings.SlotsInEpoch * s.protocol.Settings.EpochsPerSyncCommitteePeriod

boundary := (syncCommitteePeriod + 1) * s.protocol.Settings.SlotsInEpoch * s.protocol.Settings.EpochsPerSyncCommitteePeriod
func (s *Syncer) GetFinalizedUpdateAtAttestedSlot(minSlot, maxSlot uint64, fetchNextSyncCommittee bool) (scale.Update, error) {
var update scale.Update

attestedSlot, err := s.FindOldestAttestedHeaderAtInterval(slot, boundary)
attestedSlot, err := s.FindValidAttestedHeader(minSlot, maxSlot)
if err != nil {
return scale.Update{}, fmt.Errorf("cannot find blocks at boundaries: %w", err)
}

return s.GetFinalizedUpdateAtAttestedSlot(attestedSlot, boundary, true)
}

func (s *Syncer) GetFinalizedUpdateAtAttestedSlot(attestedSlot uint64, boundary uint64, fetchNextSyncCommittee bool) (scale.Update, error) {
var update scale.Update

// Try getting beacon data from the API first
data, err := s.getBeaconDataFromClient(attestedSlot)
if err != nil {
log.WithFields(log.Fields{"minSlot": minSlot, "maxSlot": maxSlot}).Info("attempting to find in beacon store")
// If it fails, using the beacon store and look for a relevant finalized update
data, err = s.getBeaconDataFromStore(attestedSlot, boundary, fetchNextSyncCommittee)
data, err = s.getBestMatchBeaconDataFromStore(minSlot, maxSlot)
if err != nil {
return update, fmt.Errorf("fetch beacon data from api and data store failure: %w", err)
}
Expand Down Expand Up @@ -732,7 +702,7 @@ func (s *Syncer) GetFinalizedUpdateAtAttestedSlot(attestedSlot uint64, boundary
}

func (s *Syncer) getBlockHeaderAncestryProof(slot int, blockRoot common.Hash, blockRootTree *ssz.Node) ([]types.H256, error) {
maxSlotsPerHistoricalRoot := int(s.protocol.Settings.SlotsInEpoch * s.protocol.Settings.EpochsPerSyncCommitteePeriod)
maxSlotsPerHistoricalRoot := int(s.protocol.SlotsPerHistoricalRoot)
indexInArray := slot % maxSlotsPerHistoricalRoot
leafIndex := maxSlotsPerHistoricalRoot + indexInArray

Expand Down Expand Up @@ -793,25 +763,11 @@ func (s *Syncer) getBeaconDataFromClient(attestedSlot uint64) (finalizedUpdateCo
return response, nil
}

// Get the best, latest finalized and attested beacon states including the slot provided in the finalized state block
// roots, from the Beacon store.
func (s *Syncer) getBeaconDataFromStore(slot, boundary uint64, findMin bool) (finalizedUpdateContainer, error) {
response, err := s.getExactMatchFromStore(slot)
if err != nil {
response, err = s.getBestMatchBeaconDataFromStore(slot, boundary, findMin)
if err != nil {
return finalizedUpdateContainer{}, fmt.Errorf("unable to find exact slot or best other slot beacon data")
}
}

return response, nil
}

func (s *Syncer) getBestMatchBeaconDataFromStore(slot, boundary uint64, findMin bool) (finalizedUpdateContainer, error) {
func (s *Syncer) getBestMatchBeaconDataFromStore(minSlot, maxSlot uint64) (finalizedUpdateContainer, error) {
var response finalizedUpdateContainer
var err error

data, err := s.store.FindBeaconStateWithinSyncPeriod(slot, boundary, findMin)
data, err := s.store.FindBeaconStateWithinRange(minSlot, maxSlot)
if err != nil {
return finalizedUpdateContainer{}, err
}
Expand All @@ -836,44 +792,11 @@ func (s *Syncer) getBestMatchBeaconDataFromStore(slot, boundary uint64, findMin
return response, nil
}

func (s *Syncer) getExactMatchFromStore(slot uint64) (finalizedUpdateContainer, error) {
var response finalizedUpdateContainer
attestedStateData, err := s.store.GetBeaconStateData(slot)
if err != nil {
return finalizedUpdateContainer{}, err
}

response.AttestedSlot = slot
response.AttestedState, err = s.UnmarshalBeaconState(slot, attestedStateData)
if err != nil {
return finalizedUpdateContainer{}, err
}

response.FinalizedCheckPoint = *response.AttestedState.GetFinalizedCheckpoint()

response.FinalizedHeader, err = s.Client.GetHeaderByBlockRoot(common.BytesToHash(response.FinalizedCheckPoint.Root))
if err != nil {
return response, fmt.Errorf("fetch header: %w", err)
}

finalizedStateData, err := s.store.GetBeaconStateData(response.FinalizedHeader.Slot)
if err != nil {
return finalizedUpdateContainer{}, err
}

response.FinalizedState, err = s.UnmarshalBeaconState(response.FinalizedHeader.Slot, finalizedStateData)
if err != nil {
return finalizedUpdateContainer{}, err
}

return response, nil
}
func (s *Syncer) getBeaconState(slot uint64) ([]byte, error) {
data, err := s.Client.GetBeaconState(strconv.FormatUint(slot, 10))
if err != nil {
log.WithFields(log.Fields{"slot": slot, "err": err}).Warn("unable to download ssz state from api, trying store")
data, err = s.store.GetBeaconStateData(slot)
log.WithFields(log.Fields{"slot": slot, "err": err}).Warn("error after store is")
if err != nil {
return nil, fmt.Errorf("fetch beacon state from store: %w", err)
}
Expand Down
12 changes: 6 additions & 6 deletions relayer/relays/beacon/header/syncer/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,11 @@ func TestGetFinalizedUpdateAtSlot(t *testing.T) {
require.NoError(t, err)
lodestarUpdateJSON := lodestarUpdate.Payload.ToJSON()

attestedSlot, err := syncer.FindLatestAttestedHeadersAtInterval(uint64(lodestarUpdate.Payload.AttestedHeader.Slot), 9331)
attestedSlot, err := syncer.FindValidAttestedHeader(uint64(lodestarUpdate.Payload.AttestedHeader.Slot), 9331)
require.NoError(t, err)

// Manually construct the finalized update for the same block
manualUpdate, err := syncer.GetLatestPossibleFinalizedUpdate(attestedSlot, 9331)
manualUpdate, err := syncer.GetFinalizedUpdateAtAttestedSlot(attestedSlot, 9331, false)
require.NoError(t, err)
manualUpdateJSON := manualUpdate.Payload.ToJSON()

Expand Down Expand Up @@ -167,7 +167,7 @@ func TestFindAttestedAndFinalizedHeadersAtBoundary(t *testing.T) {
DenebForkEpoch: 0,
}))

attested, err := syncer.FindLatestAttestedHeadersAtInterval(8192, 100)
attested, err := syncer.FindValidAttestedHeader(8000, 8160)
assert.NoError(t, err)
assert.Equal(t, "8064", strconv.FormatUint(attested, 10))

Expand Down Expand Up @@ -197,7 +197,7 @@ func TestFindAttestedAndFinalizedHeadersAtBoundary(t *testing.T) {
DenebForkEpoch: 0,
}))

attested, err = syncer.FindLatestAttestedHeadersAtInterval(32768, 25076)
attested, err = syncer.FindValidAttestedHeader(32576, 32704)
assert.NoError(t, err)
assert.Equal(t, "32704", strconv.FormatUint(attested, 10))

Expand Down Expand Up @@ -227,7 +227,7 @@ func TestFindAttestedAndFinalizedHeadersAtBoundary(t *testing.T) {
DenebForkEpoch: 0,
}))

attested, err = syncer.FindLatestAttestedHeadersAtInterval(32768, 25076)
attested, err = syncer.FindValidAttestedHeader(25076, 32736)
assert.NoError(t, err)
assert.Equal(t, "32704", strconv.FormatUint(attested, 10))

Expand All @@ -251,6 +251,6 @@ func TestFindAttestedAndFinalizedHeadersAtBoundary(t *testing.T) {
DenebForkEpoch: 0,
}))

attested, err = syncer.FindLatestAttestedHeadersAtInterval(32768, 32540)
attested, err = syncer.FindValidAttestedHeader(32540, 32768)
assert.Error(t, err)
}
8 changes: 4 additions & 4 deletions relayer/relays/beacon/mock/mock_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ type Store struct {
BeaconStateData map[uint64][]byte
}

func (m *Store) FindBeaconStateWithinRange(slot, boundary uint64) (store.StoredBeaconData, error) {
return m.StoredBeaconStateData, nil
}

func (m *Store) WriteEntry(attestedSlot, finalizedSlot uint64, attestedStateData, finalizedStateData []byte) error {
return nil
}
Expand All @@ -29,7 +33,3 @@ func (m *Store) Connect() error {
func (m *Store) Close() {

}

func (m *Store) FindBeaconStateWithinSyncPeriod(slot, boundary uint64, findMax bool) (store.StoredBeaconData, error) {
return m.StoredBeaconStateData, nil
}
4 changes: 4 additions & 0 deletions relayer/relays/beacon/mock/mock_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ type Writer struct {
LastFinalizedState state.FinalizedHeader
}

func (m *Writer) GetLastExecutionHeaderState() (state.ExecutionHeader, error) {
return state.ExecutionHeader{}, nil
}

func (m *Writer) GetLastFinalizedStateIndex() (types.U32, error) {
return 0, nil
}
Expand Down
8 changes: 6 additions & 2 deletions relayer/relays/beacon/protocol/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,15 @@ import (
)

type Protocol struct {
Settings config.SpecSettings
Settings config.SpecSettings
SlotsPerHistoricalRoot uint64
}

func New(setting config.SpecSettings) *Protocol {
return &Protocol{Settings: setting}
return &Protocol{
Settings: setting,
SlotsPerHistoricalRoot: setting.SlotsInEpoch * setting.EpochsPerSyncCommitteePeriod,
}
}

func (p *Protocol) ComputeSyncPeriodAtSlot(slot uint64) uint64 {
Expand Down
Loading

0 comments on commit b02684b

Please sign in to comment.