-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathtimer.go
217 lines (187 loc) · 5.24 KB
/
timer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
package timer
import (
"errors"
"sync"
"sync/atomic"
"time"
"github.com/thinkgos/timer/delayqueue"
)
const (
// DefaultTickMs default tick milliseconds.
DefaultTickMs = 1
// DefaultWheelSize default wheel size.
DefaultWheelSize = 128
)
// ErrClosed is returned when the timer is closed.
var ErrClosed = errors.New("timer: use of closed timer")
// goroutinePool is a reusable go pool.
var goroutinePool = goroutine{}
// GoPool goroutine pool.
type GoPool interface {
Go(f func())
}
// TaskContainer a container hold task
type TaskContainer interface {
DerefTask() *Task
}
type goroutine struct{}
// Go implements GoPool interface.
func (goroutine) Go(f func()) {
go f()
}
// Option `Timer` custom options.
type Option func(*Timer)
// WithTickMs set basic time tick milliseconds.
func WithTickMs(tickMs int64) Option {
return func(t *Timer) {
t.tickMs = tickMs
}
}
// WithWheelSize set wheel size.
func WithWheelSize(size int) Option {
return func(t *Timer) {
t.wheelSize = NextPowOf2(size)
t.wheelMask = t.wheelSize - 1
}
}
// WithGoPool set goroutine pool.
func WithGoPool(p GoPool) Option {
return func(t *Timer) {
t.goPool = p
}
}
// Timer is a timer
type Timer struct {
tickMs int64 // basic time span, unit is milliseconds.
wheelSize int // wheel size, the power of 2
wheelMask int // wheel mask
taskCounter atomic.Int64 // task total count
delayQueue *delayqueue.DelayQueue[*Spoke] // delay queue, the priority queue use spoke's expiration time as `cmp`.
goPool GoPool // goroutine pool
waitGroup sync.WaitGroup // ensure the goroutine has finished.
rw sync.RWMutex // protects following fields.
wheel *TimingWheel // timing wheel, concurrent add task(read-lock) and advance clock only one(write-lock).
quit chan struct{} // of chan struct{}, created when first start.
closed bool // true if closed.
}
// NewTimer new timer instance. default tick is 1 milliseconds, wheel size is 512.
func NewTimer(opts ...Option) *Timer {
t := &Timer{
tickMs: DefaultTickMs,
wheelSize: DefaultWheelSize,
wheelMask: DefaultWheelSize - 1,
taskCounter: atomic.Int64{},
delayQueue: delayqueue.NewDelayQueue(CompareSpoke),
goPool: goroutinePool,
quit: nil,
closed: true,
}
for _, opt := range opts {
opt(t)
}
if t.tickMs <= 0 {
panic("timer: tick must be greater than or equal to 1ms")
}
if t.wheelSize <= 0 {
panic("timer: wheel size must be greater than 0")
}
if t.goPool == nil {
t.goPool = goroutinePool
}
t.wheel = newTimingWheel(t, t.tickMs, time.Now().UnixMilli())
return t
}
// TickMs return basic time tick milliseconds.
func (t *Timer) TickMs() int64 { return t.tickMs }
// WheelSize return the wheel size.
func (t *Timer) WheelSize() int { return t.wheelSize }
// WheelMask return the wheel mask.
func (t *Timer) WheelMask() int { return t.wheelMask }
// TaskCounter return the total number of tasks.
func (t *Timer) TaskCounter() int64 { return t.taskCounter.Load() }
// AfterFunc adds a function to the timer.
func (t *Timer) AfterFunc(d time.Duration, f func()) (*Task, error) {
task := NewTask(d).WithJobFunc(f)
err := t.AddTask(task)
if err != nil {
return nil, err
}
return task, nil
}
// AddTask adds a task to the timer.
func (t *Timer) AddTask(task *Task) error {
t.rw.RLock()
defer t.rw.RUnlock()
if t.closed {
return ErrClosed
}
t.addTaskEntry(newTaskEntry(task))
return nil
}
// AddDerefTask adds a task from TaskContainer to the timer.
func (t *Timer) AddDerefTask(tc TaskContainer) error {
return t.AddTask(tc.DerefTask())
}
// Started have started or not.
func (t *Timer) Started() bool {
t.rw.RLock()
defer t.rw.RUnlock()
return !t.closed
}
// Start the timer.
func (t *Timer) Start() {
t.rw.Lock()
defer t.rw.Unlock()
if t.closed {
t.closed = false
t.quit = make(chan struct{})
t.waitGroup.Add(1)
go func() {
defer t.waitGroup.Done()
for {
spoke, exit := t.delayQueue.Take(t.quit)
if exit {
break
}
for exist := true; exist; spoke, exist = t.delayQueue.Poll() {
t.advanceWheelClock(spoke.GetExpiration())
t.flushSpoke(spoke)
}
}
}()
}
}
// Stop the timer, graceful shutdown waiting the goroutine until it's stopped.
func (t *Timer) Stop() {
t.rw.Lock()
defer t.rw.Unlock()
if !t.closed {
close(t.quit)
t.waitGroup.Wait() // Ensure the goroutine has finished
t.closed = true
}
}
func (t *Timer) advanceWheelClock(expiration int64) {
t.rw.Lock()
defer t.rw.Unlock()
t.wheel.advanceClock(expiration)
}
func (t *Timer) flushSpoke(spoke *Spoke) {
t.rw.RLock()
defer t.rw.RUnlock()
spoke.Flush(t.reinsert)
}
func (t *Timer) addSpokeToDelayQueue(spoke *Spoke) {
t.delayQueue.Add(spoke)
}
func (t *Timer) addTaskEntry(te *taskEntry) {
// if success, we do not need deal the task entry, because it has be added to the timing wheel.
// if cancelled cancelled, we ignore the task entry.
// if already expired, we run the task job.
if t.wheel.add(te) == Result_AlreadyExpired {
t.goPool.Go(te.task.Run)
}
}
func (t *Timer) reinsert(te *taskEntry) {
t.addTaskEntry(te)
}