@@ -306,24 +306,28 @@ func TestMockKV_Watch(t *testing.T) {
306
306
// returned wait group.
307
307
setupWatchTest := func (key string , prefix bool ) (* mockKV , context.CancelFunc , chan * clientv3.Event , * sync.WaitGroup ) {
308
308
kv := newMockKV ()
309
- // Use a condition to make sure the goroutine has started using the watch before
309
+ // Use a WaitGroup to make sure the goroutine has started using the watch before
310
310
// we do anything to the mockKV that would emit an event the watcher is expecting
311
- cond := sync .NewCond (& sync.Mutex {})
312
- wg := sync.WaitGroup {}
311
+ started := sync.WaitGroup {}
312
+ // Use another WaitGroup so that callers can tell when the channel returned by watch
313
+ // method is closed and the watch is complete.
314
+ complete := sync.WaitGroup {}
315
+
313
316
ch := make (chan * clientv3.Event )
314
317
ctx , cancel := context .WithCancel (context .Background ())
315
318
316
- wg .Add (1 )
319
+ started .Add (1 )
320
+ complete .Add (1 )
317
321
go func () {
318
- defer wg .Done ()
322
+ defer complete .Done ()
319
323
320
324
var ops []clientv3.OpOption
321
325
if prefix {
322
326
ops = []clientv3.OpOption {clientv3 .WithPrefix ()}
323
327
}
324
328
325
329
watch := kv .Watch (ctx , key , ops ... )
326
- cond . Broadcast ()
330
+ started . Done ()
327
331
328
332
for e := range watch {
329
333
if len (e .Events ) > 0 {
@@ -332,33 +336,29 @@ func TestMockKV_Watch(t *testing.T) {
332
336
}
333
337
}()
334
338
335
- // Wait for the watcher goroutine to start actually watching
336
- cond .L .Lock ()
337
- cond .Wait ()
338
- cond .L .Unlock ()
339
-
340
- return kv , cancel , ch , & wg
339
+ started .Wait ()
340
+ return kv , cancel , ch , & complete
341
341
}
342
342
343
- t .Run ("watch stopped by context" , func (t * testing.T ) {
343
+ t .Run ("watch stopped by context" , func (* testing.T ) {
344
344
// Ensure we can use the cancel method of the context given to the watch
345
345
// to stop the watch
346
- _ , cancel , _ , wg := setupWatchTest ("/bar" , false )
346
+ _ , cancel , _ , complete := setupWatchTest ("/bar" , false )
347
347
cancel ()
348
- wg .Wait ()
348
+ complete .Wait ()
349
349
})
350
350
351
- t .Run ("watch stopped by close" , func (t * testing.T ) {
351
+ t .Run ("watch stopped by close" , func (* testing.T ) {
352
352
// Ensure we can use the Close method of the mockKV given to the watch
353
353
// to stop the watch
354
- kv , _ , _ , wg := setupWatchTest ("/bar" , false )
354
+ kv , _ , _ , complete := setupWatchTest ("/bar" , false )
355
355
_ = kv .Close ()
356
- wg .Wait ()
356
+ complete .Wait ()
357
357
})
358
358
359
359
t .Run ("watch exact key" , func (t * testing.T ) {
360
360
// watch for events with key "/bar" and send them via the channel
361
- kv , cancel , ch , wg := setupWatchTest ("/bar" , false )
361
+ kv , cancel , ch , complete := setupWatchTest ("/bar" , false )
362
362
363
363
_ , err := kv .Put (context .Background (), "/foo" , "1" )
364
364
require .NoError (t , err )
@@ -371,12 +371,12 @@ func TestMockKV_Watch(t *testing.T) {
371
371
assert .Equal (t , []byte ("/bar" ), event .Kv .Key )
372
372
373
373
cancel ()
374
- wg .Wait ()
374
+ complete .Wait ()
375
375
})
376
376
377
377
t .Run ("watch prefix match" , func (t * testing.T ) {
378
378
// watch for events with the prefix "/b" and send them via the channel
379
- kv , cancel , ch , wg := setupWatchTest ("/b" , true )
379
+ kv , cancel , ch , complete := setupWatchTest ("/b" , true )
380
380
381
381
_ , err := kv .Delete (context .Background (), "/foo" )
382
382
require .NoError (t , err )
@@ -389,6 +389,6 @@ func TestMockKV_Watch(t *testing.T) {
389
389
assert .Equal (t , []byte ("/bar" ), event .Kv .Key )
390
390
391
391
cancel ()
392
- wg .Wait ()
392
+ complete .Wait ()
393
393
})
394
394
}
0 commit comments