-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
117 lines (95 loc) · 3.39 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
import json
import random
import threading
import time
import uuid
from importlib.metadata import metadata
from confluent_kafka import Producer
from confluent_kafka.admin import AdminClient, NewTopic
import logging
KAFKA_BROKERS = "localhost:29092,localhost:39092,localhost:49092"
NUM_PARTITIONS = 5
REPLICATION_FACTOR = 3
TOPIC_NAME = 'financial_transactions'
logging.basicConfig(
level=logging.INFO
)
logger = logging.getLogger(__name__)
producer_conf = {
'bootstrap.servers': KAFKA_BROKERS,
'queue.buffering.max.messages': 10000,
'queue.buffering.max.kbytes': 512000,
'batch.num.messages': 1000,
'linger.ms': 10,
'acks': 1,
'compression.type': 'gzip'
}
producer = Producer(producer_conf)
def create_topic(topic_name):
admin_client = AdminClient({"bootstrap.servers": KAFKA_BROKERS})
try:
metadata = admin_client.list_topics(timeout=10)
if topic_name not in metadata.topics:
topic = NewTopic(
topic=topic_name,
num_partitions=NUM_PARTITIONS,
replication_factor=REPLICATION_FACTOR,
)
fs = admin_client.create_topics([topic])
for topic, future in fs.items():
try:
future.result()
logger.info(f"Topic '{topic_name} created successfully!")
except Exception as e:
logger.error(f"Failed to create topic '{topic_name}': {e}")
else:
logger.info(f"Topic '{topic_name}' already exists!")
except Exception as e:
logger.error(f"Error creating Topic: {e}")
def generate_transaction():
return dict(
transactionId=str(uuid.uuid4()),
userId=f"user_{random.randint(1, 100)}",
amount=round(random.uniform(50000, 150000), 2),
transactionTime=int(time.time()),
merchantId=random.choice(['merchant_1', 'merchant_2', 'merchant_3']),
transactionType=random.choice(['purchase', 'refund']),
location=f'location_{random.randint(1, 50)}',
paymentMethod=random.choice(['credit_card', 'paypal', 'bank_transfer']),
isInternational=random.choice(['True', 'False']),
currency=random.choice(['USD', 'EUR', 'GBP'])
)
def delivery_report(err, msg):
if err is not None:
print(f'Delivery failed for record {msg.key()}')
else:
print(f'Record {msg.key()} successfully produced')
def produce_transaction(thread_id):
while True:
transaction = generate_transaction()
try:
producer.produce(
topic=TOPIC_NAME,
key=transaction['userId'],
value=json.dumps(transaction).encode('utf-8'),
on_delivery=delivery_report
)
#print(f' Thread {thread_id} - Produced transaction: {transaction}')
producer.flush()
except Exception as e:
print(f'Error sending transaction: {e}')
def producer_data_in_parallel(num_threads):
threads = []
try:
for i in range(num_threads):
thread = threading.Thread(target=produce_transaction, args=(i,))
thread.daemon = True
thread.start()
threads.append(thread)
for thread in threads:
thread.join()
except Exception as e:
print(f'Error message {e}')
if __name__ == "__main__":
create_topic(TOPIC_NAME)
producer_data_in_parallel(3)