Skip to content

Commit

Permalink
Metrics datanode (#30)
Browse files Browse the repository at this point in the history
* basic metrics for datanode

* basic metrics

* basic metrics

* pull/push per sec

* throughput added

* seems done
  • Loading branch information
pooyay authored Feb 11, 2024
1 parent 5cfd4c1 commit c34c700
Showing 1 changed file with 64 additions and 0 deletions.
64 changes: 64 additions & 0 deletions datanode/src/datanode_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,55 @@
from shared_partition import clear_path
from loguru import logger

from prometheus_client import Counter, Gauge, Summary, Histogram, generate_latest, REGISTRY, start_http_server
import os
import time

disk_total_size = Gauge('disk_total_size', 'Total size of disk', labelnames=["provider"], _labelvalues=[ConfigManager.get_prop('datanode_name')])
disk_used_size = Gauge('disk_used_size', 'Used size of disk', labelnames=["provider"], _labelvalues=[ConfigManager.get_prop('datanode_name')])
message_count = Gauge('message_count', 'Number of messages in datanode', labelnames=["provider"], _labelvalues=[ConfigManager.get_prop('datanode_name')])
push_per_sec = Histogram('push_per_sec', 'Number of pushes in second', labelnames=["provider"], _labelvalues=[ConfigManager.get_prop('datanode_name')])
pull_per_sec = Histogram('pull_per_sec', 'Number of pulls in second', labelnames=["provider"], _labelvalues=[ConfigManager.get_prop('datanode_name')])
throughput = Histogram('disk_total_size', 'Total size of disk', labelnames=["provider"], _labelvalues=[ConfigManager.get_prop('datanode_name')])


# DECORATORS{
def inc_message_count(func):
def wrapper(*args, **kwargs):
result = func(*args, **kwargs)
message_count.labels(provider=ConfigManager.get_prop('datanode_name')).inc()
return result
return wrapper


def dec_message_count(func):
def wrapper(*args, **kwargs):
result = func(*args, **kwargs)
message_count.labels(provider=ConfigManager.get_prop('datanode_name')).dec()
return result
return wrapper


def set_message_count(func):
def wrapper(*args, **kwargs):
result = func(*args, **kwargs)
message_count.labels(provider=ConfigManager.get_prop('datanode_name')).set(result)
return result
return wrapper


def get_disk_info_decorator(func):
def wrapper():
result = func()
path = ConfigManager.get_prop('partition_home_path')
st = os.statvfs(path)
total = st.f_blocks * st.f_frsize
used = (st.f_blocks - st.f_bfree) * st.f_frsize
disk_total_size.labels(provider=ConfigManager.get_prop('datanode_name')).set(total)
disk_total_size.labels(provider=ConfigManager.get_prop('datanode_name')).set(used)
return result
return wrapper


class DataNode(datanode_pb2_grpc.DataNodeServicer):
def __init__(self, partition_count=1, home_path='datanode/server/'):
Expand All @@ -19,19 +68,28 @@ def __init__(self, partition_count=1, home_path='datanode/server/'):
self.replica = SharedPartitions(partition_count, home_path=home_path + '/replica/')


@inc_message_count
def Push(self, request, context):
throughput.observe(1)
start_time = time.time()
logger.info(f"received a push message: {request.message}")
if request.is_replica:
self.replica.push(request.message)
else:
self.shared_partition.push(request.message)
end_time = time.time()
push_per_sec.observe(end_time - start_time)
return empty_pb2.Empty()

def Pull(self, request, context):
try:
throughput.observe(1)
start_time = time.time()
logger.info(f"received a pull message: {request}")
message = self.shared_partition.pull()
response = datanode_pb2.PullResponse(message=message)
end_time = time.time()
push_per_sec.observe(end_time - start_time)
return response
except grpc.RpcError as e:
logger.exception(e)
Expand Down Expand Up @@ -77,8 +135,10 @@ def PurgeReplicaData(self, request, context):
home_path=self.home_path + '/replica/')
return empty_pb2.Empty()

@dec_message_count
def AcknowledgePull(self, request, context):
try:
throughput.observe(1)
key = request.key
logger.info(f"received an acknowledge message: {key}")
if request.is_replica:
Expand All @@ -95,6 +155,8 @@ def IsHealthy(self, request, context):
except grpc.RpcError as e:
logger.exception(f"Error in acknowledging. {e}")

@get_disk_info_decorator
@set_message_count
def GetRemainingMessagesCount(self, request, context):
try:
count = self.shared_partition.get_remaining_messages_count()
Expand All @@ -111,6 +173,8 @@ def push_to_partition(partition_index: int,


def serve():
start_http_server(9000)

port = ConfigManager.get_prop('server_port')
partitions_count = int(ConfigManager.get_prop('partition_count'))
home_path = ConfigManager.get_prop('partition_home_path')
Expand Down

0 comments on commit c34c700

Please sign in to comment.