From 4560f28249edcda52aaef232bbf98ddf34c52d61 Mon Sep 17 00:00:00 2001 From: Kasra Hajian Date: Fri, 9 Feb 2024 01:21:31 +0330 Subject: [PATCH] Datanode fault tolerance (#26) * Add write replica. Add write partition. * Add client subscribe. --------- Co-authored-by: maziar --- client_py/client.py | 48 +++++++++++++------------------- client_py/datanode-server.py | 2 +- client_py/test_client.py | 27 +++++++++++++++++- datanode/src/datanode_server.py | 38 +++++++++++++++++++++++-- datanode/src/shared_partition.py | 5 ++-- 5 files changed, 86 insertions(+), 34 deletions(-) diff --git a/client_py/client.py b/client_py/client.py index 7321714..c644a5d 100644 --- a/client_py/client.py +++ b/client_py/client.py @@ -1,11 +1,12 @@ +import time from typing import List import asyncio import logging - +from threading import Thread import grpc # import google.protobuf.empty_pb2 as empty_pb2 from google.protobuf import empty_pb2 as _empty_pb2 - +from concurrent.futures import ThreadPoolExecutor from client_py import queue_pb2_grpc from client_py import queue_pb2 @@ -13,6 +14,8 @@ class QueueClient: stub = None HOST, PORT = "localhost", "8000" + SUBSCRIBE_WORKERS = 3 + SUBSCRIBE_SLEEP_TIMEOUT = 2 @classmethod def get_stub(cls, host: str, port: str): @@ -33,7 +36,7 @@ def push(self, key: str, value: bytes): def pull(self) -> (str, bytes): try: stub = self.get_stub(self.HOST, self.PORT) - response = stub.Pull(f()) + response = stub.Pull(_empty_pb2.Empty()) self.ack(response.key) return response.key, response.value except grpc.RpcError as e: @@ -48,33 +51,22 @@ def ack(self, acknowledgement: str): print(f"Error in acknowledgement: {e}") return False - async def non_blocking_pull(self): - try: - stub = self.get_stub(self.HOST, self.PORT) - response = stub.Pull(f()) - await self.non_blocking_ack(response.key) - return response - except grpc.RpcError as e: - print(f"Error in pulling: {e}.") - - async def non_blocking_ack(self, acknowledgement: str): - try: - stub = self.get_stub(self.HOST, self.PORT) - ack_request = queue_pb2.AcknowledgePullRequest(key=acknowledgement) - stub.AcknowledgePull(ack_request) - return True - except grpc.RpcError as e: - print(f"Error in acknowledgement: {e}") - return False + def subscribe(self, f): + thread = Thread(target=self.run_subscribe, args=(f,)) + thread.start() - async def subscribe(self): + def run_subscribe(self, f): try: + futures = [] while True: - await self.non_blocking_pull() - except grpc.RpcError as e: - print(f"Error in pulling: {e}.") - pass + with ThreadPoolExecutor(max_workers=QueueClient.SUBSCRIBE_WORKERS) as executer: + for _ in range(QueueClient.SUBSCRIBE_WORKERS): + pull_response = self.pull() + if pull_response is not None and pull_response is not False: + futures.append(executer.submit(f, pull_response[0], pull_response[1])) + time.sleep(QueueClient.SUBSCRIBE_SLEEP_TIMEOUT) + _ = [future.result() for future in futures] -def f(): - return _empty_pb2.Empty() + except grpc.RpcError as e: + print(f"Error in pulling: {e}.") diff --git a/client_py/datanode-server.py b/client_py/datanode-server.py index 0e675b2..0c0451b 100644 --- a/client_py/datanode-server.py +++ b/client_py/datanode-server.py @@ -38,7 +38,7 @@ def AcknowledgePull(self, request, context): def serve(): - port = "8888" + port = "8000" host = "localhost" server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) diff --git a/client_py/test_client.py b/client_py/test_client.py index b4a8092..0a6a033 100644 --- a/client_py/test_client.py +++ b/client_py/test_client.py @@ -1,7 +1,10 @@ +import json +import time from unittest import TestCase from client_py.client import QueueClient from concurrent import futures import asyncio +import os def sample_push_pull(client: QueueClient): @@ -9,6 +12,15 @@ def sample_push_pull(client: QueueClient): return asyncio.run(client.pull()).value +def store_key_value(key, value): + print(key, value) + print(os.path) + path = '/tmp/test/' + os.makedirs(os.path.dirname(path), exist_ok=True) + with open(f'{path}/test.json', 'w') as file: + file.write(json.dumps(dict(key=key, value=value.decode()))) + + class TestQueueClient(TestCase): def setUp(self): self.client = QueueClient() @@ -45,4 +57,17 @@ def test_concurrent_push_without_order(self): self.assertListEqual(res, [[b'test value'], [b'test value'], [b'test value']]) def test_subscribe(self): - self.fail() + client_2 = QueueClient() + self.client.subscribe(f=store_key_value) + + client_2.push("test key", b'test value') + + time.sleep(5) + + with open('/tmp/test/test.json') as file: + json_string = file.read() + json_dict = json.loads(json_string) + + print(json_dict['key']) + + self.assertEqual(json_dict['key'], 'test key') \ No newline at end of file diff --git a/datanode/src/datanode_server.py b/datanode/src/datanode_server.py index 2b7aa19..fcc24f6 100644 --- a/datanode/src/datanode_server.py +++ b/datanode/src/datanode_server.py @@ -13,11 +13,15 @@ class DataNode(datanode_pb2_grpc.DataNodeServicer): def __init__(self, partition_count=1, home_path='datanode/server/'): - self.shared_partition = SharedPartitions(partition_count, home_path=home_path) + self.shared_partition = SharedPartitions(partition_count, home_path=home_path + 'main/') + self.replica = SharedPartitions(partition_count, home_path=home_path + 'replica/') def Push(self, request, context): logger.info(f"received a push message: {request.message}") - self.shared_partition.push(request.message) + if request.is_replica: + self.replica.push(request.message) + else: + self.shared_partition.push(request.message) return empty_pb2.Empty() def Pull(self, request, context): @@ -31,6 +35,30 @@ def Pull(self, request, context): except Exception as e: logger.exception(e) + def WritePartition(self, request, context): + try: + logger.info(f"received partition write message for partition: {request.partition_index}") + partition_messages = request.partition_messages + partition_index = request.partition_index + if request.is_replica: + for message in partition_messages: + push_to_partition(partition_index, self.replica, message) + else: + for message in partition_messages: + push_to_partition(partition_index, self.shared_partition, message) + except grpc.RpcError as e: + logger.exception(e) + except Exception as e: + logger.exception(e) + + def ReadPartition(self, request, context): + try: + pass + except grpc.RpcError as e: + logger.exception(e) + except Exception as e: + logger.exception(e) + def AcknowledgePull(self, request, context): try: key = request.key @@ -55,6 +83,12 @@ def GetRemainingMessagesCount(self, request, context): logger.exception(f"Error in getting remaining messages count: {e}") +def push_to_partition(partition_index: int, + shared_partition: SharedPartitions, + partition_message): + shared_partition.push(partition_message, partition_index) + + def serve(): port = ConfigManager.get_prop('server_port') partitions_count = int(ConfigManager.get_prop('partition_count')) diff --git a/datanode/src/shared_partition.py b/datanode/src/shared_partition.py index 7b667ff..28b0070 100644 --- a/datanode/src/shared_partition.py +++ b/datanode/src/shared_partition.py @@ -160,8 +160,9 @@ def get_free_partition(self): return available_partition raise PartitionsBusyError('There is no a free - non empty partition!') - def push(self, message): - partition_index = self.get_dest_partition(message.key) + def push(self, message, partition_index=None): + if partition_index is None: + partition_index = self.get_dest_partition(message.key) destination_partition = self.partitions[partition_index] destination_partition.add_message(message)