Skip to content

Commit

Permalink
Datanode fault tolerance (#26)
Browse files Browse the repository at this point in the history
* Add write replica. Add write partition.

* Add client subscribe.

---------

Co-authored-by: maziar <[email protected]>
  • Loading branch information
kysre and tandalalam authored Feb 8, 2024
1 parent 3c49156 commit 4560f28
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 34 deletions.
48 changes: 20 additions & 28 deletions client_py/client.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,21 @@
import time
from typing import List
import asyncio
import logging

from threading import Thread
import grpc
# import google.protobuf.empty_pb2 as empty_pb2
from google.protobuf import empty_pb2 as _empty_pb2

from concurrent.futures import ThreadPoolExecutor
from client_py import queue_pb2_grpc
from client_py import queue_pb2


class QueueClient:
stub = None
HOST, PORT = "localhost", "8000"
SUBSCRIBE_WORKERS = 3
SUBSCRIBE_SLEEP_TIMEOUT = 2

@classmethod
def get_stub(cls, host: str, port: str):
Expand All @@ -33,7 +36,7 @@ def push(self, key: str, value: bytes):
def pull(self) -> (str, bytes):
try:
stub = self.get_stub(self.HOST, self.PORT)
response = stub.Pull(f())
response = stub.Pull(_empty_pb2.Empty())
self.ack(response.key)
return response.key, response.value
except grpc.RpcError as e:
Expand All @@ -48,33 +51,22 @@ def ack(self, acknowledgement: str):
print(f"Error in acknowledgement: {e}")
return False

async def non_blocking_pull(self):
try:
stub = self.get_stub(self.HOST, self.PORT)
response = stub.Pull(f())
await self.non_blocking_ack(response.key)
return response
except grpc.RpcError as e:
print(f"Error in pulling: {e}.")

async def non_blocking_ack(self, acknowledgement: str):
try:
stub = self.get_stub(self.HOST, self.PORT)
ack_request = queue_pb2.AcknowledgePullRequest(key=acknowledgement)
stub.AcknowledgePull(ack_request)
return True
except grpc.RpcError as e:
print(f"Error in acknowledgement: {e}")
return False
def subscribe(self, f):
thread = Thread(target=self.run_subscribe, args=(f,))
thread.start()

async def subscribe(self):
def run_subscribe(self, f):
try:
futures = []
while True:
await self.non_blocking_pull()
except grpc.RpcError as e:
print(f"Error in pulling: {e}.")
pass
with ThreadPoolExecutor(max_workers=QueueClient.SUBSCRIBE_WORKERS) as executer:
for _ in range(QueueClient.SUBSCRIBE_WORKERS):
pull_response = self.pull()
if pull_response is not None and pull_response is not False:
futures.append(executer.submit(f, pull_response[0], pull_response[1]))
time.sleep(QueueClient.SUBSCRIBE_SLEEP_TIMEOUT)

_ = [future.result() for future in futures]

def f():
return _empty_pb2.Empty()
except grpc.RpcError as e:
print(f"Error in pulling: {e}.")
2 changes: 1 addition & 1 deletion client_py/datanode-server.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def AcknowledgePull(self, request, context):


def serve():
port = "8888"
port = "8000"
host = "localhost"

server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
Expand Down
27 changes: 26 additions & 1 deletion client_py/test_client.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,26 @@
import json
import time
from unittest import TestCase
from client_py.client import QueueClient
from concurrent import futures
import asyncio
import os


def sample_push_pull(client: QueueClient):
asyncio.run(client.push("test key", [b'test value']))
return asyncio.run(client.pull()).value


def store_key_value(key, value):
print(key, value)
print(os.path)
path = '/tmp/test/'
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(f'{path}/test.json', 'w') as file:
file.write(json.dumps(dict(key=key, value=value.decode())))


class TestQueueClient(TestCase):
def setUp(self):
self.client = QueueClient()
Expand Down Expand Up @@ -45,4 +57,17 @@ def test_concurrent_push_without_order(self):
self.assertListEqual(res, [[b'test value'], [b'test value'], [b'test value']])

def test_subscribe(self):
self.fail()
client_2 = QueueClient()
self.client.subscribe(f=store_key_value)

client_2.push("test key", b'test value')

time.sleep(5)

with open('/tmp/test/test.json') as file:
json_string = file.read()
json_dict = json.loads(json_string)

print(json_dict['key'])

self.assertEqual(json_dict['key'], 'test key')
38 changes: 36 additions & 2 deletions datanode/src/datanode_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,15 @@

class DataNode(datanode_pb2_grpc.DataNodeServicer):
def __init__(self, partition_count=1, home_path='datanode/server/'):
self.shared_partition = SharedPartitions(partition_count, home_path=home_path)
self.shared_partition = SharedPartitions(partition_count, home_path=home_path + 'main/')
self.replica = SharedPartitions(partition_count, home_path=home_path + 'replica/')

def Push(self, request, context):
logger.info(f"received a push message: {request.message}")
self.shared_partition.push(request.message)
if request.is_replica:
self.replica.push(request.message)
else:
self.shared_partition.push(request.message)
return empty_pb2.Empty()

def Pull(self, request, context):
Expand All @@ -31,6 +35,30 @@ def Pull(self, request, context):
except Exception as e:
logger.exception(e)

def WritePartition(self, request, context):
try:
logger.info(f"received partition write message for partition: {request.partition_index}")
partition_messages = request.partition_messages
partition_index = request.partition_index
if request.is_replica:
for message in partition_messages:
push_to_partition(partition_index, self.replica, message)
else:
for message in partition_messages:
push_to_partition(partition_index, self.shared_partition, message)
except grpc.RpcError as e:
logger.exception(e)
except Exception as e:
logger.exception(e)

def ReadPartition(self, request, context):
try:
pass
except grpc.RpcError as e:
logger.exception(e)
except Exception as e:
logger.exception(e)

def AcknowledgePull(self, request, context):
try:
key = request.key
Expand All @@ -55,6 +83,12 @@ def GetRemainingMessagesCount(self, request, context):
logger.exception(f"Error in getting remaining messages count: {e}")


def push_to_partition(partition_index: int,
shared_partition: SharedPartitions,
partition_message):
shared_partition.push(partition_message, partition_index)


def serve():
port = ConfigManager.get_prop('server_port')
partitions_count = int(ConfigManager.get_prop('partition_count'))
Expand Down
5 changes: 3 additions & 2 deletions datanode/src/shared_partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,9 @@ def get_free_partition(self):
return available_partition
raise PartitionsBusyError('There is no a free - non empty partition!')

def push(self, message):
partition_index = self.get_dest_partition(message.key)
def push(self, message, partition_index=None):
if partition_index is None:
partition_index = self.get_dest_partition(message.key)
destination_partition = self.partitions[partition_index]
destination_partition.add_message(message)

Expand Down

0 comments on commit 4560f28

Please sign in to comment.