-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathsoftcache.go
290 lines (240 loc) · 8.91 KB
/
softcache.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
package softcache
import (
"errors"
"strings"
"time"
"github.com/TritonHo/simplelock"
"github.com/go-redis/redis"
cache "github.com/patrickmn/go-cache"
)
//the single thread lock duration
//this setting applies to manager internal locking only.
const lockDuration = 1 * time.Minute
var ErrWaitTooLong = errors.New(`The transaction is not proceeded due to long waiting.`)
type CacheManager struct {
// the prefix for the lock name in item lock
// i.e. the lockname would be lockNamePrefix + `-` + cacheId
lockNamePrefix string
// the lock name for updating the zset in redis,
// the lock is a casual mechanism to avoid multiple CacheManager update he zset in the same time
zsetLockName string
// the name of redis sorted sort, to store the cacheId of cache that need to be refreshed
zsetName string
// the channel to communicate between different thread in the library
taskChannel chan string
// to avoid overwhelming the redis, registration of same redis-cache will be grouped locally
recentRegistation *cache.Cache
// functions to get the data from database,
// it is function writer responsibility to marshal the dataset to string
resultSetTypes map[string]ResultSetType
//the lockManager to enforce global locking between multiple CacheManager
lockManger *simplelock.LockManager
//the redis connection pool object
redisClient *redis.Client
}
func New(lockNamePrefix, zsetLockName, zsetName string, lockManager *simplelock.LockManager, redisConn *redis.Client) *CacheManager {
cm := &CacheManager{
lockNamePrefix: lockNamePrefix,
zsetLockName: zsetLockName,
zsetName: zsetName,
taskChannel: make(chan string),
recentRegistation: cache.New(lockDuration, lockDuration),
resultSetTypes: map[string]ResultSetType{},
lockManger: simplelock.New(redisConn),
redisClient: redisConn,
}
go cm.cacheRebuilder()
go cm.taskPicker()
return cm
}
func (cm *CacheManager) AddResultSetType(funcName string, rsType ResultSetType) error {
if _, ok := cm.resultSetTypes[funcName]; ok {
errors.New(`funcName has already existed.`)
}
cm.resultSetTypes[funcName] = rsType
return nil
}
func getCacheId(funcName, context string) string {
return funcName + `-` + context
}
func getFuncNameAndContext(cacheId string) (funcName string, context string) {
temp := strings.SplitN(cacheId, `-`, 2)
return temp[0], temp[1]
}
//if isHardRefresh is true, then the cache will be immediately deleted from the cache system
func (cm *CacheManager) Refresh(funcName, context string, isHardRefresh bool) error {
rsType, ok := cm.resultSetTypes[funcName]
if !ok {
//not registered function, simply return and do nothing
return errors.New(`funcName is not registered.`)
}
cacheId := getCacheId(funcName, context)
if isHardRefresh {
//step 1a: delete the cache directly
if err := cm.redisClient.Del(cacheId).Err(); err != nil {
return err
}
} else {
//step 1b: shorten the cache TTL
//otherwise the cache rebuilder may consider the cache is still fresh and refuse to rebuild it
if ttl := rsType.HardTtl - rsType.SoftTtl - 10*time.Second; ttl > 0 {
// elapsedTime > rs.SoftTtl
// elapsedTime + ttl = rs.HardTtl
// i.e. ttl < rs.HardTtl - rs.SoftTtl
if err := cm.redisClient.Expire(cacheId, ttl).Err(); err != nil {
return err
}
}
}
//step 2: get a lock, to protect the ZSet
//no matter if the lock can be acquired, perform update.
//as the race condition doesn't make disastrous result
if isSuccessful, lockToken := cm.lockManger.GetLock(cm.zsetLockName, lockDuration, lockDuration); isSuccessful {
defer cm.lockManger.ReleaseLock(cm.zsetLockName, lockToken)
}
//step 3: add the cacheId into redis
return cm.redisClient.ZAdd(cm.zsetName, redis.Z{Score: float64(time.Now().Unix()), Member: cacheId}).Err()
}
func (cm *CacheManager) GetData(funcName, context string) (string, error) {
rsType, ok := cm.resultSetTypes[funcName]
if !ok {
//not registered function, simply return and do nothing
return ``, errors.New(`funcName is not registered.`)
}
cacheId := getCacheId(funcName, context)
//do the 1st get cache
if s, err := cm.redisClient.Get(cacheId).Result(); err == nil && s != `` {
//We only need to register the Get event when there is cache-hit
go cm.register(cacheId, rsType)
return s, nil
} else {
if err != redis.Nil {
return ``, err
}
}
//cache miss, then try to get the lock
itemLock := cm.lockNamePrefix + `-` + cacheId
hasLock, lockToken := cm.lockManger.GetLock(itemLock, rsType.MaxExec, rsType.MaxWait)
if !hasLock {
return ``, ErrWaitTooLong
}
defer cm.lockManger.ReleaseLock(itemLock, lockToken)
//do the 2st get cache
// There maybe another thread query the database and put the data into the redis
// In order to avoid wasteful database query, we have to double check if the cache is not exists in redis.
// Reminder: the database query is MUCH more expensive than the redis GET operation
if s, err := cm.redisClient.Get(cacheId).Result(); err == nil && s != `` {
//when there is cache-hit in the second Redis-Get, the cache should be just prepared by another thread.
//thus no need to perform register
return s, nil
} else {
if err != redis.Nil {
return ``, err
}
}
//no cache found, and thus load data from database
s, err := rsType.Worker.Query(context)
if err == nil {
cm.redisClient.Set(cacheId, s, rsType.HardTtl)
}
return s, err
}
func (cm *CacheManager) register(cacheId string, rsType ResultSetType) {
if _, found := cm.recentRegistation.Get(cacheId); found {
//some thread in the same machine has already registered the event,
//thus no need to do anything
return
}
//get back the ttl from redis
//remarks: in rare case, d may be = -2 second as the cache has expired, but it will not break the code and thus no need to take care
d, err0 := cm.redisClient.TTL(cacheId).Result()
if err0 != nil {
panic(err0)
}
// elapsedTime + d = rs.HardTtl
// thus, elapsedTime = rs.HardTtl - d
//and we want to trigger the data query after SoftTtl passed:
// elapsedTime + t = rs.SoftTtl
// t = rs.SoftTtl - (rs.HardTtl - d)
// and t is the time we have to schedule for
t := rsType.SoftTtl - (rsType.HardTtl - d)
//add 5 second, to avoid time inconsistancy between servers
//when the refresh worker pick up this event, it must passed the SoftTtl
t = t + (5 * time.Second)
score := time.Now().Add(t).Unix()
//add the cacheId into localSet
if t > 0 {
cm.recentRegistation.Add(cacheId, "DONOTCARE", t)
}
//get a lock, to protect the ZSet
//no matter if the lock can be acquired, perform update.
//as the race condition doesn't make disastrous result
if isSuccessful, lockToken := cm.lockManger.GetLock(cm.zsetLockName, lockDuration, lockDuration); isSuccessful {
defer cm.lockManger.ReleaseLock(cm.zsetLockName, lockToken)
}
//add the cacheId into redis
cm.redisClient.ZAdd(cm.zsetName, redis.Z{Score: float64(score), Member: cacheId})
}
func (cm *CacheManager) pick() {
//get a lock, to avoid multiple sync thread to sync the data
if hasLock, lockToken := cm.lockManger.GetLock(cm.zsetLockName, lockDuration, 0); !hasLock {
//cannot get the lock, skip current iteration
return
} else {
defer cm.lockManger.ReleaseLock(cm.zsetLockName, lockToken)
}
score := float64(time.Now().Unix())
candidates, err1 := cm.redisClient.ZRangeWithScores(cm.zsetName, 0, 10).Result()
if err1 != nil {
panic(err1)
}
for _, c := range candidates {
if c.Score > score {
break
}
//remove the current record from the set
cacheId := c.Member.(string)
cm.redisClient.ZRem(cm.zsetName, cacheId)
//and then pass to the worker
cm.taskChannel <- cacheId
}
}
func (cm *CacheManager) taskPicker() {
for {
if len(cm.taskChannel) == 0 {
//only pick task if there is no working one
cm.pick()
}
//sleep randomly 1 second, to avoid overloading the redis server
time.Sleep(1 * time.Second)
}
}
func (cm *CacheManager) cacheRebuilder() {
for {
cacheId := <-cm.taskChannel
funcName, context := getFuncNameAndContext(cacheId)
rsType, ok := cm.resultSetTypes[funcName]
if !ok {
// there is a cacheId we don't understand, there is two condition:
// 1. the current running manager is the old version, the cacheId comes from some new version worker
// 2. the cacheId is already deplicated
// In above both case, simply ignore the cacheId is okay
continue
}
//get the TTL from redis. if the freshness is still within the SoftTtl, then do nothing
//remarks: in rare case, d may be = -2 second as the cache has expired, but it will not break the code and thus no need to take care
ttl, err0 := cm.redisClient.TTL(cacheId).Result()
if err0 != nil {
panic(err0)
}
if rsType.IsFresh(ttl) {
//concurrency issue
//we doesn't enforce strict locking for the cache refresh, thus likely other machine refreshed the cache.
//we can simply ignore this event and then do nothing.
continue
}
if s, err := rsType.Worker.Query(context); err == nil {
cm.redisClient.Set(cacheId, s, rsType.HardTtl)
}
}
}