Skip to content

Commit

Permalink
Refactor metrics & add new ones
Browse files Browse the repository at this point in the history
  • Loading branch information
kysre committed Feb 11, 2024
1 parent cc346fd commit 4e3d21a
Showing 1 changed file with 14 additions and 8 deletions.
22 changes: 14 additions & 8 deletions datanode/src/datanode_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@
disk_total_size = Gauge('disk_total_size', 'Total size of disk', labelnames=["provider"], _labelvalues=[ConfigManager.get_prop('datanode_name')])
disk_used_size = Gauge('disk_used_size', 'Used size of disk', labelnames=["provider"], _labelvalues=[ConfigManager.get_prop('datanode_name')])
message_count = Gauge('message_count', 'Number of messages in datanode', labelnames=["provider"], _labelvalues=[ConfigManager.get_prop('datanode_name')])
push_per_sec = Histogram('push_per_sec', 'Number of pushes in second', labelnames=["provider"], _labelvalues=[ConfigManager.get_prop('datanode_name')])
pull_per_sec = Histogram('pull_per_sec', 'Number of pulls in second', labelnames=["provider"], _labelvalues=[ConfigManager.get_prop('datanode_name')])
throughput = Histogram('disk_total_size', 'Total size of disk', labelnames=["provider"], _labelvalues=[ConfigManager.get_prop('datanode_name')])
push_latency = Histogram('push_latency', 'Push requests latency', labelnames=["provider"], _labelvalues=[ConfigManager.get_prop('datanode_name')])
pull_latency = Histogram('pull_latency', 'Pull requests latency', labelnames=["provider"], _labelvalues=[ConfigManager.get_prop('datanode_name')])
ack_latency = Histogram('ack_latency', 'Ack requests latency', labelnames=["provider"], _labelvalues=[ConfigManager.get_prop('datanode_name')])
push_throughput = Histogram('push_throughput', 'Push throughput', labelnames=["provider"], _labelvalues=[ConfigManager.get_prop('datanode_name')])
pull_throughput = Histogram('pull_throughput', 'Pull throughput', labelnames=["provider"], _labelvalues=[ConfigManager.get_prop('datanode_name')])
ack_throughput = Histogram('ack_throughput', 'Ack throughput', labelnames=["provider"], _labelvalues=[ConfigManager.get_prop('datanode_name')])


def inc_message_count(func):
Expand Down Expand Up @@ -67,26 +70,26 @@ def __init__(self, partition_count=1, home_path='datanode/server/'):

@inc_message_count
def Push(self, request, context):
throughput.observe(1)
push_throughput.observe(1)
start_time = time.time()
logger.info(f"received a push message: {request.message}")
if request.is_replica:
self.replica.push(request.message)
else:
self.shared_partition.push(request.message)
end_time = time.time()
push_per_sec.observe(end_time - start_time)
push_latency.observe(end_time - start_time)
return empty_pb2.Empty()

def Pull(self, request, context):
try:
throughput.observe(1)
pull_throughput.observe(1)
start_time = time.time()
logger.info(f"received a pull message: {request}")
message = self.shared_partition.pull()
response = datanode_pb2.PullResponse(message=message)
end_time = time.time()
push_per_sec.observe(end_time - start_time)
pull_latency.observe(end_time - start_time)
return response
except grpc.RpcError as e:
logger.exception(e)
Expand Down Expand Up @@ -135,13 +138,16 @@ def PurgeReplicaData(self, request, context):
@dec_message_count
def AcknowledgePull(self, request, context):
try:
throughput.observe(1)
ack_throughput.observe(1)
start_time = time.time()
key = request.key
logger.info(f"received an acknowledge message: {key}")
if request.is_replica:
self.replica.acknowledge(key)
else:
self.shared_partition.acknowledge(key)
end_time = time.time()
ack_latency.observe(end_time - start_time)
return empty_pb2.Empty()
except grpc.RpcError as e:
logger.exception(f"Error in acknowledging. {e}")
Expand Down

0 comments on commit 4e3d21a

Please sign in to comment.