From c34c70065eb86882accdb5c4b5c2963c4b5a66fd Mon Sep 17 00:00:00 2001 From: PooyaY <59162706+pooyay@users.noreply.github.com> Date: Sun, 11 Feb 2024 17:12:59 +0330 Subject: [PATCH] Metrics datanode (#30) * basic metrics for datanode * basic metrics * basic metrics * pull/push per sec * throughput added * seems done --- datanode/src/datanode_server.py | 64 +++++++++++++++++++++++++++++++++ 1 file changed, 64 insertions(+) diff --git a/datanode/src/datanode_server.py b/datanode/src/datanode_server.py index 4a9d646..3983d06 100644 --- a/datanode/src/datanode_server.py +++ b/datanode/src/datanode_server.py @@ -10,6 +10,55 @@ from shared_partition import clear_path from loguru import logger +from prometheus_client import Counter, Gauge, Summary, Histogram, generate_latest, REGISTRY, start_http_server +import os +import time + +disk_total_size = Gauge('disk_total_size', 'Total size of disk', labelnames=["provider"], _labelvalues=[ConfigManager.get_prop('datanode_name')]) +disk_used_size = Gauge('disk_used_size', 'Used size of disk', labelnames=["provider"], _labelvalues=[ConfigManager.get_prop('datanode_name')]) +message_count = Gauge('message_count', 'Number of messages in datanode', labelnames=["provider"], _labelvalues=[ConfigManager.get_prop('datanode_name')]) +push_per_sec = Histogram('push_per_sec', 'Number of pushes in second', labelnames=["provider"], _labelvalues=[ConfigManager.get_prop('datanode_name')]) +pull_per_sec = Histogram('pull_per_sec', 'Number of pulls in second', labelnames=["provider"], _labelvalues=[ConfigManager.get_prop('datanode_name')]) +throughput = Histogram('disk_total_size', 'Total size of disk', labelnames=["provider"], _labelvalues=[ConfigManager.get_prop('datanode_name')]) + + +# DECORATORS{ +def inc_message_count(func): + def wrapper(*args, **kwargs): + result = func(*args, **kwargs) + message_count.labels(provider=ConfigManager.get_prop('datanode_name')).inc() + return result + return wrapper + + +def dec_message_count(func): + def wrapper(*args, **kwargs): + result = func(*args, **kwargs) + message_count.labels(provider=ConfigManager.get_prop('datanode_name')).dec() + return result + return wrapper + + +def set_message_count(func): + def wrapper(*args, **kwargs): + result = func(*args, **kwargs) + message_count.labels(provider=ConfigManager.get_prop('datanode_name')).set(result) + return result + return wrapper + + +def get_disk_info_decorator(func): + def wrapper(): + result = func() + path = ConfigManager.get_prop('partition_home_path') + st = os.statvfs(path) + total = st.f_blocks * st.f_frsize + used = (st.f_blocks - st.f_bfree) * st.f_frsize + disk_total_size.labels(provider=ConfigManager.get_prop('datanode_name')).set(total) + disk_total_size.labels(provider=ConfigManager.get_prop('datanode_name')).set(used) + return result + return wrapper + class DataNode(datanode_pb2_grpc.DataNodeServicer): def __init__(self, partition_count=1, home_path='datanode/server/'): @@ -19,19 +68,28 @@ def __init__(self, partition_count=1, home_path='datanode/server/'): self.replica = SharedPartitions(partition_count, home_path=home_path + '/replica/') + @inc_message_count def Push(self, request, context): + throughput.observe(1) + start_time = time.time() logger.info(f"received a push message: {request.message}") if request.is_replica: self.replica.push(request.message) else: self.shared_partition.push(request.message) + end_time = time.time() + push_per_sec.observe(end_time - start_time) return empty_pb2.Empty() def Pull(self, request, context): try: + throughput.observe(1) + start_time = time.time() logger.info(f"received a pull message: {request}") message = self.shared_partition.pull() response = datanode_pb2.PullResponse(message=message) + end_time = time.time() + push_per_sec.observe(end_time - start_time) return response except grpc.RpcError as e: logger.exception(e) @@ -77,8 +135,10 @@ def PurgeReplicaData(self, request, context): home_path=self.home_path + '/replica/') return empty_pb2.Empty() + @dec_message_count def AcknowledgePull(self, request, context): try: + throughput.observe(1) key = request.key logger.info(f"received an acknowledge message: {key}") if request.is_replica: @@ -95,6 +155,8 @@ def IsHealthy(self, request, context): except grpc.RpcError as e: logger.exception(f"Error in acknowledging. {e}") + @get_disk_info_decorator + @set_message_count def GetRemainingMessagesCount(self, request, context): try: count = self.shared_partition.get_remaining_messages_count() @@ -111,6 +173,8 @@ def push_to_partition(partition_index: int, def serve(): + start_http_server(9000) + port = ConfigManager.get_prop('server_port') partitions_count = int(ConfigManager.get_prop('partition_count')) home_path = ConfigManager.get_prop('partition_home_path')