Skip to content

Commit cf15463

Browse files
authored
Merge pull request #4 from dheerajcl/Week2_Experimental
Week-2 Deliverable with Early Base Developments
2 parents 7b8d299 + 14ccb13 commit cf15463

File tree

8 files changed

+355
-0
lines changed

8 files changed

+355
-0
lines changed

.gitignore

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
__pycache__
451 KB
Loading
Loading
+28
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
# fluent.conf
2+
<source>
3+
@type forward
4+
port 24224
5+
bind 0.0.0.0
6+
</source>
7+
8+
<match microservice.**>
9+
@type kafka2
10+
11+
# Kafka configuration
12+
brokers localhost:9092
13+
topic_key topic
14+
default_topic microservice_logs
15+
16+
# Buffer configuration
17+
<buffer>
18+
@type memory
19+
chunk_limit_size 1m
20+
queue_limit_length 128
21+
flush_interval 1s
22+
</buffer>
23+
24+
# Data format
25+
<format>
26+
@type json
27+
</format>
28+
</match>
+71
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
# kafka_utils.py
2+
from kafka import KafkaProducer, KafkaConsumer
3+
import json
4+
from datetime import datetime
5+
import threading
6+
from typing import Callable, Optional
7+
import logging
8+
9+
logging.basicConfig(level=logging.INFO)
10+
logger = logging.getLogger(__name__)
11+
12+
class KafkaWrapper:
13+
def __init__(self, bootstrap_servers: str = 'localhost:9092'):
14+
self.bootstrap_servers = bootstrap_servers
15+
self.producer = None
16+
self.consumer = None
17+
self._setup_producer()
18+
19+
def _setup_producer(self):
20+
try:
21+
self.producer = KafkaProducer(
22+
bootstrap_servers=self.bootstrap_servers,
23+
value_serializer=lambda v: json.dumps(v).encode('utf-8')
24+
)
25+
logger.info("Kafka producer initialized successfully")
26+
except Exception as e:
27+
logger.error(f"Failed to initialize Kafka producer: {e}")
28+
raise
29+
30+
def send_message(self, topic: str, message: dict):
31+
try:
32+
future = self.producer.send(topic, value=message)
33+
future.get(timeout=10) # Wait for message to be sent
34+
logger.debug(f"Message sent to topic {topic}: {message}")
35+
except Exception as e:
36+
logger.error(f"Failed to send message to Kafka: {e}")
37+
raise
38+
39+
def start_consumer(self, topic: str, message_handler: Callable,
40+
group_id: Optional[str] = None):
41+
def consume():
42+
try:
43+
self.consumer = KafkaConsumer(
44+
topic,
45+
bootstrap_servers=self.bootstrap_servers,
46+
group_id=group_id or f'group-{datetime.now().timestamp()}',
47+
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
48+
auto_offset_reset='latest'
49+
)
50+
51+
logger.info(f"Started consuming from topic: {topic}")
52+
for message in self.consumer:
53+
try:
54+
message_handler(message.value)
55+
except Exception as e:
56+
logger.error(f"Error processing message: {e}")
57+
58+
except Exception as e:
59+
logger.error(f"Consumer error: {e}")
60+
raise
61+
62+
# Start consumer in a separate thread
63+
consumer_thread = threading.Thread(target=consume, daemon=True)
64+
consumer_thread.start()
65+
return consumer_thread
66+
67+
def close(self):
68+
if self.producer:
69+
self.producer.close()
70+
if self.consumer:
71+
self.consumer.close()
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
# log_consumer.py
2+
from kafka_utils import KafkaWrapper
3+
import logging
4+
from colorama import Fore, Style, init
5+
from datetime import datetime
6+
7+
# Initialize colorama and logging
8+
init(autoreset=True)
9+
logging.basicConfig(level=logging.INFO)
10+
logger = logging.getLogger(__name__)
11+
12+
class LogConsumer:
13+
def __init__(self):
14+
self.kafka = KafkaWrapper()
15+
16+
def format_timestamp(self, timestamp_str):
17+
"""Format ISO timestamp to a more readable format"""
18+
try:
19+
dt = datetime.fromisoformat(timestamp_str)
20+
return dt.strftime("%Y-%m-%d %H:%M:%S")
21+
except:
22+
return timestamp_str
23+
24+
def handle_log(self, message):
25+
"""Handle incoming log messages"""
26+
log_level = message.get('log_level', 'UNKNOWN')
27+
color = {
28+
'INFO': Fore.GREEN,
29+
'WARN': Fore.YELLOW,
30+
'ERROR': Fore.RED
31+
}.get(log_level, Fore.WHITE)
32+
33+
timestamp = self.format_timestamp(message.get('timestamp', ''))
34+
service_name = message.get('service_name', 'Unknown')
35+
node_id = message.get('node_id', 'Unknown')[:8] # Show first 8 chars of node_id
36+
msg = message.get('message', '')
37+
38+
# Add extra details for WARN and ERROR logs
39+
extra_info = ""
40+
if log_level == 'WARN':
41+
response_time = message.get('response_time_ms', '')
42+
threshold = message.get('threshold_limit_ms', '')
43+
if response_time and threshold:
44+
extra_info = f" [Response: {response_time}ms, Threshold: {threshold}ms]"
45+
elif log_level == 'ERROR':
46+
error_details = message.get('error_details', {})
47+
if error_details:
48+
error_code = error_details.get('error_code', '')
49+
error_msg = error_details.get('error_message', '')
50+
extra_info = f" [Code: {error_code}, Details: {error_msg}]"
51+
52+
print(f"{color}[{timestamp}] [{log_level}] {service_name} ({node_id}): {msg}{extra_info}{Style.RESET_ALL}")
53+
54+
def handle_heartbeat(self, message):
55+
"""Handle incoming heartbeat messages"""
56+
timestamp = self.format_timestamp(message.get('timestamp', ''))
57+
service_name = message.get('service_name', 'Unknown')
58+
node_id = message.get('node_id', 'Unknown')[:8]
59+
status = message.get('status', 'UNKNOWN')
60+
61+
color = Fore.GREEN if status == 'UP' else Fore.RED
62+
print(f"{color}[{timestamp}] [HEARTBEAT] {service_name} ({node_id}): Status: {status}{Style.RESET_ALL}")
63+
64+
def handle_registration(self, message):
65+
"""Handle incoming registration messages"""
66+
timestamp = self.format_timestamp(message.get('timestamp', ''))
67+
service_name = message.get('service_name', 'Unknown')
68+
node_id = message.get('node_id', 'Unknown')[:8]
69+
70+
print(f"{Fore.MAGENTA}[{timestamp}] [REGISTRATION] New service registered: {service_name} ({node_id}){Style.RESET_ALL}")
71+
72+
def start(self):
73+
"""Start consuming messages from all topics"""
74+
self.kafka.start_consumer('microservice_logs', self.handle_log, 'log-consumer')
75+
self.kafka.start_consumer('microservice_heartbeats', self.handle_heartbeat, 'heartbeat-consumer')
76+
self.kafka.start_consumer('microservice_registration', self.handle_registration, 'registration-consumer')
77+
78+
logger.info("Started consuming messages from all topics")
79+
80+
# Keep the main thread running
81+
try:
82+
while True:
83+
pass
84+
except KeyboardInterrupt:
85+
logger.info("Shutting down consumers...")
86+
self.kafka.close()
87+
88+
if __name__ == "__main__":
89+
consumer = LogConsumer()
90+
consumer.start()

Week2 experimental(working)/node.py

+154
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
# node.py
2+
import uuid
3+
import time
4+
import threading
5+
import random
6+
import json
7+
from datetime import datetime
8+
from colorama import Fore, Style, init
9+
from threading import Lock
10+
from fluent import sender
11+
from kafka_utils import KafkaWrapper
12+
13+
# Initialize colorama for colored terminal output
14+
init(autoreset=True)
15+
16+
# Global lock for synchronized printing
17+
print_lock = Lock()
18+
19+
class Node:
20+
# Colors for message types
21+
message_colors = {
22+
"registration": Fore.CYAN,
23+
"heartbeat": Fore.RED,
24+
"Log": Fore.GREEN
25+
}
26+
# Colors for keys and values within messages
27+
key_color = Fore.LIGHTMAGENTA_EX
28+
value_color = Fore.LIGHTYELLOW_EX
29+
30+
def __init__(self, service_name):
31+
self.node_id = str(uuid.uuid4())
32+
self.service_name = service_name
33+
self.status = "UP"
34+
35+
# Initialize Fluentd sender
36+
self.fluent_sender = sender.FluentSender(
37+
'microservice',
38+
host='localhost',
39+
port=24224
40+
)
41+
42+
# Initialize Kafka wrapper
43+
self.kafka = KafkaWrapper()
44+
45+
self.register_node()
46+
47+
def format_message(self, message):
48+
"""
49+
Format JSON message to color keys and values distinctly.
50+
"""
51+
formatted_message = ""
52+
for key, value in message.items():
53+
formatted_message += f"{Node.key_color}{key}{Style.RESET_ALL}: {Node.value_color}{value}{Style.RESET_ALL}, "
54+
return "{" + formatted_message.rstrip(", ") + "}"
55+
56+
def print_message(self, message_type, message_content):
57+
with print_lock:
58+
color = Node.message_colors[message_type]
59+
formatted_content = self.format_message(json.loads(message_content))
60+
print(color + f"{message_type.capitalize()}:" + Style.RESET_ALL, formatted_content)
61+
62+
def send_to_fluentd(self, tag, message):
63+
"""Send message to Fluentd"""
64+
try:
65+
self.fluent_sender.emit(tag, message)
66+
except Exception as e:
67+
print(f"Error sending to Fluentd: {e}")
68+
69+
def send_to_kafka(self, topic, message):
70+
"""Send message to Kafka"""
71+
try:
72+
self.kafka.send_message(topic, message)
73+
except Exception as e:
74+
print(f"Error sending to Kafka: {e}")
75+
76+
def register_node(self):
77+
registration_message = {
78+
"node_id": self.node_id,
79+
"message_type": "REGISTRATION",
80+
"service_name": self.service_name,
81+
"timestamp": datetime.now().isoformat()
82+
}
83+
84+
# Send to both console and message brokers
85+
self.print_message("registration", json.dumps(registration_message))
86+
self.send_to_fluentd('registration', registration_message)
87+
self.send_to_kafka('microservice_registration', registration_message)
88+
89+
def generate_log(self, log_level, message, extra_data=None):
90+
log_message = {
91+
"log_id": str(uuid.uuid4()),
92+
"node_id": self.node_id,
93+
"log_level": log_level,
94+
"message_type": "LOG",
95+
"message": message,
96+
"service_name": self.service_name,
97+
"timestamp": datetime.now().isoformat()
98+
}
99+
if extra_data:
100+
log_message.update(extra_data)
101+
102+
# Send to both console and message brokers
103+
self.print_message("Log", json.dumps(log_message))
104+
self.send_to_fluentd(f'log.{log_level.lower()}', log_message)
105+
self.send_to_kafka('microservice_logs', log_message)
106+
107+
def send_heartbeat(self):
108+
heartbeat_message = {
109+
"node_id": self.node_id,
110+
"message_type": "HEARTBEAT",
111+
"service_name": self.service_name, # Added service_name
112+
"status": self.status,
113+
"timestamp": datetime.now().isoformat()
114+
}
115+
116+
# Send to both console and message brokers
117+
self.print_message("heartbeat", json.dumps(heartbeat_message))
118+
self.send_to_fluentd('heartbeat', heartbeat_message)
119+
self.send_to_kafka('microservice_heartbeats', heartbeat_message)
120+
121+
def start_heartbeat(self, interval=5):
122+
def heartbeat():
123+
while self.status == "UP":
124+
self.send_heartbeat()
125+
time.sleep(interval)
126+
threading.Thread(target=heartbeat).start()
127+
128+
def start_log_generation(self, interval=3):
129+
def generate_logs():
130+
log_levels = ["INFO", "WARN", "ERROR"]
131+
while self.status == "UP":
132+
log_level = random.choice(log_levels)
133+
if log_level == "INFO":
134+
self.generate_log("INFO", "This is an info log.")
135+
elif log_level == "WARN":
136+
self.generate_log("WARN", "This is a warning log.", {
137+
"response_time_ms": random.randint(100, 500),
138+
"threshold_limit_ms": 300
139+
})
140+
elif log_level == "ERROR":
141+
self.generate_log("ERROR", "This is an error log.", {
142+
"error_details": {
143+
"error_code": "500",
144+
"error_message": "Internal Server Error"
145+
}
146+
})
147+
time.sleep(interval)
148+
threading.Thread(target=generate_logs).start()
149+
150+
def __del__(self):
151+
if hasattr(self, 'fluent_sender'):
152+
self.fluent_sender.close()
153+
if hasattr(self, 'kafka'):
154+
self.kafka.close()
+11
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
# run_nodes.py
2+
from node import Node
3+
4+
if __name__ == "__main__":
5+
services = ["PaymentService", "OrderService", "InventoryService"]
6+
nodes = [Node(service) for service in services]
7+
8+
# Start log generation and heartbeat for each node
9+
for node in nodes:
10+
node.start_heartbeat()
11+
node.start_log_generation()

0 commit comments

Comments
 (0)