diff --git a/stats/view/worker.go b/stats/view/worker.go index 9255d27d2..6fef75115 100644 --- a/stats/view/worker.go +++ b/stats/view/worker.go @@ -40,9 +40,9 @@ type worker struct { views map[string]*viewInternal startTimes map[*viewInternal]time.Time - timer *time.Ticker - c chan command - quit, done chan bool + timer *time.Ticker + c chan command + quitCh, doneCh, flushCh chan struct{} } var defaultWorker *worker @@ -107,6 +107,15 @@ func RetrieveData(viewName string) ([]*Row, error) { return resp.rows, resp.err } +// Flush force aggregates all buffered in-memory data. +// Otherwise, aggregation is going to happen at each period +// set by SetReportingPeriod. +// +// Flush is useful before program termination to avoid data loss. +func Flush() { + defaultWorker.flushAll() +} + func record(tags *tag.Map, ms interface{}) { req := &recordReq{ tm: tags, @@ -140,8 +149,9 @@ func newWorker() *worker { startTimes: make(map[*viewInternal]time.Time), timer: time.NewTicker(defaultReportingDuration), c: make(chan command, 1024), - quit: make(chan bool), - done: make(chan bool), + quitCh: make(chan struct{}), + doneCh: make(chan struct{}), + flushCh: make(chan struct{}), } } @@ -152,18 +162,26 @@ func (w *worker) start() { cmd.handleCommand(w) case <-w.timer.C: w.reportUsage(time.Now()) - case <-w.quit: + case <-w.flushCh: + w.reportUsage(time.Now()) + w.doneCh <- struct{}{} + case <-w.quitCh: w.timer.Stop() close(w.c) - w.done <- true + w.doneCh <- struct{}{} return } } } +func (w *worker) flushAll() { + w.flushCh <- struct{}{} + <-w.doneCh +} + func (w *worker) stop() { - w.quit <- true - <-w.done + w.quitCh <- struct{}{} + <-w.doneCh } func (w *worker) getMeasureRef(name string) *measureRef { diff --git a/stats/view/worker_test.go b/stats/view/worker_test.go index d43014648..4b136a9ac 100644 --- a/stats/view/worker_test.go +++ b/stats/view/worker_test.go @@ -397,6 +397,36 @@ func TestUnregisterReportsUsage(t *testing.T) { } } +func TestFlush(t *testing.T) { + restart() + + ctx := context.Background() + + m := stats.Int64("measure", "desc", "unit") + SetReportingPeriod(time.Hour) + + e := &vdExporter{} + RegisterExporter(e) + + if err := Register(&View{Name: "count", Measure: m, Aggregation: Count()}); err != nil { + t.Fatal(err) + } + + stats.Record(ctx, m.M(1)) + stats.Record(ctx, m.M(1)) + stats.Record(ctx, m.M(1)) + + Flush() + + e.Lock() + got := len(e.vds) + e.Unlock() + + if got == 0 { + t.Errorf("got %v aggregations; want at least one", got) + } +} + type countExporter struct { sync.Mutex count int64