-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathconsumer.py
More file actions
145 lines (113 loc) · 4.54 KB
/
consumer.py
File metadata and controls
145 lines (113 loc) · 4.54 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
#!/usr/bin/env python3
"""
Transaction Consumer
Consumes transactions from Redis Stream and dispatches to all modules.
This is pre-built - workshop developers don't modify this file.
"""
import sys
import time
from pathlib import Path
from typing import Dict
sys.path.insert(0, str(Path(__file__).parent.parent))
from lib.redis_client import get_redis
from lib.logger import setup_logger
# Import all module processors
from modules import ordered_transactions
from modules import store_transaction
from modules import spending_categories
from modules import spending_over_time
from modules import vector_search
logger = setup_logger("consumer")
def dispatch_transaction(redis_client, tx_data: Dict[str, str]) -> None:
"""
Dispatch transaction to all module processors.
Each module receives the same transaction and stores it
in a different Redis data structure.
"""
# Module 1: Add to ordered list
ordered_transactions.process_transaction(redis_client, tx_data)
# Module 2: Store as JSON document
store_transaction.process_transaction(redis_client, tx_data)
# Module 3: Update spending category rankings
spending_categories.process_transaction(redis_client, tx_data)
# Module 4: Add to time-series
spending_over_time.process_transaction(redis_client, tx_data)
# Module 5: Generate embedding for vector search
vector_search.process_transaction(redis_client, tx_data)
def ensure_consumer_group(redis_client, stream_key: str, group_name: str) -> None:
"""Create consumer group if it doesn't exist."""
try:
redis_client.xgroup_create(stream_key, group_name, id='0', mkstream=True)
logger.info(f"Consumer group '{group_name}' created")
except Exception as e:
if "BUSYGROUP" in str(e):
logger.info(f"Consumer group '{group_name}' already exists")
else:
raise
def main() -> None:
"""Main consumer loop - consumes once, dispatches to all modules."""
STREAM_KEY = "stream:transactions"
GROUP_NAME = "consumer-group:processor"
CONSUMER_NAME = "processor-1"
BATCH_SIZE = 10
BLOCK_MS = 1000
redis = get_redis()
ensure_consumer_group(redis, STREAM_KEY, GROUP_NAME)
# Create vector search index if configured
try:
vector_search.create_index(redis)
except Exception as e:
logger.warning(f"Vector search index not ready: {e}")
logger.info("=" * 70)
logger.info("Transaction Processor Starting")
logger.info("=" * 70)
logger.info(f"Stream: {STREAM_KEY}")
logger.info(f"Dispatching to 5 modules:")
logger.info(" 1. ordered_transactions - List")
logger.info(" 2. store_transaction - JSON")
logger.info(" 3. spending_categories - Sorted Sets")
logger.info(" 4. spending_over_time - TimeSeries")
logger.info(" 5. vector_search - Vector Search")
logger.info("=" * 70)
processed_count = 0
start_time = time.time()
try:
while True:
# Consume from stream
messages = redis.xreadgroup(
groupname=GROUP_NAME,
consumername=CONSUMER_NAME,
streams={STREAM_KEY: '>'},
count=BATCH_SIZE,
block=BLOCK_MS
)
if not messages:
continue
for stream, message_list in messages:
for message_id, data in message_list:
# Convert bytes to strings
tx_data = {
key.decode() if isinstance(key, bytes) else key:
value.decode() if isinstance(value, bytes) else value
for key, value in data.items()
}
# Dispatch to all modules
dispatch_transaction(redis, tx_data)
# Acknowledge message
redis.xack(stream, GROUP_NAME, message_id)
processed_count += 1
# Log progress
if processed_count % 50 == 0:
elapsed = time.time() - start_time
tps = processed_count / elapsed if elapsed > 0 else 0
logger.info(f"Processed: {processed_count} | TPS: {tps:.2f}")
except KeyboardInterrupt:
logger.info("\n" + "=" * 70)
logger.info("Processor Stopped")
logger.info(f"Total Processed: {processed_count:,}")
logger.info("=" * 70)
except Exception as e:
logger.error(f"Error: {e}", exc_info=True)
raise
if __name__ == "__main__":
main()