Skip to content

Commit 56b0444

Browse files
authored
Merge pull request #21 from abhishek-singla-97/singla/remove-keys
feat: add RemoveKeys #17
2 parents 48ae696 + 110178d commit 56b0444

File tree

12 files changed

+655
-23
lines changed

12 files changed

+655
-23
lines changed

group.go

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ type Group interface {
4242
Remove(context.Context, string) error
4343
UsedBytes() (int64, int64)
4444
Name() string
45+
RemoveKeys(ctx context.Context, keys ...string) error
46+
GroupStats() GroupStats
4547
}
4648

4749
// A Getter loads data for a key.
@@ -108,6 +110,11 @@ func (g *group) Name() string {
108110
return g.name
109111
}
110112

113+
// GroupStats returns the stats for this group.
114+
func (g *group) GroupStats() GroupStats {
115+
return g.Stats
116+
}
117+
111118
// UsedBytes returns the total number of bytes used by the main and hot caches
112119
func (g *group) UsedBytes() (mainCache int64, hotCache int64) {
113120
return g.mainCache.Bytes(), g.hotCache.Bytes()
@@ -443,6 +450,79 @@ func (g *group) LocalRemove(key string) {
443450
})
444451
}
445452

453+
func (g *group) RemoveKeys(ctx context.Context, keys ...string) error {
454+
if len(keys) == 0 {
455+
return nil
456+
}
457+
458+
g.Stats.RemoveKeysRequests.Add(1)
459+
g.Stats.RemovedKeys.Add(int64(len(keys)))
460+
461+
keysByOwner := make(map[peer.Client][]string)
462+
var localKeys []string
463+
464+
for _, key := range keys {
465+
owner, isRemote := g.instance.PickPeer(key)
466+
if isRemote {
467+
keysByOwner[owner] = append(keysByOwner[owner], key)
468+
} else {
469+
localKeys = append(localKeys, key)
470+
}
471+
}
472+
473+
for _, key := range localKeys {
474+
g.LocalRemove(key)
475+
}
476+
477+
multiErr := &MultiError{}
478+
errCh := make(chan error)
479+
480+
// Send removeKeys requests to owners (parallel)
481+
var wg sync.WaitGroup
482+
for owner, ownerKeys := range keysByOwner {
483+
wg.Add(1)
484+
go func(p peer.Client, k []string) {
485+
errCh <- p.RemoveKeys(ctx, &pb.RemoveKeysRequest{
486+
Group: &g.name,
487+
Keys: k,
488+
})
489+
wg.Done()
490+
}(owner, ownerKeys)
491+
}
492+
493+
allPeers := g.instance.getAllPeers()
494+
for _, p := range allPeers {
495+
if p.PeerInfo().IsSelf {
496+
continue
497+
}
498+
if _, isOwner := keysByOwner[p]; isOwner {
499+
continue
500+
}
501+
502+
wg.Add(1)
503+
go func(peer peer.Client) {
504+
errCh <- peer.RemoveKeys(ctx, &pb.RemoveKeysRequest{
505+
Group: &g.name,
506+
Keys: keys,
507+
})
508+
wg.Done()
509+
}(p)
510+
}
511+
512+
go func() {
513+
wg.Wait()
514+
close(errCh)
515+
}()
516+
517+
for err := range errCh {
518+
if err != nil {
519+
multiErr.Add(err)
520+
}
521+
}
522+
523+
return multiErr.NilOrError()
524+
}
525+
446526
func (g *group) populateCache(key string, value transport.ByteView, cache Cache) {
447527
if g.maxCacheBytes <= 0 {
448528
return
@@ -524,6 +604,8 @@ func (g *group) registerInstruments(meter otelmetric.Meter) error {
524604
o.ObserveInt64(instruments.LocalLoadsCounter(), g.Stats.LocalLoads.Get(), observeOptions...)
525605
o.ObserveInt64(instruments.LocalLoadErrsCounter(), g.Stats.LocalLoadErrs.Get(), observeOptions...)
526606
o.ObserveInt64(instruments.GetFromPeersLatencyMaxGauge(), g.Stats.GetFromPeersLatencyLower.Get(), observeOptions...)
607+
o.ObserveInt64(instruments.RemoveKeysRequestsCounter(), g.Stats.RemoveKeysRequests.Get(), observeOptions...)
608+
o.ObserveInt64(instruments.RemovedKeysCounter(), g.Stats.RemovedKeys.Get(), observeOptions...)
527609

528610
return nil
529611
},
@@ -536,6 +618,8 @@ func (g *group) registerInstruments(meter otelmetric.Meter) error {
536618
instruments.LocalLoadsCounter(),
537619
instruments.LocalLoadErrsCounter(),
538620
instruments.GetFromPeersLatencyMaxGauge(),
621+
instruments.RemoveKeysRequestsCounter(),
622+
instruments.RemovedKeysCounter(),
539623
)
540624

541625
return err

instance_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -523,11 +523,13 @@ func TestNewGroupRegistersMetricsWithMeterProvider(t *testing.T) {
523523
"groupcache.group.loads.deduped",
524524
"groupcache.group.local.loads",
525525
"groupcache.group.local.load_errors",
526+
"groupcache.group.remove_keys.requests",
527+
"groupcache.group.removed_keys",
526528
}
527529
assert.Equal(t, expectedCounters, recMeter.counterNames)
528530
assert.Equal(t, []string{"groupcache.group.peer.latency_max_ms"}, recMeter.updownNames)
529531
assert.True(t, recMeter.callbackRegistered, "expected callback registration for metrics")
530-
assert.Equal(t, 9, recMeter.instrumentCount)
532+
assert.Equal(t, 11, recMeter.instrumentCount)
531533
}
532534

533535
func TestNewGroupFailsWhenMetricRegistrationFails(t *testing.T) {

remove_keys_test.go

Lines changed: 260 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,260 @@
1+
/*
2+
Copyright 2024 Groupcache Authors
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package groupcache_test
18+
19+
import (
20+
"context"
21+
"fmt"
22+
"testing"
23+
"time"
24+
25+
"github.com/groupcache/groupcache-go/v3"
26+
"github.com/groupcache/groupcache-go/v3/cluster"
27+
"github.com/groupcache/groupcache-go/v3/transport"
28+
"github.com/stretchr/testify/assert"
29+
"github.com/stretchr/testify/require"
30+
)
31+
32+
func TestRemoveKeys(t *testing.T) {
33+
ctx := context.Background()
34+
35+
err := cluster.Start(ctx, 3, groupcache.Options{})
36+
require.NoError(t, err)
37+
defer func() { _ = cluster.Shutdown(ctx) }()
38+
39+
callCount := make(map[string]int)
40+
getter := groupcache.GetterFunc(func(ctx context.Context, key string, dest transport.Sink) error {
41+
callCount[key]++
42+
return dest.SetString(fmt.Sprintf("value-%s", key), time.Now().Add(time.Minute*5))
43+
})
44+
45+
// Register the group on ALL daemons (required for broadcast)
46+
group, err := cluster.DaemonAt(0).NewGroup("test-remove-keys", 3000000, getter)
47+
require.NoError(t, err)
48+
for i := 1; i < 3; i++ {
49+
_, err := cluster.DaemonAt(i).NewGroup("test-remove-keys", 3000000, getter)
50+
require.NoError(t, err)
51+
}
52+
53+
keys := []string{"key1", "key2", "key3"}
54+
55+
// First, populate the cache by getting each key
56+
for _, key := range keys {
57+
var value string
58+
err := group.Get(ctx, key, transport.StringSink(&value))
59+
require.NoError(t, err)
60+
assert.Equal(t, fmt.Sprintf("value-%s", key), value)
61+
}
62+
63+
// Verify getter was called for each key
64+
for _, key := range keys {
65+
assert.Equal(t, 1, callCount[key], "getter should be called once for %s", key)
66+
}
67+
68+
// Now remove all keys using variadic signature
69+
err = group.RemoveKeys(ctx, "key1", "key2", "key3")
70+
require.NoError(t, err)
71+
72+
// Fetch again - getter should be called again since keys were removed
73+
for _, key := range keys {
74+
var value string
75+
err := group.Get(ctx, key, transport.StringSink(&value))
76+
require.NoError(t, err)
77+
}
78+
79+
// Verify getter was called again for each key
80+
for _, key := range keys {
81+
assert.Equal(t, 2, callCount[key], "getter should be called twice for %s after removal", key)
82+
}
83+
}
84+
85+
func TestRemoveKeysEmpty(t *testing.T) {
86+
ctx := context.Background()
87+
88+
err := cluster.Start(ctx, 2, groupcache.Options{})
89+
require.NoError(t, err)
90+
defer func() { _ = cluster.Shutdown(ctx) }()
91+
92+
getter := groupcache.GetterFunc(func(ctx context.Context, key string, dest transport.Sink) error {
93+
return dest.SetString("value", time.Now().Add(time.Minute))
94+
})
95+
96+
// Register the group on ALL daemons
97+
group, err := cluster.DaemonAt(0).NewGroup("test-remove-empty", 3000000, getter)
98+
require.NoError(t, err)
99+
_, err = cluster.DaemonAt(1).NewGroup("test-remove-empty", 3000000, getter)
100+
require.NoError(t, err)
101+
102+
// Test RemoveKeys with no keys - should not error
103+
err = group.RemoveKeys(ctx)
104+
require.NoError(t, err)
105+
}
106+
107+
func TestRemoveKeysWithSlice(t *testing.T) {
108+
ctx := context.Background()
109+
110+
err := cluster.Start(ctx, 2, groupcache.Options{})
111+
require.NoError(t, err)
112+
defer func() { _ = cluster.Shutdown(ctx) }()
113+
114+
getter := groupcache.GetterFunc(func(ctx context.Context, key string, dest transport.Sink) error {
115+
return dest.SetString(fmt.Sprintf("value-%s", key), time.Now().Add(time.Minute*5))
116+
})
117+
118+
// Register the group on ALL daemons
119+
group, err := cluster.DaemonAt(0).NewGroup("test-remove-slice", 3000000, getter)
120+
require.NoError(t, err)
121+
_, err = cluster.DaemonAt(1).NewGroup("test-remove-slice", 3000000, getter)
122+
require.NoError(t, err)
123+
124+
keys := []string{"key1", "key2", "key3"}
125+
126+
// Populate cache
127+
for _, key := range keys {
128+
var value string
129+
err := group.Get(ctx, key, transport.StringSink(&value))
130+
require.NoError(t, err)
131+
}
132+
133+
// Test RemoveKeys with slice expansion
134+
err = group.RemoveKeys(ctx, keys...)
135+
require.NoError(t, err)
136+
}
137+
138+
func TestRemoveKeysStats(t *testing.T) {
139+
ctx := context.Background()
140+
141+
err := cluster.Start(ctx, 2, groupcache.Options{})
142+
require.NoError(t, err)
143+
defer func() { _ = cluster.Shutdown(ctx) }()
144+
145+
getter := groupcache.GetterFunc(func(ctx context.Context, key string, dest transport.Sink) error {
146+
return dest.SetString(fmt.Sprintf("value-%s", key), time.Now().Add(time.Minute*5))
147+
})
148+
149+
// Register the group on ALL daemons
150+
transportGroup, err := cluster.DaemonAt(0).NewGroup("test-remove-stats", 3000000, getter)
151+
require.NoError(t, err)
152+
_, err = cluster.DaemonAt(1).NewGroup("test-remove-stats", 3000000, getter)
153+
require.NoError(t, err)
154+
155+
// Cast to groupcache.Group to access GroupStats()
156+
group, ok := transportGroup.(groupcache.Group)
157+
require.True(t, ok, "expected transportGroup to implement groupcache.Group")
158+
159+
// Capture stats before RemoveKeys
160+
statsBefore := group.GroupStats()
161+
removeKeysRequestsBefore := statsBefore.RemoveKeysRequests.Get()
162+
removedKeysBefore := statsBefore.RemovedKeys.Get()
163+
164+
err = group.RemoveKeys(ctx, "key1", "key2", "key3")
165+
require.NoError(t, err)
166+
167+
// Verify stats were incremented correctly
168+
statsAfter := group.GroupStats()
169+
assert.Equal(t, removeKeysRequestsBefore+1, statsAfter.RemoveKeysRequests.Get(), "RemoveKeysRequests should be incremented by 1")
170+
assert.Equal(t, removedKeysBefore+3, statsAfter.RemovedKeys.Get(), "RemovedKeys should be incremented by 3")
171+
}
172+
173+
func BenchmarkRemoveKeys(b *testing.B) {
174+
ctx := context.Background()
175+
176+
err := cluster.Start(ctx, 3, groupcache.Options{})
177+
if err != nil {
178+
b.Fatal(err)
179+
}
180+
defer func() { _ = cluster.Shutdown(ctx) }()
181+
182+
getter := groupcache.GetterFunc(func(ctx context.Context, key string, dest transport.Sink) error {
183+
return dest.SetString(fmt.Sprintf("value-%s", key), time.Now().Add(time.Minute*5))
184+
})
185+
186+
// Register the group on ALL daemons
187+
group, err := cluster.DaemonAt(0).NewGroup("bench-remove", 3000000, getter)
188+
if err != nil {
189+
b.Fatal(err)
190+
}
191+
for i := 1; i < 3; i++ {
192+
_, err := cluster.DaemonAt(i).NewGroup("bench-remove", 3000000, getter)
193+
if err != nil {
194+
b.Fatal(err)
195+
}
196+
}
197+
198+
// Prepare keys
199+
keys := make([]string, 100)
200+
for i := 0; i < 100; i++ {
201+
keys[i] = fmt.Sprintf("key-%d", i)
202+
}
203+
204+
// Populate cache first
205+
for _, key := range keys {
206+
var value string
207+
_ = group.Get(ctx, key, transport.StringSink(&value))
208+
}
209+
210+
b.ResetTimer()
211+
for i := 0; i < b.N; i++ {
212+
_ = group.RemoveKeys(ctx, keys...)
213+
}
214+
}
215+
216+
func BenchmarkRemoveKeysVsLoop(b *testing.B) {
217+
ctx := context.Background()
218+
219+
err := cluster.Start(ctx, 3, groupcache.Options{})
220+
if err != nil {
221+
b.Fatal(err)
222+
}
223+
defer func() { _ = cluster.Shutdown(ctx) }()
224+
225+
getter := groupcache.GetterFunc(func(ctx context.Context, key string, dest transport.Sink) error {
226+
return dest.SetString(fmt.Sprintf("value-%s", key), time.Now().Add(time.Minute*5))
227+
})
228+
229+
// Register the group on ALL daemons
230+
group, err := cluster.DaemonAt(0).NewGroup("bench-compare", 3000000, getter)
231+
if err != nil {
232+
b.Fatal(err)
233+
}
234+
for i := 1; i < 3; i++ {
235+
_, err := cluster.DaemonAt(i).NewGroup("bench-compare", 3000000, getter)
236+
if err != nil {
237+
b.Fatal(err)
238+
}
239+
}
240+
241+
// Prepare keys
242+
keys := make([]string, 50)
243+
for i := 0; i < 50; i++ {
244+
keys[i] = fmt.Sprintf("key-%d", i)
245+
}
246+
247+
b.Run("RemoveKeys", func(b *testing.B) {
248+
for i := 0; i < b.N; i++ {
249+
_ = group.RemoveKeys(ctx, keys...)
250+
}
251+
})
252+
253+
b.Run("LoopRemove", func(b *testing.B) {
254+
for i := 0; i < b.N; i++ {
255+
for _, key := range keys {
256+
_ = group.Remove(ctx, key)
257+
}
258+
}
259+
})
260+
}

0 commit comments

Comments
 (0)