@@ -28,26 +28,32 @@ func Run(t T, cfg Config) Result {
2828
2929 q := makeReplicaQueue (cfg .NumReplicas )
3030
31- var wg sync.WaitGroup
3231 s := newAggStats ()
3332
34- statsCtx , stopStats := context .WithCancel (context .Background ())
35- defer stopStats ()
36- go statsLoop (t , statsCtx , cfg , s , raftEng , smEng )
33+ statsCtx , cancelCtx := context .WithCancel (context .Background ())
34+ defer cancelCtx ()
35+ var bgWG sync.WaitGroup
36+ bgWG .Add (1 )
37+ go func () {
38+ defer bgWG .Done ()
39+ statsLoop (t , statsCtx , cfg , s , raftEng , smEng )
40+ }()
3741
3842 o := writeOptions {
3943 cfg : cfg , smEng : smEng , raftEng : raftEng ,
4044 keyLen : keyLen , valueLen : valueLen , batchLen : batchLen ,
4145 }
4246 var durabilityCallbackCount atomic.Int64
4347 tStartWorkers := timeutil .Now ()
48+
49+ var workerWG sync.WaitGroup
4450 for i := 0 ; i < cfg .NumWorkers ; i ++ {
45- wg .Add (1 )
51+ workerWG .Add (1 )
4652 w := & worker {
4753 t : t , s : s , o : o , rng : rand .New (rand .NewSource (int64 (i ))),
4854 durabilityCallbackCount : & durabilityCallbackCount ,
4955 }
50- go w .run (t , q , & wg )
56+ go w .run (t , q , & workerWG )
5157 }
5258 logf (t , "started workers" )
5359
@@ -63,7 +69,9 @@ func Run(t T, cfg Config) Result {
6369 var bytesFlushed uint64
6470 var n int
6571 notifyCh := make (chan struct {}, 1 )
72+ bgWG .Add (1 )
6673 go func () {
74+ defer bgWG .Done ()
6775 for {
6876 select {
6977 case <- statsCtx .Done ():
@@ -87,8 +95,9 @@ func Run(t T, cfg Config) Result {
8795 })
8896 }
8997
90- wg .Wait ()
91- stopStats ()
98+ workerWG .Wait ()
99+ cancelCtx ()
100+ bgWG .Wait () // make sure all goroutines are stopped by time engine closes
92101 duration := timeutil .Since (tStartWorkers )
93102 logf (t , "done working" )
94103
0 commit comments