diff --git a/vochain/indexer/block.go b/vochain/indexer/block.go index 5d2118ef4..c6021e530 100644 --- a/vochain/indexer/block.go +++ b/vochain/indexer/block.go @@ -76,3 +76,17 @@ func (idx *Indexer) BlockList(limit, offset int, chainID, hash, proposerAddress } return list, uint64(results[0].TotalCount), nil } + +// CountBlocks returns how many blocks are indexed. +func (idx *Indexer) CountBlocks() (uint64, error) { + results, err := idx.readOnlyQuery.SearchBlocks(context.TODO(), indexerdb.SearchBlocksParams{ + Limit: 1, + }) + if err != nil { + return 0, err + } + if len(results) == 0 { + return 0, nil + } + return uint64(results[0].TotalCount), nil +} diff --git a/vochain/indexer/indexer.go b/vochain/indexer/indexer.go index 21821c10b..903f3c954 100644 --- a/vochain/indexer/indexer.go +++ b/vochain/indexer/indexer.go @@ -1,6 +1,7 @@ package indexer import ( + "bytes" "context" "database/sql" "embed" @@ -172,6 +173,12 @@ func (idx *Indexer) startDB() error { } goose.SetLogger(log.GooseLogger()) goose.SetBaseFS(embedMigrations) + + if gooseMigrationsPending(idx.readWriteDB, "migrations") { + log.Info("indexer db needs migration, scheduling a reindex after sync") + go idx.ReindexBlocks(false) + } + if err := goose.Up(idx.readWriteDB, "migrations"); err != nil { return fmt.Errorf("goose up: %w", err) } @@ -249,6 +256,27 @@ func (idx *Indexer) RestoreBackup(path string) error { return nil } +func gooseMigrationsPending(db *sql.DB, dir string) bool { + // Get the latest applied migration version + currentVersion, err := goose.GetDBVersion(db) + if err != nil { + log.Errorf("failed to get current database version: %v", err) + return false + } + + // Collect migrations after the current version + migrations, err := goose.CollectMigrations(dir, currentVersion, goose.MaxVersion) + if err != nil { + if errors.Is(err, goose.ErrNoMigrationFiles) { + return false + } + log.Errorf("failed to collect migrations: %v", err) + return false + } + + return len(migrations) > 0 +} + // SaveBackup backs up the database to a file on disk. // Note that writes to the database may be blocked until the backup finishes, // and an error may occur if a file at path already exists. @@ -402,27 +430,66 @@ func (idx *Indexer) AfterSyncBootstrap(inTest bool) { log.Infof("live results recovery computation finished, took %s", time.Since(startTime)) } -func (idx *Indexer) ReindexBlocks() { - queries := idx.blockTxQueries() +// ReindexBlocks reindexes all blocks found in blockstore +func (idx *Indexer) ReindexBlocks(inTest bool) { + if !inTest { + <-idx.App.WaitUntilSynced() + } - for i := idx.App.Node.BlockStore().Base(); i <= idx.App.Node.BlockStore().Height(); i++ { - if b := idx.App.GetBlockByHeight(int64(i)); b != nil { - idxBlock, err := idx.readOnlyQuery.GetBlockByHeight(context.TODO(), i) - if err == nil && idxBlock.Time != b.Time { - log.Errorf("while reindexing blocks, block %d timestamp in db (%s) differs from blockstore (%s), leaving untouched", i, idxBlock.Time, b.Time) - continue - } - // if we got here, the block doesn't exist - if _, err := queries.CreateBlock(context.TODO(), indexerdb.CreateBlockParams{ - ChainID: b.ChainID, - Height: b.Height, - Time: b.Time, - Hash: nonNullBytes(b.Hash()), - ProposerAddress: nonNullBytes(b.ProposerAddress), - LastBlockHash: nonNullBytes(b.LastBlockID.Hash), - }); err != nil { - log.Errorw(err, "cannot index new block") - } + // Note that holding blockMu means new votes aren't added until the reindex finishes. + idx.blockMu.Lock() + defer idx.blockMu.Unlock() + + idxBlockCount, err := idx.CountBlocks() + if err != nil { + log.Warnf("indexer CountBlocks returned error: %s", err) + } + log.Infow("start reindexing", + "blockStoreBase", idx.App.Node.BlockStore().Base(), + "blockStoreHeight", idx.App.Node.BlockStore().Height(), + "indexerBlockCount", idxBlockCount, + ) + queries := idx.blockTxQueries() + for height := idx.App.Node.BlockStore().Base(); height <= idx.App.Node.BlockStore().Height(); height++ { + if b := idx.App.GetBlockByHeight(int64(height)); b != nil { + // Blocks + func() { + idxBlock, err := idx.readOnlyQuery.GetBlockByHeight(context.TODO(), b.Height) + if err == nil && idxBlock.Time != b.Time { + log.Errorf("while reindexing blocks, block %d timestamp in db (%s) differs from blockstore (%s), leaving untouched", height, idxBlock.Time, b.Time) + return + } + if _, err := queries.CreateBlock(context.TODO(), indexerdb.CreateBlockParams{ + ChainID: b.ChainID, + Height: b.Height, + Time: b.Time, + Hash: nonNullBytes(b.Hash()), + ProposerAddress: nonNullBytes(b.ProposerAddress), + LastBlockHash: nonNullBytes(b.LastBlockID.Hash), + }); err != nil { + log.Errorw(err, "cannot index new block") + } + }() + + // Transactions + func() { + for index, tx := range b.Data.Txs { + idxTx, err := idx.readOnlyQuery.GetTransactionByHeightAndIndex(context.TODO(), indexerdb.GetTransactionByHeightAndIndexParams{ + BlockHeight: b.Height, + BlockIndex: int64(index), + }) + if err == nil && !bytes.Equal(idxTx.Hash, tx.Hash()) { + log.Errorf("while reindexing txs, tx %d/%d hash in db (%x) differs from blockstore (%x), leaving untouched", b.Height, index, idxTx.Hash, tx.Hash()) + return + } + vtx := new(vochaintx.Tx) + if err := vtx.Unmarshal(tx, b.ChainID); err != nil { + log.Errorw(err, fmt.Sprintf("cannot unmarshal tx %d/%d", b.Height, index)) + continue + } + idx.indexTx(vtx, uint32(b.Height), int32(index)) + } + }() } } } diff --git a/vochain/indexer/transaction.go b/vochain/indexer/transaction.go index 09ce738ab..5b949da30 100644 --- a/vochain/indexer/transaction.go +++ b/vochain/indexer/transaction.go @@ -92,9 +92,13 @@ func (idx *Indexer) OnNewTx(tx *vochaintx.Tx, blockHeight uint32, txIndex int32) idx.blockMu.Lock() defer idx.blockMu.Unlock() + idx.indexTx(tx, blockHeight, txIndex) +} + +func (idx *Indexer) indexTx(tx *vochaintx.Tx, blockHeight uint32, txIndex int32) { rawtx, err := proto.Marshal(tx.Tx) if err != nil { - log.Errorw(err, "indexer cannot marshal new transaction") + log.Errorw(err, "indexer cannot marshal transaction") return } @@ -119,6 +123,6 @@ func (idx *Indexer) OnNewTx(tx *vochaintx.Tx, blockHeight uint32, txIndex int32) Signature: nonNullBytes(tx.Signature), Signer: nonNullBytes(signer), }); err != nil { - log.Errorw(err, "cannot index new transaction") + log.Errorw(err, "cannot index transaction") } }