From e6aff50101cf60318471371f7546ddfa21e9380b Mon Sep 17 00:00:00 2001 From: tandalalam <76206300+tandalalam@users.noreply.github.com> Date: Mon, 12 Feb 2024 23:08:26 +0330 Subject: [PATCH] Debug clean partition to hande directory doesn't exist. (#42) * Debug clean partition to hande directory doesn't exist. Debug concurrent push, pull to datanode (Write lock). Add unittest. * Add python test github action --------- Co-authored-by: kasra --- .github/workflows/{go_test.yaml => test.yaml} | 33 +++++++-- datanode/datanode_test/integration_tests.py | 30 ++++---- datanode/src/shared_partition.py | 44 +++++++----- datanode/src/unit_tests.py | 70 +++++++++++++++++++ 4 files changed, 138 insertions(+), 39 deletions(-) rename .github/workflows/{go_test.yaml => test.yaml} (52%) create mode 100644 datanode/src/unit_tests.py diff --git a/.github/workflows/go_test.yaml b/.github/workflows/test.yaml similarity index 52% rename from .github/workflows/go_test.yaml rename to .github/workflows/test.yaml index 6e0a63c..8f82689 100644 --- a/.github/workflows/go_test.yaml +++ b/.github/workflows/test.yaml @@ -1,22 +1,21 @@ name: Test -on: [push] +on: [ push ] jobs: Go: name: Leader Test runs-on: ubuntu-latest steps: + - name: Check out code into the Go module directory + uses: actions/checkout@v2 - - name: Download Go + - name: Set up Go uses: actions/setup-go@v2 with: go-version: 1.21 id: go - - name: Check out code into the Go module directory - uses: actions/checkout@v2 - - - name: Get dependencies + - name: Install dependencies working-directory: ./leader run: | go get -v -t -d ./... @@ -32,3 +31,25 @@ jobs: GOPROXY: "https://proxy.golang.org" working-directory: ./leader run: go test -v ./... + + Python: + name: Datanode Test + runs-on: ubuntu-latest + steps: + - name: Check out code + uses: actions/checkout@v4 + + - name: Set up Python + uses: actions/setup-python@v4 + with: + python-version: '3.10' + + - name: Install dependencies + working-directory: ./datanode + run: | + python -m pip install --upgrade pip + pip install -r requirements.txt + + - name: Test + working-directory: ./datanode/src + run: python -m unittest unit_tests.py diff --git a/datanode/datanode_test/integration_tests.py b/datanode/datanode_test/integration_tests.py index 65d586f..71facfe 100644 --- a/datanode/datanode_test/integration_tests.py +++ b/datanode/datanode_test/integration_tests.py @@ -65,30 +65,32 @@ def test_multiple_push_pull(self): self.assertEqual(value, value_1) + def test_concurrent_push_without_order(self): + for _ in range(50): - _futures = [] + _futures = [] - num_threads = 3 + num_threads = 3 - res = [] + res = [] - with futures.ThreadPoolExecutor(max_workers=num_threads) as executor: + with futures.ThreadPoolExecutor(max_workers=num_threads) as executor: - for i in range(num_threads): - _futures.append(executor.submit(sample_push_pull, self.client)) + for i in range(num_threads): + _futures.append(executor.submit(sample_push_pull, self.client)) - for pulled_value in futures.as_completed(_futures): - try: - res.append(pulled_value.result()) - except Exception as e: - print(f'Exception: {e}') + for pulled_value in futures.as_completed(_futures): + try: + res.append(pulled_value.result()) + except Exception as e: + print(f'Exception: {e}') - real, expected = [r[0] for r in res], [r[1] for r in res] + real, expected = [r[0] for r in res], [r[1] for r in res] - print(f'real: {real}, expected: {expected}') + print(f'real: {real}, expected: {expected}') - self.assertSetEqual(set(real), set(expected)) + self.assertSetEqual(set(real), set(expected)) def test_read_partition(self): client = self.client diff --git a/datanode/src/shared_partition.py b/datanode/src/shared_partition.py index 27cc9aa..42655c8 100644 --- a/datanode/src/shared_partition.py +++ b/datanode/src/shared_partition.py @@ -11,23 +11,27 @@ def clear_path(path): - for filename in os.listdir(path): - file_path = os.path.join(path, filename) - try: - if os.path.isfile(file_path) or os.path.islink(file_path): - os.unlink(file_path) - elif os.path.isdir(file_path): - shutil.rmtree(file_path) - except Exception as e: - logger.exception('Failed to delete %s. Reason: %s' % (file_path, e)) + try: + for filename in os.listdir(path): + file_path = os.path.join(path, filename) + try: + if os.path.isfile(file_path) or os.path.islink(file_path): + os.unlink(file_path) + elif os.path.isdir(file_path): + shutil.rmtree(file_path) + except Exception as e: + logger.exception('Failed to delete %s. Reason: %s' % (file_path, e)) + except FileNotFoundError: + pass # No need for clearing the path, because the path is not exist. class Message: encoding_method = ConfigManager.get_prop('encoding_method') - def __init__(self, message: datanode_pb2.QueueMessage, file_address): + def __init__(self, message: datanode_pb2.QueueMessage, file_address, read_write_lock): self.key = message.key self.message_status = MessagesStatus.NOT_SENT + self.read_write_lock = read_write_lock self.file_address = file_address self.index = self.write_file_system(message.value) @@ -60,22 +64,24 @@ def set_message_free(self): self.write_file_system(b'') def write_file_system(self, value): - count_lines = 0 - with open(self.file_address, 'r') as file_system: - for line in file_system: - count_lines += 1 + with self.read_write_lock: + count_lines = 0 + with open(self.file_address, 'r') as file_system: + for line in file_system: + count_lines += 1 - with open(self.file_address, 'a') as file_system: - value = value.decode(Message.encoding_method) - file_system.write(json.dumps(dict(key=self.key, value=value, status=self.message_status.value)) + '\n') + with open(self.file_address, 'a') as file_system: + value = value.decode(Message.encoding_method) + file_system.write(json.dumps(dict(key=self.key, value=value, status=self.message_status.value)) + '\n') - return count_lines + return count_lines class Partition: def __init__(self, path): self.messages = [] self.dir = path + self.messages_lock = threading.Lock() self.last_pull = {'message': None, 'time': None} @@ -115,7 +121,7 @@ def remove_message(self, key): message.sent_message() def add_message(self, message: datanode_pb2.QueueMessage): - self.messages.append(Message(message, self.dir)) + self.messages.append(Message(message, self.dir, self.messages_lock)) def get_remaining_message_count(self) -> int: return len(self.messages) diff --git a/datanode/src/unit_tests.py b/datanode/src/unit_tests.py new file mode 100644 index 0000000..7a17fae --- /dev/null +++ b/datanode/src/unit_tests.py @@ -0,0 +1,70 @@ +import threading +from unittest import TestCase +from shared_partition import Message, Partition +from configs.utils import MessagesStatus +from protos import datanode_pb2 +import random +import string +import os + + +def random_string(): + return ''.join(random.choices(string.ascii_uppercase + string.digits, k=10)) + + +class MessagesTests(TestCase): + def test_get_message(self): + path = 'test.msq' + with open(path, 'a') as file: + file.write('') + queue_message = datanode_pb2.QueueMessage(key=random_string(), value=random_string().encode('utf-8')) + message = Message(queue_message, path, threading.Lock()) + self.assertEqual(message.get_message(), queue_message) + os.remove(path) + + +class PartitionsTests(TestCase): + + def setUp(self): + self.path = './test.msq' + self.test_partition = Partition(self.path) + + def test_add_messages(self): + queue_message = datanode_pb2.QueueMessage(key=random_string(), value=random_string().encode('utf-8')) + self.test_partition.add_message(queue_message) + self.assertEqual(self.test_partition.messages[0].get_message(), queue_message) + + def test_get_messages(self): + queue_message = datanode_pb2.QueueMessage(key=random_string(), value=random_string().encode('utf-8')) + self.test_partition.add_message(queue_message) + self.assertEqual(self.test_partition.get_messages(), queue_message) + + def test_pending_state(self): + queue_message = datanode_pb2.QueueMessage(key=random_string(), value=random_string().encode('utf-8')) + self.test_partition.add_message(queue_message) + self.test_partition.get_messages() + self.assertEqual(self.test_partition.messages[0].message_status, + MessagesStatus.PENDING) + + def test_removing_messages(self): + key, value = random_string(), random_string().encode('utf-8') + queue_message = datanode_pb2.QueueMessage(key=key, value=value) + self.test_partition.add_message(queue_message) + message = self.test_partition.messages[0] + self.test_partition.get_messages() + self.test_partition.remove_message(key) + + self.assertEqual(message.message_status, MessagesStatus.SENT) + self.assertListEqual(self.test_partition.messages, []) + + def test_message_generation(self): + messages = [datanode_pb2.QueueMessage(key=random_string(), + value=random_string().encode('utf-8')) + for _ in range(10)] + for message in messages: + self.test_partition.add_message(message) + + self.assertListEqual(list(self.test_partition.message_generator()), messages) + + def tearDown(self): + os.remove(self.path)