From 9336561b011f127b9ad67c00c5c1faed23840ab6 Mon Sep 17 00:00:00 2001 From: kasra Date: Sun, 11 Feb 2024 18:11:03 +0330 Subject: [PATCH] Test init metrics in DataNode class --- datanode/Dockerfile | 1 + datanode/src/datanode_server.py | 121 ++++++++++++++++++-------------- 2 files changed, 70 insertions(+), 52 deletions(-) diff --git a/datanode/Dockerfile b/datanode/Dockerfile index bcfb8e2..bbe690d 100644 --- a/datanode/Dockerfile +++ b/datanode/Dockerfile @@ -8,6 +8,7 @@ RUN pip install -r requirements.txt COPY /datanode/src src/ EXPOSE 8000 +EXPOSE 9000 WORKDIR /usr/src/app/src diff --git a/datanode/src/datanode_server.py b/datanode/src/datanode_server.py index 83779f2..c39f68b 100644 --- a/datanode/src/datanode_server.py +++ b/datanode/src/datanode_server.py @@ -14,48 +14,48 @@ import os import time -disk_total_size = Gauge('disk_total_size', 'Total size of disk', labelnames=["provider"], _labelvalues=[ConfigManager.get_prop('datanode_name')]) -disk_used_size = Gauge('disk_used_size', 'Used size of disk', labelnames=["provider"], _labelvalues=[ConfigManager.get_prop('datanode_name')]) -message_count = Gauge('message_count', 'Number of messages in datanode', labelnames=["provider"], _labelvalues=[ConfigManager.get_prop('datanode_name')]) -push_latency = Histogram('push_latency', 'Push requests latency', labelnames=["provider"], _labelvalues=[ConfigManager.get_prop('datanode_name')]) -pull_latency = Histogram('pull_latency', 'Pull requests latency', labelnames=["provider"], _labelvalues=[ConfigManager.get_prop('datanode_name')]) -ack_latency = Histogram('ack_latency', 'Ack requests latency', labelnames=["provider"], _labelvalues=[ConfigManager.get_prop('datanode_name')]) -push_throughput = Histogram('push_throughput', 'Push throughput', labelnames=["provider"], _labelvalues=[ConfigManager.get_prop('datanode_name')]) -pull_throughput = Histogram('pull_throughput', 'Pull throughput', labelnames=["provider"], _labelvalues=[ConfigManager.get_prop('datanode_name')]) -ack_throughput = Histogram('ack_throughput', 'Ack throughput', labelnames=["provider"], _labelvalues=[ConfigManager.get_prop('datanode_name')]) - - -def inc_message_count(func): - def wrapper(*args, **kwargs): - result = func(*args, **kwargs) - message_count.inc() - return result - return wrapper - - -def dec_message_count(func): - def wrapper(*args, **kwargs): - result = func(*args, **kwargs) - message_count.dec() - return result - return wrapper - - -def set_message_count(func): - def wrapper(*args, **kwargs): - result = func(*args, **kwargs) - message_count.set(result.remaining_messages_count) - return result - return wrapper - - -def submit_disk_metrics(): - path = ConfigManager.get_prop('partition_home_path') - st = os.statvfs(path) - total = st.f_blocks * st.f_frsize - used = (st.f_blocks - st.f_bfree) * st.f_frsize - disk_total_size.set(total) - disk_total_size.set(used) +# disk_total_size = Gauge('disk_total_size', 'Total size of disk', labelnames=["provider"], _labelvalues=[ConfigManager.get_prop('datanode_name')]) +# disk_used_size = Gauge('disk_used_size', 'Used size of disk', labelnames=["provider"], _labelvalues=[ConfigManager.get_prop('datanode_name')]) +# message_count = Gauge('message_count', 'Number of messages in datanode', labelnames=["provider"], _labelvalues=[ConfigManager.get_prop('datanode_name')]) +# push_latency = Histogram('push_latency', 'Push requests latency', labelnames=["provider"], _labelvalues=[ConfigManager.get_prop('datanode_name')]) +# pull_latency = Histogram('pull_latency', 'Pull requests latency', labelnames=["provider"], _labelvalues=[ConfigManager.get_prop('datanode_name')]) +# ack_latency = Histogram('ack_latency', 'Ack requests latency', labelnames=["provider"], _labelvalues=[ConfigManager.get_prop('datanode_name')]) +# push_throughput = Histogram('push_throughput', 'Push throughput', labelnames=["provider"], _labelvalues=[ConfigManager.get_prop('datanode_name')]) +# pull_throughput = Histogram('pull_throughput', 'Pull throughput', labelnames=["provider"], _labelvalues=[ConfigManager.get_prop('datanode_name')]) +# ack_throughput = Histogram('ack_throughput', 'Ack throughput', labelnames=["provider"], _labelvalues=[ConfigManager.get_prop('datanode_name')]) + + +# def inc_message_count(func): +# def wrapper(*args, **kwargs): +# result = func(*args, **kwargs) +# message_count.inc() +# return result +# return wrapper + + +# def dec_message_count(func): +# def wrapper(*args, **kwargs): +# result = func(*args, **kwargs) +# message_count.dec() +# return result +# return wrapper + + +# def set_message_count(func): +# def wrapper(*args, **kwargs): +# result = func(*args, **kwargs) +# message_count.set(result.remaining_messages_count) +# return result +# return wrapper + + +# def submit_disk_metrics(): +# path = ConfigManager.get_prop('partition_home_path') +# st = os.statvfs(path) +# total = st.f_blocks * st.f_frsize +# used = (st.f_blocks - st.f_bfree) * st.f_frsize +# disk_total_size.set(total) +# disk_total_size.set(used) class DataNode(datanode_pb2_grpc.DataNodeServicer): @@ -65,10 +65,19 @@ def __init__(self, partition_count=1, home_path='datanode/server/'): self.shared_partition = SharedPartitions(partition_count, home_path=home_path + '/main/') self.replica = SharedPartitions(partition_count, home_path=home_path + '/replica/') + self.disk_total_size = Gauge('disk_total_size', 'Total size of disk', labelnames=["provider"], _labelvalues=[ConfigManager.get_prop('datanode_name')]) + self.disk_used_size = Gauge('disk_used_size', 'Used size of disk', labelnames=["provider"], _labelvalues=[ConfigManager.get_prop('datanode_name')]) + self.push_latency = Histogram('push_latency', 'Push requests latency', labelnames=["provider"], _labelvalues=[ConfigManager.get_prop('datanode_name')]) + self.pull_latency = Histogram('pull_latency', 'Pull requests latency', labelnames=["provider"], _labelvalues=[ConfigManager.get_prop('datanode_name')]) + self.ack_latency = Histogram('ack_latency', 'Ack requests latency', labelnames=["provider"], _labelvalues=[ConfigManager.get_prop('datanode_name')]) + self.push_throughput = Histogram('push_throughput', 'Push throughput', labelnames=["provider"], _labelvalues=[ConfigManager.get_prop('datanode_name')]) + self.pull_throughput = Histogram('pull_throughput', 'Pull throughput', labelnames=["provider"], _labelvalues=[ConfigManager.get_prop('datanode_name')]) + self.ack_throughput = Histogram('ack_throughput', 'Ack throughput', labelnames=["provider"], _labelvalues=[ConfigManager.get_prop('datanode_name')]) - @inc_message_count + + # @inc_message_count def Push(self, request, context): - push_throughput.observe(1) + self.push_throughput.observe(1) start_time = time.time() logger.info(f"received a push message: {request.message}") if request.is_replica: @@ -76,18 +85,18 @@ def Push(self, request, context): else: self.shared_partition.push(request.message) end_time = time.time() - push_latency.observe(end_time - start_time) + self.push_latency.observe(end_time - start_time) return empty_pb2.Empty() def Pull(self, request, context): try: - pull_throughput.observe(1) + self.pull_throughput.observe(1) start_time = time.time() logger.info(f"received a pull message: {request}") message = self.shared_partition.pull() response = datanode_pb2.PullResponse(message=message) end_time = time.time() - pull_latency.observe(end_time - start_time) + self.pull_latency.observe(end_time - start_time) return response except grpc.RpcError as e: logger.exception(e) @@ -133,10 +142,10 @@ def PurgeReplicaData(self, request, context): home_path=self.home_path + '/replica/') return empty_pb2.Empty() - @dec_message_count + # @dec_message_count def AcknowledgePull(self, request, context): try: - ack_throughput.observe(1) + self.ack_throughput.observe(1) start_time = time.time() key = request.key logger.info(f"received an acknowledge message: {key}") @@ -145,7 +154,7 @@ def AcknowledgePull(self, request, context): else: self.shared_partition.acknowledge(key) end_time = time.time() - ack_latency.observe(end_time - start_time) + self.ack_latency.observe(end_time - start_time) return empty_pb2.Empty() except grpc.RpcError as e: logger.exception(f"Error in acknowledging. {e}") @@ -156,15 +165,23 @@ def IsHealthy(self, request, context): except grpc.RpcError as e: logger.exception(f"Error in acknowledging. {e}") - @set_message_count + # @set_message_count def GetRemainingMessagesCount(self, request, context): - submit_disk_metrics() + self.submit_disk_metrics() try: count = self.shared_partition.get_remaining_messages_count() res = datanode_pb2.GetRemainingMessagesCountResponse(remaining_messages_count=count) return res except grpc.RpcError as e: logger.exception(f"Error in getting remaining messages count: {e}") + + def submit_disk_metrics(self): + path = ConfigManager.get_prop('partition_home_path') + st = os.statvfs(path) + total = st.f_blocks * st.f_frsize + used = (st.f_blocks - st.f_bfree) * st.f_frsize + self.disk_total_size.set(total) + self.disk_total_size.set(used) def push_to_partition(partition_index: int,