Skip to content

Commit

Permalink
datanode metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
pooyay committed Feb 10, 2024
1 parent 4560f28 commit 31dbcf4
Showing 1 changed file with 33 additions and 0 deletions.
33 changes: 33 additions & 0 deletions datanode/src/datanode_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,43 @@
from shared_partition import clear_path
from loguru import logger

from prometheus_client import Counter, Gauge, Summary, Histogram, generate_latest, REGISTRY

disk_total_size = Gauge('disk_total_size', 'Total size of disk')
disk_used_size = Gauge('disk_used_size', 'Used size of disk')
message_count = Gauge('message_count', 'Number of messages in datanode')
push_per_sec = Histogram('push_per_sec', 'Number of pushes in second')
pull_per_sec = Histogram('pull_per_sec', 'Number of pulls in second')


# DECORATORS{
def inc_message_count(func):
def wrapper(*args, **kwargs):
message_count.inc()
func(*args, **kwargs)
return wrapper()


def dec_message_count(func):
def wrapper(*args, **kwargs):
message_count.dec()
func(*args, **kwargs)
return wrapper()


def set_message_count(func):
def wrapper(*args, **kwargs):
result = func(*args, **kwargs)
message_count.set(result)
return wrapper()


class DataNode(datanode_pb2_grpc.DataNodeServicer):
def __init__(self, partition_count=1, home_path='datanode/server/'):
self.shared_partition = SharedPartitions(partition_count, home_path=home_path + 'main/')
self.replica = SharedPartitions(partition_count, home_path=home_path + 'replica/')

@inc_message_count
def Push(self, request, context):
logger.info(f"received a push message: {request.message}")
if request.is_replica:
Expand Down Expand Up @@ -59,6 +90,7 @@ def ReadPartition(self, request, context):
except Exception as e:
logger.exception(e)

@dec_message_count
def AcknowledgePull(self, request, context):
try:
key = request.key
Expand All @@ -74,6 +106,7 @@ def IsHealthy(self, request, context):
except grpc.RpcError as e:
logger.exception(f"Error in acknowledging. {e}")

@set_message_count
def GetRemainingMessagesCount(self, request, context):
try:
count = self.shared_partition.get_remaining_messages_count()
Expand Down

0 comments on commit 31dbcf4

Please sign in to comment.