From 2adb7e63b1ce6eab41da295ef420a99c3d3c5c00 Mon Sep 17 00:00:00 2001 From: Harshil Goel Date: Mon, 6 Jan 2025 15:00:13 +0530 Subject: [PATCH] added config --- dgraph/cmd/alpha/run.go | 6 +++- dgraph/cmd/debug/run.go | 2 +- posting/list.go | 7 +++++ posting/list_test.go | 2 +- posting/lists.go | 4 +-- posting/mvcc.go | 55 +++++++++++++++++++++++------------- posting/mvcc_test.go | 2 +- worker/config.go | 5 ++++ worker/mutation_unit_test.go | 2 +- worker/sort_test.go | 10 +++---- 10 files changed, 64 insertions(+), 31 deletions(-) diff --git a/dgraph/cmd/alpha/run.go b/dgraph/cmd/alpha/run.go index e2a6e0f95ce..8d217543c0f 100644 --- a/dgraph/cmd/alpha/run.go +++ b/dgraph/cmd/alpha/run.go @@ -145,6 +145,8 @@ they form a Raft group and provide synchronous replication. Flag("percentage", "Cache percentages summing up to 100 for various caches (FORMAT: PostingListCache,"+ "PstoreBlockCache,PstoreIndexCache)"). + Flag("keep-updates", + "Should carry updates in cache or not (bool)"). String()) flag.String("raft", worker.RaftDefaults, z.NewSuperFlagHelp(worker.RaftDefaults). @@ -633,6 +635,7 @@ func run() { x.AssertTruef(totalCache >= 0, "ERROR: Cache size must be non-negative") cachePercentage := cache.GetString("percentage") + keepUpdates := cache.GetBool("keep-updates") cachePercent, err := x.GetCachePercentages(cachePercentage, 3) x.Check(err) postingListCacheSize := (cachePercent[0] * (totalCache << 20)) / 100 @@ -655,6 +658,7 @@ func run() { WALDir: Alpha.Conf.GetString("wal"), CacheMb: totalCache, CachePercentage: cachePercentage, + KeepUpdates: keepUpdates, MutationsMode: worker.AllowMutations, AuthToken: security.GetString("token"), @@ -782,7 +786,7 @@ func run() { // Posting will initialize index which requires schema. Hence, initialize // schema before calling posting.Init(). schema.Init(worker.State.Pstore) - posting.Init(worker.State.Pstore, postingListCacheSize) + posting.Init(worker.State.Pstore, postingListCacheSize, keepUpdates) defer posting.Cleanup() worker.Init(worker.State.Pstore) diff --git a/dgraph/cmd/debug/run.go b/dgraph/cmd/debug/run.go index 630c0b634ee..d6e3de9eafd 100644 --- a/dgraph/cmd/debug/run.go +++ b/dgraph/cmd/debug/run.go @@ -1024,7 +1024,7 @@ func run() { db, err := badger.OpenManaged(bopts) x.Check(err) // Not using posting list cache - posting.Init(db, 0) + posting.Init(db, 0, false) defer db.Close() printSummary(db) diff --git a/posting/list.go b/posting/list.go index fd83da3cd78..635508265de 100644 --- a/posting/list.go +++ b/posting/list.go @@ -121,6 +121,13 @@ func newMutableLayer() *MutableLayer { } } +func (mm *MutableLayer) setTs(readTs uint64) { + if mm == nil { + return + } + mm.readTs = readTs +} + // This function clones an existing mutable layer for the new transactions. This function makes sure we copy the right // things from the existing mutable layer for the new list. It basically copies committedEntries using reference and // ignores currentEntires and readTs. Similarly, all the cache items related to currentEntries are ignored and diff --git a/posting/list_test.go b/posting/list_test.go index 8fc44d8db61..3fa4b506778 100644 --- a/posting/list_test.go +++ b/posting/list_test.go @@ -1806,7 +1806,7 @@ func TestMain(m *testing.M) { ps, err = badger.OpenManaged(badger.DefaultOptions(dir)) x.Panic(err) // Not using posting list cache - Init(ps, 0) + Init(ps, 0, false) schema.Init(ps) m.Run() diff --git a/posting/lists.go b/posting/lists.go index 9bf37d2a0ec..a43e2b82092 100644 --- a/posting/lists.go +++ b/posting/lists.go @@ -41,7 +41,7 @@ var ( ) // Init initializes the posting lists package, the in memory and dirty list hash. -func Init(ps *badger.DB, cacheSize int64) { +func Init(ps *badger.DB, cacheSize int64, keepUpdates bool) { pstore = ps closer = z.NewCloser(1) go x.MonitorMemoryMetrics(closer) @@ -52,7 +52,7 @@ func Init(ps *badger.DB, cacheSize int64) { return } - memoryLayer = initMemoryLayer(cacheSize) + memoryLayer = initMemoryLayer(cacheSize, keepUpdates) } func UpdateMaxCost(maxCost int64) { diff --git a/posting/mvcc.go b/posting/mvcc.go index 24eba428e3e..3b7c1094383 100644 --- a/posting/mvcc.go +++ b/posting/mvcc.go @@ -351,14 +351,17 @@ func RemoveCacheFor(key []byte) { type MemoryLayer struct { cache *ristretto.Cache[[]byte, *CachePL] + keepUpdates bool + numCacheRead int numCacheReadFails int numDisksRead int numCacheSave int } -func initMemoryLayer(cacheSize int64) *MemoryLayer { - sm := &MemoryLayer{} +func initMemoryLayer(cacheSize int64, keepUpdates bool) *MemoryLayer { + ml := &MemoryLayer{} + ml.keepUpdates = keepUpdates cache, err := ristretto.NewCache[[]byte, *CachePL](&ristretto.Config[[]byte, *CachePL]{ // Use 5% of cache memory for storing counters. NumCounters: int64(float64(cacheSize) * 0.05 * 2), @@ -382,12 +385,17 @@ func initMemoryLayer(cacheSize int64) *MemoryLayer { ostats.Record(context.Background(), x.PLCacheHitRatio.M(m.Ratio())) } }() - sm.cache = cache - return sm + if cacehSize > 0 { + ml.cache = cache + } + return ml } -func (sm *MemoryLayer) get(key []byte) (*CachePL, bool) { - val, ok := sm.cache.Get(key) +func (ml *MemoryLayer) get(key []byte) (*CachePL, bool) { + if ml.cache == nil { + return nil, false + } + val, ok := ml.cache.Get(key) if !ok { return val, ok } @@ -397,16 +405,25 @@ func (sm *MemoryLayer) get(key []byte) (*CachePL, bool) { return val, true } -func (sm *MemoryLayer) set(key []byte, i *CachePL) { - sm.cache.Set(key, i, 1) +func (ml *MemoryLayer) set(key []byte, i *CachePL) { + if ml.cache == nil { + return + } + ml.cache.Set(key, i, 1) } -func (sm *MemoryLayer) del(key []byte) { - sm.cache.Del(key) +func (ml *MemoryLayer) del(key []byte) { + if ml.cache == nil { + return + } + ml.cache.Del(key) } -func (sm *MemoryLayer) clear() { - sm.cache.Clear() +func (ml *MemoryLayer) clear() { + if ml.cache == nil { + return + } + ml.cache.Clear() } func NewCachePL() *CachePL { @@ -434,9 +451,8 @@ func (ml *MemoryLayer) updateItemInCache(key string, delta []byte, startTs, comm return } - updateItemAfterCommit := false - - if !updateItemAfterCommit { + if !ml.keepUpdates { + // TODO We should mark the key as deleted instead of directly deleting from the cache. ml.del([]byte(key)) return } @@ -449,7 +465,7 @@ func (ml *MemoryLayer) updateItemInCache(key string, delta []byte, startTs, comm val.lastUpdate = commitTs val.count -= 1 - if val.list != nil && updateItemAfterCommit { + if val.list != nil && ml.keepUpdates { p := new(pb.PostingList) x.Check(proto.Unmarshal(delta, p)) @@ -551,9 +567,6 @@ func ReadPostingList(key []byte, it *badger.Iterator) (*List, error) { return l, nil case BitDeltaPosting: err := item.Value(func(val []byte) error { - if l.mutationMap == nil { - l.mutationMap = newMutableLayer() - } pl := &pb.PostingList{} if err := proto.Unmarshal(val, pl); err != nil { return err @@ -640,6 +653,7 @@ func (ml *MemoryLayer) readFromDisk(key []byte, pstore *badger.DB, readTs uint64 return l, nil } +// Saves the data in the cache. The caller must ensure that the list provided is the latest possible. func (ml *MemoryLayer) saveInCache(key []byte, l *List) { l.RLock() defer l.RUnlock() @@ -657,6 +671,7 @@ func (ml *MemoryLayer) ReadData(key []byte, pstore *badger.DB, readTs uint64) (* l := ml.readFromCache(key, readTs) if l != nil { ml.numCacheRead += 1 + l.mutationMap.setTs(readTs) return l, nil } else { ml.numCacheReadFails += 1 @@ -667,6 +682,7 @@ func (ml *MemoryLayer) ReadData(key []byte, pstore *badger.DB, readTs uint64) (* } ml.saveInCache(key, l) if l.minTs == 0 || readTs >= l.minTs { + l.mutationMap.setTs(readTs) return l, nil } @@ -675,6 +691,7 @@ func (ml *MemoryLayer) ReadData(key []byte, pstore *badger.DB, readTs uint64) (* return nil, err } + l.mutationMap.setTs(readTs) return l, nil } diff --git a/posting/mvcc_test.go b/posting/mvcc_test.go index bb616399161..4f0d98bd74d 100644 --- a/posting/mvcc_test.go +++ b/posting/mvcc_test.go @@ -109,7 +109,7 @@ func BenchmarkTestCache(b *testing.B) { ps, err = badger.OpenManaged(badger.DefaultOptions(dir)) x.Panic(err) - Init(ps, 10000000) + Init(ps, 10000000, true) schema.Init(ps) attr := x.GalaxyAttr("cache") diff --git a/worker/config.go b/worker/config.go index 96d04c19b23..7a1fd7f62f4 100644 --- a/worker/config.go +++ b/worker/config.go @@ -63,6 +63,11 @@ type Options struct { CachePercentage string // CacheMb is the total memory allocated between all the caches. CacheMb int64 + // KeepUpdates is the parameter that allows the user to set if the cache should keep the items that were + // just mutated. Keeping these items are good when there is a mixed workload where you are updating the + // same element multiple times. However, for a heavy mutation workload, not keeping these items would be better + // , as keeping these elements bloats the cache making it slow. + KeepUpdates bool Audit *x.LoggerConf diff --git a/worker/mutation_unit_test.go b/worker/mutation_unit_test.go index 6eb47fe832b..2483f924814 100644 --- a/worker/mutation_unit_test.go +++ b/worker/mutation_unit_test.go @@ -42,7 +42,7 @@ func TestReverseEdge(t *testing.T) { x.Check(err) pstore = ps // Not using posting list cache - posting.Init(ps, 0) + posting.Init(ps, 0, false) Init(ps) err = schema.ParseBytes([]byte("revc: [uid] @reverse @count ."), 1) require.NoError(t, err) diff --git a/worker/sort_test.go b/worker/sort_test.go index 73f3305a581..29564df6b10 100644 --- a/worker/sort_test.go +++ b/worker/sort_test.go @@ -75,7 +75,7 @@ func TestSingleUid(t *testing.T) { ps, err := badger.OpenManaged(opt) x.Check(err) pstore = ps - posting.Init(ps, 0) + posting.Init(ps, 0, false) Init(ps) err = schema.ParseBytes([]byte("singleUidTest: string @index(exact) @unique ."), 1) require.NoError(t, err) @@ -170,7 +170,7 @@ func TestLangExact(t *testing.T) { x.Check(err) pstore = ps // Not using posting list cache - posting.Init(ps, 0) + posting.Init(ps, 0, false) Init(ps) err = schema.ParseBytes([]byte("testLang: string @index(term) @lang ."), 1) require.NoError(t, err) @@ -218,7 +218,7 @@ func TestLangExact(t *testing.T) { const letterBytes = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ" -func RandStringBytes(n int) string { +func randStringBytes(n int) string { b := make([]byte, n) for i := range b { b[i] = letterBytes[rand.Intn(len(letterBytes))] @@ -236,7 +236,7 @@ func BenchmarkAddMutationWithIndex(b *testing.B) { x.Check(err) pstore = ps // Not using posting list cache - posting.Init(ps, 0) + posting.Init(ps, 0, false) Init(ps) err = schema.ParseBytes([]byte("benchmarkadd: string @index(term) ."), 1) fmt.Println(err) @@ -250,7 +250,7 @@ func BenchmarkAddMutationWithIndex(b *testing.B) { n := uint64(1000) values := make([]string, 0) for range n { - values = append(values, RandStringBytes(5)) + values = append(values, randStringBytes(5)) } for i := 0; i < b.N; i++ {