From 71de0dcc044f02dc1ac483aa6cfbf06d00ee5840 Mon Sep 17 00:00:00 2001 From: Gui Iribarren Date: Mon, 12 Feb 2024 10:33:11 -0300 Subject: [PATCH] vochain: implement StateSync * vochain: add StateSync* and SnapshotInterval flags * if RPCServers not specified, fallback to Seeds * only do Snapshots when node has an Indexer enabled state/snapshot: refactor the whole Snapshot feature into a new package snapshot * merge {Add,dump,End}Tree into a single DumpTree * refactor Trees into generic Blobs concept * implement indexer Export/Import * Import,ExportNoStateDB now uses NoState(withTxLock=true), grabbing lock once indexer: * add ExportBackupAsBytes() deduping code from api.chainIndexerExportHandler() * refactor RestoreBackup to use a reader instead of a file * handle errors of IterateVotes * if dbPath exists, always startDB and ignore ExpectBackupRestore statedb: implement statedb.Import method tests: * state: remove TestTreeExportImport, needs rewriting from scratch * testsuite: add test_statesync, uses gatewaySync to test State+NoState+Indexer Export/Import * testsuite: check for CONSENSUS FAILURE in logs dockerfiles: open port 26657 for CometBFT RPC used by StateSync --- api/chain.go | 22 +- cmd/node/main.go | 20 +- config/config.go | 15 + dockerfiles/testsuite/docker-compose.yml | 26 + dockerfiles/testsuite/env.gateway0 | 4 +- dockerfiles/testsuite/env.gatewaySync | 17 + dockerfiles/testsuite/start_test.sh | 35 +- dockerfiles/vocdoninode/docker-compose.yml | 1 + service/indexer.go | 46 ++ service/vochain.go | 4 +- snapshot/indexer.go | 54 ++ snapshot/nostatedb.go | 18 + snapshot/snapshot.go | 568 +++++++++++++++++++++ snapshot/state.go | 27 + statedb/statedb.go | 33 ++ statedb/treeupdate.go | 15 +- tree/arbo/tree.go | 19 +- tree/tree.go | 12 + vochain/app.go | 39 +- vochain/appsetup.go | 5 + vochain/cometbft.go | 204 ++++++-- vochain/indexer/indexer.go | 51 +- vochain/indexer/indexer_test.go | 2 +- vochain/indexer/migrations_test.go | 4 +- vochain/start.go | 41 ++ vochain/state/snapshot.go | 493 +++--------------- vochain/state/state.go | 22 +- vochain/state/state_snapshot_test.go | 119 ----- 28 files changed, 1288 insertions(+), 628 deletions(-) create mode 100644 dockerfiles/testsuite/env.gatewaySync create mode 100644 snapshot/indexer.go create mode 100644 snapshot/nostatedb.go create mode 100644 snapshot/snapshot.go create mode 100644 snapshot/state.go diff --git a/api/chain.go b/api/chain.go index 37c417c8c..c165d1398 100644 --- a/api/chain.go +++ b/api/chain.go @@ -6,9 +6,6 @@ import ( "encoding/json" "errors" "fmt" - "io" - "os" - "path/filepath" "strconv" "time" @@ -16,7 +13,6 @@ import ( "go.vocdoni.io/dvote/crypto/zk/circuit" "go.vocdoni.io/dvote/httprouter" "go.vocdoni.io/dvote/httprouter/apirest" - "go.vocdoni.io/dvote/log" "go.vocdoni.io/dvote/types" "go.vocdoni.io/dvote/util" "go.vocdoni.io/dvote/vochain" @@ -1028,23 +1024,7 @@ func (a *API) chainListFeesByTypeHandler(_ *apirest.APIdata, ctx *httprouter.HTT func (a *API) chainIndexerExportHandler(_ *apirest.APIdata, ctx *httprouter.HTTPContext) error { exportCtx, cancel := context.WithTimeout(context.Background(), 60*time.Second) defer cancel() - tmpFilePath := filepath.Join(os.TempDir(), "indexer.sql") - if err := a.indexer.SaveBackup(exportCtx, tmpFilePath); err != nil { - return fmt.Errorf("error saving indexer backup: %w", err) - } - tmpFile, err := os.Open(tmpFilePath) - if err != nil { - return fmt.Errorf("error opening indexer backup: %w", err) - } - defer func() { - if err := tmpFile.Close(); err != nil { - log.Warnw("error closing indexer backup file", "err", err) - } - if err := os.Remove(tmpFilePath); err != nil { - log.Warnw("error removing indexer backup file", "err", err) - } - }() - data, err := io.ReadAll(tmpFile) + data, err := a.indexer.ExportBackupAsBytes(exportCtx) if err != nil { return fmt.Errorf("error reading indexer backup: %w", err) } diff --git a/cmd/node/main.go b/cmd/node/main.go index ecb528649..40274df1e 100644 --- a/cmd/node/main.go +++ b/cmd/node/main.go @@ -187,6 +187,18 @@ func loadConfig() *config.Config { "do not wait for Vochain to synchronize (for testing only)") flag.Int("vochainMempoolSize", 20000, "vochain mempool size") + flag.Int("vochainSnapshotInterval", 0, + "create state snapshot every N blocks (0 to disable)") + flag.Bool("vochainStateSyncEnabled", false, + "during startup, let cometBFT ask peers for available snapshots and use them to bootstrap the state") + flag.StringSlice("vochainStateSyncRPCServers", []string{}, + "list of RPC servers to bootstrap the StateSync (optional, defaults to using seeds)") + flag.String("vochainStateSyncTrustHash", "", + "hash of the trusted block (required if vochainStateSyncEnabled)") + flag.Int64("vochainStateSyncTrustHeight", 0, + "height of the trusted block (required if vochainStateSyncEnabled)") + flag.Int64("vochainStateSyncChunkSize", 10*(1<<20), // 10 MB + "cometBFT chunk size in bytes") flag.Int("vochainMinerTargetBlockTimeSeconds", 10, "vochain consensus block time target (in seconds)") @@ -440,7 +452,12 @@ func main() { conf.Vochain.OffChainDataDownload = conf.Vochain.OffChainDataDownload && conf.Mode == types.ModeGateway - // create the vochain service + // if there's no indexer, then never do snapshots because they would be incomplete + if !conf.Vochain.Indexer.Enabled { + conf.Vochain.SnapshotInterval = 0 + } + + // create the vochain service if err := srv.Vochain(); err != nil { log.Fatal(err) } @@ -467,6 +484,7 @@ func main() { } } // start the service and block until finish fast sync + // State Sync (if enabled) also happens during this step if err := srv.Start(); err != nil { log.Fatal(err) } diff --git a/config/config.go b/config/config.go index 957e531ff..d5bea9043 100644 --- a/config/config.go +++ b/config/config.go @@ -125,6 +125,21 @@ type VochainCfg struct { IsSeedNode bool // OffChainDataDownload specifies if the node is configured to download off-chain data OffChainDataDownload bool + // SnapshotInterval enables creating a state snapshot every N blocks (0 to disable) + SnapshotInterval int + // StateSyncEnabled allows cometBFT during startup, to ask peers for available snapshots + // and use them to bootstrap the state + StateSyncEnabled bool + // StateSyncRPCServers is the list of RPC servers to bootstrap the StateSync (optional, defaults to using seeds) + StateSyncRPCServers []string + // StateSyncTrustHeight and TrustHash must both be provided to force the trusting of a + // particular header. + StateSyncTrustHeight int64 + // StateSyncTrustHeight and TrustHash must both be provided to force the trusting of a + // particular header. + StateSyncTrustHash string + // StateSyncChunkSize defines the size of the chunks when splitting a Snapshot for sending via StateSync + StateSyncChunkSize int64 } // IndexerCfg handles the configuration options of the indexer diff --git a/dockerfiles/testsuite/docker-compose.yml b/dockerfiles/testsuite/docker-compose.yml index 0cecc91fd..4191c1dae 100644 --- a/dockerfiles/testsuite/docker-compose.yml +++ b/dockerfiles/testsuite/docker-compose.yml @@ -90,6 +90,8 @@ services: environment: - GOCOVERDIR=/app/run/gocoverage - LOG_PANIC_ON_INVALIDCHARS + - VOCDONI_VOCHAIN_SNAPSHOTINTERVAL=3 + - VOCDONI_VOCHAIN_STATESYNCCHUNKSIZE=2000 test: image: "dvotenode-test:${TESTSUITE_BUILD_TAG:-latest}" @@ -117,6 +119,7 @@ services: - gocoverage-miner2:/app/run/gocoverage/miner2 - gocoverage-miner3:/app/run/gocoverage/miner3 - gocoverage-gateway0:/app/run/gocoverage/gateway0 + - gocoverage-gatewaySync:/app/run/gocoverage/gatewaySync - gocoverage-test:/app/run/gocoverage/test networks: - blockchain @@ -157,6 +160,27 @@ services: profiles: - grafana + gatewaySync: + image: "dvotenode:${TESTSUITE_BUILD_TAG:-latest}" + env_file: "${COMPOSE_HOST_PATH:-.}/env.gatewaySync" + networks: + - blockchain + volumes: + - data-gatewaySync:/app/run/ + - ${COMPOSE_HOST_PATH:-.}/genesis.json:/app/misc/genesis.json + - /tmp/.vochain-zkCircuits/:/app/run/dev/zkCircuits/ + - gocoverage-gatewaySync:/app/run/gocoverage + environment: + - GOCOVERDIR=/app/run/gocoverage + - LOG_PANIC_ON_INVALIDCHARS + - VOCDONI_LOGLEVEL=debug + - VOCDONI_VOCHAIN_STATESYNCENABLED=True + - VOCDONI_VOCHAIN_STATESYNCRPCSERVERS=miner0:26657,miner0:26657 + - VOCDONI_VOCHAIN_STATESYNCTRUSTHEIGHT + - VOCDONI_VOCHAIN_STATESYNCTRUSTHASH + profiles: + - statesync + networks: blockchain: @@ -167,12 +191,14 @@ volumes: data-miner2: {} data-miner3: {} data-gateway0: {} + data-gatewaySync: {} gocoverage-seed: {} gocoverage-miner0: {} gocoverage-miner1: {} gocoverage-miner2: {} gocoverage-miner3: {} gocoverage-gateway0: {} + gocoverage-gatewaySync: {} gocoverage-test: {} prometheus_data: {} grafana_data: {} diff --git a/dockerfiles/testsuite/env.gateway0 b/dockerfiles/testsuite/env.gateway0 index 10cd1b6c3..5f5a1344f 100755 --- a/dockerfiles/testsuite/env.gateway0 +++ b/dockerfiles/testsuite/env.gateway0 @@ -1,10 +1,12 @@ VOCDONI_DATADIR=/app/run +VOCDONI_MODE=gateway VOCDONI_LOGLEVEL=debug -VOCDONI_VOCHAIN_LOGLEVEL=error +VOCDONI_VOCHAIN_LOGLEVEL=info VOCDONI_DEV=True VOCDONI_ENABLEAPI=True VOCDONI_ENABLERPC=True VOCDONI_ENABLEFAUCETWITHAMOUNT=100000 +VOCDONI_VOCHAIN_PUBLICADDR=gateway0:26656 VOCDONI_VOCHAIN_SEEDS=3c3765494e758ae7baccb1f5b0661755302ddc47@seed:26656 VOCDONI_VOCHAIN_GENESIS=/app/misc/genesis.json VOCDONI_VOCHAIN_NOWAITSYNC=True diff --git a/dockerfiles/testsuite/env.gatewaySync b/dockerfiles/testsuite/env.gatewaySync new file mode 100644 index 000000000..3b3afbc6f --- /dev/null +++ b/dockerfiles/testsuite/env.gatewaySync @@ -0,0 +1,17 @@ +VOCDONI_DATADIR=/app/run +VOCDONI_MODE=gateway +VOCDONI_LOGLEVEL=debug +VOCDONI_VOCHAIN_LOGLEVEL=info +VOCDONI_DEV=True +VOCDONI_ENABLEAPI=True +VOCDONI_ENABLERPC=True +VOCDONI_ENABLEFAUCETWITHAMOUNT=100000 +VOCDONI_VOCHAIN_PUBLICADDR=gatewaySync:26656 +VOCDONI_VOCHAIN_SEEDS=3c3765494e758ae7baccb1f5b0661755302ddc47@seed:26656 +VOCDONI_VOCHAIN_GENESIS=/app/misc/genesis.json +VOCDONI_VOCHAIN_NOWAITSYNC=True +VOCDONI_METRICS_ENABLED=True +VOCDONI_METRICS_REFRESHINTERVAL=5 +VOCDONI_CHAIN=dev +VOCDONI_SIGNINGKEY=f50be23d315a2e6c1c30e0f1412b86d6ca9f2b318f1d243e35bc72d44c8b2f90 +VOCDONI_ARCHIVEURL=none diff --git a/dockerfiles/testsuite/start_test.sh b/dockerfiles/testsuite/start_test.sh index 00ec2ff06..72d41170c 100755 --- a/dockerfiles/testsuite/start_test.sh +++ b/dockerfiles/testsuite/start_test.sh @@ -58,6 +58,7 @@ tests_to_run=( "e2etest_cspelection" "e2etest_dynamicensuselection" "e2etest_electiontimebounds" + "test_statesync" ) # print help @@ -83,7 +84,7 @@ e2etest() { shift args=$@ id="${op}_$(echo $args | md5sum | awk '{print $1}')" - $COMPOSE_CMD_RUN --name ${TEST_PREFIX}_${FUNCNAME[0]}-${id}_${RANDOMID} test timeout 60000 \ + $COMPOSE_CMD_RUN --name ${TEST_PREFIX}_${FUNCNAME[0]}-${id}_${RANDOMID} test timeout 300 \ ./end2endtest --host $APIHOST --faucet=$FAUCET \ --logLevel=$LOGLEVEL \ --operation=$op \ @@ -157,6 +158,24 @@ e2etest_ballotelection() { e2etest ballotApproval } +test_statesync() { + HEIGHT=3 + HASH= + + log "### Waiting for height $HEIGHT ###" + for i in {1..20}; do + # very brittle hack to extract the hash without using jq, to avoid dependencies + HASH=$($COMPOSE_CMD_RUN test curl -s --fail $APIHOST/chain/blocks/$HEIGHT 2>/dev/null | grep -oP '"hash":"\K[^"]+' | head -1) + [ -n "$HASH" ] && break || sleep 2 + done + + export VOCDONI_VOCHAIN_STATESYNCTRUSTHEIGHT=$HEIGHT + export VOCDONI_VOCHAIN_STATESYNCTRUSTHASH=$HASH + $COMPOSE_CMD --profile statesync up gatewaySync -d + # watch logs for 2 minutes, until catching 'startup complete'. in case of timeout, or panic, or whatever, test will fail + timeout 120 sh -c "($COMPOSE_CMD logs gatewaySync -f | grep -m 1 'startup complete')" +} + ### end tests definition log "### Starting test suite ###" @@ -194,7 +213,9 @@ results="/tmp/.vocdoni-test$RANDOM" mkdir -p $results for test in ${tests_to_run[@]}; do - if [ $test == "e2etest_raceDuringCommit" ] || [ $CONCURRENT -eq 0 ] ; then + if [ $test == "e2etest_raceDuringCommit" ] || \ + [ $test == "test_statesync" ] || \ + [ $CONCURRENT -eq 0 ] ; then log "### Running test $test ###" ( set -o pipefail ; $test | tee $results/$test.stdout ; echo $? > $results/$test.retval ) else @@ -239,9 +260,17 @@ if [ -n "$GOCOVERDIR" ] ; then log "### Coverage data in binary fmt left in $GOCOVERDIR ###" fi +if $COMPOSE_CMD logs | grep -q "^panic:" ; then + RET=2 + log "### Panic detected! Look for panic in the logs above for full context, pasting here the 100 lines after panic ###" + $COMPOSE_CMD logs | grep -A 100 "^panic:" +fi + +if $COMPOSE_CMD logs | grep -q "CONSENSUS FAILURE" ; then RET=3 ; log "### CONSENSUS FAILURE detected! Look for it in the logs above ###"; fi + [ $CLEAN -eq 1 ] && { log "### Cleaning docker environment ###" - $COMPOSE_CMD down -v --remove-orphans + $COMPOSE_CMD --profile statesync down -v --remove-orphans } if [ -n "$failed" ]; then diff --git a/dockerfiles/vocdoninode/docker-compose.yml b/dockerfiles/vocdoninode/docker-compose.yml index 48f843bd7..22c3de866 100644 --- a/dockerfiles/vocdoninode/docker-compose.yml +++ b/dockerfiles/vocdoninode/docker-compose.yml @@ -14,6 +14,7 @@ services: - "4001:4001" # IPFS swarm - "[::1]:5001:5001" # IPFS api, never expose outside localhost - "26656:26656" # CometBFT p2p port (PublicAddr) + - "26657:26657" # CometBFT RPC port (used by StateSync) - "[::1]:26658:26658" # CometBFT PrivValidatorListenAddr (disabled by default) - "[::1]:61000-61100:61000-61100" # PprofPort (runtime profiling data, disabled by default) sysctls: diff --git a/service/indexer.go b/service/indexer.go index 685f6a714..5ce214e71 100644 --- a/service/indexer.go +++ b/service/indexer.go @@ -1,9 +1,15 @@ package service import ( + "context" + "fmt" + "io" + "os" "path/filepath" + "time" "go.vocdoni.io/dvote/log" + "go.vocdoni.io/dvote/snapshot" "go.vocdoni.io/dvote/vochain/indexer" ) @@ -14,6 +20,8 @@ func (vs *VocdoniService) VochainIndexer() error { vs.Indexer, err = indexer.New(vs.App, indexer.Options{ DataDir: filepath.Join(vs.Config.DataDir, "indexer"), IgnoreLiveResults: vs.Config.Indexer.IgnoreLiveResults, + // During StateSync, IndexerDB will be restored, so enable ExpectBackupRestore in that case + ExpectBackupRestore: vs.Config.StateSyncEnabled, }) if err != nil { return err @@ -21,6 +29,44 @@ func (vs *VocdoniService) VochainIndexer() error { // launch the indexer after sync routine (executed when the blockchain is ready) go vs.Indexer.AfterSyncBootstrap(false) + snapshot.SetFnImportIndexer(func(r io.Reader) error { + log.Debugf("restoring indexer backup") + + file, err := os.CreateTemp("", "indexer.sqlite3") + if err != nil { + return fmt.Errorf("creating tmpfile: %w", err) + } + defer func() { + if err := file.Close(); err != nil { + log.Warnw("error closing tmpfile", "path", file.Name(), "err", err) + } + if err := os.Remove(file.Name()); err != nil { + log.Warnw("error removing tmpfile", "path", file.Name(), "err", err) + } + }() + + if _, err := io.Copy(file, r); err != nil { + return fmt.Errorf("writing tmpfile: %w", err) + } + + return vs.Indexer.RestoreBackup(file.Name()) + }) + + snapshot.SetFnExportIndexer(func(w io.Writer) error { + log.Debugf("saving indexer backup") + + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + data, err := vs.Indexer.ExportBackupAsBytes(ctx) + if err != nil { + return fmt.Errorf("creating indexer backup: %w", err) + } + if _, err := w.Write(data); err != nil { + return fmt.Errorf("writing data: %w", err) + } + return nil + }) + if vs.Config.Indexer.ArchiveURL != "" && vs.Config.Indexer.ArchiveURL != "none" { log.Infow("starting archive retrieval", "path", vs.Config.Indexer.ArchiveURL) go vs.Indexer.StartArchiveRetrieval(vs.DataDownloader, vs.Config.Indexer.ArchiveURL) diff --git a/service/vochain.go b/service/vochain.go index f9e0cc3b6..867517ef3 100644 --- a/service/vochain.go +++ b/service/vochain.go @@ -129,7 +129,7 @@ func (vs *VocdoniService) Start() error { go vs.Stats.Start(10) } - if !vs.Config.NoWaitSync { + if !vs.Config.NoWaitSync || vs.Config.StateSyncEnabled { log.Infof("waiting for vochain to synchronize") var lastHeight uint64 i := 0 @@ -154,7 +154,7 @@ func (vs *VocdoniService) Start() error { syncCounter-- } } - log.Infow("vochain fastsync completed", "height", vs.Stats.Height(), "time", time.Since(timeSyncCounter)) + log.Infow("vochain fastsync completed", "height", vs.Stats.Height(), "duration", time.Since(timeSyncCounter).String()) } go VochainPrintInfo(20*time.Second, vs.Stats) diff --git a/snapshot/indexer.go b/snapshot/indexer.go new file mode 100644 index 000000000..c64f96bfb --- /dev/null +++ b/snapshot/indexer.go @@ -0,0 +1,54 @@ +package snapshot + +import ( + "io" + "sync" +) + +// fnIndexer groups the funcs that export and import the Indexer +var fnIndexer struct { + sync.Mutex + exp func(w io.Writer) error + imp func(r io.Reader) error +} + +// SetFnExportIndexer sets the func that exports the Indexer +func SetFnExportIndexer(fn func(w io.Writer) error) { + fnIndexer.Lock() + defer fnIndexer.Unlock() + fnIndexer.exp = fn +} + +// FnExportIndexer returns the func that exports the Indexer +func FnExportIndexer() (fn func(w io.Writer) error) { + fnIndexer.Lock() + defer fnIndexer.Unlock() + return fnIndexer.exp +} + +// SetFnImportIndexer sets the func that imports the Indexer +func SetFnImportIndexer(fn func(r io.Reader) error) { + fnIndexer.Lock() + defer fnIndexer.Unlock() + fnIndexer.imp = fn +} + +// FnImportIndexer returns the func that imports the Indexer +func FnImportIndexer() (fn func(r io.Reader) error) { + fnIndexer.Lock() + defer fnIndexer.Unlock() + return fnIndexer.imp +} + +// DumpIndexer calls the passed fn to dump the Indexer to the snapshot. +func (s *Snapshot) DumpIndexer(fn func(w io.Writer) error) error { + s.lock.Lock() + defer s.lock.Unlock() + + s.header.Blobs = append(s.header.Blobs, SnapshotBlobHeader{ + Type: snapshotBlobType_IndexerDB, + Size: 0, + }) + + return fn(s) +} diff --git a/snapshot/nostatedb.go b/snapshot/nostatedb.go new file mode 100644 index 000000000..170b2597f --- /dev/null +++ b/snapshot/nostatedb.go @@ -0,0 +1,18 @@ +package snapshot + +import ( + "go.vocdoni.io/dvote/vochain/state" +) + +// DumpNoStateDB dumps the NoStateDB to the snapshot. `Create` needs to be called first. +func (s *Snapshot) DumpNoStateDB(v *state.State) error { + s.lock.Lock() + defer s.lock.Unlock() + + s.header.Blobs = append(s.header.Blobs, SnapshotBlobHeader{ + Type: snapshotBlobType_NoStateDB, + Size: 0, + }) + + return v.ExportNoStateDB(s) +} diff --git a/snapshot/snapshot.go b/snapshot/snapshot.go new file mode 100644 index 000000000..bf8c4de35 --- /dev/null +++ b/snapshot/snapshot.go @@ -0,0 +1,568 @@ +package snapshot + +import ( + "bytes" + "crypto/md5" + "encoding/binary" + "encoding/gob" + "encoding/hex" + "errors" + "fmt" + "hash" + "io" + "math" + "os" + "path" + "path/filepath" + "sync" + "time" + + "go.vocdoni.io/dvote/log" + "go.vocdoni.io/dvote/vochain/state" +) + +const ( + snapshotHeaderVersion = 1 + snapshotHeaderLenSize = 32 +) + +const ( + snapshotBlobType_Tree = iota + snapshotBlobType_NoStateDB + snapshotBlobType_IndexerDB +) + +const chunksDir = "chunks" + +// Snapshot is a copy in a specific point in time of the blockchain state. +// The state is supposed to be a list of nested merkle trees. +// The Snapshot contains the methods for building a single file snapshot of +// the state containing multiple trees. +// The implementation allows the encoding and decoding of snapshots. +// +// The structure of the snapshot encoded file is: +// +// [headerLen][header][blob1][blob2][blobN] +// +// - headerlen is a fixed 32 bytes little endian number indicating the size of the header. +// +// - header is the Gob encoded structure containing the metadata of the blobs (type, size, name, etc.). +// +// - blobN is the raw bytes dump of all trees and databases. +type Snapshot struct { + path string + file *os.File + lock sync.Mutex + header SnapshotHeader + headerSize uint32 + currentBlob int // index of the current blob being read + currentBlobReader io.Reader // the io.Reader for reading the current blob +} + +var _ io.Writer = (*Snapshot)(nil) + +// SnapshotHeader is the header structure of StateSnapshot containing the list of blobs. +type SnapshotHeader struct { + Version int + Root []byte + ChainID string + Height uint32 + Blobs []SnapshotBlobHeader + Hash []byte + hasher hash.Hash +} + +// SnapshotBlobHeader represents a blob of data of the StateSnapshot, for example a merkle tree or a database. +type SnapshotBlobHeader struct { + Type int + Name string + Size uint32 + Parent string + Key []byte + Root []byte +} + +// DiskSnapshotInfo describes a file on disk +type DiskSnapshotInfo struct { + Path string + ModTime time.Time + Size int64 + Hash []byte +} + +func (h *SnapshotHeader) String() string { + return fmt.Sprintf("version=%d root=%s chainID=%s height=%d blobs=%+v", + h.Version, hex.EncodeToString(h.Root), h.ChainID, h.Height, h.Blobs) +} + +// Write implements the io.Writer interface. +// Writes a chunk of bytes, updates the Size of the last s.header.Blobs[] item, +// and updates the s.header.Hash +func (s *Snapshot) Write(b []byte) (int, error) { + if _, err := s.header.hasher.Write(b); err != nil { + return 0, fmt.Errorf("error calculating hash: %w", err) + } + s.header.Hash = s.header.hasher.Sum(nil) + + n, err := s.file.Write(b) + s.header.Blobs[len(s.header.Blobs)-1].Size += uint32(n) + return n, err +} + +// SetMainRoot sets the root for the main merkle tree of the state. +func (s *Snapshot) SetMainRoot(root []byte) { + s.header.Root = root +} + +// SetHeight sets the blockchain height for the snapshot. +func (s *Snapshot) SetHeight(height uint32) { + s.header.Height = height +} + +// SetChainID sets the blockchain identifier for the snapshot. +func (s *Snapshot) SetChainID(chainID string) { + s.header.ChainID = chainID +} + +// SeekToNextBlob seeks over the size of the current blob, leaving everything ready +// for reading the next blob. +// +// Returns io.EOF when there are no more blobs. +func (s *Snapshot) SeekToNextBlob() error { + s.lock.Lock() + defer s.lock.Unlock() + + s.currentBlob++ + + if s.currentBlob >= len(s.header.Blobs) { + // no more blobs available + return io.EOF + } + + // update the buffer Reader + s.currentBlobReader = io.LimitReader( + s.file, + int64(s.header.Blobs[s.currentBlob].Size), + ) + return nil +} + +// CurrentBlobReader returns the s.currentBlobReader +func (s *Snapshot) CurrentBlobReader() io.Reader { + return s.currentBlobReader +} + +// Header returns the header for the snapshot containing the information +// about all the blobs (merkle trees and DBs) +func (s *Snapshot) Header() *SnapshotHeader { + return &s.header +} + +// BlobHeader returns the header for the current blob. +func (s *Snapshot) BlobHeader() *SnapshotBlobHeader { + return &s.header.Blobs[s.currentBlob] +} + +// Path returns the file path of the snapshot file currently used. +func (s *Snapshot) Path() string { + return s.path +} + +// Finish builds the snapshot started with `New` and stores in disk its contents. +// After calling this method the snapshot is finished. +func (s *Snapshot) Finish() error { + s.lock.Lock() + defer s.lock.Unlock() + + // create the final file + finalFile, err := os.Create(s.path) + if err != nil { + return err + } + + defer func() { + if err := finalFile.Close(); err != nil && !errors.Is(err, os.ErrClosed) { + log.Warnf("error closing the file %v", err) + } + }() + + // build the header + var buf bytes.Buffer + enc := gob.NewEncoder(&buf) + if err := enc.Encode(s.header); err != nil { + return err + } + + // write the size of the header in the first 32 bytes + headerSize := make([]byte, snapshotHeaderLenSize) + binary.LittleEndian.PutUint32(headerSize, uint32(buf.Len())) + if _, err := finalFile.Write(headerSize); err != nil { + return err + } + + // write the header + hs, err := finalFile.Write(buf.Bytes()) + if err != nil { + return err + } + log.Debugf("snapshot header size is %d bytes", hs) + + // write the blobs (by copying the tmpFile) + if _, err := s.file.Seek(0, io.SeekStart); err != nil { + return err + } + bs, err := io.Copy(finalFile, s.file) + if err != nil { + return err + } + log.Debugf("snapshot blobs size is %d bytes", bs) + + // close and remove the temporary file + if err := s.file.Close(); err != nil { + return err + } + if err := finalFile.Close(); err != nil { + return err + } + return os.Remove(s.file.Name()) +} + +// Restore restores the State snapshot into a temp directory +// inside the passed dataDir, and returns the path. Caller is expected to move that tmpDir +// into the location normally used by statedb, +// or handle removing the directory, in case the returned err != nil +// It also restores the IndexerDB into dataDir/indexer (hardcoded) +func (s *Snapshot) Restore(dbType, dataDir string) (string, error) { + log.Infof("installing snapshot %+v into dir %s using dbType %s", s.Header(), dataDir, dbType) + + tmpDir, err := os.MkdirTemp(dataDir, "newState") + if err != nil { + return tmpDir, fmt.Errorf("error creating temp dir: %w", err) + } + + newState, err := state.New(dbType, tmpDir) + if err != nil { + return tmpDir, fmt.Errorf("error creating newState: %w", err) + } + + for s.SeekToNextBlob() == nil { + switch h := s.BlobHeader(); h.Type { + case snapshotBlobType_Tree: + log.Debugw("restoring snapshot, found a Tree blob", + "name", h.Name, + "root", hex.EncodeToString(h.Root), + "parent", h.Parent, + "key", hex.EncodeToString(h.Key)) + + err := newState.RestoreStateTree(s.CurrentBlobReader(), + state.TreeDescription{ + Name: h.Name, + Parent: h.Parent, + Key: h.Key, + }) + if err != nil { + return tmpDir, err + } + case snapshotBlobType_NoStateDB: + log.Debug("restoring snapshot, found a NoStateDB blob, will import") + if err := newState.ImportNoStateDB(s.CurrentBlobReader()); err != nil { + return tmpDir, err + } + case snapshotBlobType_IndexerDB: + if FnImportIndexer() == nil { + log.Debug("restoring snapshot, found a IndexerDB blob, but there's no indexer on this node, skipping...") + // TODO: we need to io.ReadAll because s.SeekToNextBlob() does not actually Seek, despite its name, + // it only assumes the Seek happened because the blob was read. + _, _ = io.ReadAll(s.CurrentBlobReader()) + continue + } + log.Debug("restoring snapshot, found a IndexerDB blob, will import") + if err := FnImportIndexer()(s.CurrentBlobReader()); err != nil { + return tmpDir, err + } + } + } + + if err := newState.Commit(uint32(s.Header().Height)); err != nil { + return tmpDir, fmt.Errorf("error doing newState.Commit(%d): %w", s.Header().Height, err) + } + + if err := newState.Close(); err != nil { + return tmpDir, fmt.Errorf("error closing newState: %w", err) + } + return tmpDir, nil +} + +// SnapshotManager is the manager +type SnapshotManager struct { + // dataDir is the path for storing some files + dataDir string + // ChunkSize is the chunk size for slicing snapshots + ChunkSize int64 +} + +// NewManager creates a new SnapshotManager. +func NewManager(dataDir string, chunkSize int64) (*SnapshotManager, error) { + if err := os.MkdirAll(dataDir, 0o750); err != nil { + return nil, err + } + + if err := os.MkdirAll(filepath.Join(dataDir, chunksDir), 0o750); err != nil { + return nil, err + } + + return &SnapshotManager{ + dataDir: dataDir, + ChunkSize: chunkSize, + }, nil +} + +// Do performs a snapshot of the last committed state for all trees and dbs. +// The snapshot is stored in disk and the file path is returned. +func (sm *SnapshotManager) Do(v *state.State) (string, error) { + height, err := v.LastHeight() + if err != nil { + return "", err + } + + snap, err := sm.New(height) + if err != nil { + return "", err + } + + t := v.MainTreeView() + root, err := t.Root() + if err != nil { + return "", err + } + snap.SetMainRoot(root) + snap.SetHeight(height) + snap.SetChainID(v.ChainID()) + + // NoStateDB + if err := snap.DumpNoStateDB(v); err != nil { + return "", err + } + log.Debugf("dumped blob %d: %+v", len(snap.header.Blobs)-1, snap.header.Blobs[len(snap.header.Blobs)-1]) + + // State + list, err := v.DeepListStateTrees() + if err != nil { + return "", err + } + for _, treedesc := range list { + if err := snap.DumpTree(treedesc.Name, treedesc.Parent, treedesc.Key, treedesc.Tree); err != nil { + return "", err + } + log.Debugf("dumped blob %d: %+v", len(snap.header.Blobs)-1, snap.header.Blobs[len(snap.header.Blobs)-1]) + } + + // Indexer + if FnExportIndexer() != nil { + if err := snap.DumpIndexer(FnExportIndexer()); err != nil { + return "", err + } + log.Debugf("dumped blob %d: %+v", len(snap.header.Blobs)-1, snap.header.Blobs[len(snap.header.Blobs)-1]) + } + + return snap.Path(), snap.Finish() +} + +// New starts the creation of a new snapshot as a disk file. +// +// To open an existing snapshot file, use `Open` instead. +func (sm *SnapshotManager) New(height uint32) (*Snapshot, error) { + filePath := filepath.Join(sm.dataDir, fmt.Sprintf("%d", height)) + file, err := os.Create(filePath + ".tmp") + if err != nil { + return nil, err + } + return &Snapshot{ + path: filePath, + file: file, + header: SnapshotHeader{ + Version: snapshotHeaderVersion, + hasher: md5.New(), + }, + }, nil +} + +// Open reads an existing snapshot file, decodes the header and returns a Snapshot +// On the returned Snapshot, you are expected to call SeekToNextBlob(), read from CurrentBlobReader() +// and iterate until SeekToNextBlob() returns io.EOF. +// +// This method performs the opposite operation of `New`. +func (*SnapshotManager) Open(filePath string) (*Snapshot, error) { + file, err := os.Open(filePath) + if err != nil { + return nil, err + } + headerSizeBytes := make([]byte, snapshotHeaderLenSize) + if _, err = io.ReadFull(file, headerSizeBytes); err != nil { + return nil, err + } + + s := &Snapshot{ + path: filePath, + file: file, + headerSize: binary.LittleEndian.Uint32(headerSizeBytes), + currentBlob: -1, // in order for the first SeekToNextBlob seek to blob 0 + } + decoder := gob.NewDecoder(io.LimitReader(s.file, int64(s.headerSize))) + if err := decoder.Decode(&s.header); err != nil { + return nil, fmt.Errorf("cannot decode header: %w", err) + } + if s.header.Version != snapshotHeaderVersion { + return nil, fmt.Errorf("snapshot version not compatible") + } + return s, nil +} + +// List returns the list of the current snapshots stored on disk, indexed by height +func (sm *SnapshotManager) List() map[uint32]DiskSnapshotInfo { + files, err := os.ReadDir(sm.dataDir) + if err != nil { + log.Fatal(err) + } + dsi := make(map[uint32]DiskSnapshotInfo) + for _, file := range files { + if !file.IsDir() { + if path.Ext(file.Name()) == ".tmp" { + // ignore incomplete snapshots + continue + } + fileInfo, err := file.Info() + if err != nil { + log.Errorw(err, "could not fetch snapshot file info") + continue + } + s, err := sm.Open(filepath.Join(sm.dataDir, file.Name())) + if err != nil { + log.Errorw(err, fmt.Sprintf("could not open snapshot file %q", filepath.Join(sm.dataDir, file.Name()))) + continue + } + + dsi[uint32(s.header.Height)] = DiskSnapshotInfo{ + Path: filepath.Join(sm.dataDir, file.Name()), + Size: fileInfo.Size(), + ModTime: fileInfo.ModTime(), + Hash: s.header.Hash, + } + } + } + return dsi +} + +// SliceChunk returns a chunk of a snapshot +func (sm *SnapshotManager) SliceChunk(height uint64, format uint32, chunk uint32) ([]byte, error) { + _ = format // TBD: we don't support different formats + + dsi := sm.List() + + snapshot, found := dsi[uint32(height)] + if !found { + return nil, fmt.Errorf("snapshot not found for height %d", height) + } + + file, err := os.Open(snapshot.Path) + if err != nil { + return nil, err + } + + defer file.Close() + + chunks := int(math.Ceil(float64(snapshot.Size) / float64(sm.ChunkSize))) + partSize := int(math.Min(float64(sm.ChunkSize), float64(snapshot.Size-int64(chunk)*sm.ChunkSize))) + partBuffer := make([]byte, partSize) + if _, err := file.ReadAt(partBuffer, int64(chunk)*sm.ChunkSize); err != nil && !errors.Is(err, io.EOF) { + return nil, err + } + + log.Debugf("splitting snapshot for height %d (size=%d, hash=%x), serving chunk %d of %d", + height, snapshot.Size, snapshot.Hash, chunk, chunks) + + return partBuffer, nil +} + +// JoinChunks joins all chunkFilenames in order, and returns the resulting Snapshot +func (sm *SnapshotManager) JoinChunks(chunks int32, height int64) (*Snapshot, error) { + snapshotFilename := filepath.Join(sm.dataDir, fmt.Sprintf("%d", height)) + + log.Debugf("joining %d chunks into snapshot (height %d) file %s", chunks, height, snapshotFilename) + // Create or truncate the destination file + if _, err := os.Create(snapshotFilename); err != nil { + return nil, err + } + + // Open for APPEND + snapFile, err := os.OpenFile(snapshotFilename, os.O_APPEND|os.O_WRONLY, os.ModeAppend) + if err != nil { + return nil, err + } + + // according to https://socketloop.com/tutorials/golang-recombine-chunked-files-example + // we shouldn't defer a file.Close when opening a file for APPEND mode, but no idea why + defer snapFile.Close() + + // we can't use range chunkFilenames since that would restore in random order + for i := int32(0); i < chunks; i++ { + file := filepath.Join(sm.dataDir, chunksDir, fmt.Sprintf("%d", i)) + // open a chunk + chunk, err := os.ReadFile(file) + if err != nil { + return nil, fmt.Errorf("error reading chunk file: %w", err) + } + + if _, err := snapFile.Write(chunk); err != nil { + return nil, fmt.Errorf("error writing chunk bytes: %w", err) + } + + // flush to disk + if err := snapFile.Sync(); err != nil { + return nil, fmt.Errorf("error flushing chunk to disk: %w", err) + } + + // remove the chunk file + if err := os.Remove(file); err != nil { + return nil, fmt.Errorf("error removing chunk file: %w", err) + } + } + + s, err := sm.Open(snapFile.Name()) + if err != nil { + return nil, fmt.Errorf("error opening resulting snapshot: %w", err) + } + + log.Debugf("snapshot file %s has header.Hash=%x", snapFile.Name(), s.header.Hash) + // TODO: hash the contents and check it matches the s.header.Hash, confirming no corruption + + return s, nil +} + +// WriteChunkToDisk writes the chunk bytes into a file (named according to index) inside the chunk storage dir +// +// When all chunks are on disk, you are expected to call sm.JoinChunks +func (sm *SnapshotManager) WriteChunkToDisk(index uint32, chunk []byte) error { + f, err := os.Create(filepath.Join(sm.dataDir, chunksDir, fmt.Sprintf("%d", index))) + if err != nil { + return err + } + defer f.Close() + + if _, err := f.Write(chunk); err != nil { + if err := os.Remove(f.Name()); err != nil { + log.Warnf("couldn't remove temp file %s: %s", f.Name(), err) + } + return err + } + return nil +} + +// CountChunksInDisk counts how many files are present inside the chunk storage dir +func (sm *SnapshotManager) CountChunksInDisk() int { + files, err := os.ReadDir(filepath.Join(sm.dataDir, chunksDir)) + if err != nil { + return 0 + } + return len(files) +} diff --git a/snapshot/state.go b/snapshot/state.go new file mode 100644 index 000000000..c03939bd5 --- /dev/null +++ b/snapshot/state.go @@ -0,0 +1,27 @@ +package snapshot + +import ( + "go.vocdoni.io/dvote/statedb" +) + +// DumpTree dumps a tree to the snapshot. +func (s *Snapshot) DumpTree(name, parent string, key []byte, tr statedb.TreeViewer) error { + s.lock.Lock() + defer s.lock.Unlock() + + root, err := tr.Root() + if err != nil { + return err + } + + s.header.Blobs = append(s.header.Blobs, SnapshotBlobHeader{ + Type: snapshotBlobType_Tree, + Name: name, + Parent: parent, + Key: key, + Root: root, + Size: 0, + }) + + return tr.Dump(s) +} diff --git a/statedb/statedb.go b/statedb/statedb.go index c2bc21566..c23f0c00e 100644 --- a/statedb/statedb.go +++ b/statedb/statedb.go @@ -55,6 +55,7 @@ import ( "bytes" "encoding/binary" "errors" + "io" "path" "sync" @@ -406,3 +407,35 @@ func (s *StateDB) TreeView(root []byte) (*TreeView, error) { cfg: MainTreeCfg, }, nil } + +// Import imports the contents from r into a state tree +func (s *StateDB) Import(cfg TreeConfig, parent *TreeConfig, r io.Reader) (err error) { + dbtx := s.db.WriteTx() + defer func() { + if err != nil { + dbtx.Discard() + } + }() + + var txTree db.WriteTx + if cfg.kindID == "" { // TreeMain + txTree = subWriteTx(dbtx, subKeyTree) + } else if parent != nil && parent.prefix != "" { // ChildTree (for example Votes) + txParent := subWriteTx(dbtx, path.Join(subKeySubTree, parent.prefix)) + txChild := subWriteTx(txParent, path.Join(subKeySubTree, cfg.prefix)) + txTree = subWriteTx(txChild, subKeyTree) + } else { + tx := subWriteTx(dbtx, path.Join(subKeySubTree, cfg.prefix)) + txTree = subWriteTx(tx, subKeyTree) + } + tree, err := tree.New(txTree, + tree.Options{DB: nil, MaxLevels: cfg.maxLevels, HashFunc: cfg.hashFunc}) + if err != nil { + return err + } + if err := tree.ImportDumpReaderWithTx(txTree, r); err != nil { + return err + } + + return txTree.Commit() +} diff --git a/statedb/treeupdate.go b/statedb/treeupdate.go index ab1358a30..4d86df6d6 100644 --- a/statedb/treeupdate.go +++ b/statedb/treeupdate.go @@ -1,7 +1,6 @@ package statedb import ( - "bytes" "fmt" "io" "path" @@ -101,13 +100,8 @@ func (u *TreeUpdate) Dump(w io.Writer) error { } // Import writes the content exported with Dump. -// TODO: use io.Reader once implemented in Arbo. func (u *TreeUpdate) Import(r io.Reader) error { - var buf bytes.Buffer - if _, err := buf.ReadFrom(r); err != nil { - return err - } - return u.tree.ImportDump(buf.Bytes()) + return u.tree.ImportDumpReaderWithTx(u.tx, r) } // noState returns a key-value database associated with this tree that doesn't @@ -117,6 +111,11 @@ func (u *TreeUpdate) noState() Updater { return subWriteTx(u.tx, subKeyNoState) } +// MarkDirty sets dirtyTree = true +func (u *TreeUpdate) MarkDirty() { + u.dirtyTree = true +} + // Add a new key-value to this tree. `key` is the path of the leaf, and // `value` is the content of the leaf. func (u *TreeUpdate) Add(key, value []byte) error { @@ -314,7 +313,7 @@ func (t *TreeTx) Commit(version uint32) error { if err := t.CommitOnTx(version); err != nil { return err } - return t.SaveWithoutCommit() + return t.tx.Commit() } // CommitOnTx do as Commit but without committing the transaction to database. diff --git a/tree/arbo/tree.go b/tree/arbo/tree.go index 0fd9354da..a5a531f3a 100644 --- a/tree/arbo/tree.go +++ b/tree/arbo/tree.go @@ -847,14 +847,24 @@ func (t *Tree) ImportDump(b []byte) error { // ImportDumpReader imports the leafs (that have been exported with the Dump // method) in the Tree, reading them from the given reader. func (t *Tree) ImportDumpReader(r io.Reader) error { + wTx := t.db.WriteTx() + defer wTx.Discard() + + if err := t.ImportDumpReaderWithTx(wTx, r); err != nil { + return err + } + return wTx.Commit() +} + +// ImportDumpReaderWithTx imports the leafs (that have been exported with the Dump +// method) in the Tree (using the given db.WriteTx), reading them from the given reader. +func (t *Tree) ImportDumpReaderWithTx(wTx db.WriteTx, r io.Reader) error { if !t.editable() { return ErrSnapshotNotEditable } - wTx := t.db.WriteTx() - defer wTx.Discard() // create the root node if it does not exist - _, err := t.db.Get(dbKeyRoot) + _, err := wTx.Get(dbKeyRoot) if err == db.ErrKeyNotFound { // store new root 0 (empty) if err := t.setToEmptyTree(wTx); err != nil { @@ -889,6 +899,7 @@ func (t *Tree) ImportDumpReader(r io.Reader) error { keys = append(keys, k) values = append(values, v) } + invalid, err := t.AddBatchWithTx(wTx, keys, values) if err != nil { return fmt.Errorf("error adding batch: %w", err) @@ -896,7 +907,7 @@ func (t *Tree) ImportDumpReader(r io.Reader) error { if len(invalid) > 0 { return fmt.Errorf("%d invalid keys found in batch", len(invalid)) } - return wTx.Commit() + return nil } // Graphviz iterates across the full tree to generate a string Graphviz diff --git a/tree/tree.go b/tree/tree.go index 880f2e070..8a5fc217e 100644 --- a/tree/tree.go +++ b/tree/tree.go @@ -285,6 +285,18 @@ func (t *Tree) ImportDump(b []byte) error { return t.tree.ImportDump(b) } +// ImportDumpReader imports the leafs (that have been exported with the Dump method) +// in the Tree (using a byte reader) +func (t *Tree) ImportDumpReader(r io.Reader) error { + return t.tree.ImportDumpReader(r) +} + +// ImportDumpReaderWithTx imports the leafs (that have been exported with the Dump method) +// in the Tree (using a byte reader) +func (t *Tree) ImportDumpReaderWithTx(wTx db.WriteTx, r io.Reader) error { + return t.tree.ImportDumpReaderWithTx(wTx, r) +} + func (t *Tree) PrintGraphviz(rTx db.Reader) error { if rTx == nil { rTx = t.db diff --git a/vochain/app.go b/vochain/app.go index 5ba5a3dd5..51786cfa5 100644 --- a/vochain/app.go +++ b/vochain/app.go @@ -17,6 +17,7 @@ import ( lru "github.com/hashicorp/golang-lru/v2" "go.vocdoni.io/dvote/config" "go.vocdoni.io/dvote/crypto/zk/circuit" + "go.vocdoni.io/dvote/snapshot" "go.vocdoni.io/dvote/test/testcommon/testutil" "go.vocdoni.io/dvote/types" "go.vocdoni.io/dvote/vochain/ist" @@ -38,6 +39,13 @@ const ( // maxPendingTxAttempts is the number of times a transaction can be included in a block // and fail before being removed from the mempool. maxPendingTxAttempts = 3 + + // StateDataDir is the subdirectory inside app.DataDir where State data will be saved + StateDataDir = "vcstate" + // TxHandlerDataDir is the subdirectory inside app.DataDir where TransactionHandler data will be saved + TxHandlerDataDir = "txHandler" + // TxHandlerDataDir is the subdirectory inside app.DataDir where Snapshots data will be saved + SnapshotsDataDir = "snapshots" ) var ( @@ -55,6 +63,7 @@ type BaseApplication struct { NodeClient *cometcli.Local NodeAddress ethcommon.Address TransactionHandler *transaction.TransactionHandler + Snapshots *snapshot.SnapshotManager isSynchronizingFn func() bool // tendermint WaitSync() function is racy, we need to use a mutex in order to avoid // data races when querying about the sync status of the blockchain. @@ -76,10 +85,14 @@ type BaseApplication struct { // blockTime is the target block time that miners use blockTime time.Duration + // snapshotInterval create state snapshot every N blocks (0 to disable) + snapshotInterval int + // endBlockTimestamp is the last block end timestamp calculated from local time. endBlockTimestamp atomic.Int64 chainID string dataDir string + dbType string genesisInfo *comettypes.GenesisDoc // lastDeliverTxResponse is used to store the last DeliverTxResponse, so validators @@ -125,7 +138,7 @@ type ExecuteBlockResponse struct { // Node still needs to be initialized with SetNode. // Callback functions still need to be initialized. func NewBaseApplication(vochainCfg *config.VochainCfg) (*BaseApplication, error) { - state, err := vstate.New(vochainCfg.DBType, vochainCfg.DataDir) + state, err := vstate.New(vochainCfg.DBType, filepath.Join(vochainCfg.DataDir, StateDataDir)) if err != nil { return nil, fmt.Errorf("cannot create state: (%v)", err) } @@ -136,9 +149,14 @@ func NewBaseApplication(vochainCfg *config.VochainCfg) (*BaseApplication, error) transactionHandler := transaction.NewTransactionHandler( state, istc, - filepath.Join(vochainCfg.DataDir, "txHandler"), + filepath.Join(vochainCfg.DataDir, TxHandlerDataDir), ) + snaps, err := snapshot.NewManager(filepath.Join(vochainCfg.DataDir, SnapshotsDataDir), vochainCfg.StateSyncChunkSize) + if err != nil { + return nil, fmt.Errorf("cannot create snapshot manager: %w", err) + } + // Initialize the zk circuit if err := circuit.Init(); err != nil { return nil, fmt.Errorf("cannot load zk circuit: %w", err) @@ -152,8 +170,11 @@ func NewBaseApplication(vochainCfg *config.VochainCfg) (*BaseApplication, error) State: state, Istc: istc, TransactionHandler: transactionHandler, + Snapshots: snaps, blockCache: blockCache, dataDir: vochainCfg.DataDir, + dbType: vochainCfg.DBType, + snapshotInterval: vochainCfg.SnapshotInterval, genesisInfo: &comettypes.GenesisDoc{}, }, nil } @@ -209,15 +230,17 @@ func (app *BaseApplication) CommitState() ([]byte, error) { if err != nil { return nil, fmt.Errorf("cannot save state: %w", err) } - // perform state snapshot (DISABLED) - if false && app.Height()%50000 == 0 && !app.IsSynchronizing() { // DISABLED + + // perform state snapshot + if app.snapshotInterval > 0 && + app.Height()%uint32(app.snapshotInterval) == 0 && + !app.IsSynchronizing() { startTime := time.Now() - log.Infof("performing a state snapshot on block %d", app.Height()) - if _, err := app.State.Snapshot(); err != nil { - return hash, fmt.Errorf("cannot make state snapshot: %w", err) + log.Infof("performing a snapshot on block %d", app.Height()) + if _, err := app.Snapshots.Do(app.State); err != nil { + return hash, fmt.Errorf("cannot make snapshot: %w", err) } log.Infof("snapshot created successfully, took %s", time.Since(startTime)) - log.Debugf("%+v", app.State.ListSnapshots()) } return hash, err } diff --git a/vochain/appsetup.go b/vochain/appsetup.go index db2c2a035..54fa0d529 100644 --- a/vochain/appsetup.go +++ b/vochain/appsetup.go @@ -34,6 +34,11 @@ func (app *BaseApplication) SetNode(vochaincfg *config.VochainCfg, genesis []byt return err } app.genesisInfo = nodeGenesis.Genesis + + // immediately init isSynchronizing to true, + // else this will be (incorrectly) false until the first beginBlock, which can take some time + app.isSynchronizing.Store(true) + return nil } diff --git a/vochain/cometbft.go b/vochain/cometbft.go index d78aa585f..138380c7b 100644 --- a/vochain/cometbft.go +++ b/vochain/cometbft.go @@ -8,7 +8,11 @@ import ( "encoding/json" "errors" "fmt" + "math" + "os" + "path/filepath" "slices" + "sync/atomic" "time" cometabcitypes "github.com/cometbft/cometbft/abci/types" @@ -17,9 +21,10 @@ import ( ethcommon "github.com/ethereum/go-ethereum/common" "go.vocdoni.io/dvote/crypto/ethereum" "go.vocdoni.io/dvote/log" + "go.vocdoni.io/dvote/snapshot" "go.vocdoni.io/dvote/vochain/genesis" "go.vocdoni.io/dvote/vochain/ist" - vstate "go.vocdoni.io/dvote/vochain/state" + "go.vocdoni.io/dvote/vochain/state" "go.vocdoni.io/dvote/vochain/transaction" "go.vocdoni.io/dvote/vochain/transaction/vochaintx" "go.vocdoni.io/proto/build/go/models" @@ -34,7 +39,6 @@ import ( // We use this method to initialize some state variables. func (app *BaseApplication) Info(_ context.Context, req *cometabcitypes.InfoRequest) (*cometabcitypes.InfoResponse, error) { - app.isSynchronizing.Store(true) lastHeight, err := app.State.LastHeight() if err != nil { return nil, fmt.Errorf("cannot get State.LastHeight: %w", err) @@ -46,8 +50,11 @@ func (app *BaseApplication) Info(_ context.Context, } // print some basic version info about tendermint components log.Infow("cometbft info", "cometVersion", req.Version, "p2pVersion", - req.P2PVersion, "blockVersion", req.BlockVersion, "lastHeight", - lastHeight, "appHash", hex.EncodeToString(appHash)) + req.P2PVersion, "blockVersion", req.BlockVersion) + log.Infow("telling cometbft our state", + "LastBlockHeight", lastHeight, + "LastBlockAppHash", hex.EncodeToString(appHash), + ) return &cometabcitypes.InfoResponse{ LastBlockHeight: int64(lastHeight), @@ -71,7 +78,7 @@ func (app *BaseApplication) InitChain(_ context.Context, for _, acc := range genesisAppState.Accounts { addr := ethcommon.BytesToAddress(acc.Address) if err := app.State.CreateAccount(addr, "", nil, acc.Balance); err != nil { - if err != vstate.ErrAccountAlreadyExists { + if err != state.ErrAccountAlreadyExists { return nil, fmt.Errorf("cannot create acount %x: %w", addr, err) } if err := app.State.InitChainMintBalance(addr, acc.Balance); err != nil { @@ -104,7 +111,7 @@ func (app *BaseApplication) InitChain(_ context.Context, } addr := ethcommon.BytesToAddress(v.Address) if err := app.State.CreateAccount(addr, "", nil, 0); err != nil { - if err != vstate.ErrAccountAlreadyExists { + if err != state.ErrAccountAlreadyExists { return nil, fmt.Errorf("cannot create validator acount %x: %w", addr, err) } } @@ -134,7 +141,7 @@ func (app *BaseApplication) InitChain(_ context.Context, } // create burn account - if err := app.State.SetAccount(vstate.BurnAddress, &vstate.Account{}); err != nil { + if err := app.State.SetAccount(state.BurnAddress, &state.Account{}); err != nil { return nil, fmt.Errorf("unable to set burn address") } @@ -263,12 +270,10 @@ func (app *BaseApplication) FinalizeBlock(_ context.Context, Info: tx.Info, } } - if len(req.Txs) > 0 { - log.Debugw("finalize block", "height", height, - "txs", len(req.Txs), "hash", hex.EncodeToString(root), - "milliSeconds", time.Since(start).Milliseconds(), - "proposer", hex.EncodeToString(req.GetProposerAddress())) - } + log.Debugw("finalize block", "height", height, + "txs", len(req.Txs), "hash", hex.EncodeToString(root), + "milliSeconds", time.Since(start).Milliseconds(), + "proposer", hex.EncodeToString(req.GetProposerAddress())) // update validator score as an IST action for the next block. Note that at this point, // we cannot modify the state or we would break ProcessProposal optimistic execution @@ -497,28 +502,169 @@ func (app *BaseApplication) ProcessProposal(_ context.Context, }, nil } -// ListSnapshots returns a list of available snapshots. -func (*BaseApplication) ListSnapshots(context.Context, - *cometabcitypes.ListSnapshotsRequest) (*cometabcitypes.ListSnapshotsResponse, error) { - return &cometabcitypes.ListSnapshotsResponse{}, nil +// Example StateSync (Snapshot) successful flow: +// Alice is a comet node, up-to-date with RPC open on port 26657 +// Bob is a fresh node that is bootstrapping, with params: +// * StateSync.Enable = true +// * StateSync.RPCServers = alice:26657, alice:26657 +// * Bob comet will ask Alice for snapshots (app doesn't intervene here) +// * Alice comet calls ListSnapshots, App returns a []*v1.Snapshot +// * Bob comet calls OfferSnapshot passing a single *v1.Snapshot, App returns ACCEPT +// * Bob comet asks Alice for chunks (app doesn't intervene here) +// * Alice comet calls (N times) LoadSnapshotChunk passing height, format, chunk index. App returns []byte +// * Bob comet calls (N times) ApplySnapshotChunk passing []byte, chunk index and sender. App returns ACCEPT + +// ListSnapshots provides cometbft with a list of available snapshots. +func (app *BaseApplication) ListSnapshots(_ context.Context, + req *cometabcitypes.ListSnapshotsRequest, +) (*cometabcitypes.ListSnapshotsResponse, error) { + list := app.Snapshots.List() + + response := &cometabcitypes.ListSnapshotsResponse{} + for height, dsi := range list { + chunks := uint32(math.Ceil(float64(dsi.Size) / float64(app.Snapshots.ChunkSize))) + + response.Snapshots = append(response.Snapshots, &cometabcitypes.Snapshot{ + Height: uint64(height), + Format: 0, + Chunks: chunks, + Hash: dsi.Hash, + Metadata: []byte{}, + }) + } + log.Debugf("cometbft requests our list of snapshots, we offer %d options", len(response.Snapshots)) + return response, nil +} + +// snapshotFromComet is used when receiving a Snapshot from CometBFT. +// Only 1 snapshot at a time is processed by CometBFT, hence we keep it simple. +var snapshotFromComet struct { + height atomic.Int64 + chunks atomic.Int32 } -// OfferSnapshot returns the response to a snapshot offer. -func (*BaseApplication) OfferSnapshot(context.Context, - *cometabcitypes.OfferSnapshotRequest) (*cometabcitypes.OfferSnapshotResponse, error) { - return &cometabcitypes.OfferSnapshotResponse{}, nil +// OfferSnapshot is called by cometbft during StateSync, when another node offers a Snapshot. +func (app *BaseApplication) OfferSnapshot(_ context.Context, + req *cometabcitypes.OfferSnapshotRequest, +) (*cometabcitypes.OfferSnapshotResponse, error) { + log.Debugw("cometbft offers us a snapshot", + "appHash", hex.EncodeToString(req.AppHash), + "height", req.Snapshot.Height, "format", req.Snapshot.Format, "chunks", req.Snapshot.Chunks) + + snapshotFromComet.height.Store(int64(req.Snapshot.Height)) + snapshotFromComet.chunks.Store(int32(req.Snapshot.Chunks)) + + return &cometabcitypes.OfferSnapshotResponse{ + Result: cometabcitypes.OFFER_SNAPSHOT_RESULT_ACCEPT, + }, nil } -// LoadSnapshotChunk returns the response to a snapshot chunk loading request. -func (*BaseApplication) LoadSnapshotChunk(context.Context, - *cometabcitypes.LoadSnapshotChunkRequest) (*cometabcitypes.LoadSnapshotChunkResponse, error) { - return &cometabcitypes.LoadSnapshotChunkResponse{}, nil +// LoadSnapshotChunk provides cometbft with a snapshot chunk, during StateSync. +// +// cometbft will reject a len(Chunk) > 16M +func (app *BaseApplication) LoadSnapshotChunk(_ context.Context, + req *cometabcitypes.LoadSnapshotChunkRequest, +) (*cometabcitypes.LoadSnapshotChunkResponse, error) { + log.Debugw("cometbft requests a chunk from our snapshot", + "height", req.Height, "format", req.Format, "chunk", req.Chunk) + + buf, err := app.Snapshots.SliceChunk(req.Height, req.Format, req.Chunk) + + if err != nil { + return nil, err + } + + return &cometabcitypes.LoadSnapshotChunkResponse{ + Chunk: buf, + }, nil +} + +// ApplySnapshotChunk saves to disk a snapshot chunk provided by cometbft StateSync. +// When all chunks of a Snapshot are in disk, the Snapshot is restored. +// +// cometbft will never pass a Chunk bigger than 16M +func (app *BaseApplication) ApplySnapshotChunk(_ context.Context, + req *cometabcitypes.ApplySnapshotChunkRequest, +) (*cometabcitypes.ApplySnapshotChunkResponse, error) { + log.Debugw("cometbft provides us a chunk", + "index", req.Index, "size", len(req.Chunk)) + + if err := app.Snapshots.WriteChunkToDisk(req.Index, req.Chunk); err != nil { + return nil, err + } + + if app.Snapshots.CountChunksInDisk() == int(snapshotFromComet.chunks.Load()) { + // if we got here, all chunks are on disk + s, err := app.Snapshots.JoinChunks(snapshotFromComet.chunks.Load(), snapshotFromComet.height.Load()) + if err != nil { + log.Error(err) + return &cometabcitypes.ApplySnapshotChunkResponse{ + Result: cometabcitypes.APPLY_SNAPSHOT_CHUNK_RESULT_REJECT_SNAPSHOT, + }, nil + } + + if err := app.RestoreStateFromSnapshot(s); err != nil { + log.Error(err) + return nil, err + } + } + + return &cometabcitypes.ApplySnapshotChunkResponse{ + Result: cometabcitypes.APPLY_SNAPSHOT_CHUNK_RESULT_ACCEPT, + }, nil } -// ApplySnapshotChunk returns the response to a snapshot chunk applying request. -func (*BaseApplication) ApplySnapshotChunk(context.Context, - *cometabcitypes.ApplySnapshotChunkRequest) (*cometabcitypes.ApplySnapshotChunkResponse, error) { - return &cometabcitypes.ApplySnapshotChunkResponse{}, nil +func (app *BaseApplication) RestoreStateFromSnapshot(snap *snapshot.Snapshot) error { + tmpDir, err := snap.Restore(app.dbType, app.dataDir) + if err != nil { + log.Error(err) + return err + } + + // some components (Indexer, OffChainDataHandler) are initialized before statesync, + // and their EventListeners are added to the old state. So we need to copy them to the new state + oldEventListeners := app.State.EventListeners() + + if err := app.State.Close(); err != nil { + return fmt.Errorf("cannot close old state: %w", err) + } + + if err := os.RemoveAll(filepath.Join(app.dataDir, StateDataDir)); err != nil { + return fmt.Errorf("error removing existing dataDir: %w", err) + } + + if err := os.Rename(tmpDir, filepath.Join(app.dataDir, StateDataDir)); err != nil { + return fmt.Errorf("error moving newDataDir into dataDir: %w", err) + } + + // TODO: dedup this from vochain/app.go NewBaseApplication + newState, err := state.New(app.dbType, filepath.Join(app.dataDir, StateDataDir)) + if err != nil { + return fmt.Errorf("cannot open new state: %w", err) + } + + // we also need to SetChainID again (since app.SetChainID propagated chainID to the old state) + newState.SetChainID(app.ChainID()) + + newState.CleanEventListeners() + for _, l := range oldEventListeners { + newState.AddEventListener(l) + } + + istc := ist.NewISTC(newState) + // Create the transaction handler for checking and processing transactions + transactionHandler := transaction.NewTransactionHandler( + newState, + istc, + filepath.Join(app.dataDir, TxHandlerDataDir), + ) + + // This looks racy but actually it's OK, since State Sync happens during very early init, + // when the app is blocked waiting for cometbft to finish startup. + app.State = newState + app.Istc = istc + app.TransactionHandler = transactionHandler + return nil } // Query does nothing diff --git a/vochain/indexer/indexer.go b/vochain/indexer/indexer.go index 85d462407..231b5f0f9 100644 --- a/vochain/indexer/indexer.go +++ b/vochain/indexer/indexer.go @@ -5,6 +5,7 @@ import ( "database/sql" "embed" "encoding/hex" + "errors" "fmt" "io" "math/big" @@ -16,6 +17,7 @@ import ( "time" "go.vocdoni.io/dvote/log" + "go.vocdoni.io/dvote/statedb" "go.vocdoni.io/dvote/types" "go.vocdoni.io/dvote/vochain" indexerdb "go.vocdoni.io/dvote/vochain/indexer/db" @@ -39,6 +41,8 @@ import ( //go:embed migrations/*.sql var embedMigrations embed.FS +const dbFilename = "db.sqlite" + // EventListener is an interface used for executing custom functions during the // events of the tally of a process. type EventListener interface { @@ -98,6 +102,7 @@ type Options struct { // ExpectBackupRestore should be set to true if a call to Indexer.RestoreBackup // will be made shortly after New is called, and before any indexing or queries happen. + // If the DB file on disk exists, this flag will be ignored and the existing DB will be loaded. ExpectBackupRestore bool IgnoreLiveResults bool @@ -125,10 +130,13 @@ func New(app *vochain.BaseApplication, opts Options) (*Indexer, error) { if err := os.MkdirAll(opts.DataDir, os.ModePerm); err != nil { return nil, err } - idx.dbPath = filepath.Join(opts.DataDir, "db.sqlite") + idx.dbPath = filepath.Join(opts.DataDir, dbFilename) - // If we are expecting a restore shortly after, that will initialize the DB. - if !opts.ExpectBackupRestore { + // if dbPath exists, always startDB (ExpectBackupRestore is ignored) + // if dbPath doesn't exist, and we're not expecting a BackupRestore, startDB + // if dbPath doesn't exist and we're expecting a backup, skip startDB, it will be triggered after the restore + if _, err := os.Stat(idx.dbPath); err == nil || + (os.IsNotExist(err) && !opts.ExpectBackupRestore) { if err := idx.startDB(); err != nil { return nil, err } @@ -228,7 +236,7 @@ func (idx *Indexer) Close() error { // BackupPath restores the database from a backup created via SaveBackup. // Note that this must be called with ExpectBackupRestore set to true, // and before any indexing or queries happen. -func (idx *Indexer) RestoreBackup(ctx context.Context, path string) error { +func (idx *Indexer) RestoreBackup(path string) error { if idx.readWriteDB != nil { panic("Indexer.RestoreBackup called after the database was initialized") } @@ -251,6 +259,29 @@ func (idx *Indexer) SaveBackup(ctx context.Context, path string) error { return err } +// ExportBackupAsBytes backs up the database, and returns the contents as []byte. +// +// Note that writes to the database may be blocked until the backup finishes. +// +// For sqlite, this is done via "VACUUM INTO", so the resulting file is also a database. +func (idx *Indexer) ExportBackupAsBytes(ctx context.Context) ([]byte, error) { + tmpDir, err := os.MkdirTemp("", "indexer") + if err != nil { + return nil, fmt.Errorf("error creating tmpDir: %w", err) + + } + tmpFilePath := filepath.Join(tmpDir, "indexer.sqlite3") + if err := idx.SaveBackup(ctx, tmpFilePath); err != nil { + return nil, fmt.Errorf("error saving indexer backup: %w", err) + } + defer func() { + if err := os.Remove(tmpFilePath); err != nil { + log.Warnw("error removing indexer backup file", "path", tmpFilePath, "err", err) + } + }() + return os.ReadFile(tmpFilePath) +} + // blockTxQueries assumes that lockPool is locked. func (idx *Indexer) blockTxQueries() *indexerdb.Queries { if idx.blockMu.TryLock() { @@ -353,12 +384,20 @@ func (idx *Indexer) AfterSyncBootstrap(inTest bool) { EnvelopeType: process.Envelope, } // Get the votes from the state - idx.App.State.IterateVotes(p, true, func(vote *models.StateDBVote) bool { + if err := idx.App.State.IterateVotes(p, true, func(vote *models.StateDBVote) bool { if err := idx.addLiveVote(process, vote.VotePackage, new(big.Int).SetBytes(vote.Weight), partialResults); err != nil { log.Errorw(err, "could not add live vote") } return false - }) + }); err != nil { + if errors.Is(err, statedb.ErrEmptyTree) { + log.Debugf("process %x doesn't have any votes yet, skipping", p) + continue + } + log.Errorw(err, "unexpected error during iterate votes") + continue + } + // Store the results on the persistent database if err := idx.commitVotesUnsafe(queries, p, indxR, partialResults, nil, idx.App.Height()); err != nil { log.Errorw(err, "could not commit live votes") diff --git a/vochain/indexer/indexer_test.go b/vochain/indexer/indexer_test.go index 4da5e5342..bd2cea58e 100644 --- a/vochain/indexer/indexer_test.go +++ b/vochain/indexer/indexer_test.go @@ -110,7 +110,7 @@ func TestBackup(t *testing.T) { idx.Close() idx, err = New(app, Options{DataDir: t.TempDir(), ExpectBackupRestore: true}) qt.Assert(t, err, qt.IsNil) - err = idx.RestoreBackup(context.TODO(), backupPath) + err = idx.RestoreBackup(backupPath) qt.Assert(t, err, qt.IsNil) wantTotalVotes(10) diff --git a/vochain/indexer/migrations_test.go b/vochain/indexer/migrations_test.go index 064056c17..e5d0848d7 100644 --- a/vochain/indexer/migrations_test.go +++ b/vochain/indexer/migrations_test.go @@ -1,7 +1,6 @@ package indexer import ( - "context" "io" "os" "path/filepath" @@ -45,8 +44,7 @@ func TestRestoreBackupAndMigrate(t *testing.T) { // Restore the backup. // Note that the indexer prepares all queries upfront, // which means sqlite will fail if any of them reference missing columns or tables. - ctx := context.Background() - err = idx.RestoreBackup(ctx, backupPath) + err = idx.RestoreBackup(backupPath) qt.Assert(t, err, qt.IsNil) // Sanity check that the data is there, and can be used. diff --git a/vochain/start.go b/vochain/start.go index 6d9107fb7..a6528da0a 100644 --- a/vochain/start.go +++ b/vochain/start.go @@ -184,6 +184,47 @@ func newTendermint(app *BaseApplication, tconfig.Mempool.MaxTxsBytes = int64(tconfig.Mempool.Size * tconfig.Mempool.MaxTxBytes) tconfig.Mempool.CacheSize = 100000 tconfig.Mempool.Broadcast = true + tconfig.StateSync.Enable = localConfig.StateSyncEnabled + if tconfig.StateSync.Enable { + tconfig.StateSync.RPCServers = func() []string { + // prefer the most the specific flag first + if len(localConfig.StateSyncRPCServers) > 0 { + return localConfig.StateSyncRPCServers + } + + // else, we resort to seeds (replacing the port) + replacePorts := func(slice []string) []string { + for i, v := range slice { + slice[i] = strings.ReplaceAll(v, ":26656", ":26657") + } + return slice + } + + // first fallback to Seeds + if len(localConfig.Seeds) > 0 { + return replacePorts(localConfig.Seeds) + } + + // if also no Seeds specified, fallback to genesis + if _, ok := vocdoniGenesis.Genesis[localConfig.Network]; ok { + return replacePorts(vocdoniGenesis.Genesis[localConfig.Network].SeedNodes) + } + + return nil + }() + + // after parsing flag and fallbacks, if we still have only 1 server specified, + // duplicate it as a quick workaround since cometbft requires passing 2 (primary and witness) + if len(tconfig.StateSync.RPCServers) == 1 { + tconfig.StateSync.RPCServers = append(tconfig.StateSync.RPCServers, tconfig.StateSync.RPCServers...) + } + + log.Infof("state sync rpc servers: %s", tconfig.StateSync.RPCServers) + + tconfig.StateSync.TrustHeight = localConfig.StateSyncTrustHeight + tconfig.StateSync.TrustHash = localConfig.StateSyncTrustHash + } + tconfig.RPC.ListenAddress = "tcp://0.0.0.0:26657" log.Debugf("mempool config: %+v", tconfig.Mempool) // tmdbBackend defaults to goleveldb, but switches to cleveldb if diff --git a/vochain/state/snapshot.go b/vochain/state/snapshot.go index b1cddb621..bf76a9dcb 100644 --- a/vochain/state/snapshot.go +++ b/vochain/state/snapshot.go @@ -2,439 +2,122 @@ package state import ( "bytes" - "encoding/binary" "encoding/gob" "errors" "fmt" "io" - "os" - "path/filepath" - "strconv" - "sync" - "time" "go.vocdoni.io/dvote/log" "go.vocdoni.io/dvote/statedb" "go.vocdoni.io/dvote/tree/arbo" ) -const ( - snapshotHeaderVersion = 1 - snapshotHeaderLenSize = 32 -) - -// StateSnapshot is a copy in a specific point in time of the blockchain state. -// The state is supposed to be a list of nested merkle trees. -// The StateSnapshot contains the methods for building a single file snapshot of -// the state containing multiple trees. -// The implementation allows the encoding and decoding of snapshots. -// -// The structure of the snapshot encoded file is: -// -// [headerLen][header][noState][tree1][tree2][treeN] -// -// - headerlen is a fixed 32 bytes little endian number indicating the size of the header. -// -// - header is the Gob encoded structure containing the information of the trees (size, name, etc.). -// -// - treeN is the raw bytes dump of all trees. -type StateSnapshot struct { - path string - file *os.File - lock sync.Mutex - header SnapshotHeader - headerSize uint32 - currentTree int // indicates the current tree - currentTreeReadBuffer io.Reader // the buffer for reading the current tree -} - -// SnapshotHeader is the header structure of StateSnapshot containing the list of merkle trees. -type SnapshotHeader struct { - Version int - Root []byte - ChainID string - Height uint32 - NoStateSize uint32 - Trees []SnapshotHeaderTree -} - -// SnapshotHeaderTree represents a merkle tree of the StateSnapshot. -type SnapshotHeaderTree struct { +// TreeDescription has the details needed for a tree to be backed up +type TreeDescription struct { Name string - Size uint32 Parent string - Root []byte -} - -// SetMainRoot sets the root for the mail merkle tree of the state. -func (s *StateSnapshot) SetMainRoot(root []byte) { - s.header.Root = root -} - -// SetHeight sets the blockchain height for the snapshot. -func (s *StateSnapshot) SetHeight(height uint32) { - s.header.Height = height + Key []byte + Tree statedb.TreeViewer } -// SetChainID sets the blockchain identifier for the snapshot. -func (s *StateSnapshot) SetChainID(chainID string) { - s.header.ChainID = chainID -} +// DeepListStateTrees returns a flat list of all trees in statedb +func (v *State) DeepListStateTrees() ([]TreeDescription, error) { + list := []TreeDescription{} -// SetNoStateSize sets the noState database size -func (s *StateSnapshot) SetNoStateSize(size uint32) { - s.header.NoStateSize = size -} - -// Open reads an existing snapshot file and decodes the header. -// After calling this method everything is ready for reading the first -// merkle tree. No need to execute `FetchNextTree` until io.EOF is reached. -// -// This method performs the opposite operation of `Create`, one of both needs -// to be called (but not both). -func (s *StateSnapshot) Open(filePath string) error { - var err error - s.path = filePath - s.file, err = os.Open(filePath) - if err != nil { - return err - } - headerSizeBytes := make([]byte, snapshotHeaderLenSize) - if _, err = io.ReadFull(s.file, headerSizeBytes); err != nil { - return err - } - s.headerSize = binary.LittleEndian.Uint32(headerSizeBytes) - if _, err := s.file.Seek(snapshotHeaderLenSize, 0); err != nil { - return err - } - decoder := gob.NewDecoder(io.LimitReader(s.file, int64(s.headerSize))) - if err := decoder.Decode(&s.header); err != nil { - return fmt.Errorf("cannot decode header: %w", err) - } - if s.header.Version != snapshotHeaderVersion { - return fmt.Errorf("snapshot version not compatible") - } - // we call FetchNextTree which increase s.currentTree, so we set - // currentTree to -1 value (thus the first tree with index 0 is loaded) - s.currentTree-- - return s.FetchNextTree() -} - -// FetchNextTree prepares everything for reading the next tree. -// Returns io.EOF when there are no more trees. -func (s *StateSnapshot) FetchNextTree() error { - s.lock.Lock() - defer s.lock.Unlock() - - // check if no more trees available - if s.currentTree >= len(s.header.Trees)-1 { - return io.EOF - } - s.currentTree++ - - // move the pointer is in the start of the next tree - seekPointer := snapshotHeaderLenSize + int(s.headerSize) - for i := 0; i < s.currentTree; i++ { - seekPointer += int(s.header.Trees[i].Size) - } - _, err := s.file.Seek(int64(seekPointer), io.SeekStart) - - // update the buffer Reader - s.currentTreeReadBuffer = io.LimitReader( - s.file, - int64(s.header.Trees[s.currentTree].Size), - ) - return err -} - -// Read implements the io.Reader interface. Returns io.EOF error when no -// more bytes available in the current three. -func (s *StateSnapshot) Read(b []byte) (int, error) { - s.lock.Lock() - defer s.lock.Unlock() - return s.currentTreeReadBuffer.Read(b) -} - -// ReadAll reads the full content of the current tree and returns its bytes. -// io.EOF error is returned if the bytes have been already read. -func (s *StateSnapshot) ReadAll() ([]byte, error) { - s.lock.Lock() - defer s.lock.Unlock() - b, err := io.ReadAll(s.currentTreeReadBuffer) - if err != nil { - return nil, err - } - if len(b) == 0 { - return nil, io.EOF - } - return b, nil -} - -// Header returns the header for the snapshot containing the information -// about all the merkle trees. -func (s *StateSnapshot) Header() *SnapshotHeader { - return &s.header -} - -// TreeHeader returns the header for the current tree. -func (s *StateSnapshot) TreeHeader() *SnapshotHeaderTree { - return &s.header.Trees[s.currentTree] -} - -// Path returns the file path of the snapshot file currently used. -func (s *StateSnapshot) Path() string { - return s.path -} - -// Create starts the creation of a new snapshot as a disk file. -// This method must be called only once and its operation is opposed to `Open`. -func (s *StateSnapshot) Create(filePath string) error { - s.lock.Lock() - defer s.lock.Unlock() - var err error - s.header.Version = snapshotHeaderVersion - s.file, err = os.Create(filePath + ".tmp") - s.path = filePath - return err -} - -// AddTree adds a new tree to the snapshot. `Create` needs to be called first. -func (s *StateSnapshot) AddTree(name, parent string, root []byte) { - s.lock.Lock() // only 1 tree at time is allowed - s.header.Trees = append(s.header.Trees, SnapshotHeaderTree{ - Name: name, - Parent: parent, - Root: root, - Size: 0, + // main tree + list = append(list, TreeDescription{ + Name: TreeMain, + Parent: "", + Key: nil, + Tree: v.mainTreeViewer(true), }) -} - -// EndTree finishes the addition of a tree. This method should be called after `AddTree`. -func (s *StateSnapshot) EndTree() { - s.currentTree++ - s.lock.Unlock() -} - -// Save builds the snapshot started with `Create` and stores in disk its contents. -// After calling this method the snapshot is finished. -// `EndTree` must be called before saving. -func (s *StateSnapshot) Save() error { - s.lock.Lock() - defer s.lock.Unlock() - - // create the final file - finalFile, err := os.Create(s.path) - if err != nil { - return err - } - - defer func() { - if err := finalFile.Close(); err != nil { - log.Warnf("error closing the file %v", err) - } - }() - - // build the header - var buf bytes.Buffer - enc := gob.NewEncoder(&buf) - if err := enc.Encode(s.header); err != nil { - return err - } - - // write the size of the header in the first 32 bytes - headerSize := make([]byte, snapshotHeaderLenSize) - binary.LittleEndian.PutUint32(headerSize, uint32(buf.Len())) - if _, err := finalFile.Write(headerSize); err != nil { - return err - } - - // write the header - hs, err := finalFile.Write(buf.Bytes()) - if err != nil { - return err - } - log.Debugf("snapshot header size is %d bytes", hs) - // write the trees (by copying the tmpFile) - if _, err := s.file.Seek(0, io.SeekStart); err != nil { - return err - } - bs, err := io.Copy(finalFile, s.file) - if err != nil { - return err - } - log.Debugf("snapshot trees size is %d bytes", bs) - - // close and remove the temporary file - if err := s.file.Close(); err != nil { - return err - } - if err := finalFile.Close(); err != nil { - return err - } - return os.Remove(s.file.Name()) -} - -// Write implements the io.Writer interface. -// Writes a chunk of bytes as part of the current merkle tree. -func (s *StateSnapshot) Write(b []byte) (int, error) { - n, err := s.file.Write(b) - s.header.Trees[s.currentTree].Size += uint32(n) - return n, err -} - -// Snapshot performs a snapshot of the last committed state for all trees. -// The snapshot is stored in disk and the file path is returned. -func (v *State) Snapshot() (string, error) { - t := v.MainTreeView() - height, err := v.LastHeight() - if err != nil { - return "", err - } - root, err := t.Root() - if err != nil { - return "", err - } - - if err := os.MkdirAll(filepath.Join( - v.dataDir, - storageDirectory, - snapshotsDirectory), 0750); err != nil { - return "", err - } - - var snap StateSnapshot - if err := snap.Create(filepath.Join( - v.dataDir, - storageDirectory, - snapshotsDirectory, - fmt.Sprintf("%d", height), - )); err != nil { - return "", err - } - snap.SetMainRoot(root) - snap.SetHeight(height) - snap.SetChainID(v.chainID) - - noStateSize, err := ExportNoStateDB(&snap, v.NoState(false)) - if err != nil { - return "", err - } - snap.SetNoStateSize(noStateSize) - - dumpTree := func(name, parent string, tr statedb.TreeViewer) error { - root, err := tr.Root() + // main subtrees + for name := range MainTrees { + tv, err := v.mainTreeViewer(true).SubTree(StateTreeCfg(name)) if err != nil { - return err - } - snap.AddTree(name, parent, root) - if err := tr.Dump(&snap); err != nil { - return fmt.Errorf("cannot dump tree: %w", err) - } - snap.EndTree() - return nil - } - - // dump main tree - if err := dumpTree("Main", "", v.mainTreeViewer(true)); err != nil { - return "", err - } - - // dump main subtrees - for k := range MainTrees { - t, err := v.mainTreeViewer(true).SubTree(StateTreeCfg(k)) - if err != nil { - return "", err - } - if err := dumpTree(k, "", t); err != nil { - return "", err + return nil, err } + list = append(list, TreeDescription{ + Name: name, + Parent: "", + Key: nil, + Tree: tv, + }) } // dump child trees that depend on process pids, err := v.ListProcessIDs(true) if err != nil { - return "", err + return nil, err } log.Debugf("found %d processes", len(pids)) + for name := range ChildTrees { - for _, p := range pids { - childTreeCfg := StateChildTreeCfg(name) - processTree, err := v.mainTreeViewer(true).SubTree(StateTreeCfg(TreeProcess)) - if err != nil { - return "", fmt.Errorf("cannot load process tree: %w", err) - } - childTree, err := processTree.SubTree(childTreeCfg.WithKey(p)) + for _, pid := range pids { + childTree, err := v.mainTreeViewer(true).DeepSubTree(StateTreeCfg(TreeProcess), StateChildTreeCfg(name).WithKey(pid)) if err != nil { - // key might not exist (i.e process does not have census) + // key might not exist (i.e process does not have votes) if !errors.Is(err, arbo.ErrKeyNotFound) && !errors.Is(err, ErrProcessChildLeafRootUnknown) && !errors.Is(err, statedb.ErrEmptyTree) { - return "", fmt.Errorf("child tree (%s) cannot be loaded with key %x: %w", name, p, err) + return nil, fmt.Errorf("child tree (%s) cannot be loaded with key %x: %w", name, pid, err) } continue } - if err := dumpTree(name, TreeProcess, childTree); err != nil { - return "", err - } + // according to statedb docs, voteTree arbo.Tree (a non-singleton under processTree) uses prefix + // `s/procs/s/votes{pID}/t/` (we are using pID, the processID as the id of the subTree) + // so we need to pass pid here, in order to import the subtree WithKey(pid) during restore + list = append(list, TreeDescription{ + Name: name, + Parent: TreeProcess, + Key: pid, + Tree: childTree, + }) + } } - return snap.Path(), snap.Save() + return list, nil } -/* -// TODO install Snapshot -func (v *State) installSnapshot(height uint32) error { - var snap StateSnapshot - if err := snap.Open(filepath.Join( - v.dataDir, - storageDirectory, - snapshotsDirectory, - fmt.Sprintf("%d", height), - )); err != nil { - return err +func (v *State) RestoreStateTree(r io.Reader, h TreeDescription) error { + if h.Name == TreeMain { + log.Debug("found main tree, but it will be simply be rebuilt at the end, skipping...") + _, _ = io.ReadAll(r) + return nil } - log.Infof("installing snapshot %+v", snap.Header()) - return fmt.Errorf("unimplemented") -} -*/ -type DiskSnapshotInfo struct { - ModTime time.Time - Height uint32 - Size int64 -} + var treeCfg, parent statedb.TreeConfig + switch { + case h.Parent != "": + parent = StateTreeCfg(h.Parent) + treeCfg = StateChildTreeCfg(h.Name).WithKey(h.Key) + default: + treeCfg = StateTreeCfg(h.Name) + } -// ListSnapshots returns the list of the current state snapshots stored in disk. -func (v *State) ListSnapshots() []DiskSnapshotInfo { - files, err := os.ReadDir(filepath.Join( - v.dataDir, - storageDirectory, - snapshotsDirectory)) - if err != nil { - log.Fatal(err) + if err := v.store.Import(treeCfg, &parent, r); err != nil { + log.Errorw(err, "import tree failed:") + return err } - var list []DiskSnapshotInfo - for _, file := range files { - if !file.IsDir() { - height, err := strconv.Atoi(file.Name()) - if err != nil { - log.Errorw(err, "could not list snapshot file height") - continue - } - fileInfo, err := file.Info() - if err != nil { - log.Errorw(err, "could not list snapshot file") - continue - } - list = append(list, DiskSnapshotInfo{ - Size: fileInfo.Size(), - ModTime: fileInfo.ModTime(), - Height: uint32(height), - }) - } + + var st *statedb.TreeUpdate + var err error + switch { + case h.Parent != "": + st, err = v.tx.TreeUpdate.DeepSubTree(parent, treeCfg) + default: + st, err = v.tx.TreeUpdate.SubTree(treeCfg) } - return list + if err != nil { + return err + } + // TODO: v.store.Import() does not mark the tree as dirty, + // but it should, to indicate that the root needs to be updated and propagated, + // so we do it here + st.MarkDirty() + + return nil } // DBPair is a key value pair for the no state db. @@ -444,42 +127,36 @@ type DBPair struct { } // ExportNoStateDB exports the no state db to a gob encoder and writes it to the given writer. -func ExportNoStateDB(w io.Writer, reader *NoState) (uint32, error) { +func (v *State) ExportNoStateDB(w io.Writer) error { pairs := []DBPair{} - err := reader.Iterate(nil, func(key []byte, value []byte) bool { + err := v.NoState(true).Iterate(nil, func(key []byte, value []byte) bool { pairs = append(pairs, DBPair{Key: bytes.Clone(key), Value: bytes.Clone(value)}) return true }) if err != nil { - return 0, err + return err } - cw := &countingWriter{w: w} - enc := gob.NewEncoder(cw) - return cw.n, enc.Encode(pairs) + enc := gob.NewEncoder(w) + return enc.Encode(pairs) } // ImportNoStateDB imports the no state db from a gob decoder and writes it to the given db updater. -func ImportNoStateDB(r io.Reader, db *NoState) error { +func (v *State) ImportNoStateDB(r io.Reader) error { + pairs := []DBPair{} dec := gob.NewDecoder(r) - var pairs []DBPair if err := dec.Decode(&pairs); err != nil { return err } + ns := v.NoState(true) for _, pair := range pairs { - if err := db.Set(pair.Key, pair.Value); err != nil { + if err := ns.Set(pair.Key, pair.Value); err != nil { return err } } return nil } -type countingWriter struct { - w io.Writer - n uint32 -} - -func (cw *countingWriter) Write(p []byte) (int, error) { - n, err := cw.w.Write(p) - cw.n += uint32(n) - return n, err +// Commit calls v.tx.Commit(height) +func (v *State) Commit(height uint32) error { + return v.tx.Commit(height) } diff --git a/vochain/state/state.go b/vochain/state/state.go index c2ebc311c..91c1a1cba 100644 --- a/vochain/state/state.go +++ b/vochain/state/state.go @@ -6,7 +6,6 @@ import ( "errors" "fmt" "os" - "path/filepath" "sync" "sync/atomic" "time" @@ -30,10 +29,6 @@ const ( // is the value of arbo.HashFunctionSha256.Len(), arbo.HashFunctionPoseidon.Len() and // arbo.HashFunctionBlake2b.Len() defaultHashLen = 32 - // storageDirectory is the final directory name where all files are stored. - storageDirectory = "vcstate" - // snapshotsDirectory is the final directory name where the snapshots are stored. - snapshotsDirectory = "snapshots" // timestampKey is the key used to store the timestamp of the last block. timestampKey = "timestamp" @@ -49,8 +44,6 @@ var ( // State represents the state of the vochain application type State struct { - // data directory for storing files - dataDir string // db is the underlying key-value database used by the StateDB db db.Database // Store contains the StateDB. We match every StateDB commit version @@ -80,12 +73,13 @@ type State struct { mtxValidSIKRoots sync.Mutex } -// New creates a new State +// NewState creates a new State func New(dbType, dataDir string) (*State, error) { - database, err := metadb.New(dbType, filepath.Join(dataDir, storageDirectory)) + database, err := metadb.New(dbType, dataDir) if err != nil { - return nil, err + return nil, fmt.Errorf("cannot init metadb: %w", err) } + sdb, err := initStateDB(database) if err != nil { return nil, fmt.Errorf("cannot init StateDB: %s", err) @@ -102,8 +96,8 @@ func New(dbType, dataDir string) (*State, error) { if err != nil { return nil, err } - log.Infof("state database is ready at version %d with hash %x", - version, root) + log.Infof("state database at %s is ready at version %d with hash %x", + dataDir, version, root) tx, err := sdb.BeginTx() if err != nil { return nil, err @@ -113,7 +107,6 @@ func New(dbType, dataDir string) (*State, error) { return nil, err } s := &State{ - dataDir: dataDir, db: database, store: sdb, tx: treeTxWithMutex{TreeTx: tx}, @@ -130,7 +123,7 @@ func New(dbType, dataDir string) (*State, error) { if err := s.FetchValidSIKRoots(); err != nil { return nil, fmt.Errorf("cannot update valid SIK roots: %w", err) } - return s, os.MkdirAll(filepath.Join(dataDir, storageDirectory, snapshotsDirectory), 0750) + return s, os.MkdirAll(dataDir, 0o750) } // initStateDB initializes the StateDB with the default subTrees @@ -152,6 +145,7 @@ func initStateDB(database db.Database) (*statedb.StateDB, error) { return nil, err } defer update.Discard() + // Create the all the Main subtrees (from // mainTree) by adding leaves in the mainTree that contain the // corresponding tree roots, and opening the subTrees for the first diff --git a/vochain/state/state_snapshot_test.go b/vochain/state/state_snapshot_test.go index 1eaa6bfdd..7f193fc7b 100644 --- a/vochain/state/state_snapshot_test.go +++ b/vochain/state/state_snapshot_test.go @@ -1,16 +1,10 @@ package state import ( - "io" - "path/filepath" "testing" qt "github.com/frankban/quicktest" "go.vocdoni.io/dvote/db" - "go.vocdoni.io/dvote/db/metadb" - "go.vocdoni.io/dvote/test/testcommon/testutil" - "go.vocdoni.io/dvote/tree" - "go.vocdoni.io/dvote/tree/arbo" "go.vocdoni.io/proto/build/go/models" ) @@ -41,119 +35,6 @@ func TestSnapshot(t *testing.T) { qt.Assert(t, hash, qt.Not(qt.IsNil)) } -func TestTreeExportImport(t *testing.T) { - rnd := testutil.NewRandom(10) - mainRoot := rnd.RandomBytes(32) - - var snap StateSnapshot - err := snap.Create(filepath.Join(t.TempDir(), "snapshot1")) - qt.Assert(t, err, qt.IsNil) - snap.SetMainRoot(mainRoot) - - tree1 := newTreeForTest(t, 0) - root1, err := tree1.Root(nil) - qt.Assert(t, err, qt.IsNil) - snap.AddTree("Tree1", "", root1) - err = tree1.DumpWriter(&snap) - qt.Assert(t, err, qt.IsNil) - snap.EndTree() - - tree2 := newTreeForTest(t, 1) - root2, err := tree2.Root(nil) - qt.Assert(t, err, qt.IsNil) - snap.AddTree("Tree2", "", root2) - err = tree2.DumpWriter(&snap) - qt.Assert(t, err, qt.IsNil) - snap.EndTree() - - tree3 := newTreeForTest(t, 2) - root3, err := tree3.Root(nil) - qt.Assert(t, err, qt.IsNil) - snap.AddTree("Tree3", "Tree1", root3) - err = tree3.DumpWriter(&snap) - qt.Assert(t, err, qt.IsNil) - snap.EndTree() - - err = snap.Save() - qt.Assert(t, err, qt.IsNil) - - var snap2 StateSnapshot - err = snap2.Open(snap.Path()) - qt.Assert(t, err, qt.IsNil) - - // tree1 - treeImp := newEmptyTreeForTest(t) - b, err := snap2.ReadAll() - qt.Assert(t, err, qt.IsNil) - qt.Assert(t, len(b), qt.Not(qt.Equals), 0) - _, err = snap2.ReadAll() - qt.Assert(t, err, qt.ErrorIs, io.EOF) - err = treeImp.ImportDump(b) - qt.Assert(t, err, qt.IsNil) - _, err = snap2.ReadAll() - qt.Assert(t, err, qt.ErrorIs, io.EOF) - root, err := treeImp.Root(nil) - qt.Assert(t, err, qt.IsNil) - qt.Assert(t, root1, qt.DeepEquals, root) - - // tree2 - err = snap2.FetchNextTree() - qt.Assert(t, err, qt.IsNil) - treeImp = newEmptyTreeForTest(t) - b, err = snap2.ReadAll() - qt.Assert(t, err, qt.IsNil) - err = treeImp.ImportDump(b) - qt.Assert(t, err, qt.IsNil) - root, err = treeImp.Root(nil) - qt.Assert(t, err, qt.IsNil) - qt.Assert(t, root2, qt.DeepEquals, root) - - // tree3 - err = snap2.FetchNextTree() - qt.Assert(t, err, qt.IsNil) - treeImp = newEmptyTreeForTest(t) - b, err = snap2.ReadAll() - qt.Assert(t, err, qt.IsNil) - err = treeImp.ImportDump(b) - qt.Assert(t, err, qt.IsNil) - root, err = treeImp.Root(nil) - qt.Assert(t, err, qt.IsNil) - qt.Assert(t, root3, qt.DeepEquals, root) - - // end of trees - err = snap2.FetchNextTree() - qt.Assert(t, err, qt.ErrorIs, io.EOF) - -} - -func newTreeForTest(t *testing.T, rndGenerator int64) *tree.Tree { - tree := newEmptyTreeForTest(t) - rnd := testutil.NewRandom(rndGenerator) - - // Add 3 values to the tree - wtx := tree.DB().WriteTx() - err := tree.Add(wtx, rnd.RandomBytes(32), rnd.RandomBytes(32)) - qt.Assert(t, err, qt.IsNil) - err = tree.Add(wtx, rnd.RandomBytes(32), rnd.RandomBytes(32)) - qt.Assert(t, err, qt.IsNil) - err = tree.Add(wtx, rnd.RandomBytes(32), rnd.RandomBytes(32)) - qt.Assert(t, err, qt.IsNil) - err = tree.Add(wtx, rnd.RandomBytes(32), rnd.RandomBytes(32)) - qt.Assert(t, err, qt.IsNil) - err = wtx.Commit() - qt.Assert(t, err, qt.IsNil) - - return tree -} - -func newEmptyTreeForTest(t *testing.T) *tree.Tree { - db, err := metadb.New(db.TypePebble, t.TempDir()) - qt.Assert(t, err, qt.IsNil) - tr, err := tree.New(nil, tree.Options{DB: db, MaxLevels: 256, HashFunc: arbo.HashFunctionBlake2b}) - qt.Assert(t, err, qt.IsNil) - return tr -} - func newStateForTest(t *testing.T) *State { state, err := New(db.TypePebble, t.TempDir()) qt.Assert(t, err, qt.IsNil)