Skip to content

Week-2 Deliverable with Early Base Developments #4

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Nov 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
__pycache__
Binary file added Week2 experimental(working)/Kafka Output.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added Week2 experimental(working)/Logs_generating.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
28 changes: 28 additions & 0 deletions Week2 experimental(working)/fluent.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# fluent.conf
<source>
@type forward
port 24224
bind 0.0.0.0
</source>

<match microservice.**>
@type kafka2

# Kafka configuration
brokers localhost:9092
topic_key topic
default_topic microservice_logs

# Buffer configuration
<buffer>
@type memory
chunk_limit_size 1m
queue_limit_length 128
flush_interval 1s
</buffer>

# Data format
<format>
@type json
</format>
</match>
71 changes: 71 additions & 0 deletions Week2 experimental(working)/kafka_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
# kafka_utils.py
from kafka import KafkaProducer, KafkaConsumer
import json
from datetime import datetime
import threading
from typing import Callable, Optional
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class KafkaWrapper:
def __init__(self, bootstrap_servers: str = 'localhost:9092'):
self.bootstrap_servers = bootstrap_servers
self.producer = None
self.consumer = None
self._setup_producer()

def _setup_producer(self):
try:
self.producer = KafkaProducer(
bootstrap_servers=self.bootstrap_servers,
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
logger.info("Kafka producer initialized successfully")
except Exception as e:
logger.error(f"Failed to initialize Kafka producer: {e}")
raise

def send_message(self, topic: str, message: dict):
try:
future = self.producer.send(topic, value=message)
future.get(timeout=10) # Wait for message to be sent
logger.debug(f"Message sent to topic {topic}: {message}")
except Exception as e:
logger.error(f"Failed to send message to Kafka: {e}")
raise

def start_consumer(self, topic: str, message_handler: Callable,
group_id: Optional[str] = None):
def consume():
try:
self.consumer = KafkaConsumer(
topic,
bootstrap_servers=self.bootstrap_servers,
group_id=group_id or f'group-{datetime.now().timestamp()}',
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
auto_offset_reset='latest'
)

logger.info(f"Started consuming from topic: {topic}")
for message in self.consumer:
try:
message_handler(message.value)
except Exception as e:
logger.error(f"Error processing message: {e}")

except Exception as e:
logger.error(f"Consumer error: {e}")
raise

# Start consumer in a separate thread
consumer_thread = threading.Thread(target=consume, daemon=True)
consumer_thread.start()
return consumer_thread

def close(self):
if self.producer:
self.producer.close()
if self.consumer:
self.consumer.close()
90 changes: 90 additions & 0 deletions Week2 experimental(working)/log_consumer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
# log_consumer.py
from kafka_utils import KafkaWrapper
import logging
from colorama import Fore, Style, init
from datetime import datetime

# Initialize colorama and logging
init(autoreset=True)
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class LogConsumer:
def __init__(self):
self.kafka = KafkaWrapper()

def format_timestamp(self, timestamp_str):
"""Format ISO timestamp to a more readable format"""
try:
dt = datetime.fromisoformat(timestamp_str)
return dt.strftime("%Y-%m-%d %H:%M:%S")
except:
return timestamp_str

def handle_log(self, message):
"""Handle incoming log messages"""
log_level = message.get('log_level', 'UNKNOWN')
color = {
'INFO': Fore.GREEN,
'WARN': Fore.YELLOW,
'ERROR': Fore.RED
}.get(log_level, Fore.WHITE)

timestamp = self.format_timestamp(message.get('timestamp', ''))
service_name = message.get('service_name', 'Unknown')
node_id = message.get('node_id', 'Unknown')[:8] # Show first 8 chars of node_id
msg = message.get('message', '')

# Add extra details for WARN and ERROR logs
extra_info = ""
if log_level == 'WARN':
response_time = message.get('response_time_ms', '')
threshold = message.get('threshold_limit_ms', '')
if response_time and threshold:
extra_info = f" [Response: {response_time}ms, Threshold: {threshold}ms]"
elif log_level == 'ERROR':
error_details = message.get('error_details', {})
if error_details:
error_code = error_details.get('error_code', '')
error_msg = error_details.get('error_message', '')
extra_info = f" [Code: {error_code}, Details: {error_msg}]"

print(f"{color}[{timestamp}] [{log_level}] {service_name} ({node_id}): {msg}{extra_info}{Style.RESET_ALL}")

def handle_heartbeat(self, message):
"""Handle incoming heartbeat messages"""
timestamp = self.format_timestamp(message.get('timestamp', ''))
service_name = message.get('service_name', 'Unknown')
node_id = message.get('node_id', 'Unknown')[:8]
status = message.get('status', 'UNKNOWN')

color = Fore.GREEN if status == 'UP' else Fore.RED
print(f"{color}[{timestamp}] [HEARTBEAT] {service_name} ({node_id}): Status: {status}{Style.RESET_ALL}")

def handle_registration(self, message):
"""Handle incoming registration messages"""
timestamp = self.format_timestamp(message.get('timestamp', ''))
service_name = message.get('service_name', 'Unknown')
node_id = message.get('node_id', 'Unknown')[:8]

print(f"{Fore.MAGENTA}[{timestamp}] [REGISTRATION] New service registered: {service_name} ({node_id}){Style.RESET_ALL}")

def start(self):
"""Start consuming messages from all topics"""
self.kafka.start_consumer('microservice_logs', self.handle_log, 'log-consumer')
self.kafka.start_consumer('microservice_heartbeats', self.handle_heartbeat, 'heartbeat-consumer')
self.kafka.start_consumer('microservice_registration', self.handle_registration, 'registration-consumer')

logger.info("Started consuming messages from all topics")

# Keep the main thread running
try:
while True:
pass
except KeyboardInterrupt:
logger.info("Shutting down consumers...")
self.kafka.close()

if __name__ == "__main__":
consumer = LogConsumer()
consumer.start()
154 changes: 154 additions & 0 deletions Week2 experimental(working)/node.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
# node.py
import uuid
import time
import threading
import random
import json
from datetime import datetime
from colorama import Fore, Style, init
from threading import Lock
from fluent import sender
from kafka_utils import KafkaWrapper

# Initialize colorama for colored terminal output
init(autoreset=True)

# Global lock for synchronized printing
print_lock = Lock()

class Node:
# Colors for message types
message_colors = {
"registration": Fore.CYAN,
"heartbeat": Fore.RED,
"Log": Fore.GREEN
}
# Colors for keys and values within messages
key_color = Fore.LIGHTMAGENTA_EX
value_color = Fore.LIGHTYELLOW_EX

def __init__(self, service_name):
self.node_id = str(uuid.uuid4())
self.service_name = service_name
self.status = "UP"

# Initialize Fluentd sender
self.fluent_sender = sender.FluentSender(
'microservice',
host='localhost',
port=24224
)

# Initialize Kafka wrapper
self.kafka = KafkaWrapper()

self.register_node()

def format_message(self, message):
"""
Format JSON message to color keys and values distinctly.
"""
formatted_message = ""
for key, value in message.items():
formatted_message += f"{Node.key_color}{key}{Style.RESET_ALL}: {Node.value_color}{value}{Style.RESET_ALL}, "
return "{" + formatted_message.rstrip(", ") + "}"

def print_message(self, message_type, message_content):
with print_lock:
color = Node.message_colors[message_type]
formatted_content = self.format_message(json.loads(message_content))
print(color + f"{message_type.capitalize()}:" + Style.RESET_ALL, formatted_content)

def send_to_fluentd(self, tag, message):
"""Send message to Fluentd"""
try:
self.fluent_sender.emit(tag, message)
except Exception as e:
print(f"Error sending to Fluentd: {e}")

def send_to_kafka(self, topic, message):
"""Send message to Kafka"""
try:
self.kafka.send_message(topic, message)
except Exception as e:
print(f"Error sending to Kafka: {e}")

def register_node(self):
registration_message = {
"node_id": self.node_id,
"message_type": "REGISTRATION",
"service_name": self.service_name,
"timestamp": datetime.now().isoformat()
}

# Send to both console and message brokers
self.print_message("registration", json.dumps(registration_message))
self.send_to_fluentd('registration', registration_message)
self.send_to_kafka('microservice_registration', registration_message)

def generate_log(self, log_level, message, extra_data=None):
log_message = {
"log_id": str(uuid.uuid4()),
"node_id": self.node_id,
"log_level": log_level,
"message_type": "LOG",
"message": message,
"service_name": self.service_name,
"timestamp": datetime.now().isoformat()
}
if extra_data:
log_message.update(extra_data)

# Send to both console and message brokers
self.print_message("Log", json.dumps(log_message))
self.send_to_fluentd(f'log.{log_level.lower()}', log_message)
self.send_to_kafka('microservice_logs', log_message)

def send_heartbeat(self):
heartbeat_message = {
"node_id": self.node_id,
"message_type": "HEARTBEAT",
"service_name": self.service_name, # Added service_name
"status": self.status,
"timestamp": datetime.now().isoformat()
}

# Send to both console and message brokers
self.print_message("heartbeat", json.dumps(heartbeat_message))
self.send_to_fluentd('heartbeat', heartbeat_message)
self.send_to_kafka('microservice_heartbeats', heartbeat_message)

def start_heartbeat(self, interval=5):
def heartbeat():
while self.status == "UP":
self.send_heartbeat()
time.sleep(interval)
threading.Thread(target=heartbeat).start()

def start_log_generation(self, interval=3):
def generate_logs():
log_levels = ["INFO", "WARN", "ERROR"]
while self.status == "UP":
log_level = random.choice(log_levels)
if log_level == "INFO":
self.generate_log("INFO", "This is an info log.")
elif log_level == "WARN":
self.generate_log("WARN", "This is a warning log.", {
"response_time_ms": random.randint(100, 500),
"threshold_limit_ms": 300
})
elif log_level == "ERROR":
self.generate_log("ERROR", "This is an error log.", {
"error_details": {
"error_code": "500",
"error_message": "Internal Server Error"
}
})
time.sleep(interval)
threading.Thread(target=generate_logs).start()

def __del__(self):
if hasattr(self, 'fluent_sender'):
self.fluent_sender.close()
if hasattr(self, 'kafka'):
self.kafka.close()
11 changes: 11 additions & 0 deletions Week2 experimental(working)/run_nodes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# run_nodes.py
from node import Node

if __name__ == "__main__":
services = ["PaymentService", "OrderService", "InventoryService"]
nodes = [Node(service) for service in services]

# Start log generation and heartbeat for each node
for node in nodes:
node.start_heartbeat()
node.start_log_generation()
Loading