forked from rosedblabs/rosedb
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdb_zset.go
349 lines (273 loc) · 9.29 KB
/
db_zset.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
package rosedb
import (
"github.com/roseduan/rosedb/ds/zset"
"github.com/roseduan/rosedb/storage"
"github.com/roseduan/rosedb/utils"
"math"
"sync"
"time"
)
// ZsetIdx the zset idx.
type ZsetIdx struct {
mu *sync.RWMutex
indexes *zset.SortedSet
}
// create a new zset index.
func newZsetIdx() *ZsetIdx {
return &ZsetIdx{indexes: zset.New(), mu: new(sync.RWMutex)}
}
// ZAdd adds the specified member with the specified score to the sorted set stored at key.
func (db *RoseDB) ZAdd(key []byte, score float64, member []byte) error {
if err := db.checkKeyValue(key, member); err != nil {
return err
}
// if the score corresponding to the key and member already exist, nothing will be done.
if oldScore := db.ZScore(key, member); oldScore == score {
return nil
}
db.zsetIndex.mu.Lock()
defer db.zsetIndex.mu.Unlock()
extra := []byte(utils.Float64ToStr(score))
e := storage.NewEntry(key, member, extra, ZSet, ZSetZAdd)
if err := db.store(e); err != nil {
return err
}
db.zsetIndex.indexes.ZAdd(string(key), score, string(member))
return nil
}
// ZScore returns the score of member in the sorted set at key.
func (db *RoseDB) ZScore(key, member []byte) float64 {
db.zsetIndex.mu.RLock()
defer db.zsetIndex.mu.RUnlock()
if db.checkExpired(key, ZSet) {
return math.MinInt64
}
return db.zsetIndex.indexes.ZScore(string(key), string(member))
}
// ZCard returns the sorted set cardinality (number of elements) of the sorted set stored at key.
func (db *RoseDB) ZCard(key []byte) int {
db.zsetIndex.mu.RLock()
defer db.zsetIndex.mu.RUnlock()
if db.checkExpired(key, ZSet) {
return 0
}
return db.zsetIndex.indexes.ZCard(string(key))
}
// ZRank returns the rank of member in the sorted set stored at key, with the scores ordered from low to high.
// The rank (or index) is 0-based, which means that the member with the lowest score has rank 0.
func (db *RoseDB) ZRank(key, member []byte) int64 {
if err := db.checkKeyValue(key, member); err != nil {
return -1
}
db.zsetIndex.mu.RLock()
defer db.zsetIndex.mu.RUnlock()
if db.checkExpired(key, ZSet) {
return -1
}
return db.zsetIndex.indexes.ZRank(string(key), string(member))
}
// ZRevRank returns the rank of member in the sorted set stored at key, with the scores ordered from high to low.
// The rank (or index) is 0-based, which means that the member with the highest score has rank 0.
func (db *RoseDB) ZRevRank(key, member []byte) int64 {
if err := db.checkKeyValue(key, member); err != nil {
return -1
}
db.zsetIndex.mu.RLock()
defer db.zsetIndex.mu.RUnlock()
if db.checkExpired(key, ZSet) {
return -1
}
return db.zsetIndex.indexes.ZRevRank(string(key), string(member))
}
// ZIncrBy increments the score of member in the sorted set stored at key by increment.
// If member does not exist in the sorted set, it is added with increment as its score (as if its previous score was 0.0).
// If key does not exist, a new sorted set with the specified member as its sole member is created.
func (db *RoseDB) ZIncrBy(key []byte, increment float64, member []byte) (float64, error) {
if err := db.checkKeyValue(key, member); err != nil {
return increment, err
}
db.zsetIndex.mu.Lock()
defer db.zsetIndex.mu.Unlock()
increment = db.zsetIndex.indexes.ZIncrBy(string(key), increment, string(member))
extra := utils.Float64ToStr(increment)
e := storage.NewEntry(key, member, []byte(extra), ZSet, ZSetZAdd)
if err := db.store(e); err != nil {
return increment, err
}
return increment, nil
}
// ZRange returns the specified range of elements in the sorted set stored at key.
func (db *RoseDB) ZRange(key []byte, start, stop int) []interface{} {
if err := db.checkKeyValue(key, nil); err != nil {
return nil
}
db.zsetIndex.mu.RLock()
defer db.zsetIndex.mu.RUnlock()
if db.checkExpired(key, ZSet) {
return nil
}
return db.zsetIndex.indexes.ZRange(string(key), start, stop)
}
// ZRangeWithScores returns the specified range of elements in the sorted set stored at key.
func (db *RoseDB) ZRangeWithScores(key []byte, start, stop int) []interface{} {
if err := db.checkKeyValue(key, nil); err != nil {
return nil
}
db.zsetIndex.mu.RLock()
defer db.zsetIndex.mu.RUnlock()
if db.checkExpired(key, ZSet) {
return nil
}
return db.zsetIndex.indexes.ZRangeWithScores(string(key), start, stop)
}
// ZRevRange returns the specified range of elements in the sorted set stored at key.
// The elements are considered to be ordered from the highest to the lowest score.
// Descending lexicographical order is used for elements with equal score.
func (db *RoseDB) ZRevRange(key []byte, start, stop int) []interface{} {
if err := db.checkKeyValue(key, nil); err != nil {
return nil
}
db.zsetIndex.mu.RLock()
defer db.zsetIndex.mu.RUnlock()
if db.checkExpired(key, ZSet) {
return nil
}
return db.zsetIndex.indexes.ZRevRange(string(key), start, stop)
}
// ZRevRangeWithScores returns the specified range of elements in the sorted set stored at key.
// The elements are considered to be ordered from the highest to the lowest score.
// Descending lexicographical order is used for elements with equal score.
func (db *RoseDB) ZRevRangeWithScores(key []byte, start, stop int) []interface{} {
if err := db.checkKeyValue(key, nil); err != nil {
return nil
}
db.zsetIndex.mu.RLock()
defer db.zsetIndex.mu.RUnlock()
if db.checkExpired(key, ZSet) {
return nil
}
return db.zsetIndex.indexes.ZRevRangeWithScores(string(key), start, stop)
}
// ZRem removes the specified members from the sorted set stored at key. Non existing members are ignored.
// An error is returned when key exists and does not hold a sorted set.
func (db *RoseDB) ZRem(key, member []byte) (ok bool, err error) {
if err = db.checkKeyValue(key, member); err != nil {
return
}
db.zsetIndex.mu.Lock()
defer db.zsetIndex.mu.Unlock()
if db.checkExpired(key, ZSet) {
return
}
if ok = db.zsetIndex.indexes.ZRem(string(key), string(member)); ok {
e := storage.NewEntryNoExtra(key, member, ZSet, ZSetZRem)
if err = db.store(e); err != nil {
return
}
}
return
}
// ZGetByRank get the member at key by rank, the rank is ordered from lowest to highest.
// The rank of lowest is 0 and so on.
func (db *RoseDB) ZGetByRank(key []byte, rank int) []interface{} {
db.zsetIndex.mu.RLock()
defer db.zsetIndex.mu.RUnlock()
if db.checkExpired(key, ZSet) {
return nil
}
return db.zsetIndex.indexes.ZGetByRank(string(key), rank)
}
// ZRevGetByRank get the member at key by rank, the rank is ordered from highest to lowest.
// The rank of highest is 0 and so on.
func (db *RoseDB) ZRevGetByRank(key []byte, rank int) []interface{} {
db.zsetIndex.mu.RLock()
defer db.zsetIndex.mu.RUnlock()
if db.checkExpired(key, ZSet) {
return nil
}
return db.zsetIndex.indexes.ZRevGetByRank(string(key), rank)
}
// ZScoreRange returns all the elements in the sorted set at key with a score between min and max (including elements with score equal to min or max).
// The elements are considered to be ordered from low to high scores.
func (db *RoseDB) ZScoreRange(key []byte, min, max float64) []interface{} {
if err := db.checkKeyValue(key, nil); err != nil {
return nil
}
db.zsetIndex.mu.RLock()
defer db.zsetIndex.mu.RUnlock()
if db.checkExpired(key, ZSet) {
return nil
}
return db.zsetIndex.indexes.ZScoreRange(string(key), min, max)
}
// ZRevScoreRange returns all the elements in the sorted set at key with a score between max and min (including elements with score equal to max or min).
// In contrary to the default ordering of sorted sets, for this command the elements are considered to be ordered from high to low scores.
func (db *RoseDB) ZRevScoreRange(key []byte, max, min float64) []interface{} {
if err := db.checkKeyValue(key, nil); err != nil {
return nil
}
db.zsetIndex.mu.RLock()
defer db.zsetIndex.mu.RUnlock()
if db.checkExpired(key, ZSet) {
return nil
}
return db.zsetIndex.indexes.ZRevScoreRange(string(key), max, min)
}
// ZKeyExists check if the key exists in zset.
func (db *RoseDB) ZKeyExists(key []byte) (ok bool) {
if err := db.checkKeyValue(key, nil); err != nil {
return
}
db.zsetIndex.mu.RLock()
defer db.zsetIndex.mu.RUnlock()
if db.checkExpired(key, ZSet) {
return
}
ok = db.zsetIndex.indexes.ZKeyExists(string(key))
return
}
// ZClear clear the specified key in zset.
func (db *RoseDB) ZClear(key []byte) (err error) {
if !db.ZKeyExists(key) {
return ErrKeyNotExist
}
db.zsetIndex.mu.Lock()
defer db.zsetIndex.mu.Unlock()
e := storage.NewEntryNoExtra(key, nil, ZSet, ZSetZClear)
if err = db.store(e); err != nil {
return
}
db.zsetIndex.indexes.ZClear(string(key))
return
}
// ZExpire set expired time for the key in zset.
func (db *RoseDB) ZExpire(key []byte, duration int64) (err error) {
if duration <= 0 {
return ErrInvalidTTL
}
if !db.ZKeyExists(key) {
return ErrKeyNotExist
}
db.zsetIndex.mu.Lock()
defer db.zsetIndex.mu.Unlock()
deadline := time.Now().Unix() + duration
e := storage.NewEntryWithExpire(key, nil, deadline, ZSet, ZSetZExpire)
if err = db.store(e); err != nil {
return err
}
db.expires[ZSet][string(key)] = deadline
return
}
// ZTTL return time to live of the key.
func (db *RoseDB) ZTTL(key []byte) (ttl int64) {
if !db.ZKeyExists(key) {
return
}
db.zsetIndex.mu.RLock()
defer db.zsetIndex.mu.RUnlock()
deadline, exist := db.expires[ZSet][string(key)]
if !exist {
return
}
return deadline - time.Now().Unix()
}