Skip to content

Commit

Permalink
vochain: implement StateSync
Browse files Browse the repository at this point in the history
* vochain: add StateSync* and SnapshotInterval flags
  * if RPCServers not specified, fallback to Seeds
  * only do Snapshots when node has an Indexer enabled

state/snapshot: refactor the whole Snapshot feature into a new package snapshot
  * merge {Add,dump,End}Tree into a single DumpTree
  * refactor Trees into generic Blobs concept
  * implement indexer Export/Import
  * Import,ExportNoStateDB now uses NoState(withTxLock=true), grabbing lock once

indexer:
  * add ExportBackupAsBytes() deduping code from api.chainIndexerExportHandler()
  * refactor RestoreBackup to use a reader instead of a file
  * handle errors of IterateVotes
  * if dbPath exists, always startDB and ignore ExpectBackupRestore

statedb: implement statedb.Import method

tests:
  * state: remove TestTreeExportImport, needs rewriting from scratch
  * testsuite: add test_statesync, uses gatewaySync to test State+NoState+Indexer Export/Import
  * testsuite: check for CONSENSUS FAILURE in logs

dockerfiles: open port 26657 for CometBFT RPC used by StateSync
  • Loading branch information
altergui committed Mar 6, 2024
1 parent 8dfe265 commit 71de0dc
Show file tree
Hide file tree
Showing 28 changed files with 1,288 additions and 628 deletions.
22 changes: 1 addition & 21 deletions api/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,13 @@ import (
"encoding/json"
"errors"
"fmt"
"io"
"os"
"path/filepath"
"strconv"
"time"

comettypes "github.com/cometbft/cometbft/types"
"go.vocdoni.io/dvote/crypto/zk/circuit"
"go.vocdoni.io/dvote/httprouter"
"go.vocdoni.io/dvote/httprouter/apirest"
"go.vocdoni.io/dvote/log"
"go.vocdoni.io/dvote/types"
"go.vocdoni.io/dvote/util"
"go.vocdoni.io/dvote/vochain"
Expand Down Expand Up @@ -1028,23 +1024,7 @@ func (a *API) chainListFeesByTypeHandler(_ *apirest.APIdata, ctx *httprouter.HTT
func (a *API) chainIndexerExportHandler(_ *apirest.APIdata, ctx *httprouter.HTTPContext) error {
exportCtx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()
tmpFilePath := filepath.Join(os.TempDir(), "indexer.sql")
if err := a.indexer.SaveBackup(exportCtx, tmpFilePath); err != nil {
return fmt.Errorf("error saving indexer backup: %w", err)
}
tmpFile, err := os.Open(tmpFilePath)
if err != nil {
return fmt.Errorf("error opening indexer backup: %w", err)
}
defer func() {
if err := tmpFile.Close(); err != nil {
log.Warnw("error closing indexer backup file", "err", err)
}
if err := os.Remove(tmpFilePath); err != nil {
log.Warnw("error removing indexer backup file", "err", err)
}
}()
data, err := io.ReadAll(tmpFile)
data, err := a.indexer.ExportBackupAsBytes(exportCtx)
if err != nil {
return fmt.Errorf("error reading indexer backup: %w", err)
}
Expand Down
20 changes: 19 additions & 1 deletion cmd/node/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,18 @@ func loadConfig() *config.Config {
"do not wait for Vochain to synchronize (for testing only)")
flag.Int("vochainMempoolSize", 20000,
"vochain mempool size")
flag.Int("vochainSnapshotInterval", 0,
"create state snapshot every N blocks (0 to disable)")
flag.Bool("vochainStateSyncEnabled", false,
"during startup, let cometBFT ask peers for available snapshots and use them to bootstrap the state")
flag.StringSlice("vochainStateSyncRPCServers", []string{},
"list of RPC servers to bootstrap the StateSync (optional, defaults to using seeds)")
flag.String("vochainStateSyncTrustHash", "",
"hash of the trusted block (required if vochainStateSyncEnabled)")
flag.Int64("vochainStateSyncTrustHeight", 0,
"height of the trusted block (required if vochainStateSyncEnabled)")
flag.Int64("vochainStateSyncChunkSize", 10*(1<<20), // 10 MB
"cometBFT chunk size in bytes")

flag.Int("vochainMinerTargetBlockTimeSeconds", 10,
"vochain consensus block time target (in seconds)")
Expand Down Expand Up @@ -440,7 +452,12 @@ func main() {
conf.Vochain.OffChainDataDownload = conf.Vochain.OffChainDataDownload &&
conf.Mode == types.ModeGateway

// create the vochain service
// if there's no indexer, then never do snapshots because they would be incomplete
if !conf.Vochain.Indexer.Enabled {
conf.Vochain.SnapshotInterval = 0
}

// create the vochain service
if err := srv.Vochain(); err != nil {
log.Fatal(err)
}
Expand All @@ -467,6 +484,7 @@ func main() {
}
}
// start the service and block until finish fast sync
// State Sync (if enabled) also happens during this step
if err := srv.Start(); err != nil {
log.Fatal(err)
}
Expand Down
15 changes: 15 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,21 @@ type VochainCfg struct {
IsSeedNode bool
// OffChainDataDownload specifies if the node is configured to download off-chain data
OffChainDataDownload bool
// SnapshotInterval enables creating a state snapshot every N blocks (0 to disable)
SnapshotInterval int
// StateSyncEnabled allows cometBFT during startup, to ask peers for available snapshots
// and use them to bootstrap the state
StateSyncEnabled bool
// StateSyncRPCServers is the list of RPC servers to bootstrap the StateSync (optional, defaults to using seeds)
StateSyncRPCServers []string
// StateSyncTrustHeight and TrustHash must both be provided to force the trusting of a
// particular header.
StateSyncTrustHeight int64
// StateSyncTrustHeight and TrustHash must both be provided to force the trusting of a
// particular header.
StateSyncTrustHash string
// StateSyncChunkSize defines the size of the chunks when splitting a Snapshot for sending via StateSync
StateSyncChunkSize int64
}

// IndexerCfg handles the configuration options of the indexer
Expand Down
26 changes: 26 additions & 0 deletions dockerfiles/testsuite/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ services:
environment:
- GOCOVERDIR=/app/run/gocoverage
- LOG_PANIC_ON_INVALIDCHARS
- VOCDONI_VOCHAIN_SNAPSHOTINTERVAL=3
- VOCDONI_VOCHAIN_STATESYNCCHUNKSIZE=2000

test:
image: "dvotenode-test:${TESTSUITE_BUILD_TAG:-latest}"
Expand Down Expand Up @@ -117,6 +119,7 @@ services:
- gocoverage-miner2:/app/run/gocoverage/miner2
- gocoverage-miner3:/app/run/gocoverage/miner3
- gocoverage-gateway0:/app/run/gocoverage/gateway0
- gocoverage-gatewaySync:/app/run/gocoverage/gatewaySync
- gocoverage-test:/app/run/gocoverage/test
networks:
- blockchain
Expand Down Expand Up @@ -157,6 +160,27 @@ services:
profiles:
- grafana

gatewaySync:
image: "dvotenode:${TESTSUITE_BUILD_TAG:-latest}"
env_file: "${COMPOSE_HOST_PATH:-.}/env.gatewaySync"
networks:
- blockchain
volumes:
- data-gatewaySync:/app/run/
- ${COMPOSE_HOST_PATH:-.}/genesis.json:/app/misc/genesis.json
- /tmp/.vochain-zkCircuits/:/app/run/dev/zkCircuits/
- gocoverage-gatewaySync:/app/run/gocoverage
environment:
- GOCOVERDIR=/app/run/gocoverage
- LOG_PANIC_ON_INVALIDCHARS
- VOCDONI_LOGLEVEL=debug
- VOCDONI_VOCHAIN_STATESYNCENABLED=True
- VOCDONI_VOCHAIN_STATESYNCRPCSERVERS=miner0:26657,miner0:26657
- VOCDONI_VOCHAIN_STATESYNCTRUSTHEIGHT
- VOCDONI_VOCHAIN_STATESYNCTRUSTHASH
profiles:
- statesync

networks:
blockchain:

Expand All @@ -167,12 +191,14 @@ volumes:
data-miner2: {}
data-miner3: {}
data-gateway0: {}
data-gatewaySync: {}
gocoverage-seed: {}
gocoverage-miner0: {}
gocoverage-miner1: {}
gocoverage-miner2: {}
gocoverage-miner3: {}
gocoverage-gateway0: {}
gocoverage-gatewaySync: {}
gocoverage-test: {}
prometheus_data: {}
grafana_data: {}
Expand Down
4 changes: 3 additions & 1 deletion dockerfiles/testsuite/env.gateway0
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
VOCDONI_DATADIR=/app/run
VOCDONI_MODE=gateway
VOCDONI_LOGLEVEL=debug
VOCDONI_VOCHAIN_LOGLEVEL=error
VOCDONI_VOCHAIN_LOGLEVEL=info
VOCDONI_DEV=True
VOCDONI_ENABLEAPI=True
VOCDONI_ENABLERPC=True
VOCDONI_ENABLEFAUCETWITHAMOUNT=100000
VOCDONI_VOCHAIN_PUBLICADDR=gateway0:26656
VOCDONI_VOCHAIN_SEEDS=3c3765494e758ae7baccb1f5b0661755302ddc47@seed:26656
VOCDONI_VOCHAIN_GENESIS=/app/misc/genesis.json
VOCDONI_VOCHAIN_NOWAITSYNC=True
Expand Down
17 changes: 17 additions & 0 deletions dockerfiles/testsuite/env.gatewaySync
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
VOCDONI_DATADIR=/app/run
VOCDONI_MODE=gateway
VOCDONI_LOGLEVEL=debug
VOCDONI_VOCHAIN_LOGLEVEL=info
VOCDONI_DEV=True
VOCDONI_ENABLEAPI=True
VOCDONI_ENABLERPC=True
VOCDONI_ENABLEFAUCETWITHAMOUNT=100000
VOCDONI_VOCHAIN_PUBLICADDR=gatewaySync:26656
VOCDONI_VOCHAIN_SEEDS=3c3765494e758ae7baccb1f5b0661755302ddc47@seed:26656
VOCDONI_VOCHAIN_GENESIS=/app/misc/genesis.json
VOCDONI_VOCHAIN_NOWAITSYNC=True
VOCDONI_METRICS_ENABLED=True
VOCDONI_METRICS_REFRESHINTERVAL=5
VOCDONI_CHAIN=dev
VOCDONI_SIGNINGKEY=f50be23d315a2e6c1c30e0f1412b86d6ca9f2b318f1d243e35bc72d44c8b2f90
VOCDONI_ARCHIVEURL=none
35 changes: 32 additions & 3 deletions dockerfiles/testsuite/start_test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ tests_to_run=(
"e2etest_cspelection"
"e2etest_dynamicensuselection"
"e2etest_electiontimebounds"
"test_statesync"
)

# print help
Expand All @@ -83,7 +84,7 @@ e2etest() {
shift
args=$@
id="${op}_$(echo $args | md5sum | awk '{print $1}')"
$COMPOSE_CMD_RUN --name ${TEST_PREFIX}_${FUNCNAME[0]}-${id}_${RANDOMID} test timeout 60000 \
$COMPOSE_CMD_RUN --name ${TEST_PREFIX}_${FUNCNAME[0]}-${id}_${RANDOMID} test timeout 300 \
./end2endtest --host $APIHOST --faucet=$FAUCET \
--logLevel=$LOGLEVEL \
--operation=$op \
Expand Down Expand Up @@ -157,6 +158,24 @@ e2etest_ballotelection() {
e2etest ballotApproval
}

test_statesync() {
HEIGHT=3
HASH=

log "### Waiting for height $HEIGHT ###"
for i in {1..20}; do
# very brittle hack to extract the hash without using jq, to avoid dependencies
HASH=$($COMPOSE_CMD_RUN test curl -s --fail $APIHOST/chain/blocks/$HEIGHT 2>/dev/null | grep -oP '"hash":"\K[^"]+' | head -1)
[ -n "$HASH" ] && break || sleep 2
done

export VOCDONI_VOCHAIN_STATESYNCTRUSTHEIGHT=$HEIGHT
export VOCDONI_VOCHAIN_STATESYNCTRUSTHASH=$HASH
$COMPOSE_CMD --profile statesync up gatewaySync -d
# watch logs for 2 minutes, until catching 'startup complete'. in case of timeout, or panic, or whatever, test will fail
timeout 120 sh -c "($COMPOSE_CMD logs gatewaySync -f | grep -m 1 'startup complete')"
}

### end tests definition

log "### Starting test suite ###"
Expand Down Expand Up @@ -194,7 +213,9 @@ results="/tmp/.vocdoni-test$RANDOM"
mkdir -p $results

for test in ${tests_to_run[@]}; do
if [ $test == "e2etest_raceDuringCommit" ] || [ $CONCURRENT -eq 0 ] ; then
if [ $test == "e2etest_raceDuringCommit" ] || \
[ $test == "test_statesync" ] || \
[ $CONCURRENT -eq 0 ] ; then
log "### Running test $test ###"
( set -o pipefail ; $test | tee $results/$test.stdout ; echo $? > $results/$test.retval )
else
Expand Down Expand Up @@ -239,9 +260,17 @@ if [ -n "$GOCOVERDIR" ] ; then
log "### Coverage data in binary fmt left in $GOCOVERDIR ###"
fi

if $COMPOSE_CMD logs | grep -q "^panic:" ; then
RET=2
log "### Panic detected! Look for panic in the logs above for full context, pasting here the 100 lines after panic ###"
$COMPOSE_CMD logs | grep -A 100 "^panic:"
fi

if $COMPOSE_CMD logs | grep -q "CONSENSUS FAILURE" ; then RET=3 ; log "### CONSENSUS FAILURE detected! Look for it in the logs above ###"; fi

[ $CLEAN -eq 1 ] && {
log "### Cleaning docker environment ###"
$COMPOSE_CMD down -v --remove-orphans
$COMPOSE_CMD --profile statesync down -v --remove-orphans
}

if [ -n "$failed" ]; then
Expand Down
1 change: 1 addition & 0 deletions dockerfiles/vocdoninode/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ services:
- "4001:4001" # IPFS swarm
- "[::1]:5001:5001" # IPFS api, never expose outside localhost
- "26656:26656" # CometBFT p2p port (PublicAddr)
- "26657:26657" # CometBFT RPC port (used by StateSync)
- "[::1]:26658:26658" # CometBFT PrivValidatorListenAddr (disabled by default)
- "[::1]:61000-61100:61000-61100" # PprofPort (runtime profiling data, disabled by default)
sysctls:
Expand Down
46 changes: 46 additions & 0 deletions service/indexer.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
package service

import (
"context"
"fmt"
"io"
"os"
"path/filepath"
"time"

"go.vocdoni.io/dvote/log"
"go.vocdoni.io/dvote/snapshot"
"go.vocdoni.io/dvote/vochain/indexer"
)

Expand All @@ -14,13 +20,53 @@ func (vs *VocdoniService) VochainIndexer() error {
vs.Indexer, err = indexer.New(vs.App, indexer.Options{
DataDir: filepath.Join(vs.Config.DataDir, "indexer"),
IgnoreLiveResults: vs.Config.Indexer.IgnoreLiveResults,
// During StateSync, IndexerDB will be restored, so enable ExpectBackupRestore in that case
ExpectBackupRestore: vs.Config.StateSyncEnabled,
})
if err != nil {
return err
}
// launch the indexer after sync routine (executed when the blockchain is ready)
go vs.Indexer.AfterSyncBootstrap(false)

snapshot.SetFnImportIndexer(func(r io.Reader) error {
log.Debugf("restoring indexer backup")

file, err := os.CreateTemp("", "indexer.sqlite3")
if err != nil {
return fmt.Errorf("creating tmpfile: %w", err)
}
defer func() {
if err := file.Close(); err != nil {
log.Warnw("error closing tmpfile", "path", file.Name(), "err", err)
}
if err := os.Remove(file.Name()); err != nil {
log.Warnw("error removing tmpfile", "path", file.Name(), "err", err)
}
}()

if _, err := io.Copy(file, r); err != nil {
return fmt.Errorf("writing tmpfile: %w", err)
}

return vs.Indexer.RestoreBackup(file.Name())
})

snapshot.SetFnExportIndexer(func(w io.Writer) error {
log.Debugf("saving indexer backup")

ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
data, err := vs.Indexer.ExportBackupAsBytes(ctx)
if err != nil {
return fmt.Errorf("creating indexer backup: %w", err)
}
if _, err := w.Write(data); err != nil {
return fmt.Errorf("writing data: %w", err)
}
return nil
})

if vs.Config.Indexer.ArchiveURL != "" && vs.Config.Indexer.ArchiveURL != "none" {
log.Infow("starting archive retrieval", "path", vs.Config.Indexer.ArchiveURL)
go vs.Indexer.StartArchiveRetrieval(vs.DataDownloader, vs.Config.Indexer.ArchiveURL)
Expand Down
4 changes: 2 additions & 2 deletions service/vochain.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func (vs *VocdoniService) Start() error {
go vs.Stats.Start(10)
}

if !vs.Config.NoWaitSync {
if !vs.Config.NoWaitSync || vs.Config.StateSyncEnabled {
log.Infof("waiting for vochain to synchronize")
var lastHeight uint64
i := 0
Expand All @@ -154,7 +154,7 @@ func (vs *VocdoniService) Start() error {
syncCounter--
}
}
log.Infow("vochain fastsync completed", "height", vs.Stats.Height(), "time", time.Since(timeSyncCounter))
log.Infow("vochain fastsync completed", "height", vs.Stats.Height(), "duration", time.Since(timeSyncCounter).String())
}
go VochainPrintInfo(20*time.Second, vs.Stats)

Expand Down
Loading

0 comments on commit 71de0dc

Please sign in to comment.