-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathtask.go
147 lines (126 loc) · 3.49 KB
/
task.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
package timer
import (
"fmt"
"os"
"sync"
"sync/atomic"
"time"
)
// Job job interface
type Job interface {
Run()
}
// JobFunc job function
type JobFunc func()
// Run implement job interface
func (f JobFunc) Run() { f() }
var emptyJob = JobFunc(func() {})
var _ TaskContainer = (*Task)(nil)
var _ Job = (*Task)(nil)
// Task timer task.
type Task struct {
delay atomic.Int64 // delay duration
job Job // the job of future execution
rw sync.RWMutex // protects following fields.
taskEntry *taskEntry // the taskEntry to which the task belongs.
}
// NewTask new task with delay duration and an empty job, the accuracy is milliseconds.
func NewTask(d time.Duration) *Task {
t := &Task{job: emptyJob}
t.delay.Store(int64(d))
return t
}
// NewTaskFunc new task with delay duration and a function job, the accuracy is milliseconds.
func NewTaskFunc(d time.Duration, f func()) *Task {
return NewTask(d).WithJobFunc(f)
}
// NewTaskJob new task with delay duration and a job, the accuracy is milliseconds.
func NewTaskJob(d time.Duration, job Job) *Task {
return NewTask(d).WithJob(job)
}
// WithJobFunc with a function job
func (t *Task) WithJobFunc(f func()) *Task {
t.job = JobFunc(f)
return t
}
// WithJob with a job
func (t *Task) WithJob(j Job) *Task {
t.job = j
return t
}
// DerefTask implements TaskContainer.
func (t *Task) DerefTask() *Task { return t }
// Run immediate call job. implement Job interface.
func (t *Task) Run() {
defer func() {
if err := recover(); err != nil {
fmt.Fprintf(os.Stderr, "timer: Recovered from panic: %v\n", err)
}
}()
t.job.Run()
}
// Cancel the task.
func (t *Task) Cancel() {
t.rw.Lock()
defer t.rw.Unlock()
if t.taskEntry != nil {
t.taskEntry.remove()
t.taskEntry = nil
}
}
// Delay return the delay duration.
func (t *Task) Delay() time.Duration {
return time.Duration(t.delay.Load())
}
// SetDelay set a new delay duration, the accuracy is milliseconds.
// NOTE: Only effect when re-add to `Timer`, It has no effect on the task being running!
func (t *Task) SetDelay(d time.Duration) *Task {
t.delay.Store(int64(d))
return t
}
// Activated return true if the task is activated.
func (t *Task) Activated() bool {
t.rw.RLock()
defer t.rw.RUnlock()
// why need check task entry?
// when cancel, we will set t.taskEntry to nil,
// but if the task is expired, only remove the task entry from the spoke.
// so we should check the task entry..
return t.taskEntry != nil && t.taskEntry.activated()
}
// Expiry return the milliseconds as a Unix time when the task will be expired.
// the number of milliseconds elapsed since January 1, 1970 UTC.
// the value -1 indicate the task not activated.
func (t *Task) Expiry() int64 {
t.rw.RLock()
defer t.rw.RUnlock()
if t.taskEntry != nil && t.taskEntry.activated() {
return t.taskEntry.ExpirationMs()
}
return -1
}
// ExpiryAt return the local time when the task will be expired.
// the zero time indicate the task not activated.
func (t *Task) ExpiryAt() time.Time {
if ms := t.Expiry(); ms < 0 {
return time.Time{}
} else {
return time.UnixMilli(ms)
}
}
// setBelongTo set the task belongs to the task entry.
func (t *Task) setBelongTo(te *taskEntry) {
t.rw.Lock()
defer t.rw.Unlock()
// if this task already belong to an existing task entry,
// we should remove such an entry first.
if t.taskEntry != nil && t.taskEntry != te {
t.taskEntry.remove()
}
t.taskEntry = te
}
func (t *Task) isBelongTo(te *taskEntry) bool {
t.rw.RLock()
defer t.rw.RUnlock()
return t.taskEntry == te
}