diff --git a/v2/cmd/bench/bench.go b/v2/cmd/bench/bench.go new file mode 100644 index 000000000..d12e25d7e --- /dev/null +++ b/v2/cmd/bench/bench.go @@ -0,0 +1,160 @@ +package bench + +import ( + "log/slog" + "net/http" + "os" + "runtime/pprof" + "time" + + "github.com/cosmos/iavl/v2" + "github.com/cosmos/iavl/v2/metrics" + "github.com/cosmos/iavl/v2/testutil" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/prometheus/client_golang/prometheus/promhttp" + "github.com/spf13/cobra" +) + +func Command() *cobra.Command { + cmd := &cobra.Command{ + Use: "bench", + Short: "run benchmarks", + } + cmd.AddCommand(benchCommand()) + return cmd +} + +func benchCommand() *cobra.Command { + var ( + dbPath string + changelogPath string + loadSnapshot bool + usePrometheus bool + cpuProfile string + ) + cmd := &cobra.Command{ + Use: "std", + Short: "run the std development benchmark", + Long: `Runs a longer benchmark for the IAVL tree. This is useful for development and testing. +Pre-requisites this command: +$ go run ./cmd gen tree --db /tmp/iavl-v2 --limit 1 --type osmo-like-many +mkdir -p /tmp/osmo-like-many/v2 && go run ./cmd gen emit --start 2 --limit 1000 --type osmo-like-many --out /tmp/osmo-like-many/v2 + +Optional for --snapshot arg: +$ go run ./cmd snapshot --db /tmp/iavl-v2 --version 1 +`, + + RunE: func(_ *cobra.Command, _ []string) error { + if cpuProfile != "" { + f, err := os.Create(cpuProfile) + if err != nil { + return err + } + if err := pprof.StartCPUProfile(f); err != nil { + return err + } + defer func() { + pprof.StopCPUProfile() + f.Close() + }() + } + treeOpts := iavl.DefaultTreeOptions() + treeOpts.CheckpointInterval = 80 + treeOpts.StateStorage = true + treeOpts.HeightFilter = 1 + treeOpts.EvictionDepth = 22 + treeOpts.MetricsProxy = metrics.NewStructMetrics() + if usePrometheus { + treeOpts.MetricsProxy = newPrometheusMetricsProxy() + } + + logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug})) + var multiTree *iavl.MultiTree + if loadSnapshot { + var err error + multiTree, err = iavl.ImportMultiTree(logger, 1, dbPath, treeOpts) + if err != nil { + return err + } + } else { + multiTree = iavl.NewMultiTree(logger, dbPath, treeOpts) + if err := multiTree.MountTrees(); err != nil { + return err + } + if err := multiTree.LoadVersion(1); err != nil { + return err + } + if err := multiTree.WarmLeaves(); err != nil { + return err + } + } + + opts := testutil.CompactedChangelogs(changelogPath) + opts.SampleRate = 250_000 + + // opts.Until = 1_000 + // opts.UntilHash = "557663181d9ab97882ecfc6538e3b4cfe31cd805222fae905c4b4f4403ca5cda" + opts.Until = 500 + opts.UntilHash = "2670bd5767e70f2bf9e4f723b5f205759e39afdb5d8cfb6b54a4a3ecc27a1377" + + _, err := multiTree.TestBuild(opts) + return err + }, + } + cmd.Flags().StringVar(&dbPath, "db", "/tmp/iavl-v2", "the path to the database at version 1") + cmd.Flags().StringVar(&changelogPath, "changelog", "/tmp/osmo-like-many/v2", "the path to the changelog") + cmd.Flags().BoolVar(&loadSnapshot, "snapshot", false, "load the snapshot at version 1 before running the benchmarks (loads full tree into memory)") + cmd.Flags().BoolVar(&usePrometheus, "prometheus", false, "enable prometheus metrics") + cmd.Flags().StringVar(&cpuProfile, "cpu-profile", "", "write cpu profile to file") + + if err := cmd.MarkFlagRequired("changelog"); err != nil { + panic(err) + } + if err := cmd.MarkFlagRequired("db"); err != nil { + panic(err) + } + return cmd +} + +var _ metrics.Proxy = &prometheusMetricsProxy{} + +type prometheusMetricsProxy struct { + workingSize prometheus.Gauge + workingBytes prometheus.Gauge +} + +func newPrometheusMetricsProxy() *prometheusMetricsProxy { + p := &prometheusMetricsProxy{} + p.workingSize = promauto.NewGauge(prometheus.GaugeOpts{ + Name: "iavl_working_size", + Help: "working size", + }) + p.workingBytes = promauto.NewGauge(prometheus.GaugeOpts{ + Name: "iavl_working_bytes", + Help: "working bytes", + }) + http.Handle("/metrics", promhttp.Handler()) + go func() { + err := http.ListenAndServe(":2112", nil) + if err != nil { + panic(err) + } + }() + return p +} + +func (p *prometheusMetricsProxy) IncrCounter(_ float32, _ ...string) { +} + +func (p *prometheusMetricsProxy) SetGauge(val float32, keys ...string) { + k := keys[1] + switch k { + case "working_size": + p.workingSize.Set(float64(val)) + case "working_bytes": + p.workingBytes.Set(float64(val)) + } +} + +func (p *prometheusMetricsProxy) MeasureSince(_ time.Time, _ ...string) {} diff --git a/v2/cmd/gen/gen.go b/v2/cmd/gen/gen.go index a9872e63e..19e4b51c6 100644 --- a/v2/cmd/gen/gen.go +++ b/v2/cmd/gen/gen.go @@ -144,8 +144,8 @@ func treeCommand() *cobra.Command { cmd := &cobra.Command{ Use: "tree", Short: "build and save a Tree to disk, taking generated changesets as input", - RunE: func(cmd *cobra.Command, args []string) error { - multiTree := iavl.NewMultiTree(iavl.NewTestLogger(), dbPath, iavl.TreeOptions{StateStorage: true}) + RunE: func(_ *cobra.Command, _ []string) error { + multiTree := iavl.NewMultiTree(iavl.NewDebugLogger(), dbPath, iavl.DefaultTreeOptions()) defer func(mt *iavl.MultiTree) { err := mt.Close() if err != nil { diff --git a/v2/cmd/root.go b/v2/cmd/root.go index 95a76b1b7..5991e1c0a 100644 --- a/v2/cmd/root.go +++ b/v2/cmd/root.go @@ -1,6 +1,7 @@ package main import ( + "github.com/cosmos/iavl/v2/cmd/bench" "github.com/cosmos/iavl/v2/cmd/gen" "github.com/cosmos/iavl/v2/cmd/rollback" "github.com/cosmos/iavl/v2/cmd/scan" @@ -19,6 +20,7 @@ func RootCommand() (*cobra.Command, error) { rollback.Command(), scan.Command(), latestCommand(), + bench.Command(), ) return cmd, nil } diff --git a/v2/iterator_test.go b/v2/iterator_test.go index 767fd0b13..6d7b61456 100644 --- a/v2/iterator_test.go +++ b/v2/iterator_test.go @@ -13,7 +13,8 @@ func Test_Iterator(t *testing.T) { sql, err := iavl.NewInMemorySqliteDb(pool) require.NoError(t, err) - tree := iavl.NewTree(sql, pool, iavl.TreeOptions{StateStorage: false}) + opts := iavl.DefaultTreeOptions() + tree := iavl.NewTree(sql, pool, opts) set := func(key string, value string) { _, err := tree.Set([]byte(key), []byte(value)) require.NoError(t, err) @@ -226,7 +227,8 @@ func Test_IteratorTree(t *testing.T) { sql, err := iavl.NewSqliteDb(pool, iavl.SqliteDbOptions{Path: tmpDir}) require.NoError(t, err) - tree := iavl.NewTree(sql, pool, iavl.TreeOptions{StateStorage: true}) + opts := iavl.DefaultTreeOptions() + tree := iavl.NewTree(sql, pool, opts) set := func(key string, value string) { _, err := tree.Set([]byte(key), []byte(value)) require.NoError(t, err) @@ -241,7 +243,7 @@ func Test_IteratorTree(t *testing.T) { _, version, err := tree.SaveVersion() require.NoError(t, err) - tree = iavl.NewTree(sql, pool, iavl.TreeOptions{StateStorage: true}) + tree = iavl.NewTree(sql, pool, opts) require.NoError(t, tree.LoadVersion(version)) cases := []struct { name string @@ -304,5 +306,4 @@ func Test_IteratorTree(t *testing.T) { require.NoError(t, itr.Close()) }) } - } diff --git a/v2/logger.go b/v2/logger.go index 2bfe1a401..91e66d4f9 100644 --- a/v2/logger.go +++ b/v2/logger.go @@ -1,6 +1,9 @@ package iavl -import "log/slog" +import ( + "log/slog" + "os" +) // Logger defines basic logger that IAVL expects. // It is a subset of the cosmossdk.io/core/log.Logger interface. @@ -39,6 +42,10 @@ func NewTestLogger() Logger { return &testLogger{} } +func NewDebugLogger() Logger { + return slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug})) +} + type testLogger struct{} func (l *testLogger) Info(msg string, keys ...any) { diff --git a/v2/metrics/metrics.go b/v2/metrics/metrics.go index 0ce25dddf..ba356f1ba 100644 --- a/v2/metrics/metrics.go +++ b/v2/metrics/metrics.go @@ -20,6 +20,89 @@ type Proxy interface { MeasureSince(start time.Time, keys ...string) } +var ( + _ Proxy = &StructMetrics{} + _ Proxy = &NilMetrics{} +) + +type NilMetrics struct{} + +func (n NilMetrics) IncrCounter(_ float32, _ ...string) {} + +func (n NilMetrics) SetGauge(_ float32, _ ...string) {} + +func (n NilMetrics) MeasureSince(_ time.Time, _ ...string) {} + +type StructMetrics struct { + *TreeMetrics + *DbMetrics +} + +func NewStructMetrics() *StructMetrics { + return &StructMetrics{ + TreeMetrics: &TreeMetrics{}, + DbMetrics: &DbMetrics{}, + } +} + +func (s *StructMetrics) IncrCounter(val float32, keys ...string) { + if len(keys) != 2 { + return + } + k := keys[1] + switch k { + case "pool_get": + s.PoolGet += int64(val) + case "pool_return": + s.PoolReturn += int64(val) + case "pool_evict": + s.PoolEvict += int64(val) + case "pool_evict_miss": + s.PoolEvictMiss += int64(val) + case "pool_fault": + s.PoolFault += int64(val) + + case "tree_update": + s.TreeUpdate += int64(val) + case "tree_new_node": + s.TreeNewNode += int64(val) + case "tree_delete": + s.TreeDelete += int64(val) + case "tree_hash": + s.TreeHash += int64(val) + + case "db_get_leaf": + s.QueryLeafCount += int64(val) + case "db_get_branch": + s.QueryBranchCount += int64(val) + case "db_leaf_miss": + s.QueryLeafMiss += int64(val) + case "db_write_leaf": + s.WriteLeaves += int64(val) + case "db_write_branch": + s.WriteBranch += int64(val) + } +} + +func (s *StructMetrics) SetGauge(_ float32, _ ...string) {} + +func (s *StructMetrics) MeasureSince(start time.Time, keys ...string) { + dur := time.Since(start) + if len(keys) != 2 { + return + } + k := keys[1] + switch k { + case "db_get": + s.QueryDurations = append(s.QueryDurations, dur) + s.QueryTime += dur + s.QueryCount++ + case "db_write": + s.WriteDurations = append(s.WriteDurations, dur) + s.WriteTime += dur + } +} + type TreeMetrics struct { PoolGet int64 PoolReturn int64 @@ -30,12 +113,14 @@ type TreeMetrics struct { TreeUpdate int64 TreeNewNode int64 TreeDelete int64 + TreeHash int64 } type DbMetrics struct { WriteDurations []time.Duration WriteTime time.Duration WriteLeaves int64 + WriteBranch int64 QueryDurations []time.Duration QueryTime time.Duration @@ -60,24 +145,24 @@ func (m *TreeMetrics) Report() { humanize.Comma(m.TreeDelete)) } -func (m *DbMetrics) QueryReport(bins int) error { - if m.QueryCount == 0 { +func (s *StructMetrics) QueryReport(bins int) error { + if s.QueryCount == 0 { return nil } fmt.Printf("queries=%s q/s=%s dur/q=%s dur=%s leaf-q=%s branch-q=%s leaf-miss=%s\n", - humanize.Comma(m.QueryCount), - humanize.Comma(int64(float64(m.QueryCount)/m.QueryTime.Seconds())), - time.Duration(int64(m.QueryTime)/m.QueryCount), - m.QueryTime.Round(time.Millisecond), - humanize.Comma(m.QueryLeafCount), - humanize.Comma(m.QueryBranchCount), - humanize.Comma(m.QueryLeafMiss), + humanize.Comma(s.QueryCount), + humanize.Comma(int64(float64(s.QueryCount)/s.QueryTime.Seconds())), + time.Duration(int64(s.QueryTime)/s.QueryCount), + s.QueryTime.Round(time.Millisecond), + humanize.Comma(s.QueryLeafCount), + humanize.Comma(s.QueryBranchCount), + humanize.Comma(s.QueryLeafMiss), ) if bins > 0 { var histData []float64 - for _, d := range m.QueryDurations { + for _, d := range s.QueryDurations { if d > 50*time.Microsecond { continue } @@ -92,29 +177,30 @@ func (m *DbMetrics) QueryReport(bins int) error { } } - m.SetQueryZero() + s.SetQueryZero() return nil } -func (m *DbMetrics) SetQueryZero() { - m.QueryDurations = nil - m.QueryTime = 0 - m.QueryCount = 0 - m.QueryLeafMiss = 0 - m.QueryLeafCount = 0 - m.QueryBranchCount = 0 +func (s *StructMetrics) SetQueryZero() { + s.QueryDurations = nil + s.QueryTime = 0 + s.QueryCount = 0 + s.QueryLeafMiss = 0 + s.QueryLeafCount = 0 + s.QueryBranchCount = 0 } -func (m *DbMetrics) Add(o *DbMetrics) { - m.WriteDurations = append(m.WriteDurations, o.WriteDurations...) - m.WriteTime += o.WriteTime - m.WriteLeaves += o.WriteLeaves - - m.QueryDurations = append(m.QueryDurations, o.QueryDurations...) - m.QueryTime += o.QueryTime - m.QueryCount += o.QueryCount - m.QueryLeafMiss += o.QueryLeafMiss - m.QueryLeafCount += o.QueryLeafCount - m.QueryBranchCount += o.QueryBranchCount +func (s *StructMetrics) Add(os *StructMetrics) { + s.WriteDurations = append(s.WriteDurations, os.WriteDurations...) + s.WriteTime += os.WriteTime + s.WriteLeaves += os.WriteLeaves + s.WriteBranch += os.WriteBranch + + s.QueryDurations = append(s.QueryDurations, os.QueryDurations...) + s.QueryTime += os.QueryTime + s.QueryCount += os.QueryCount + s.QueryLeafMiss += os.QueryLeafMiss + s.QueryLeafCount += os.QueryLeafCount + s.QueryBranchCount += os.QueryBranchCount } diff --git a/v2/migrate/go.mod b/v2/migrate/go.mod index e40056133..538f8c58e 100644 --- a/v2/migrate/go.mod +++ b/v2/migrate/go.mod @@ -1,6 +1,8 @@ module github.com/cosmos/iavl/v2/migrate -go 1.18 +go 1.21.0 + +toolchain go1.23.6 require ( cosmossdk.io/api v0.7.2 @@ -53,4 +55,4 @@ require ( google.golang.org/genproto/googleapis/rpc v0.0.0-20231009173412-8bfb1ae86b6c // indirect google.golang.org/grpc v1.58.3 // indirect google.golang.org/protobuf v1.31.0 // indirect -) \ No newline at end of file +) diff --git a/v2/multitree.go b/v2/multitree.go index fa46e1933..00353a48f 100644 --- a/v2/multitree.go +++ b/v2/multitree.go @@ -5,9 +5,12 @@ import ( "errors" "fmt" "path/filepath" + "runtime" "sync/atomic" + "time" "github.com/cosmos/iavl/v2/metrics" + "github.com/cosmos/iavl/v2/testutil" "github.com/dustin/go-humanize" "golang.org/x/exp/slices" ) @@ -29,6 +32,9 @@ type MultiTree struct { } func NewMultiTree(logger Logger, rootPath string, opts TreeOptions) *MultiTree { + if opts.MetricsProxy == nil { + opts.MetricsProxy = metrics.NilMetrics{} + } return &MultiTree{ Trees: make(map[string]*Tree), doneCh: make(chan saveVersionResult, 1000), @@ -36,10 +42,11 @@ func NewMultiTree(logger Logger, rootPath string, opts TreeOptions) *MultiTree { treeOpts: opts, pool: NewNodePool(), rootPath: rootPath, + logger: logger, } } -func ImportMultiTree(logger Logger, pool *NodePool, version int64, path string, treeOpts TreeOptions) (*MultiTree, error) { +func ImportMultiTree(logger Logger, version int64, path string, treeOpts TreeOptions) (*MultiTree, error) { mt := NewMultiTree(logger, path, treeOpts) paths, err := FindDbsInPath(path) if err != nil { @@ -55,12 +62,11 @@ func ImportMultiTree(logger Logger, pool *NodePool, version int64, path string, ) for _, dbPath := range paths { cnt++ - sql, err := NewSqliteDb(pool, defaultSqliteDbOptions(SqliteDbOptions{Path: dbPath})) - if err != nil { - return nil, err - } go func(p string) { - tree := NewTree(sql, pool, mt.treeOpts) + tree, err := mt.newTree(p) + if err != nil { + errs <- err + } importErr := tree.LoadSnapshot(version, PreOrder) if importErr != nil { @@ -89,14 +95,11 @@ func ImportMultiTree(logger Logger, pool *NodePool, version int64, path string, } func (mt *MultiTree) MountTree(storeKey string) error { - opts := defaultSqliteDbOptions(SqliteDbOptions{ - Path: mt.rootPath + "/" + storeKey, - }) - sql, err := NewSqliteDb(mt.pool, opts) + dbPath := filepath.Join(mt.rootPath, storeKey) + tree, err := mt.newTree(dbPath) if err != nil { return err } - tree := NewTree(sql, mt.pool, mt.treeOpts) mt.Trees[storeKey] = tree return nil } @@ -108,18 +111,32 @@ func (mt *MultiTree) MountTrees() error { } for _, dbPath := range paths { prefix := filepath.Base(dbPath) - sqlOpts := defaultSqliteDbOptions(SqliteDbOptions{}) - sqlOpts.Path = dbPath - sql, err := NewSqliteDb(mt.pool, sqlOpts) + tree, err := mt.newTree(dbPath) if err != nil { return err } - tree := NewTree(sql, mt.pool, mt.treeOpts) mt.Trees[prefix] = tree } return nil } +func (mt *MultiTree) newTree(dbPath string) (*Tree, error) { + pool := NewNodePool() + opts := mt.treeOpts + if _, ok := mt.treeOpts.MetricsProxy.(*metrics.StructMetrics); ok { + opts.MetricsProxy = metrics.NewStructMetrics() + } + sql, err := NewSqliteDb(pool, defaultSqliteDbOptions(SqliteDbOptions{ + Path: dbPath, + Metrics: opts.MetricsProxy, + Logger: mt.logger, + })) + if err != nil { + return nil, err + } + return NewTree(sql, pool, opts), nil +} + func (mt *MultiTree) LoadVersion(version int64) error { for k, tree := range mt.Trees { if err := tree.LoadVersion(version); err != nil { @@ -187,10 +204,10 @@ func (mt *MultiTree) SaveVersionConcurrently() ([]byte, int64, error) { mt.shouldCheckpoint = false if mt.treeOpts.MetricsProxy != nil { - bz := workingBytes.Load() - sz := workingSize.Load() - fmt.Printf("version=%d work-bytes=%s work-size=%s mem-ceiling=%s\n", - version, humanize.IBytes(bz), humanize.Comma(sz), humanize.IBytes(mt.treeOpts.CheckpointMemory)) + // bz := workingBytes.Load() + // sz := workingSize.Load() + // fmt.Printf("version=%d work-bytes=%s work-size=%s mem-ceiling=%s\n", + // version, humanize.IBytes(bz), humanize.Comma(sz), humanize.IBytes(mt.treeOpts.CheckpointMemory)) mt.treeOpts.MetricsProxy.SetGauge(float32(workingBytes.Load()), "iavl_v2", "working_bytes") mt.treeOpts.MetricsProxy.SetGauge(float32(workingSize.Load()), "iavl_v2", "working_size") } @@ -281,10 +298,160 @@ func (mt *MultiTree) WarmLeaves() error { } func (mt *MultiTree) QueryReport(bins int) error { - m := &metrics.DbMetrics{} + m := metrics.NewStructMetrics() for _, tree := range mt.Trees { - m.Add(tree.sql.metrics) - tree.sql.metrics.SetQueryZero() + sm, ok := tree.metricsProxy.(*metrics.StructMetrics) + if !ok { + continue + } + m.Add(sm) + sm.SetQueryZero() } return m.QueryReport(bins) } + +func (mt *MultiTree) TestBuild(opts *testutil.TreeBuildOptions) (int64, error) { + var ( + version int64 + err error + cnt = int64(1) + memUsage = func() string { + var m runtime.MemStats + runtime.ReadMemStats(&m) + // For info on each, see: https://golang.org/pkg/runtime/#MemStats + s := fmt.Sprintf("alloc=%s sys=%s gc=%d", + humanize.Bytes(m.HeapAlloc), + humanize.Bytes(m.Sys), + m.NumGC) + return s + } + ) + + // generator + itr := opts.Iterator + fmt.Printf("Initial memory usage from generators:\n%s\n", memUsage()) + + sampleRate := int64(100_000) + if opts.SampleRate != 0 { + sampleRate = opts.SampleRate + } + + since := time.Now() + itrStart := time.Now() + + report := func() error { + dur := time.Since(since) + + var ( + workingBytes uint64 + workingSize int64 + writeCount int64 + writeTime time.Duration + hashCount int64 + ) + for _, tr := range mt.Trees { + sm, ok := tr.metricsProxy.(*metrics.StructMetrics) + if !ok { + return nil + } + workingBytes += tr.workingBytes + workingSize += tr.workingSize + writeTime += sm.WriteTime + writeCount += sm.WriteLeaves + sm.WriteBranch + hashCount += sm.TreeHash + sm.WriteDurations = nil + sm.WriteLeaves = 0 + sm.WriteBranch = 0 + sm.WriteTime = 0 + sm.TreeHash = 0 + } + fmt.Printf("leaves=%s time=%s last=%s μ=%s version=%d work-size=%s work-bytes=%s %s\n", + humanize.Comma(cnt), + dur.Round(time.Millisecond), + humanize.Comma(int64(float64(sampleRate)/time.Since(since).Seconds())), + humanize.Comma(int64(float64(cnt)/time.Since(itrStart).Seconds())), + version, + humanize.Comma(workingSize), + humanize.Bytes(workingBytes), + memUsage()) + + if writeTime > 0 { + fmt.Printf("writes: cnt=%s wr/s=%s dur/wr=%s dur=%s hashes=%s\n", + humanize.Comma(writeCount), + humanize.Comma(int64(float64(writeCount)/writeTime.Seconds())), + time.Duration(int64(writeTime)/writeCount), + writeTime.Round(time.Millisecond), + humanize.Comma(hashCount), + ) + } + + if err := mt.QueryReport(0); err != nil { + return fmt.Errorf("query report err: %w", err) + } + + fmt.Println() + since = time.Now() + return nil + } + + for ; itr.Valid(); err = itr.Next() { + if err != nil { + return cnt, err + } + changeset := itr.Nodes() + for ; changeset.Valid(); err = changeset.Next() { + cnt++ + if err != nil { + return cnt, err + } + node := changeset.GetNode() + key := node.Key + + tree, ok := mt.Trees[node.StoreKey] + if !ok { + if err := mt.MountTree(node.StoreKey); err != nil { + return cnt, err + } + tree = mt.Trees[node.StoreKey] + } + + if !node.Delete { + _, err = tree.set(key, node.Value) + if err != nil { + return cnt, err + } + } else { + _, _, err := tree.Remove(key) + if err != nil { + return cnt, err + } + } + + if cnt%sampleRate == 0 { + if err := report(); err != nil { + return cnt, err + } + } + } + + _, version, err = mt.SaveVersionConcurrently() + if err != nil { + return cnt, err + } + if version%1000 == 0 { + fmt.Printf("version: %d, hash: %x\n", version, mt.Hash()) + } + if version == opts.Until { + break + } + } + fmt.Printf("final version: %d, hash: %x\n", version, mt.Hash()) + for sk, tree := range mt.Trees { + fmt.Printf("storekey: %s height: %d, size: %d\n", sk, tree.Height(), tree.Size()) + } + fmt.Printf("mean leaves/ms %s\n", humanize.Comma(cnt/time.Since(itrStart).Milliseconds())) + if opts.UntilHash != "" && opts.UntilHash != fmt.Sprintf("%x", mt.Hash()) { + return cnt, fmt.Errorf("invalid hash; expected %s, got %x", opts.UntilHash, mt.Hash()) + } + return cnt, nil +} diff --git a/v2/pool_test.go b/v2/pool_test.go deleted file mode 100644 index d8a90ac0a..000000000 --- a/v2/pool_test.go +++ /dev/null @@ -1,16 +0,0 @@ -package iavl - -import ( - "testing" - - "github.com/stretchr/testify/require" -) - -func TestNodePool_Get(t *testing.T) { - pool := NewNodePool() - node := pool.Get() - node.key = []byte("hello") - require.Equal(t, node.key, pool.nodes[node.poolId].key) - pool.Put(node) - require.Equal(t, []byte(nil), pool.nodes[node.poolId].key) -} diff --git a/v2/sqlite.go b/v2/sqlite.go index 3db493dd7..d11fa0f94 100644 --- a/v2/sqlite.go +++ b/v2/sqlite.go @@ -25,7 +25,8 @@ type SqliteDbOptions struct { ConnArgs string ShardTrees bool - Logger Logger + Logger Logger + Metrics metrics.Proxy walPages int } @@ -51,7 +52,7 @@ type SqliteDb struct { shards *VersionRange shardQueries map[int64]*sqlite3.Stmt - metrics *metrics.DbMetrics + metrics metrics.Proxy logger Logger } @@ -65,9 +66,14 @@ func defaultSqliteDbOptions(opts SqliteDbOptions) SqliteDbOptions { if opts.WalSize == 0 { opts.WalSize = 1024 * 1024 * 100 } + if opts.Metrics == nil { + opts.Metrics = metrics.NilMetrics{} + } opts.walPages = opts.WalSize / os.Getpagesize() - opts.Logger = NewNopLogger() + if opts.Logger == nil { + opts.Logger = NewNopLogger() + } return opts } @@ -135,7 +141,7 @@ func NewSqliteDb(pool *NodePool, opts SqliteDbOptions) (*SqliteDb, error) { iterators: make(map[int]*sqlite3.Stmt), opts: opts, pool: pool, - metrics: &metrics.DbMetrics{}, + metrics: opts.Metrics, logger: opts.Logger, } @@ -304,7 +310,10 @@ func (sql *SqliteDb) getReadConn() (*sqlite3.Conn, error) { func (sql *SqliteDb) getLeaf(nodeKey NodeKey) (*Node, error) { start := time.Now() - + defer func() { + sql.metrics.MeasureSince(start, metricsNamespace, "db_get") + sql.metrics.IncrCounter(1, metricsNamespace, "db_get_leaf") + }() var err error if sql.queryLeaf == nil { sql.queryLeaf, err = sql.readConn.Prepare("SELECT bytes FROM changelog.leaf WHERE version = ? AND sequence = ?") @@ -336,17 +345,19 @@ func (sql *SqliteDb) getLeaf(nodeKey NodeKey) (*Node, error) { return nil, err } - dur := time.Since(start) - sql.metrics.QueryDurations = append(sql.metrics.QueryDurations, dur) - sql.metrics.QueryTime += dur - sql.metrics.QueryCount++ - sql.metrics.QueryLeafCount++ - return node, nil } -func (sql *SqliteDb) getNode(nodeKey NodeKey, q *sqlite3.Stmt) (*Node, error) { +func (sql *SqliteDb) getNode(nodeKey NodeKey) (*Node, error) { start := time.Now() + q, err := sql.getShardQuery(nodeKey.Version()) + if err != nil { + return nil, err + } + defer func() { + sql.metrics.MeasureSince(start, metricsNamespace, "db_get") + sql.metrics.IncrCounter(1, metricsNamespace, "db_get_branch") + }() if err := q.Reset(); err != nil { return nil, err @@ -376,23 +387,9 @@ func (sql *SqliteDb) getNode(nodeKey NodeKey, q *sqlite3.Stmt) (*Node, error) { return nil, err } - dur := time.Since(start) - sql.metrics.QueryDurations = append(sql.metrics.QueryDurations, dur) - sql.metrics.QueryTime += dur - sql.metrics.QueryCount++ - sql.metrics.QueryBranchCount++ - return node, nil } -func (sql *SqliteDb) Get(nodeKey NodeKey) (*Node, error) { - q, err := sql.getShardQuery(nodeKey.Version()) - if err != nil { - return nil, err - } - return sql.getNode(nodeKey, q) -} - func (sql *SqliteDb) Close() error { for _, q := range sql.shardQueries { err := q.Close() @@ -710,20 +707,23 @@ func (sql *SqliteDb) WarmLeaves() error { return stmt.Close() } +func isLeafSeq(seq uint32) bool { + return seq&(1<<31) != 0 +} + func (sql *SqliteDb) getRightNode(node *Node) (*Node, error) { + if node.isLeaf() { + return nil, errors.New("leaf node has no children") + } var err error - if node.subtreeHeight == 1 || node.subtreeHeight == 2 { + if isLeafSeq(node.rightNodeKey.Sequence()) { node.rightNode, err = sql.getLeaf(node.rightNodeKey) - if err != nil { - return nil, err - } - if node.rightNode != nil { - return node.rightNode, nil - } - sql.metrics.QueryLeafMiss++ + } else { + node.rightNode, err = sql.getNode(node.rightNodeKey) + } + if node.rightNode == nil { + err = errors.New("not found") } - - node.rightNode, err = sql.Get(node.rightNodeKey) if err != nil { return nil, fmt.Errorf("failed to get right node node_key=%s height=%d path=%s: %w", node.rightNodeKey, node.subtreeHeight, sql.opts.Path, err) @@ -732,23 +732,23 @@ func (sql *SqliteDb) getRightNode(node *Node) (*Node, error) { } func (sql *SqliteDb) getLeftNode(node *Node) (*Node, error) { + if node.isLeaf() { + return nil, errors.New("leaf node has no children") + } var err error - if node.subtreeHeight == 1 || node.subtreeHeight == 2 { + if isLeafSeq(node.leftNodeKey.Sequence()) { node.leftNode, err = sql.getLeaf(node.leftNodeKey) - if err != nil { - return nil, err - } - if node.leftNode != nil { - return node.leftNode, nil - } - sql.metrics.QueryLeafMiss++ + } else { + node.leftNode, err = sql.getNode(node.leftNodeKey) + } + if node.leftNode == nil { + err = errors.New("not found") } - - node.leftNode, err = sql.Get(node.leftNodeKey) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to get right node node_key=%s height=%d path=%s: %w", + node.leftNodeKey, node.subtreeHeight, sql.opts.Path, err) } - return node.leftNode, err + return node.leftNode, nil } func (sql *SqliteDb) isSharded() (bool, error) { @@ -990,7 +990,7 @@ func (sql *SqliteDb) replayChangelog(tree *Tree, toVersion int64, targetHash []b if version-1 != lastVersion { tree.leaves, tree.branches, tree.leafOrphans, tree.deletes = nil, nil, nil, nil tree.version = int64(version - 1) - tree.sequence = 0 + tree.resetSequences() lastVersion = version - 1 } if bz != nil { @@ -1002,9 +1002,9 @@ func (sql *SqliteDb) replayChangelog(tree *Tree, toVersion int64, targetHash []b if _, err = tree.Set(node.key, node.hash); err != nil { return err } - if sequence != int(tree.sequence) { + if sequence != int(tree.leafSequence) { return fmt.Errorf("sequence mismatch version=%d; expected %d got %d; path=%s", - version, sequence, tree.sequence, sql.opts.Path) + version, sequence, tree.leafSequence, sql.opts.Path) } } else { if _, _, err = tree.Remove(key); err != nil { @@ -1013,7 +1013,7 @@ func (sql *SqliteDb) replayChangelog(tree *Tree, toVersion int64, targetHash []b deleteSequence := tree.deletes[len(tree.deletes)-1].deleteKey.Sequence() if sequence != int(deleteSequence) { return fmt.Errorf("sequence delete mismatch; version=%d expected %d got %d; path=%s", - version, sequence, tree.sequence, sql.opts.Path) + version, sequence, tree.leafSequence, sql.opts.Path) } } if count%250_000 == 0 { @@ -1027,7 +1027,7 @@ func (sql *SqliteDb) replayChangelog(tree *Tree, toVersion int64, targetHash []b return fmt.Errorf("root hash mismatch; expected %x got %x", targetHash, rootHash) } tree.leaves, tree.branches, tree.leafOrphans, tree.deletes = nil, nil, nil, nil - tree.sequence = 0 + tree.resetSequences() tree.version = toVersion sql.opts.Logger.Info(fmt.Sprintf("replayed changelog to version=%d count=%s dur=%s root=%v", tree.version, humanize.Comma(count), time.Since(start).Round(time.Millisecond), tree.root), logPath) diff --git a/v2/sqlite_batch.go b/v2/sqlite_batch.go index 743ed9b71..b55752d5e 100644 --- a/v2/sqlite_batch.go +++ b/v2/sqlite_batch.go @@ -5,14 +5,16 @@ import ( "time" "github.com/bvinc/go-sqlite-lite/sqlite3" + "github.com/cosmos/iavl/v2/metrics" "github.com/dustin/go-humanize" ) type sqliteBatch struct { - tree *Tree - sql *SqliteDb - size int64 - logger Logger + tree *Tree + sql *SqliteDb + size int64 + logger Logger + metrics metrics.Proxy treeCount int64 treeSince time.Time @@ -147,8 +149,6 @@ func (b *sqliteBatch) treeMaybeCommit(shardID int64) (err error) { } func (b *sqliteBatch) saveLeaves() (int64, error) { - var byteCount int64 - err := b.newChangeLogBatch() if err != nil { return 0, err @@ -169,7 +169,6 @@ func (b *sqliteBatch) saveLeaves() (int64, error) { if err != nil { return 0, err } - byteCount += int64(len(bz)) if err = b.leafInsert.Exec(leaf.nodeKey.Version(), int(leaf.nodeKey.Sequence()), bz); err != nil { return 0, err } @@ -225,10 +224,10 @@ func (b *sqliteBatch) saveLeaves() (int64, error) { err = tree.sql.leafWrite.Exec("CREATE UNIQUE INDEX IF NOT EXISTS leaf_idx ON leaf (version, sequence)") if err != nil { - return byteCount, err + return b.leafCount, err } - return byteCount, nil + return b.leafCount, nil } func (b *sqliteBatch) isCheckpoint() bool { diff --git a/v2/sqlite_test.go b/v2/sqlite_test.go index 9906ae88d..9b31d9199 100644 --- a/v2/sqlite_test.go +++ b/v2/sqlite_test.go @@ -1,8 +1,7 @@ package iavl import ( - "fmt" - "math/rand" + "sync" "testing" "time" @@ -10,49 +9,12 @@ import ( "github.com/cosmos/iavl/v2/testutil" "github.com/dustin/go-humanize" "github.com/stretchr/testify/require" + "golang.org/x/exp/rand" ) -/* -Benchmarks measured from these leafRead-leafWrite tests below: - -# SQLite - -## Writes - -no index: -- structured batch insert (node table) - 507,700 nodes/sec -- unstructured batch insert (tree table) re-use same bytes.Buffer - 444,000 nodes/sec -- unstructured batch insert (tree table) alloc new bytes.Buffer - 473,800 nodes/sec - - !! surprising, why? because GC is async? probably worse in long tail run? - -indexed: -- structured batch insert (node table) - 441,000 nodes/sec - - the difference between indexed and not is not significant. the only way I can explain this is that the writes - below are sequential proceeding version number. this will always be the case with the current node key. -- unstructured batch insert (tree table) - 414,000 nodes/sec - -writing into a trie based table (indexed on key) will likely be *much* slower since it requires an order insertion -and potentially re-balancing the BTree index. -^^^ True, initial test started at 200k and quickly declined to 75k - -## Reads - -- fully memory mapped unstructured (tree table) - ~160,000 nodes/sec -- fully memory mapped structured (node table) - ~172,000 nodes/sec -- fully memory mapped structured (node table) leafRead by key []byte - ~160,000 nodes/sec - -# LevelDB -Writes: 245,000 nodes/sec -Reads: 30,000 nodes/sec !!! - -*/ - -var testDbLocation = "/tmp/sqlite_test" - func TestBuildSqlite(t *testing.T) { - //dir := t.TempDir() - dir := testDbLocation - t.Logf("using temp dir %s", dir) + dir := t.TempDir() + t.Logf("dir: %s", dir) sql, err := NewSqliteDb(NewNodePool(), SqliteDbOptions{Path: dir}) @@ -65,43 +27,58 @@ func TestBuildSqlite(t *testing.T) { since := time.Now() - err = sql.leafWrite.Exec("CREATE TABLE node (seq INTEGER, version INTEGER, hash BLOB, key BLOB, height INTEGER, size INTEGER, l_seq INTEGER, l_version INTEGER, r_seq INTEGER, r_version INTEGER)") + require.NoError(t, err) + _, err = sql.nextShard(1) + require.NoError(t, err) + conn := sql.treeWrite + + err = conn.Exec("CREATE TABLE node (seq INTEGER, version INTEGER, hash BLOB, key BLOB, height INTEGER, size INTEGER, l_seq INTEGER, l_version INTEGER, r_seq INTEGER, r_version INTEGER)") require.NoError(t, err) - err = sql.leafWrite.Exec("CREATE INDEX trie_idx ON node (key)") - //err = sql.leafWrite.Exec("CREATE INDEX node_idx ON node (version, seq)") + // enable this to demonstrate the slowness of a blob key index + // err = conn.Exec("CREATE INDEX node_key_idx ON node (key)") + err = conn.Exec("CREATE INDEX node_key_idx ON node (version, seq)") require.NoError(t, err) - err = sql.leafWrite.Exec("CREATE INDEX tree_idx ON tree (version, sequence)") + + err = conn.Exec("CREATE INDEX tree_idx ON tree_1 (version, sequence)") require.NoError(t, err) - require.NoError(t, sql.leafWrite.Begin()) + require.NoError(t, conn.Begin()) var stmt *sqlite3.Stmt - //stmt, err = sql.leafWrite.Prepare("INSERT INTO tree(version, sequence, bytes) VALUES (?, ?, ?)") - stmt, err = sql.leafWrite.Prepare("INSERT INTO node(version, seq, hash, key, height, size, l_seq, l_version, r_seq, r_version)" + + stmt, err = conn.Prepare("INSERT INTO node(version, seq, hash, key, height, size, l_seq, l_version, r_seq, r_version)" + "VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?)") require.NoError(t, err) startTime := time.Now() batchSize := 200_000 - //nodeBz := new(bytes.Buffer) + // set to -1 to run the full set of 40M nodes + limit := 600_000 + // nodeBz := new(bytes.Buffer) for ; version1.Valid(); err = version1.Next() { + require.NoError(t, err) + if count >= limit { + break + } + node := version1.GetNode() lnk := NewNodeKey(1, uint32(count+1)) rnk := NewNodeKey(1, uint32(count+2)) - n := &Node{key: node.Key, hash: node.Key[:32], - subtreeHeight: 13, size: 4, leftNodeKey: lnk, rightNodeKey: rnk} + n := &Node{ + key: node.Key, hash: node.Key[:32], + subtreeHeight: 13, size: 4, leftNodeKey: lnk, rightNodeKey: rnk, + } - //nodeBz.Reset() - //require.NoError(t, n.WriteBytes(nodeBz)) + // nodeBz.Reset() + // require.NoError(t, n.WriteBytes(nodeBz)) // tree table - //nk := NewNodeKey(1, uint32(count)) - //nodeBz, err := n.Bytes() - //require.NoError(t, err) - //err = stmt.Exec(int(nk.Version()), int(nk.Sequence()), nodeBz) - //require.NoError(t, err) + // nk := NewNodeKey(1, uint32(count)) + // nodeBz, err := n.Bytes() + // require.NoError(t, err) + // err = stmt.Exec(int(nk.Version()), int(nk.Sequence()), nodeBz) + // require.NoError(t, err) // node table err = stmt.Exec( @@ -118,11 +95,11 @@ func TestBuildSqlite(t *testing.T) { ) if count%batchSize == 0 { - err := sql.leafWrite.Commit() + err := conn.Commit() require.NoError(t, err) - //stmt, err = newBatch() - //require.NoError(t, err) - require.NoError(t, sql.leafWrite.Begin()) + // stmt, err = newBatch() + // require.NoError(t, err) + require.NoError(t, conn.Begin()) t.Logf("nodes=%s dur=%s; rate=%s", humanize.Comma(int64(count)), time.Since(since).Round(time.Millisecond), @@ -134,7 +111,7 @@ func TestBuildSqlite(t *testing.T) { } t.Log("final commit") - require.NoError(t, sql.leafWrite.Commit()) + require.NoError(t, conn.Commit()) t.Logf("total dur=%s rate=%s", time.Since(startTime).Round(time.Millisecond), humanize.Comma(int64(40_000_000/time.Since(startTime).Seconds())), @@ -143,128 +120,11 @@ func TestBuildSqlite(t *testing.T) { require.NoError(t, sql.Close()) } -func TestReadSqlite_Trie(t *testing.T) { - dir := testDbLocation - sql, err := NewSqliteDb(NewNodePool(), SqliteDbOptions{Path: dir}) - require.NoError(t, err) - - read, err := sql.getReadConn() - require.NoError(t, err) - - query, err := read.Prepare("SELECT version, seq, hash, key, height, size, l_seq, l_version, r_seq, r_version FROM node WHERE key = ?") - require.NoError(t, err) - - var hash, key []byte - var version, seq, height, size, lSeq, lVersion, rSeq, rVersion int - - i := int64(1) - since := time.Now() - gen := testutil.OsmoLike() - version1 := gen.Iterator.Nodes() - for ; version1.Valid(); err = version1.Next() { - node := version1.GetNode() - require.NoError(t, query.Bind(node.Key)) - hasRow, err := query.Step() - require.NoError(t, err) - require.True(t, hasRow) - require.NoError(t, query.Scan(&version, &seq, &hash, &key, &height, &size, &lSeq, &lVersion, &rSeq, &rVersion)) - require.NoError(t, err) - - if i%100_000 == 0 { - i++ - t.Logf("nodes=%s dur=%s; rate=%s", - humanize.Comma(i), - time.Since(since), - humanize.Comma(int64(float64(100_000)/time.Since(since).Seconds()))) - since = time.Now() - } - require.NoError(t, query.Reset()) - i++ - } - -} - -func TestReadSqlite(t *testing.T) { - //pool := NewNodePool() - //dir := t.TempDir() - var err error - dir := testDbLocation - t.Logf("using temp dir %s", dir) - sql, err := NewSqliteDb(NewNodePool(), SqliteDbOptions{Path: dir}) - require.NoError(t, err) - - var stmt *sqlite3.Stmt - //stmt, err = sql.leafWrite.Prepare("SELECT bytes FROM tree WHERE node_key = ?") - - sqlRead, err := sql.getReadConn() - require.NoError(t, err) - //stmt, err = sqlRead.Prepare("SELECT bytes FROM tree WHERE version = ? AND sequence = ?") - stmt, err = sqlRead.Prepare("SELECT hash, key, height, size, l_seq, l_version, r_seq, r_version FROM node WHERE seq = ? AND version = ?") - require.NoError(t, err) - - var hash, key []byte - var height, size, lSeq, lVersion, rSeq, rVersion int - - since := time.Now() - for i := 1; i < 40_000_000; i++ { - j := rand.Intn(40_000_000) - - // unstructured leafRead: - //nk := NewNodeKey(1, uint32(j)) - //require.NoError(t, stmt.Bind(1, j)) - //hasRow, err := stmt.Step() - //require.Truef(t, hasRow, "no row for %d", j) - //require.NoError(t, err) - //nodeBz, err := stmt.ColumnBlob(0) - //require.NoError(t, err) - //_, err = MakeNode(pool, nk, nodeBz) - //require.NoError(t, err) - - // structured leafRead: - require.NoError(t, stmt.Bind(j, 1)) - hasRow, err := stmt.Step() - require.NoError(t, err) - require.True(t, hasRow) - require.NoError(t, stmt.Scan(&hash, &key, &height, &size, &lSeq, &lVersion, &rSeq, &rVersion)) - - if i%100_000 == 0 { - t.Logf("nodes=%s dur=%s; rate=%s", - humanize.Comma(int64(i)), - time.Since(since), - humanize.Comma(int64(float64(100_000)/time.Since(since).Seconds()))) - since = time.Now() - } - require.NoError(t, stmt.Reset()) - } - - //gen := testutil.OsmoLike() - //version1 := gen.TreeIterator.Nodes() - //var count int - //require.Equal(t, int64(1), gen.TreeIterator.Version()) -} - func TestNodeKeyFormat(t *testing.T) { nk := NewNodeKey(100, 2) + require.NotNil(t, nk) k := (int(nk.Version()) << 32) | int(nk.Sequence()) - fmt.Printf("k: %d - %x\n", k, k) -} - -func TestFetchNode(t *testing.T) { - pool := NewNodePool() - conn, err := sqlite3.Open("/tmp/iavl-v2.db") - require.NoError(t, err) - q := "SELECT bytes FROM tree_1 WHERE version = 1 and sequence = 6756148" - stmt, err := conn.Prepare(q) - require.NoError(t, err) - hasRow, err := stmt.Step() - require.NoError(t, err) - require.True(t, hasRow) - nodeBz, err := stmt.ColumnBlob(0) - require.NoError(t, err) - nk := NewNodeKey(1, 6756148) - node, err := MakeNode(pool, nk, nodeBz) - require.NoError(t, err) - fmt.Printf("node: %v\n", node) + t.Logf("k: %d - %x\n", k, k) } func TestMmap(t *testing.T) { @@ -285,7 +145,7 @@ func TestMmap(t *testing.T) { res, ok, err := stmt.ColumnRawString(0) require.True(t, ok) require.NoError(t, err) - fmt.Printf("res: %s\n", res) + t.Logf("res: %s\n", res) } func Test_NewSqliteDb(t *testing.T) { @@ -294,3 +154,171 @@ func Test_NewSqliteDb(t *testing.T) { require.NoError(t, err) require.NotNil(t, sql) } + +func Test_ConcurrentDetach(t *testing.T) { + t.Skipf("this test will sometimes panic within a panic, kept to show what doesn't work") + + dir := t.TempDir() + conn, err := sqlite3.Open(dir + "/one.db") + require.NoError(t, err) + err = conn.Exec("CREATE TABLE foo (id INTEGER PRIMARY KEY)") + require.NoError(t, err) + err = conn.Exec("INSERT INTO foo VALUES (4)") + require.NoError(t, err) + + conn2, err := sqlite3.Open(dir + "/two.db") + require.NoError(t, err) + err = conn2.Exec("CREATE TABLE bar (id INTEGER PRIMARY KEY)") + require.NoError(t, err) + err = conn2.Exec("INSERT INTO bar VALUES (7)") + require.NoError(t, err) + require.NoError(t, conn2.Close()) + + require.NoError(t, conn.Exec("ATTACH DATABASE ? AS two", dir+"/two.db")) + err = conn.Exec("SELECT * FROM bar") + require.NoError(t, err) + + conn3, err := sqlite3.Open(dir + "/three.db") + require.NoError(t, err) + err = conn3.Exec("CREATE TABLE bar (id INTEGER PRIMARY KEY)") + require.NoError(t, err) + err = conn3.Exec("INSERT INTO bar VALUES (8)") + require.NoError(t, err) + require.NoError(t, conn3.Close()) + + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + q, err := conn.Prepare("SELECT * FROM bar") + require.NoError(t, err) + for i := 0; i < 500_000; i++ { + hasRow, err := q.Step() + require.NoError(t, err) + require.True(t, hasRow) + var v int + err = q.Scan(&v) + require.NoError(t, err) + require.Equal(t, 7, v) + require.NoError(t, q.Reset()) + } + }() + + require.NoError(t, conn.Exec("ATTACH DATABASE ? AS three", dir+"/three.db")) + require.Error(t, conn.Exec("DETACH DATABASE ?", dir+"/two.db")) + + wg.Wait() +} + +func Test_ConcurrentQuery(t *testing.T) { + t.Skipf("this test will panic within a panic, kept to show what doesn't work") + dir := t.TempDir() + conn, err := sqlite3.Open(dir + "/one.db") + require.NoError(t, err) + err = conn.Exec("CREATE TABLE foo (id INTEGER PRIMARY KEY)") + require.NoError(t, err) + for i := 0; i < 100; i++ { + err = conn.Exec("INSERT INTO foo VALUES (?)", i) + require.NoError(t, err) + } + + times := 10 + errs := make(chan error, times) + checkErr := func(err error) { + if err != nil { + errs <- err + } + } + wg := sync.WaitGroup{} + + for i := 0; i < times; i++ { + wg.Add(1) + go func(i int) { + defer func() { + wg.Done() + if r := recover(); r != nil { + errs <- r.(error) + } + }() + q, err := conn.Prepare("SELECT * FROM foo WHERE id = ?") + require.NoError(t, err) + checkErr(q.Bind(i)) + hasRow, err := q.Step() + checkErr(err) + require.True(t, hasRow) + var v int + err = q.Scan(&v) + checkErr(err) + require.Equal(t, i, v) + checkErr(q.Close()) + }(i) + } + + wg.Wait() + close(errs) + + for i := 0; i < times; i++ { + err := <-errs + if err == nil { + require.Fail(t, "expected an error") + return + } + } +} + +func Test_ConcurrentIndexRead(t *testing.T) { + dir := t.TempDir() + seed := uint64(1234) + count := 1_000_000 + conn, err := sqlite3.Open(dir + "/db.db") + require.NoError(t, err) + r := rand.New(rand.NewSource(seed)) + require.NoError(t, conn.Exec("PRAGMA synchronous = OFF")) + require.NoError(t, conn.Exec("PRAGMA journal_mode = WAL")) + err = conn.Exec("CREATE TABLE foo (id BLOB, val INTEGER)") + require.NoError(t, err) + err = conn.Exec("CREATE TABLE bar (id BLOB, val INTEGER)") + require.NoError(t, err) + bz := make([]byte, 32) + t.Log("insert 100_000 rows") + require.NoError(t, conn.Begin()) + for i := 0; i < count; i++ { + _, err = r.Read(bz) + require.NoError(t, err) + err = conn.Exec("INSERT INTO foo VALUES (?, ?)", bz, i) + require.NoError(t, err) + err = conn.Exec("INSERT INTO bar VALUES (?, ?)", bz, i) + require.NoError(t, err) + if i%100_000 == 0 { + require.NoError(t, conn.Commit()) + require.NoError(t, conn.Begin()) + t.Log("inserted", i) + } + } + require.NoError(t, conn.Commit()) + r = rand.New(rand.NewSource(seed)) + reader, err := sqlite3.Open(dir + "/db.db") + require.NoError(t, err) + t.Log("query 100_000 rows") + stmt, err := reader.Prepare("SELECT val FROM bar WHERE id = ?") + require.NoError(t, err) + var v int + go func(q *sqlite3.Stmt) { + for i := 0; i < count; i++ { + n, err := r.Read(bz) + require.NoError(t, err) + require.Equal(t, 32, n) + err = q.Bind(bz) + require.NoError(t, err) + hasRow, err := q.Step() + require.NoError(t, err) + require.True(t, hasRow) + err = q.Scan(&v) + require.NoError(t, err) + require.Equal(t, i, v) + require.NoError(t, q.Reset()) + } + }(stmt) + err = conn.Exec("CREATE INDEX foo_idx ON foo (id)") + require.NoError(t, err) +} diff --git a/v2/sqlite_writer.go b/v2/sqlite_writer.go index 5a52fa19c..d7cab668a 100644 --- a/v2/sqlite_writer.go +++ b/v2/sqlite_writer.go @@ -42,9 +42,8 @@ type sqlWriter struct { } func (sql *SqliteDb) newSQLWriter() *sqlWriter { - return &sqlWriter{ + writer := &sqlWriter{ sql: sql, - logger: sql.logger, leafPruneCh: make(chan *pruneSignal), treePruneCh: make(chan *pruneSignal), leafCh: make(chan *saveSignal), @@ -52,6 +51,10 @@ func (sql *SqliteDb) newSQLWriter() *sqlWriter { leafResult: make(chan *saveResult), treeResult: make(chan *saveResult), } + if sql != nil { + writer.logger = sql.logger + } + return writer } func (w *sqlWriter) start(ctx context.Context) { @@ -438,8 +441,7 @@ func (w *sqlWriter) treeLoop(ctx context.Context) error { } func (w *sqlWriter) saveTree(tree *Tree) error { - saveStart := time.Now() - + defer tree.metrics.MeasureSince(time.Now(), metricsNamespace, "db_write") batch := &sqliteBatch{ sql: tree.sql, tree: tree, @@ -454,10 +456,8 @@ func (w *sqlWriter) saveTree(tree *Tree) error { w.leafCh <- saveSig treeResult := <-w.treeResult leafResult := <-w.leafResult - dur := time.Since(saveStart) - tree.sql.metrics.WriteDurations = append(tree.sql.metrics.WriteDurations, dur) - tree.sql.metrics.WriteTime += dur - tree.sql.metrics.WriteLeaves += int64(len(tree.leaves)) + tree.metrics.IncrCounter(float32(batch.leafCount), metricsNamespace, "db_write_leaf") + tree.metrics.IncrCounter(float32(batch.treeCount), metricsNamespace, "db_write_branch") err := errors.Join(treeResult.err, leafResult.err) diff --git a/v2/tree.go b/v2/tree.go index 5c4800838..f0fea75e4 100644 --- a/v2/tree.go +++ b/v2/tree.go @@ -11,6 +11,11 @@ import ( "github.com/cosmos/iavl/v2/metrics" ) +const ( + metricsNamespace = "iavl_v2" + leafSequenceStart = uint32(1 << 31) +) + type nodeDelete struct { // the sequence in which this deletion was processed deleteKey NodeKey @@ -21,7 +26,7 @@ type nodeDelete struct { type Tree struct { version int64 root *Node - metrics *metrics.TreeMetrics + metrics metrics.Proxy sql *SqliteDb sqlWriter *sqlWriter writerCancel context.CancelFunc @@ -42,14 +47,15 @@ type Tree struct { metricsProxy metrics.Proxy // state - branches []*Node - leaves []*Node - branchOrphans []NodeKey - leafOrphans []NodeKey - deletes []*nodeDelete - sequence uint32 - isReplaying bool - evictionDepth int8 + branches []*Node + leaves []*Node + branchOrphans []NodeKey + leafOrphans []NodeKey + deletes []*nodeDelete + leafSequence uint32 + branchSequence uint32 + isReplaying bool + evictionDepth int8 } type TreeOptions struct { @@ -67,6 +73,7 @@ func DefaultTreeOptions() TreeOptions { StateStorage: true, HeightFilter: 1, EvictionDepth: -1, + MetricsProxy: &metrics.NilMetrics{}, } } @@ -78,7 +85,7 @@ func NewTree(sql *SqliteDb, pool *NodePool, opts TreeOptions) *Tree { writerCancel: cancel, pool: pool, checkpoints: &VersionRange{}, - metrics: &metrics.TreeMetrics{}, + metrics: opts.MetricsProxy, maxWorkingSize: 1.5 * 1024 * 1024 * 1024, checkpointInterval: opts.CheckpointInterval, checkpointMemory: opts.CheckpointMemory, @@ -87,6 +94,7 @@ func NewTree(sql *SqliteDb, pool *NodePool, opts TreeOptions) *Tree { heightFilter: opts.HeightFilter, metricsProxy: opts.MetricsProxy, evictionDepth: opts.EvictionDepth, + leafSequence: leafSequenceStart, } tree.sqlWriter.start(ctx) @@ -155,7 +163,7 @@ func (tree *Tree) SaveSnapshot() (err error) { func (tree *Tree) SaveVersion() ([]byte, int64, error) { tree.version++ - tree.sequence = 0 + tree.resetSequences() if err := tree.sql.closeHangingIterators(); err != nil { return nil, 0, err @@ -281,7 +289,7 @@ func (tree *Tree) deepHash(node *Node, depth int8) { func (tree *Tree) Get(key []byte) ([]byte, error) { if tree.metricsProxy != nil { - defer tree.metricsProxy.MeasureSince(time.Now(), "iavl_v2", "get") + defer tree.metricsProxy.MeasureSince(time.Now(), metricsNamespace, "tree_get") } var ( res []byte @@ -300,7 +308,7 @@ func (tree *Tree) Get(key []byte) ([]byte, error) { func (tree *Tree) Has(key []byte) (bool, error) { if tree.metricsProxy != nil { - defer tree.metricsProxy.MeasureSince(time.Now(), "iavl_v2", "has") + defer tree.metricsProxy.MeasureSince(time.Now(), metricsNamespace, "tree_has") } var ( err error @@ -326,16 +334,16 @@ func (tree *Tree) Has(key []byte) (bool, error) { // updated, while false means it was a new key. func (tree *Tree) Set(key, value []byte) (updated bool, err error) { if tree.metricsProxy != nil { - defer tree.metricsProxy.MeasureSince(time.Now(), "iavl_v2", "set") + defer tree.metricsProxy.MeasureSince(time.Now(), metricsNamespace, "tree_set") } updated, err = tree.set(key, value) if err != nil { return false, err } if updated { - tree.metrics.TreeUpdate++ + tree.metrics.IncrCounter(1, metricsNamespace, "tree_update") } else { - tree.metrics.TreeNewNode++ + tree.metrics.IncrCounter(1, metricsNamespace, "tree_new_node") } return updated, nil } @@ -346,7 +354,7 @@ func (tree *Tree) set(key []byte, value []byte) (updated bool, err error) { } if tree.root == nil { - tree.root = tree.NewNode(key, value) + tree.root = tree.NewLeafNode(key, value) return updated, nil } @@ -363,21 +371,21 @@ func (tree *Tree) recursiveSet(node *Node, key []byte, value []byte) ( if node.isLeaf() { switch bytes.Compare(key, node.key) { case -1: // setKey < leafKey - tree.metrics.PoolGet += 2 + tree.metrics.IncrCounter(2, metricsNamespace, "pool_get") parent := tree.pool.Get() parent.nodeKey = tree.nextNodeKey() parent.key = node.key parent.subtreeHeight = 1 parent.size = 2 parent.dirty = true - parent.setLeft(tree.NewNode(key, value)) + parent.setLeft(tree.NewLeafNode(key, value)) parent.setRight(node) tree.workingBytes += parent.sizeBytes() tree.workingSize++ return parent, false, nil case 1: // setKey > leafKey - tree.metrics.PoolGet += 2 + tree.metrics.IncrCounter(2, metricsNamespace, "pool_get") parent := tree.pool.Get() parent.nodeKey = tree.nextNodeKey() parent.key = key @@ -385,7 +393,7 @@ func (tree *Tree) recursiveSet(node *Node, key []byte, value []byte) ( parent.size = 2 parent.dirty = true parent.setLeft(node) - parent.setRight(tree.NewNode(key, value)) + parent.setRight(tree.NewLeafNode(key, value)) tree.workingBytes += parent.sizeBytes() tree.workingSize++ @@ -448,7 +456,7 @@ func (tree *Tree) recursiveSet(node *Node, key []byte, value []byte) ( // after this call, since it may point to data stored inside IAVL. func (tree *Tree) Remove(key []byte) ([]byte, bool, error) { if tree.metricsProxy != nil { - tree.metricsProxy.MeasureSince(time.Now(), "iavL_v2", "remove") + defer tree.metricsProxy.MeasureSince(time.Now(), metricsNamespace, "tree_remove") } if tree.root == nil { @@ -462,7 +470,7 @@ func (tree *Tree) Remove(key []byte) ([]byte, bool, error) { return nil, false, nil } - tree.metrics.TreeDelete++ + tree.metrics.IncrCounter(1, metricsNamespace, "tree_delete") tree.root = newRoot return value, true, nil @@ -569,11 +577,25 @@ func (tree *Tree) Height() int8 { } func (tree *Tree) nextNodeKey() NodeKey { - tree.sequence++ - nk := NewNodeKey(tree.version+1, tree.sequence) + tree.branchSequence++ + nk := NewNodeKey(tree.version+1, tree.branchSequence) + return nk +} + +func (tree *Tree) nextLeafNodeKey() NodeKey { + tree.leafSequence++ + if tree.leafSequence < leafSequenceStart { + panic("leaf sequence underflow") + } + nk := NewNodeKey(tree.version+1, tree.leafSequence) return nk } +func (tree *Tree) resetSequences() { + tree.leafSequence = leafSequenceStart + tree.branchSequence = 0 +} + func (tree *Tree) mutateNode(node *Node) { // this second conditional is only relevant in replay; or more specifically, in cases where hashing has been // deferred between versions @@ -581,7 +603,11 @@ func (tree *Tree) mutateNode(node *Node) { return } node.hash = nil - node.nodeKey = tree.nextNodeKey() + if node.isLeaf() { + node.nodeKey = tree.nextLeafNodeKey() + } else { + node.nodeKey = tree.nextNodeKey() + } if node.dirty { return @@ -611,17 +637,17 @@ func (tree *Tree) addDelete(node *Node) { return } del := &nodeDelete{ - deleteKey: tree.nextNodeKey(), + deleteKey: tree.nextLeafNodeKey(), leafKey: node.key, } tree.deletes = append(tree.deletes, del) } -// NewNode returns a new node from a key, value and version. -func (tree *Tree) NewNode(key []byte, value []byte) *Node { +// NewLeafNode returns a new node from a key, value and version. +func (tree *Tree) NewLeafNode(key []byte, value []byte) *Node { node := tree.pool.Get() - node.nodeKey = tree.nextNodeKey() + node.nodeKey = tree.nextLeafNodeKey() node.key = key node.subtreeHeight = 0 diff --git a/v2/tree_test.go b/v2/tree_test.go index 2083722f7..cd1693319 100644 --- a/v2/tree_test.go +++ b/v2/tree_test.go @@ -1,14 +1,9 @@ -// TODO move to package iavl_test -// this means an audit of exported fields and types. package iavl import ( - "context" "crypto/sha256" "encoding/hex" "fmt" - "net/http" - "runtime" "sort" "testing" "time" @@ -19,147 +14,12 @@ import ( "github.com/cosmos/iavl/v2/testutil" "github.com/dustin/go-humanize" api "github.com/kocubinski/costor-api" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" - "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/stretchr/testify/require" ) -func MemUsage() string { - var m runtime.MemStats - runtime.ReadMemStats(&m) - // For info on each, see: https://golang.org/pkg/runtime/#MemStats - s := fmt.Sprintf("alloc=%s sys=%s gc=%d", - humanize.Bytes(m.HeapAlloc), - //humanize.Bytes(m.TotalAlloc), - humanize.Bytes(m.Sys), - m.NumGC) - return s -} - -func testTreeBuild(t *testing.T, multiTree *MultiTree, opts *testutil.TreeBuildOptions) (cnt int64) { - var ( - version int64 - err error - ) - cnt = 1 - - // generator - itr := opts.Iterator - fmt.Printf("Initial memory usage from generators:\n%s\n", MemUsage()) - - sampleRate := int64(100_000) - if opts.SampleRate != 0 { - sampleRate = opts.SampleRate - } - - since := time.Now() - itrStart := time.Now() - - report := func() { - dur := time.Since(since) - - var ( - workingBytes uint64 - workingSize int64 - writeLeaves int64 - writeTime time.Duration - ) - for _, tr := range multiTree.Trees { - m := tr.sql.metrics - workingBytes += tr.workingBytes - workingSize += tr.workingSize - writeLeaves += m.WriteLeaves - writeTime += m.WriteTime - m.WriteDurations = nil - m.WriteLeaves = 0 - m.WriteTime = 0 - } - fmt.Printf("leaves=%s time=%s last=%s μ=%s version=%d work-size=%s work-bytes=%s %s\n", - humanize.Comma(cnt), - dur.Round(time.Millisecond), - humanize.Comma(int64(float64(sampleRate)/time.Since(since).Seconds())), - humanize.Comma(int64(float64(cnt)/time.Since(itrStart).Seconds())), - version, - humanize.Comma(workingSize), - humanize.Bytes(workingBytes), - MemUsage()) - - if writeTime > 0 { - fmt.Printf("writes: cnt=%s wr/s=%s dur/wr=%s dur=%s\n", - humanize.Comma(writeLeaves), - humanize.Comma(int64(float64(writeLeaves)/writeTime.Seconds())), - time.Duration(int64(writeTime)/writeLeaves), - writeTime.Round(time.Millisecond), - ) - } - - if err := multiTree.QueryReport(0); err != nil { - t.Fatalf("query report err %v", err) - } - - fmt.Println() - - since = time.Now() - } - - for ; itr.Valid(); err = itr.Next() { - require.NoError(t, err) - changeset := itr.Nodes() - for ; changeset.Valid(); err = changeset.Next() { - cnt++ - require.NoError(t, err) - node := changeset.GetNode() - - //var keyBz bytes.Buffer - //keyBz.Write([]byte(node.StoreKey)) - //keyBz.Write(node.Key) - //key := keyBz.Bytes() - key := node.Key - - tree, ok := multiTree.Trees[node.StoreKey] - if !ok { - require.NoError(t, multiTree.MountTree(node.StoreKey)) - tree = multiTree.Trees[node.StoreKey] - } - - if !node.Delete { - _, err = tree.Set(key, node.Value) - require.NoError(t, err) - } else { - _, _, err := tree.Remove(key) - require.NoError(t, err) - } - - if cnt%sampleRate == 0 { - report() - } - } - - _, version, err = multiTree.SaveVersionConcurrently() - require.NoError(t, err) - - require.NoError(t, err) - if version == opts.Until { - break - } - } - fmt.Printf("final version: %d, hash: %x\n", version, multiTree.Hash()) - for sk, tree := range multiTree.Trees { - fmt.Printf("storekey: %s height: %d, size: %d\n", sk, tree.Height(), tree.Size()) - } - fmt.Printf("mean leaves/ms %s\n", humanize.Comma(cnt/time.Since(itrStart).Milliseconds())) - require.Equal(t, version, opts.Until) - require.Equal(t, opts.UntilHash, fmt.Sprintf("%x", multiTree.Hash())) - return cnt -} - func TestTree_Hash(t *testing.T) { var err error - tmpDir := t.TempDir() - //tmpDir := "/tmp/iavl-test" - t.Logf("levelDb tmpDir: %s\n", tmpDir) require.NoError(t, err) opts := testutil.BigTreeOptions_100_000() @@ -169,7 +29,9 @@ func TestTree_Hash(t *testing.T) { // at this commit tree: https://github.com/cosmos/iavl-bench/blob/3a6a1ec0a8cbec305e46239454113687da18240d/iavl-v0/main.go#L136 opts.Until = 100 opts.UntilHash = "0101e1d6f3158dcb7221acd7ed36ce19f2ef26847ffea7ce69232e362539e5cf" - treeOpts := TreeOptions{CheckpointInterval: 10, HeightFilter: 1, StateStorage: true, EvictionDepth: 8} + treeOpts := TreeOptions{ + CheckpointInterval: 10, HeightFilter: 1, StateStorage: true, EvictionDepth: 8, MetricsProxy: metrics.NewStructMetrics(), + } testStart := time.Now() multiTree := NewMultiTree(NewTestLogger(), tmpDir, treeOpts) @@ -178,7 +40,8 @@ func TestTree_Hash(t *testing.T) { for _, sk := range itrs.StoreKeys() { require.NoError(t, multiTree.MountTree(sk)) } - leaves := testTreeBuild(t, multiTree, opts) + leaves, err := multiTree.TestBuild(opts) + require.NoError(t, err) treeDuration := time.Since(testStart) fmt.Printf("mean leaves/s: %s\n", humanize.Comma(int64(float64(leaves)/treeDuration.Seconds()))) @@ -187,17 +50,19 @@ func TestTree_Hash(t *testing.T) { func TestTree_Build_Load(t *testing.T) { // build the initial version of the tree with periodic checkpoints - //tmpDir := t.TempDir() - tmpDir := "/tmp/iavl-v2-test" + tmpDir := t.TempDir() opts := testutil.NewTreeBuildOptions().With10_000() - multiTree := NewMultiTree(NewTestLogger(), tmpDir, TreeOptions{CheckpointInterval: 4000, HeightFilter: 0, StateStorage: false}) + multiTree := NewMultiTree(NewTestLogger(), tmpDir, TreeOptions{ + CheckpointInterval: 4000, HeightFilter: 0, StateStorage: false, MetricsProxy: metrics.NewStructMetrics(), + }) itrs, ok := opts.Iterator.(*bench.ChangesetIterators) require.True(t, ok) for _, sk := range itrs.StoreKeys() { require.NoError(t, multiTree.MountTree(sk)) } t.Log("building initial tree to version 10,000") - testTreeBuild(t, multiTree, opts) + _, err := multiTree.TestBuild(opts) + require.NoError(t, err) t.Log("snapshot tree at version 10,000") // take a snapshot at version 10,000 @@ -205,7 +70,7 @@ func TestTree_Build_Load(t *testing.T) { require.NoError(t, multiTree.Close()) t.Log("import snapshot into new tree") - mt, err := ImportMultiTree(NewTestLogger(), multiTree.pool, 10_000, tmpDir, DefaultTreeOptions()) + mt, err := ImportMultiTree(NewTestLogger(), 10_000, tmpDir, DefaultTreeOptions()) require.NoError(t, err) t.Log("build tree to version 12,000 and verify hash") @@ -213,126 +78,9 @@ func TestTree_Build_Load(t *testing.T) { require.Equal(t, int64(10_001), opts.Iterator.Version()) opts.Until = 12_000 opts.UntilHash = "3a037f8dd67a5e1a9ef83a53b81c619c9ac0233abee6f34a400fb9b9dfbb4f8d" - testTreeBuild(t, mt, opts) - require.NoError(t, mt.Close()) - - t.Log("export the tree at version 12,000 and import it into a sql db in pre-order") - traverseOrder := PreOrder - restorePreOrderMt := NewMultiTree(NewTestLogger(), t.TempDir(), TreeOptions{CheckpointInterval: 4000}) - for sk, tree := range multiTree.Trees { - require.NoError(t, restorePreOrderMt.MountTree(sk)) - exporter := tree.Export(traverseOrder) - - restoreTree := restorePreOrderMt.Trees[sk] - _, err := restoreTree.sql.WriteSnapshot(context.Background(), tree.Version(), exporter.Next, SnapshotOptions{WriteCheckpoint: true, TraverseOrder: traverseOrder}) - require.NoError(t, err) - require.NoError(t, restoreTree.LoadSnapshot(tree.Version(), traverseOrder)) - } - require.NoError(t, restorePreOrderMt.Close()) - - t.Log("export the tree at version 12,000 and import it into a sql db in post-order") - traverseOrder = PostOrder - restorePostOrderMt := NewMultiTree(NewTestLogger(), t.TempDir(), TreeOptions{CheckpointInterval: 4000}) - for sk, tree := range multiTree.Trees { - require.NoError(t, restorePostOrderMt.MountTree(sk)) - exporter := tree.Export(traverseOrder) - - restoreTree := restorePostOrderMt.Trees[sk] - _, err := restoreTree.sql.WriteSnapshot(context.Background(), tree.Version(), exporter.Next, SnapshotOptions{WriteCheckpoint: true, TraverseOrder: traverseOrder}) - require.NoError(t, err) - require.NoError(t, restoreTree.LoadSnapshot(tree.Version(), traverseOrder)) - } - require.Equal(t, restorePostOrderMt.Hash(), restorePreOrderMt.Hash()) - - t.Log("build tree to version 20,000 and verify hash") - require.NoError(t, opts.Iterator.Next()) - require.Equal(t, int64(12_001), opts.Iterator.Version()) - opts.Until = 20_000 - opts.UntilHash = "25907b193c697903218d92fa70a87ef6cdd6fa5b9162d955a4d70a9d5d2c4824" - testTreeBuild(t, restorePostOrderMt, opts) - require.NoError(t, restorePostOrderMt.Close()) -} - -// pre-requisites for the 2 tests below: -// $ go run ./cmd gen tree --db /tmp/iavl-v2 --limit 1 --type osmo-like-many -// $ go run ./cmd snapshot --db /tmp/iavl-v2 --version 1 -// mkdir -p /tmp/osmo-like-many/v2 && go run ./cmd gen emit --start 2 --limit 5000 --type osmo-like-many --out /tmp/osmo-like-many/v2 -func TestOsmoLike_HotStart(t *testing.T) { - tmpDir := "/tmp/iavl-v2" - // logDir := "/tmp/osmo-like-many-v2" - logDir := "/Users/mattk/src/scratch/osmo-like-many/v2" - pool := NewNodePool() - multiTree, err := ImportMultiTree(NewTestLogger(), pool, 1, tmpDir, TreeOptions{HeightFilter: 0, StateStorage: false}) + _, err = mt.TestBuild(opts) require.NoError(t, err) - require.NotNil(t, multiTree) - opts := testutil.CompactedChangelogs(logDir) - opts.SampleRate = 250_000 - - opts.Until = 1_000 - opts.UntilHash = "557663181d9ab97882ecfc6538e3b4cfe31cd805222fae905c4b4f4403ca5cda" - - testTreeBuild(t, multiTree, opts) -} - -func TestOsmoLike_ColdStart(t *testing.T) { - tmpDir := "/tmp/iavl-v2" - - treeOpts := DefaultTreeOptions() - treeOpts.CheckpointInterval = -1 - treeOpts.CheckpointMemory = 1.5 * 1024 * 1024 * 1024 - treeOpts.StateStorage = false - treeOpts.HeightFilter = 1 - treeOpts.EvictionDepth = 16 - treeOpts.MetricsProxy = newPrometheusMetricsProxy() - multiTree := NewMultiTree(NewTestLogger(), tmpDir, treeOpts) - require.NoError(t, multiTree.MountTrees()) - require.NoError(t, multiTree.LoadVersion(1)) - // require.NoError(t, multiTree.WarmLeaves()) - - // logDir := "/tmp/osmo-like-many-v2" - opts := testutil.CompactedChangelogs("/Users/mattk/src/scratch/osmo-like-many/v2") - opts.SampleRate = 250_000 - - opts.Until = 1_000 - opts.UntilHash = "557663181d9ab97882ecfc6538e3b4cfe31cd805222fae905c4b4f4403ca5cda" - - testTreeBuild(t, multiTree, opts) -} - -func TestTree_Import(t *testing.T) { - tmpDir := "/Users/mattk/src/scratch/sqlite/height-zero" - - pool := NewNodePool() - sql, err := NewSqliteDb(pool, SqliteDbOptions{Path: tmpDir}) - require.NoError(t, err) - - root, err := sql.ImportSnapshotFromTable(1, PreOrder, true) - require.NoError(t, err) - require.NotNil(t, root) -} - -func TestTree_Rehash(t *testing.T) { - pool := NewNodePool() - sql, err := NewSqliteDb(pool, SqliteDbOptions{Path: "/Users/mattk/src/scratch/sqlite/height-zero"}) - require.NoError(t, err) - tree := NewTree(sql, pool, TreeOptions{}) - require.NoError(t, tree.LoadVersion(1)) - - savedHash := make([]byte, 32) - n := copy(savedHash, tree.root.hash) - require.Equal(t, 32, n) - var step func(node *Node) - step = func(node *Node) { - if node.isLeaf() { - return - } - node.hash = nil - step(node.left(tree)) - step(node.right(tree)) - node._hash() - } - step(tree.root) - require.Equal(t, savedHash, tree.root.hash) + require.NoError(t, mt.Close()) } func TestTreeSanity(t *testing.T) { @@ -347,7 +95,7 @@ func TestTreeSanity(t *testing.T) { pool := NewNodePool() sql, err := NewInMemorySqliteDb(pool) require.NoError(t, err) - return NewTree(sql, pool, TreeOptions{}) + return NewTree(sql, pool, DefaultTreeOptions()) }, hashFn: func(tree *Tree) []byte { hash, _, err := tree.SaveVersion() @@ -359,7 +107,7 @@ func TestTreeSanity(t *testing.T) { name: "no db", treeFn: func() *Tree { pool := NewNodePool() - return NewTree(nil, pool, TreeOptions{}) + return NewTree(nil, pool, DefaultTreeOptions()) }, hashFn: func(tree *Tree) []byte { rehashTree(tree.root) @@ -411,9 +159,10 @@ func TestTreeSanity(t *testing.T) { func Test_EmptyTree(t *testing.T) { pool := NewNodePool() - sql, err := NewInMemorySqliteDb(pool) + dbPath := t.TempDir() + sql, err := NewSqliteDb(pool, SqliteDbOptions{Path: dbPath}) require.NoError(t, err) - tree := NewTree(sql, pool, TreeOptions{}) + tree := NewTree(sql, pool, DefaultTreeOptions()) _, err = tree.Set([]byte("foo"), []byte("bar")) require.NoError(t, err) @@ -439,15 +188,6 @@ func Test_EmptyTree(t *testing.T) { require.NoError(t, err) } -func Test_Replay_Tmp(t *testing.T) { - pool := NewNodePool() - sql, err := NewSqliteDb(pool, SqliteDbOptions{Path: "/Users/mattk/src/scratch/icahost"}) - require.NoError(t, err) - tree := NewTree(sql, pool, TreeOptions{StateStorage: true}) - err = tree.LoadVersion(13946707) - require.NoError(t, err) -} - func Test_Replay(t *testing.T) { unsafeBytesToStr := func(b []byte) string { return *(*string)(unsafe.Pointer(&b)) @@ -473,7 +213,9 @@ func Test_Replay(t *testing.T) { tmpDir := t.TempDir() sql, err := NewSqliteDb(pool, SqliteDbOptions{Path: tmpDir}) require.NoError(t, err) - tree := NewTree(sql, pool, TreeOptions{StateStorage: true, CheckpointInterval: 100}) + opts := DefaultTreeOptions() + opts.CheckpointInterval = 100 + tree := NewTree(sql, pool, opts) // we must buffer all sets/deletes and order them first for replay to work properly. // store v1 and v2 already do this via cachekv write buffering. @@ -532,7 +274,7 @@ func Test_Replay(t *testing.T) { sql, err = NewSqliteDb(pool, SqliteDbOptions{Path: tmpDir}) require.NoError(t, err) - tree = NewTree(sql, pool, TreeOptions{StateStorage: true}) + tree = NewTree(sql, pool, opts) err = tree.LoadVersion(140) require.NoError(t, err) itr, err = gen.Iterator() @@ -541,32 +283,12 @@ func Test_Replay(t *testing.T) { sql, err = NewSqliteDb(pool, SqliteDbOptions{Path: tmpDir}) require.NoError(t, err) - tree = NewTree(sql, pool, TreeOptions{StateStorage: true, CheckpointInterval: 100}) + tree = NewTree(sql, pool, opts) err = tree.LoadVersion(170) require.NoError(t, err) itr, err = gen.Iterator() require.NoError(t, err) ingest(171, 250) - - //sql, err = NewSqliteDb(pool, SqliteDbOptions{Path: tmpDir}) - //require.NoError(t, err) - //tree = NewTree(sql, pool, TreeOptions{StateStorage: true}) - //require.NoError(t, err) - //require.NoError(t, tree.Close()) - // - //sql, err = NewSqliteDb(pool, SqliteDbOptions{Path: tmpDir}) - //require.NoError(t, err) - //tree = NewTree(sql, pool, TreeOptions{StateStorage: true}) - //err = tree.LoadVersion(5) - //require.NoError(t, err) - // - //tree = NewTree(sql, pool, TreeOptions{StateStorage: true}) - //err = tree.LoadVersion(555) - //require.NoError(t, err) - // - //tree = NewTree(sql, pool, TreeOptions{StateStorage: true}) - //err = tree.LoadVersion(1000) - //require.NoError(t, err) } func Test_Prune_Logic(t *testing.T) { @@ -588,11 +310,12 @@ func Test_Prune_Logic(t *testing.T) { require.NoError(t, err) pool := NewNodePool() - // tmpDir := "/tmp/prune-logic" tmpDir := t.TempDir() - sql, err := NewSqliteDb(pool, SqliteDbOptions{Path: tmpDir, ShardTrees: false}) + sql, err := NewSqliteDb(pool, SqliteDbOptions{Path: tmpDir, ShardTrees: false, Logger: NewDebugLogger()}) require.NoError(t, err) - tree := NewTree(sql, pool, TreeOptions{StateStorage: true, CheckpointInterval: 100}) + treeOpts := DefaultTreeOptions() + treeOpts.CheckpointInterval = 100 + tree := NewTree(sql, pool, treeOpts) for ; itr.Valid(); err = itr.Next() { require.NoError(t, err) @@ -609,7 +332,7 @@ func Test_Prune_Logic(t *testing.T) { } } _, version, err := tree.SaveVersion() - // fmt.Printf("version=%d, hash=%x\n", version, tree.Hash()) + fmt.Printf("version=%d, hash=%x\n", version, tree.Hash()) switch version { case 30: require.NoError(t, tree.DeleteVersionsTo(20)) @@ -623,165 +346,3 @@ func Test_Prune_Logic(t *testing.T) { require.NoError(t, err) } } - -func Test_Prune_Performance(t *testing.T) { - tmpDir := "/tmp/iavl-v2" - - multiTree := NewMultiTree(NewTestLogger(), tmpDir, TreeOptions{CheckpointInterval: 50, StateStorage: false}) - require.NoError(t, multiTree.MountTrees()) - require.NoError(t, multiTree.LoadVersion(1)) - require.NoError(t, multiTree.WarmLeaves()) - - // logDir := "/tmp/osmo-like-many-v2" - opts := testutil.CompactedChangelogs("/Users/mattk/src/scratch/osmo-like-many/v2") - opts.SampleRate = 250_000 - - opts.Until = 1_000 - opts.UntilHash = "557663181d9ab97882ecfc6538e3b4cfe31cd805222fae905c4b4f4403ca5cda" - - itr := opts.Iterator - var ( - err error - cnt int64 - version int64 - since = time.Now() - itrStart = time.Now() - lastPrune = 1 - ) - report := func() { - dur := time.Since(since) - - var ( - workingBytes uint64 - workingSize int64 - writeLeaves int64 - writeTime time.Duration - ) - for _, tr := range multiTree.Trees { - m := tr.sql.metrics - workingBytes += tr.workingBytes - workingSize += tr.workingSize - writeLeaves += m.WriteLeaves - writeTime += m.WriteTime - m.WriteDurations = nil - m.WriteLeaves = 0 - m.WriteTime = 0 - } - fmt.Printf("leaves=%s time=%s last=%s μ=%s version=%d work-bytes=%s work-size=%s %s\n", - humanize.Comma(cnt), - dur.Round(time.Millisecond), - humanize.Comma(int64(float64(opts.SampleRate)/time.Since(since).Seconds())), - humanize.Comma(int64(float64(cnt)/time.Since(itrStart).Seconds())), - version, - humanize.Bytes(workingBytes), - humanize.Comma(workingSize), - MemUsage()) - - if writeTime > 0 { - fmt.Printf("writes: cnt=%s wr/s=%s dur/wr=%s dur=%s\n", - humanize.Comma(writeLeaves), - humanize.Comma(int64(float64(writeLeaves)/writeTime.Seconds())), - time.Duration(int64(writeTime)/writeLeaves), - writeTime.Round(time.Millisecond), - ) - } - - if err := multiTree.QueryReport(0); err != nil { - t.Fatalf("query report err %v", err) - } - - fmt.Println() - - since = time.Now() - } - - for ; itr.Valid(); err = itr.Next() { - require.NoError(t, err) - changeset := itr.Nodes() - for ; changeset.Valid(); err = changeset.Next() { - cnt++ - require.NoError(t, err) - node := changeset.GetNode() - key := node.Key - - tree, ok := multiTree.Trees[node.StoreKey] - require.True(t, ok) - - if !node.Delete { - _, err = tree.Set(key, node.Value) - require.NoError(t, err) - } else { - _, _, err := tree.Remove(key) - require.NoError(t, err) - } - - if cnt%opts.SampleRate == 0 { - report() - } - } - - _, version, err = multiTree.SaveVersionConcurrently() - require.NoError(t, err) - - require.NoError(t, err) - if version == opts.Until { - break - } - - lastPrune++ - // trigger two prunes close together in order to test the receipt of a prune signal before a previous prune has completed - if lastPrune == 80 || lastPrune == 85 { - pruneTo := version - 1 - t.Logf("prune to version %d", pruneTo) - for _, tree := range multiTree.Trees { - require.NoError(t, tree.DeleteVersionsTo(pruneTo)) - } - t.Log("prune signals sent") - if lastPrune == 85 { - lastPrune = 0 - } - } - } -} - -var _ metrics.Proxy = &prometheusMetricsProxy{} - -type prometheusMetricsProxy struct { - workingSize prometheus.Gauge - workingBytes prometheus.Gauge -} - -func newPrometheusMetricsProxy() *prometheusMetricsProxy { - p := &prometheusMetricsProxy{} - p.workingSize = promauto.NewGauge(prometheus.GaugeOpts{ - Name: "iavl_working_size", - Help: "working size", - }) - p.workingBytes = promauto.NewGauge(prometheus.GaugeOpts{ - Name: "iavl_working_bytes", - Help: "working bytes", - }) - http.Handle("/metrics", promhttp.Handler()) - go func() { - err := http.ListenAndServe(":2112", nil) - if err != nil { - panic(err) - } - }() - return p -} - -func (p *prometheusMetricsProxy) IncrCounter(_ float32, _ ...string) { -} - -func (p *prometheusMetricsProxy) SetGauge(val float32, keys ...string) { - k := keys[1] - switch k { - case "working_size": - p.workingSize.Set(float64(val)) - case "working_bytes": - p.workingBytes.Set(float64(val)) - } -} - -func (p *prometheusMetricsProxy) MeasureSince(_ time.Time, _ ...string) {}