-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathtodo.txt
83 lines (50 loc) · 1.88 KB
/
todo.txt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
### Dataframe / Series
- to and from pandas
- to and from Protobuf
- persist to and from DB
- serialize and deserialize via network
- register to globalcontext
- offline (batch) vs online (realtime update) mode
- DAG (batch mode, pull from child, recursively up to the root)
- parent
store lasttimestamp
return value since timestamp (can be empty array)
- child
store parentId
store last update timestamp & last result (conflated into a dict) from each parent
store last updatetimestamp
batch calculate and store the result.
when compute / evaluate, check lastcomputed timestamp
- subscribe (realtime update, push from parent)
- rxpy
- publish
- when subscriber receive update, update the parent timestamp
- multi-index
DataFrame
- holding multiple series
EvaluationContext (global)
def evaluate():
- for each input:
input[self.last_ts:] -> timeseries
put the ts into a dict
if the dict of timeseries is empty:
no further update.
if the dict of timeseries is non empty:
merge into a DF
iterate the DF (backfill for na?)
call the update method
acutally....
covert to pandas in global context
just use pandas to calculate the value
convert back to series
Expression
def evaluate():
create a EvaluationContext
slice for each parent since last calculate into a series, add to EvaluationContext
EvaluationContext.evaluate()
call the compute()
def on_event(timestamp: long, input: str, data: Dict):
def compute(timestamp: long, input_data: Dict[str, double]):
def publish(timestamp, output_data:Dict[str, double]):
- slice for each parebnt
def update(input, ):