This repository has been archived by the owner on Sep 9, 2024. It is now read-only.
forked from gocassa/gocassa
-
Notifications
You must be signed in to change notification settings - Fork 13
/
timeseries_table.go
93 lines (82 loc) · 2.67 KB
/
timeseries_table.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
package gocassa
import (
"time"
)
const bucketFieldName = "bucket"
type timeSeriesT struct {
t Table
timeField string
idField string
bucketSize time.Duration
}
func (o *timeSeriesT) Table() Table { return o.t }
func (o *timeSeriesT) Create() error { return o.Table().Create() }
func (o *timeSeriesT) CreateIfNotExist() error { return o.Table().CreateIfNotExist() }
func (o *timeSeriesT) Name() string { return o.Table().Name() }
func (o *timeSeriesT) Recreate() error { return o.Table().Recreate() }
func (o *timeSeriesT) CreateStatement() (Statement, error) { return o.Table().CreateStatement() }
func (o *timeSeriesT) CreateIfNotExistStatement() (Statement, error) {
return o.Table().CreateIfNotExistStatement()
}
func (o *timeSeriesT) Set(v interface{}) Op {
m, ok := toMap(v)
if !ok {
panic("Can't set: not able to convert")
}
if tim, ok := m[o.timeField].(time.Time); !ok {
panic("timeField is not actually a time.Time")
} else {
m[bucketFieldName] = bucket(tim, o.bucketSize)
}
return o.Table().Set(m)
}
func (o *timeSeriesT) Update(timeStamp time.Time, id interface{}, m map[string]interface{}) Op {
bucket := bucket(timeStamp, o.bucketSize)
return o.Table().
Where(Eq(bucketFieldName, bucket),
Eq(o.timeField, timeStamp),
Eq(o.idField, id)).
Update(m)
}
func (o *timeSeriesT) Delete(timeStamp time.Time, id interface{}) Op {
bucket := bucket(timeStamp, o.bucketSize)
return o.Table().
Where(Eq(bucketFieldName, bucket),
Eq(o.timeField, timeStamp),
Eq(o.idField, id)).
Delete()
}
func (o *timeSeriesT) Read(timeStamp time.Time, id, pointer interface{}) Op {
bucket := bucket(timeStamp, o.bucketSize)
return o.Table().
Where(Eq(bucketFieldName, bucket),
Eq(o.timeField, timeStamp),
Eq(o.idField, id)).
ReadOne(pointer)
}
func (o *timeSeriesT) List(startTime time.Time, endTime time.Time, pointerToASlice interface{}) Op {
buckets := []interface{}{}
for bucket := o.Buckets(startTime); bucket.Bucket().Before(endTime); bucket = bucket.Next() {
buckets = append(buckets, bucket.Bucket())
}
return o.Table().
Where(In(bucketFieldName, buckets...),
GTE(o.timeField, startTime),
LTE(o.timeField, endTime)).
Read(pointerToASlice)
}
func (o *timeSeriesT) Buckets(start time.Time) Buckets {
return bucketIter{
v: start,
step: o.bucketSize,
field: bucketFieldName,
invariant: o.Table().Where()}
}
func (o *timeSeriesT) WithOptions(opt Options) TimeSeriesTable {
return &timeSeriesT{
t: o.Table().WithOptions(opt),
timeField: o.timeField,
idField: o.idField,
bucketSize: o.bucketSize,
}
}