diff --git a/datanode/src/datanode_server.py b/datanode/src/datanode_server.py index 70305e2..8149610 100644 --- a/datanode/src/datanode_server.py +++ b/datanode/src/datanode_server.py @@ -14,6 +14,7 @@ import os import time + # disk_total_size = Gauge('disk_total_size', 'Total size of disk', labelnames=["provider"], _labelvalues=[ConfigManager.get_prop('datanode_name')]) # disk_used_size = Gauge('disk_used_size', 'Used size of disk', labelnames=["provider"], _labelvalues=[ConfigManager.get_prop('datanode_name')]) # message_count = Gauge('message_count', 'Number of messages in datanode', labelnames=["provider"], _labelvalues=[ConfigManager.get_prop('datanode_name')]) @@ -59,25 +60,35 @@ class DataNode(datanode_pb2_grpc.DataNodeServicer): - def __init__(self, partition_count=1, home_path='datanode/server/'): + def __init__( + self, + disk_total_size, + disk_used_size, + push_latency, + pull_latency, + ack_latency, + push_throughput, + pull_throughput, + ack_throughput, + partition_count=1, + home_path='datanode/server/', + ): self.home_path = home_path self.partition_count = partition_count self.shared_partition = SharedPartitions(partition_count, home_path=home_path + '/main/') self.replica = SharedPartitions(partition_count, home_path=home_path + '/replica/') - - # Start metrics server - start_http_server(9000) - self.disk_total_size = Gauge('disk_total_size', 'Total size of disk', labelnames=["provider"], _labelvalues=[ConfigManager.get_prop('datanode_name')]) - self.disk_used_size = Gauge('disk_used_size', 'Used size of disk', labelnames=["provider"], _labelvalues=[ConfigManager.get_prop('datanode_name')]) - self.push_latency = Histogram('push_latency', 'Push requests latency', labelnames=["provider"], _labelvalues=[ConfigManager.get_prop('datanode_name')]) - self.pull_latency = Histogram('pull_latency', 'Pull requests latency', labelnames=["provider"], _labelvalues=[ConfigManager.get_prop('datanode_name')]) - self.ack_latency = Histogram('ack_latency', 'Ack requests latency', labelnames=["provider"], _labelvalues=[ConfigManager.get_prop('datanode_name')]) - self.push_throughput = Histogram('push_throughput', 'Push throughput', labelnames=["provider"], _labelvalues=[ConfigManager.get_prop('datanode_name')]) - self.pull_throughput = Histogram('pull_throughput', 'Pull throughput', labelnames=["provider"], _labelvalues=[ConfigManager.get_prop('datanode_name')]) - self.ack_throughput = Histogram('ack_throughput', 'Ack throughput', labelnames=["provider"], _labelvalues=[ConfigManager.get_prop('datanode_name')]) - - - # @inc_message_count + + self.disk_total_size = disk_total_size + self.disk_used_size = disk_used_size + self.push_latency = push_latency + self.pull_latency = pull_latency + self.ack_latency = ack_latency + self.push_throughput = push_throughput + self.pull_throughput = pull_throughput + self.ack_throughput = ack_throughput + + # @inc_message_count + def Push(self, request, context): self.push_throughput.observe(1) start_time = time.time() @@ -88,6 +99,9 @@ def Push(self, request, context): self.shared_partition.push(request.message) end_time = time.time() self.push_latency.observe(end_time - start_time) + from prometheus.registry import REGISTRY + logger.info("TEST prom registry") + logger.info(list(REGISTRY.collect())) return empty_pb2.Empty() def Pull(self, request, context): @@ -176,7 +190,7 @@ def GetRemainingMessagesCount(self, request, context): return res except grpc.RpcError as e: logger.exception(f"Error in getting remaining messages count: {e}") - + def submit_disk_metrics(self): path = ConfigManager.get_prop('partition_home_path') st = os.statvfs(path) @@ -194,7 +208,23 @@ def push_to_partition(partition_index: int, def serve(): # Start metrics server - # start_http_server(9000) + start_http_server(9000) + disk_total_size = Gauge('disk_total_size', 'Total size of disk', labelnames=["provider"], + _labelvalues=[ConfigManager.get_prop('datanode_name')]) + disk_used_size = Gauge('disk_used_size', 'Used size of disk', labelnames=["provider"], + _labelvalues=[ConfigManager.get_prop('datanode_name')]) + push_latency = Histogram('push_latency', 'Push requests latency', labelnames=["provider"], + _labelvalues=[ConfigManager.get_prop('datanode_name')]) + pull_latency = Histogram('pull_latency', 'Pull requests latency', labelnames=["provider"], + _labelvalues=[ConfigManager.get_prop('datanode_name')]) + ack_latency = Histogram('ack_latency', 'Ack requests latency', labelnames=["provider"], + _labelvalues=[ConfigManager.get_prop('datanode_name')]) + push_throughput = Histogram('push_throughput', 'Push throughput', labelnames=["provider"], + _labelvalues=[ConfigManager.get_prop('datanode_name')]) + pull_throughput = Histogram('pull_throughput', 'Pull throughput', labelnames=["provider"], + _labelvalues=[ConfigManager.get_prop('datanode_name')]) + ack_throughput = Histogram('ack_throughput', 'Ack throughput', labelnames=["provider"], + _labelvalues=[ConfigManager.get_prop('datanode_name')]) port = ConfigManager.get_prop('server_port') partitions_count = int(ConfigManager.get_prop('partition_count')) @@ -206,7 +236,21 @@ def serve(): datanode_name = ConfigManager.get_prop('datanode_name') server = grpc.server(futures.ThreadPoolExecutor(max_workers=50)) - datanode_pb2_grpc.add_DataNodeServicer_to_server(DataNode(partitions_count, home_path), server) + + datanode = DataNode( + disk_total_size, + disk_used_size, + push_latency, + pull_latency, + ack_latency, + push_throughput, + pull_throughput, + ack_throughput, + partitions_count, + home_path, + ) + + datanode_pb2_grpc.add_DataNodeServicer_to_server(datanode, server) server.add_insecure_port('[::]:' + port) server.start()