Skip to content

Commit

Permalink
indexer: new method to reindex blocks and transactions
Browse files Browse the repository at this point in the history
  • Loading branch information
altergui committed Sep 2, 2024
1 parent dc74400 commit 9709c56
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 22 deletions.
14 changes: 14 additions & 0 deletions vochain/indexer/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,3 +76,17 @@ func (idx *Indexer) BlockList(limit, offset int, chainID, hash, proposerAddress
}
return list, uint64(results[0].TotalCount), nil
}

// CountBlocks returns how many blocks are indexed.
func (idx *Indexer) CountBlocks() (uint64, error) {
results, err := idx.readOnlyQuery.SearchBlocks(context.TODO(), indexerdb.SearchBlocksParams{
Limit: 1,
})
if err != nil {
return 0, err
}
if len(results) == 0 {
return 0, nil
}
return uint64(results[0].TotalCount), nil
}
107 changes: 87 additions & 20 deletions vochain/indexer/indexer.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package indexer

import (
"bytes"
"context"
"database/sql"
"embed"
Expand Down Expand Up @@ -172,6 +173,12 @@ func (idx *Indexer) startDB() error {
}
goose.SetLogger(log.GooseLogger())
goose.SetBaseFS(embedMigrations)

if gooseMigrationsPending(idx.readWriteDB, "migrations") {
log.Info("indexer db needs migration, scheduling a reindex after sync")
go idx.ReindexBlocks(false)
}

if err := goose.Up(idx.readWriteDB, "migrations"); err != nil {
return fmt.Errorf("goose up: %w", err)
}
Expand Down Expand Up @@ -249,6 +256,27 @@ func (idx *Indexer) RestoreBackup(path string) error {
return nil
}

func gooseMigrationsPending(db *sql.DB, dir string) bool {
// Get the latest applied migration version
currentVersion, err := goose.GetDBVersion(db)
if err != nil {
log.Errorf("failed to get current database version: %v", err)
return false
}

// Collect migrations after the current version
migrations, err := goose.CollectMigrations(dir, currentVersion, goose.MaxVersion)
if err != nil {
if errors.Is(err, goose.ErrNoMigrationFiles) {
return false
}
log.Errorf("failed to collect migrations: %v", err)
return false
}

return len(migrations) > 0
}

// SaveBackup backs up the database to a file on disk.
// Note that writes to the database may be blocked until the backup finishes,
// and an error may occur if a file at path already exists.
Expand Down Expand Up @@ -402,27 +430,66 @@ func (idx *Indexer) AfterSyncBootstrap(inTest bool) {
log.Infof("live results recovery computation finished, took %s", time.Since(startTime))
}

func (idx *Indexer) ReindexBlocks() {
queries := idx.blockTxQueries()
// ReindexBlocks reindexes all blocks found in blockstore
func (idx *Indexer) ReindexBlocks(inTest bool) {
if !inTest {
<-idx.App.WaitUntilSynced()
}

for i := idx.App.Node.BlockStore().Base(); i <= idx.App.Node.BlockStore().Height(); i++ {
if b := idx.App.GetBlockByHeight(int64(i)); b != nil {
idxBlock, err := idx.readOnlyQuery.GetBlockByHeight(context.TODO(), i)
if err == nil && idxBlock.Time != b.Time {
log.Errorf("while reindexing blocks, block %d timestamp in db (%s) differs from blockstore (%s), leaving untouched", i, idxBlock.Time, b.Time)
continue
}
// if we got here, the block doesn't exist
if _, err := queries.CreateBlock(context.TODO(), indexerdb.CreateBlockParams{
ChainID: b.ChainID,
Height: b.Height,
Time: b.Time,
Hash: nonNullBytes(b.Hash()),
ProposerAddress: nonNullBytes(b.ProposerAddress),
LastBlockHash: nonNullBytes(b.LastBlockID.Hash),
}); err != nil {
log.Errorw(err, "cannot index new block")
}
// Note that holding blockMu means new votes aren't added until the reindex finishes.
idx.blockMu.Lock()
defer idx.blockMu.Unlock()

idxBlockCount, err := idx.CountBlocks()
if err != nil {
log.Warnf("indexer CountBlocks returned error: %s", err)
}
log.Infow("start reindexing",
"blockStoreBase", idx.App.Node.BlockStore().Base(),
"blockStoreHeight", idx.App.Node.BlockStore().Height(),
"indexerBlockCount", idxBlockCount,
)
queries := idx.blockTxQueries()
for height := idx.App.Node.BlockStore().Base(); height <= idx.App.Node.BlockStore().Height(); height++ {
if b := idx.App.GetBlockByHeight(int64(height)); b != nil {
// Blocks
func() {
idxBlock, err := idx.readOnlyQuery.GetBlockByHeight(context.TODO(), b.Height)
if err == nil && idxBlock.Time != b.Time {
log.Errorf("while reindexing blocks, block %d timestamp in db (%s) differs from blockstore (%s), leaving untouched", height, idxBlock.Time, b.Time)
return
}
if _, err := queries.CreateBlock(context.TODO(), indexerdb.CreateBlockParams{
ChainID: b.ChainID,
Height: b.Height,
Time: b.Time,
Hash: nonNullBytes(b.Hash()),
ProposerAddress: nonNullBytes(b.ProposerAddress),
LastBlockHash: nonNullBytes(b.LastBlockID.Hash),
}); err != nil {
log.Errorw(err, "cannot index new block")
}
}()

// Transactions
func() {
for index, tx := range b.Data.Txs {
idxTx, err := idx.readOnlyQuery.GetTransactionByHeightAndIndex(context.TODO(), indexerdb.GetTransactionByHeightAndIndexParams{
BlockHeight: b.Height,
BlockIndex: int64(index),
})
if err == nil && !bytes.Equal(idxTx.Hash, tx.Hash()) {
log.Errorf("while reindexing txs, tx %d/%d hash in db (%x) differs from blockstore (%x), leaving untouched", b.Height, index, idxTx.Hash, tx.Hash())
return
}
vtx := new(vochaintx.Tx)
if err := vtx.Unmarshal(tx, b.ChainID); err != nil {
log.Errorw(err, fmt.Sprintf("cannot unmarshal tx %d/%d", b.Height, index))
continue
}
idx.indexTx(vtx, uint32(b.Height), int32(index))
}
}()
}
}
}
Expand Down
8 changes: 6 additions & 2 deletions vochain/indexer/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,13 @@ func (idx *Indexer) OnNewTx(tx *vochaintx.Tx, blockHeight uint32, txIndex int32)
idx.blockMu.Lock()
defer idx.blockMu.Unlock()

idx.indexTx(tx, blockHeight, txIndex)
}

func (idx *Indexer) indexTx(tx *vochaintx.Tx, blockHeight uint32, txIndex int32) {
rawtx, err := proto.Marshal(tx.Tx)
if err != nil {
log.Errorw(err, "indexer cannot marshal new transaction")
log.Errorw(err, "indexer cannot marshal transaction")
return
}

Expand All @@ -119,6 +123,6 @@ func (idx *Indexer) OnNewTx(tx *vochaintx.Tx, blockHeight uint32, txIndex int32)
Signature: nonNullBytes(tx.Signature),
Signer: nonNullBytes(signer),
}); err != nil {
log.Errorw(err, "cannot index new transaction")
log.Errorw(err, "cannot index transaction")
}
}

0 comments on commit 9709c56

Please sign in to comment.