Skip to content

Commit

Permalink
Test init metrics in DataNode class
Browse files Browse the repository at this point in the history
  • Loading branch information
kysre committed Feb 11, 2024
1 parent 90b3ab1 commit 9336561
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 52 deletions.
1 change: 1 addition & 0 deletions datanode/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ RUN pip install -r requirements.txt
COPY /datanode/src src/

EXPOSE 8000
EXPOSE 9000

WORKDIR /usr/src/app/src

Expand Down
121 changes: 69 additions & 52 deletions datanode/src/datanode_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,48 +14,48 @@
import os
import time

disk_total_size = Gauge('disk_total_size', 'Total size of disk', labelnames=["provider"], _labelvalues=[ConfigManager.get_prop('datanode_name')])
disk_used_size = Gauge('disk_used_size', 'Used size of disk', labelnames=["provider"], _labelvalues=[ConfigManager.get_prop('datanode_name')])
message_count = Gauge('message_count', 'Number of messages in datanode', labelnames=["provider"], _labelvalues=[ConfigManager.get_prop('datanode_name')])
push_latency = Histogram('push_latency', 'Push requests latency', labelnames=["provider"], _labelvalues=[ConfigManager.get_prop('datanode_name')])
pull_latency = Histogram('pull_latency', 'Pull requests latency', labelnames=["provider"], _labelvalues=[ConfigManager.get_prop('datanode_name')])
ack_latency = Histogram('ack_latency', 'Ack requests latency', labelnames=["provider"], _labelvalues=[ConfigManager.get_prop('datanode_name')])
push_throughput = Histogram('push_throughput', 'Push throughput', labelnames=["provider"], _labelvalues=[ConfigManager.get_prop('datanode_name')])
pull_throughput = Histogram('pull_throughput', 'Pull throughput', labelnames=["provider"], _labelvalues=[ConfigManager.get_prop('datanode_name')])
ack_throughput = Histogram('ack_throughput', 'Ack throughput', labelnames=["provider"], _labelvalues=[ConfigManager.get_prop('datanode_name')])


def inc_message_count(func):
def wrapper(*args, **kwargs):
result = func(*args, **kwargs)
message_count.inc()
return result
return wrapper


def dec_message_count(func):
def wrapper(*args, **kwargs):
result = func(*args, **kwargs)
message_count.dec()
return result
return wrapper


def set_message_count(func):
def wrapper(*args, **kwargs):
result = func(*args, **kwargs)
message_count.set(result.remaining_messages_count)
return result
return wrapper


def submit_disk_metrics():
path = ConfigManager.get_prop('partition_home_path')
st = os.statvfs(path)
total = st.f_blocks * st.f_frsize
used = (st.f_blocks - st.f_bfree) * st.f_frsize
disk_total_size.set(total)
disk_total_size.set(used)
# disk_total_size = Gauge('disk_total_size', 'Total size of disk', labelnames=["provider"], _labelvalues=[ConfigManager.get_prop('datanode_name')])
# disk_used_size = Gauge('disk_used_size', 'Used size of disk', labelnames=["provider"], _labelvalues=[ConfigManager.get_prop('datanode_name')])
# message_count = Gauge('message_count', 'Number of messages in datanode', labelnames=["provider"], _labelvalues=[ConfigManager.get_prop('datanode_name')])
# push_latency = Histogram('push_latency', 'Push requests latency', labelnames=["provider"], _labelvalues=[ConfigManager.get_prop('datanode_name')])
# pull_latency = Histogram('pull_latency', 'Pull requests latency', labelnames=["provider"], _labelvalues=[ConfigManager.get_prop('datanode_name')])
# ack_latency = Histogram('ack_latency', 'Ack requests latency', labelnames=["provider"], _labelvalues=[ConfigManager.get_prop('datanode_name')])
# push_throughput = Histogram('push_throughput', 'Push throughput', labelnames=["provider"], _labelvalues=[ConfigManager.get_prop('datanode_name')])
# pull_throughput = Histogram('pull_throughput', 'Pull throughput', labelnames=["provider"], _labelvalues=[ConfigManager.get_prop('datanode_name')])
# ack_throughput = Histogram('ack_throughput', 'Ack throughput', labelnames=["provider"], _labelvalues=[ConfigManager.get_prop('datanode_name')])


# def inc_message_count(func):
# def wrapper(*args, **kwargs):
# result = func(*args, **kwargs)
# message_count.inc()
# return result
# return wrapper


# def dec_message_count(func):
# def wrapper(*args, **kwargs):
# result = func(*args, **kwargs)
# message_count.dec()
# return result
# return wrapper


# def set_message_count(func):
# def wrapper(*args, **kwargs):
# result = func(*args, **kwargs)
# message_count.set(result.remaining_messages_count)
# return result
# return wrapper


# def submit_disk_metrics():
# path = ConfigManager.get_prop('partition_home_path')
# st = os.statvfs(path)
# total = st.f_blocks * st.f_frsize
# used = (st.f_blocks - st.f_bfree) * st.f_frsize
# disk_total_size.set(total)
# disk_total_size.set(used)


class DataNode(datanode_pb2_grpc.DataNodeServicer):
Expand All @@ -65,29 +65,38 @@ def __init__(self, partition_count=1, home_path='datanode/server/'):
self.shared_partition = SharedPartitions(partition_count, home_path=home_path + '/main/')
self.replica = SharedPartitions(partition_count, home_path=home_path + '/replica/')

self.disk_total_size = Gauge('disk_total_size', 'Total size of disk', labelnames=["provider"], _labelvalues=[ConfigManager.get_prop('datanode_name')])
self.disk_used_size = Gauge('disk_used_size', 'Used size of disk', labelnames=["provider"], _labelvalues=[ConfigManager.get_prop('datanode_name')])
self.push_latency = Histogram('push_latency', 'Push requests latency', labelnames=["provider"], _labelvalues=[ConfigManager.get_prop('datanode_name')])
self.pull_latency = Histogram('pull_latency', 'Pull requests latency', labelnames=["provider"], _labelvalues=[ConfigManager.get_prop('datanode_name')])
self.ack_latency = Histogram('ack_latency', 'Ack requests latency', labelnames=["provider"], _labelvalues=[ConfigManager.get_prop('datanode_name')])
self.push_throughput = Histogram('push_throughput', 'Push throughput', labelnames=["provider"], _labelvalues=[ConfigManager.get_prop('datanode_name')])
self.pull_throughput = Histogram('pull_throughput', 'Pull throughput', labelnames=["provider"], _labelvalues=[ConfigManager.get_prop('datanode_name')])
self.ack_throughput = Histogram('ack_throughput', 'Ack throughput', labelnames=["provider"], _labelvalues=[ConfigManager.get_prop('datanode_name')])

@inc_message_count

# @inc_message_count
def Push(self, request, context):
push_throughput.observe(1)
self.push_throughput.observe(1)
start_time = time.time()
logger.info(f"received a push message: {request.message}")
if request.is_replica:
self.replica.push(request.message)
else:
self.shared_partition.push(request.message)
end_time = time.time()
push_latency.observe(end_time - start_time)
self.push_latency.observe(end_time - start_time)
return empty_pb2.Empty()

def Pull(self, request, context):
try:
pull_throughput.observe(1)
self.pull_throughput.observe(1)
start_time = time.time()
logger.info(f"received a pull message: {request}")
message = self.shared_partition.pull()
response = datanode_pb2.PullResponse(message=message)
end_time = time.time()
pull_latency.observe(end_time - start_time)
self.pull_latency.observe(end_time - start_time)
return response
except grpc.RpcError as e:
logger.exception(e)
Expand Down Expand Up @@ -133,10 +142,10 @@ def PurgeReplicaData(self, request, context):
home_path=self.home_path + '/replica/')
return empty_pb2.Empty()

@dec_message_count
# @dec_message_count
def AcknowledgePull(self, request, context):
try:
ack_throughput.observe(1)
self.ack_throughput.observe(1)
start_time = time.time()
key = request.key
logger.info(f"received an acknowledge message: {key}")
Expand All @@ -145,7 +154,7 @@ def AcknowledgePull(self, request, context):
else:
self.shared_partition.acknowledge(key)
end_time = time.time()
ack_latency.observe(end_time - start_time)
self.ack_latency.observe(end_time - start_time)
return empty_pb2.Empty()
except grpc.RpcError as e:
logger.exception(f"Error in acknowledging. {e}")
Expand All @@ -156,15 +165,23 @@ def IsHealthy(self, request, context):
except grpc.RpcError as e:
logger.exception(f"Error in acknowledging. {e}")

@set_message_count
# @set_message_count
def GetRemainingMessagesCount(self, request, context):
submit_disk_metrics()
self.submit_disk_metrics()
try:
count = self.shared_partition.get_remaining_messages_count()
res = datanode_pb2.GetRemainingMessagesCountResponse(remaining_messages_count=count)
return res
except grpc.RpcError as e:
logger.exception(f"Error in getting remaining messages count: {e}")

def submit_disk_metrics(self):
path = ConfigManager.get_prop('partition_home_path')
st = os.statvfs(path)
total = st.f_blocks * st.f_frsize
used = (st.f_blocks - st.f_bfree) * st.f_frsize
self.disk_total_size.set(total)
self.disk_total_size.set(used)


def push_to_partition(partition_index: int,
Expand Down

0 comments on commit 9336561

Please sign in to comment.