Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add prometheus-client to datanode deps #35

Merged
merged 9 commits into from
Feb 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions datanode/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ grpcio-tools==1.60.0
python-decouple
loguru==0.7.2
protobuf==4.25.2
prometheus-client==0.19.0
66 changes: 36 additions & 30 deletions datanode/src/datanode_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,82 +14,83 @@
import os
import time

disk_total_size = Gauge('disk_total_size', 'Total size of disk', labelnames=["provider"], _labelvalues=[ConfigManager.get_prop('datanode_name')])
disk_used_size = Gauge('disk_used_size', 'Used size of disk', labelnames=["provider"], _labelvalues=[ConfigManager.get_prop('datanode_name')])
message_count = Gauge('message_count', 'Number of messages in datanode', labelnames=["provider"], _labelvalues=[ConfigManager.get_prop('datanode_name')])
push_per_sec = Histogram('push_per_sec', 'Number of pushes in second', labelnames=["provider"], _labelvalues=[ConfigManager.get_prop('datanode_name')])
pull_per_sec = Histogram('pull_per_sec', 'Number of pulls in second', labelnames=["provider"], _labelvalues=[ConfigManager.get_prop('datanode_name')])
throughput = Histogram('disk_total_size', 'Total size of disk', labelnames=["provider"], _labelvalues=[ConfigManager.get_prop('datanode_name')])
DISK_TOTAL_SIZE = Gauge('disk_total_size', 'Total size of disk', labelnames=["provider"])
DISK_USED_SIZE = Gauge('disk_used_size', 'Used size of disk', labelnames=["provider"])
MESSAGE_COUNT = Gauge('message_count', 'Number of messages in datanode', labelnames=["provider"])
PUSH_LATENCY = Histogram('push_latency', 'Push requests latency', labelnames=["provider"])
PULL_LATENCY = Histogram('pull_latency', 'Pull requests latency', labelnames=["provider"])
ACK_LATENCY = Histogram('ack_latency', 'Ack requests latency', labelnames=["provider"])
PUSH_THROUGHPUT = Histogram('push_throughput', 'Push throughput', labelnames=["provider"])
PULL_THROUGHPUT = Histogram('pull_throughput', 'Pull throughput', labelnames=["provider"])
ACK_THROUGHPUT = Histogram('ack_throughput', 'Ack throughput', labelnames=["provider"])


# DECORATORS{
def inc_message_count(func):
def wrapper(*args, **kwargs):
result = func(*args, **kwargs)
message_count.labels(provider=ConfigManager.get_prop('datanode_name')).inc()
MESSAGE_COUNT.labels(provider=ConfigManager.get_prop('datanode_name')).inc()
return result

return wrapper


def dec_message_count(func):
def wrapper(*args, **kwargs):
result = func(*args, **kwargs)
message_count.labels(provider=ConfigManager.get_prop('datanode_name')).dec()
MESSAGE_COUNT.labels(provider=ConfigManager.get_prop('datanode_name')).dec()
return result

return wrapper


def set_message_count(func):
def wrapper(*args, **kwargs):
result = func(*args, **kwargs)
message_count.labels(provider=ConfigManager.get_prop('datanode_name')).set(result)
MESSAGE_COUNT.labels(provider=ConfigManager.get_prop('datanode_name')).set(result.remaining_messages_count)
return result

return wrapper


def get_disk_info_decorator(func):
def wrapper():
result = func()
path = ConfigManager.get_prop('partition_home_path')
st = os.statvfs(path)
total = st.f_blocks * st.f_frsize
used = (st.f_blocks - st.f_bfree) * st.f_frsize
disk_total_size.labels(provider=ConfigManager.get_prop('datanode_name')).set(total)
disk_total_size.labels(provider=ConfigManager.get_prop('datanode_name')).set(used)
return result
return wrapper
def submit_disk_metrics():
path = ConfigManager.get_prop('partition_home_path')
st = os.statvfs(path)
total = st.f_blocks * st.f_frsize
used = (st.f_blocks - st.f_bfree) * st.f_frsize
DISK_TOTAL_SIZE.labels(provider=ConfigManager.get_prop('datanode_name')).set(total)
DISK_USED_SIZE.labels(provider=ConfigManager.get_prop('datanode_name')).set(used)


class DataNode(datanode_pb2_grpc.DataNodeServicer):
def __init__(self, partition_count=1, home_path='datanode/server/'):
def __init__(self, partition_count=1, home_path='datanode/server/', metrics_provider='datanode'):
self.metrics_provider = metrics_provider
self.home_path = home_path
self.partition_count = partition_count
self.shared_partition = SharedPartitions(partition_count, home_path=home_path + '/main/')
self.replica = SharedPartitions(partition_count, home_path=home_path + '/replica/')


@inc_message_count
def Push(self, request, context):
throughput.observe(1)
PUSH_THROUGHPUT.labels(provider=self.metrics_provider).observe(1)
start_time = time.time()
logger.info(f"received a push message: {request.message}")
if request.is_replica:
self.replica.push(request.message)
else:
self.shared_partition.push(request.message)
end_time = time.time()
push_per_sec.observe(end_time - start_time)
PUSH_LATENCY.labels(provider=self.metrics_provider).observe(end_time - start_time)
return empty_pb2.Empty()

def Pull(self, request, context):
try:
throughput.observe(1)
PULL_THROUGHPUT.labels(provider=self.metrics_provider).observe(1)
start_time = time.time()
logger.info(f"received a pull message: {request}")
message = self.shared_partition.pull()
response = datanode_pb2.PullResponse(message=message)
end_time = time.time()
push_per_sec.observe(end_time - start_time)
PULL_LATENCY.labels(provider=self.metrics_provider).observe(end_time - start_time)
return response
except grpc.RpcError as e:
logger.exception(e)
Expand Down Expand Up @@ -138,13 +139,16 @@ def PurgeReplicaData(self, request, context):
@dec_message_count
def AcknowledgePull(self, request, context):
try:
throughput.observe(1)
ACK_THROUGHPUT.labels(provider=self.metrics_provider).observe(1)
start_time = time.time()
key = request.key
logger.info(f"received an acknowledge message: {key}")
if request.is_replica:
self.replica.acknowledge(key)
else:
self.shared_partition.acknowledge(key)
end_time = time.time()
ACK_LATENCY.labels(provider=self.metrics_provider).observe(end_time - start_time)
return empty_pb2.Empty()
except grpc.RpcError as e:
logger.exception(f"Error in acknowledging. {e}")
Expand All @@ -155,9 +159,9 @@ def IsHealthy(self, request, context):
except grpc.RpcError as e:
logger.exception(f"Error in acknowledging. {e}")

@get_disk_info_decorator
@set_message_count
def GetRemainingMessagesCount(self, request, context):
submit_disk_metrics()
try:
count = self.shared_partition.get_remaining_messages_count()
res = datanode_pb2.GetRemainingMessagesCountResponse(remaining_messages_count=count)
Expand All @@ -173,6 +177,7 @@ def push_to_partition(partition_index: int,


def serve():
# Start metrics server
start_http_server(9000)

port = ConfigManager.get_prop('server_port')
Expand All @@ -185,7 +190,8 @@ def serve():
datanode_name = ConfigManager.get_prop('datanode_name')

server = grpc.server(futures.ThreadPoolExecutor(max_workers=50))
datanode_pb2_grpc.add_DataNodeServicer_to_server(DataNode(partitions_count, home_path), server)
datanode = DataNode(partitions_count, home_path, datanode_name)
datanode_pb2_grpc.add_DataNodeServicer_to_server(datanode, server)

server.add_insecure_port('[::]:' + port)
server.start()
Expand Down
2 changes: 1 addition & 1 deletion leader/cmd/tasks/leader_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func (ls *LeaderSyncer) RunLeaderSync() {
select {
case <-ticker.C:
if ls.handler.IsReplicaAvailable() {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
ls.handler.AddDataNodesToReplica(ctx, ls.directory.DataNodes)
cancel()
}
Expand Down
11 changes: 6 additions & 5 deletions prometheus/prometheus.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@ scrape_configs:
metrics_path: /metrics
scheme: http
static_configs:
- targets:
- leader_0:9000
- leader_1:9000
- datanode_0:9000
- datanode_1:9000
- targets: ['leader_0:9000']
- targets: ['leader_1:9000']
- targets: ['datanode_0:9000']
- targets: ['datanode_1:9000']
- targets: ['datanode_2:9000']
- targets: ['datanode_3:9000']
21 changes: 21 additions & 0 deletions test.docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,25 @@ services:
volumes:
- datanode_2_vol:/var/lib/turtlemq/data/

datanode_3:
image: kysre/turtlemq:datanode-${DATANODE_IMAGE_TAG}
restart: unless-stopped
depends_on:
- leader_0
- leader_1
environment:
- HOME_PATH=/var/lib/turtlemq/data/
- DATANODE_NAME=datanode_3
- DATANODE_PORT=8000
- LEADER_HOST=leader_0
- LEADER_PORT=8888
- PULL_TIMEOUT=10
- PENDING_TIMEOUT=15
- CLEANER_PERIOD=3
- PARTITIONS_COUNT=100
volumes:
- datanode_3_vol:/var/lib/turtlemq/data/

volumes:
prom_data:
driver: local
Expand All @@ -119,3 +138,5 @@ volumes:
driver: local
datanode_2_vol:
driver: local
datanode_3_vol:
driver: local
Loading