-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathspending_over_time.py
More file actions
34 lines (25 loc) · 1.02 KB
/
spending_over_time.py
File metadata and controls
34 lines (25 loc) · 1.02 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
"""
Module 4: Spending Over Time - SOLUTION
Track spending over time using Redis TimeSeries.
Enables time-range queries like "spending in last 7 days".
"""
from typing import List, Tuple, Dict
def process_transaction(redis_client, tx_data: Dict[str, str]) -> None:
"""
Add transaction to time-series.
"""
amount = float(tx_data.get('amount', 0))
timestamp = int(tx_data.get('timestamp', 0))
redis_client.ts().add("spending:timeseries", timestamp, amount)
def get_spending_in_range(redis_client, start_time: int, end_time: int) -> List[Tuple[int, float]]:
"""
Get spending data points in time range.
Returns list of (timestamp, amount) tuples.
"""
return redis_client.ts().range("spending:timeseries", start_time, end_time)
def get_total_spending_in_range(redis_client, start_time: int, end_time: int) -> float:
"""
Get total spending in time range.
"""
data_points = get_spending_in_range(redis_client, start_time, end_time)
return sum(amount for _, amount in data_points)