Skip to content

Commit

Permalink
Test datanode metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
kysre committed Feb 11, 2024
1 parent 414a26c commit fa9c936
Showing 1 changed file with 62 additions and 19 deletions.
81 changes: 62 additions & 19 deletions datanode/src/datanode_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@
from shared_partition import clear_path
from loguru import logger

from prometheus_client import Counter, Gauge, Summary, Histogram, generate_latest, REGISTRY, start_http_server
from prometheus_client import Gauge, Histogram, start_http_server
import os
import time


# disk_total_size = Gauge('disk_total_size', 'Total size of disk', labelnames=["provider"], _labelvalues=[ConfigManager.get_prop('datanode_name')])
# disk_used_size = Gauge('disk_used_size', 'Used size of disk', labelnames=["provider"], _labelvalues=[ConfigManager.get_prop('datanode_name')])
# message_count = Gauge('message_count', 'Number of messages in datanode', labelnames=["provider"], _labelvalues=[ConfigManager.get_prop('datanode_name')])
Expand Down Expand Up @@ -59,25 +60,35 @@


class DataNode(datanode_pb2_grpc.DataNodeServicer):
def __init__(self, partition_count=1, home_path='datanode/server/'):
def __init__(
self,
disk_total_size,
disk_used_size,
push_latency,
pull_latency,
ack_latency,
push_throughput,
pull_throughput,
ack_throughput,
partition_count=1,
home_path='datanode/server/',
):
self.home_path = home_path
self.partition_count = partition_count
self.shared_partition = SharedPartitions(partition_count, home_path=home_path + '/main/')
self.replica = SharedPartitions(partition_count, home_path=home_path + '/replica/')

# Start metrics server
start_http_server(9000)
self.disk_total_size = Gauge('disk_total_size', 'Total size of disk', labelnames=["provider"], _labelvalues=[ConfigManager.get_prop('datanode_name')])
self.disk_used_size = Gauge('disk_used_size', 'Used size of disk', labelnames=["provider"], _labelvalues=[ConfigManager.get_prop('datanode_name')])
self.push_latency = Histogram('push_latency', 'Push requests latency', labelnames=["provider"], _labelvalues=[ConfigManager.get_prop('datanode_name')])
self.pull_latency = Histogram('pull_latency', 'Pull requests latency', labelnames=["provider"], _labelvalues=[ConfigManager.get_prop('datanode_name')])
self.ack_latency = Histogram('ack_latency', 'Ack requests latency', labelnames=["provider"], _labelvalues=[ConfigManager.get_prop('datanode_name')])
self.push_throughput = Histogram('push_throughput', 'Push throughput', labelnames=["provider"], _labelvalues=[ConfigManager.get_prop('datanode_name')])
self.pull_throughput = Histogram('pull_throughput', 'Pull throughput', labelnames=["provider"], _labelvalues=[ConfigManager.get_prop('datanode_name')])
self.ack_throughput = Histogram('ack_throughput', 'Ack throughput', labelnames=["provider"], _labelvalues=[ConfigManager.get_prop('datanode_name')])


# @inc_message_count

self.disk_total_size = disk_total_size
self.disk_used_size = disk_used_size
self.push_latency = push_latency
self.pull_latency = pull_latency
self.ack_latency = ack_latency
self.push_throughput = push_throughput
self.pull_throughput = pull_throughput
self.ack_throughput = ack_throughput

# @inc_message_count

def Push(self, request, context):
self.push_throughput.observe(1)
start_time = time.time()
Expand All @@ -88,6 +99,9 @@ def Push(self, request, context):
self.shared_partition.push(request.message)
end_time = time.time()
self.push_latency.observe(end_time - start_time)
from prometheus_client.registry import REGISTRY
logger.info("TEST prom registry")
logger.info(list(REGISTRY.collect()))
return empty_pb2.Empty()

def Pull(self, request, context):
Expand Down Expand Up @@ -176,7 +190,7 @@ def GetRemainingMessagesCount(self, request, context):
return res
except grpc.RpcError as e:
logger.exception(f"Error in getting remaining messages count: {e}")

def submit_disk_metrics(self):
path = ConfigManager.get_prop('partition_home_path')
st = os.statvfs(path)
Expand All @@ -194,7 +208,22 @@ def push_to_partition(partition_index: int,

def serve():
# Start metrics server
# start_http_server(9000)
from prometheus_client.registry import REGISTRY
start_http_server(9000, registry=REGISTRY)
disk_total_size = Gauge('disk_total_size', 'Total size of disk', labelnames=["provider"],
_labelvalues=[ConfigManager.get_prop('datanode_name')], registry=REGISTRY)
disk_used_size = Gauge('disk_used_size', 'Used size of disk', labelnames=["provider"],
_labelvalues=[ConfigManager.get_prop('datanode_name')], registry=REGISTRY)
push_latency = Histogram('push_latency', 'Push requests latency', labelnames=["provider"],
_labelvalues=[ConfigManager.get_prop('datanode_name')], registry=REGISTRY)
pull_latency = Histogram('pull_latency', 'Pull requests latency', registry=REGISTRY)
ack_latency = Histogram('ack_latency', 'Ack requests latency')
push_throughput = Histogram('push_throughput', 'Push throughput', labelnames=["provider"],
_labelvalues=[ConfigManager.get_prop('datanode_name')], registry=REGISTRY)
pull_throughput = Histogram('pull_throughput', 'Pull throughput', labelnames=["provider"],
_labelvalues=[ConfigManager.get_prop('datanode_name')], registry=REGISTRY)
ack_throughput = Histogram('ack_throughput', 'Ack throughput', labelnames=["provider"],
_labelvalues=[ConfigManager.get_prop('datanode_name')], registry=REGISTRY)

port = ConfigManager.get_prop('server_port')
partitions_count = int(ConfigManager.get_prop('partition_count'))
Expand All @@ -206,7 +235,21 @@ def serve():
datanode_name = ConfigManager.get_prop('datanode_name')

server = grpc.server(futures.ThreadPoolExecutor(max_workers=50))
datanode_pb2_grpc.add_DataNodeServicer_to_server(DataNode(partitions_count, home_path), server)

datanode = DataNode(
disk_total_size,
disk_used_size,
push_latency,
pull_latency,
ack_latency,
push_throughput,
pull_throughput,
ack_throughput,
partitions_count,
home_path,
)

datanode_pb2_grpc.add_DataNodeServicer_to_server(datanode, server)

server.add_insecure_port('[::]:' + port)
server.start()
Expand Down

0 comments on commit fa9c936

Please sign in to comment.