Skip to content

Commit

Permalink
Debug clean partition to hande directory doesn't exist. (#42)
Browse files Browse the repository at this point in the history
* Debug clean partition to hande directory doesn't exist. Debug concurrent push, pull to datanode (Write lock). Add unittest.

* Add python test github action

---------

Co-authored-by: kasra <[email protected]>
  • Loading branch information
tandalalam and kysre authored Feb 12, 2024
1 parent 0ccc791 commit e6aff50
Show file tree
Hide file tree
Showing 4 changed files with 138 additions and 39 deletions.
33 changes: 27 additions & 6 deletions .github/workflows/go_test.yaml → .github/workflows/test.yaml
Original file line number Diff line number Diff line change
@@ -1,22 +1,21 @@
name: Test
on: [push]
on: [ push ]

jobs:
Go:
name: Leader Test
runs-on: ubuntu-latest
steps:
- name: Check out code into the Go module directory
uses: actions/checkout@v2

- name: Download Go
- name: Set up Go
uses: actions/setup-go@v2
with:
go-version: 1.21
id: go

- name: Check out code into the Go module directory
uses: actions/checkout@v2

- name: Get dependencies
- name: Install dependencies
working-directory: ./leader
run: |
go get -v -t -d ./...
Expand All @@ -32,3 +31,25 @@ jobs:
GOPROXY: "https://proxy.golang.org"
working-directory: ./leader
run: go test -v ./...

Python:
name: Datanode Test
runs-on: ubuntu-latest
steps:
- name: Check out code
uses: actions/checkout@v4

- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.10'

- name: Install dependencies
working-directory: ./datanode
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt
- name: Test
working-directory: ./datanode/src
run: python -m unittest unit_tests.py
30 changes: 16 additions & 14 deletions datanode/datanode_test/integration_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,30 +65,32 @@ def test_multiple_push_pull(self):

self.assertEqual(value, value_1)


def test_concurrent_push_without_order(self):
for _ in range(50):

_futures = []
_futures = []

num_threads = 3
num_threads = 3

res = []
res = []

with futures.ThreadPoolExecutor(max_workers=num_threads) as executor:
with futures.ThreadPoolExecutor(max_workers=num_threads) as executor:

for i in range(num_threads):
_futures.append(executor.submit(sample_push_pull, self.client))
for i in range(num_threads):
_futures.append(executor.submit(sample_push_pull, self.client))

for pulled_value in futures.as_completed(_futures):
try:
res.append(pulled_value.result())
except Exception as e:
print(f'Exception: {e}')
for pulled_value in futures.as_completed(_futures):
try:
res.append(pulled_value.result())
except Exception as e:
print(f'Exception: {e}')

real, expected = [r[0] for r in res], [r[1] for r in res]
real, expected = [r[0] for r in res], [r[1] for r in res]

print(f'real: {real}, expected: {expected}')
print(f'real: {real}, expected: {expected}')

self.assertSetEqual(set(real), set(expected))
self.assertSetEqual(set(real), set(expected))

def test_read_partition(self):
client = self.client
Expand Down
44 changes: 25 additions & 19 deletions datanode/src/shared_partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,23 +11,27 @@


def clear_path(path):
for filename in os.listdir(path):
file_path = os.path.join(path, filename)
try:
if os.path.isfile(file_path) or os.path.islink(file_path):
os.unlink(file_path)
elif os.path.isdir(file_path):
shutil.rmtree(file_path)
except Exception as e:
logger.exception('Failed to delete %s. Reason: %s' % (file_path, e))
try:
for filename in os.listdir(path):
file_path = os.path.join(path, filename)
try:
if os.path.isfile(file_path) or os.path.islink(file_path):
os.unlink(file_path)
elif os.path.isdir(file_path):
shutil.rmtree(file_path)
except Exception as e:
logger.exception('Failed to delete %s. Reason: %s' % (file_path, e))
except FileNotFoundError:
pass # No need for clearing the path, because the path is not exist.


class Message:
encoding_method = ConfigManager.get_prop('encoding_method')

def __init__(self, message: datanode_pb2.QueueMessage, file_address):
def __init__(self, message: datanode_pb2.QueueMessage, file_address, read_write_lock):
self.key = message.key
self.message_status = MessagesStatus.NOT_SENT
self.read_write_lock = read_write_lock
self.file_address = file_address
self.index = self.write_file_system(message.value)

Expand Down Expand Up @@ -60,22 +64,24 @@ def set_message_free(self):
self.write_file_system(b'')

def write_file_system(self, value):
count_lines = 0
with open(self.file_address, 'r') as file_system:
for line in file_system:
count_lines += 1
with self.read_write_lock:
count_lines = 0
with open(self.file_address, 'r') as file_system:
for line in file_system:
count_lines += 1

with open(self.file_address, 'a') as file_system:
value = value.decode(Message.encoding_method)
file_system.write(json.dumps(dict(key=self.key, value=value, status=self.message_status.value)) + '\n')
with open(self.file_address, 'a') as file_system:
value = value.decode(Message.encoding_method)
file_system.write(json.dumps(dict(key=self.key, value=value, status=self.message_status.value)) + '\n')

return count_lines
return count_lines


class Partition:
def __init__(self, path):
self.messages = []
self.dir = path
self.messages_lock = threading.Lock()

self.last_pull = {'message': None,
'time': None}
Expand Down Expand Up @@ -115,7 +121,7 @@ def remove_message(self, key):
message.sent_message()

def add_message(self, message: datanode_pb2.QueueMessage):
self.messages.append(Message(message, self.dir))
self.messages.append(Message(message, self.dir, self.messages_lock))

def get_remaining_message_count(self) -> int:
return len(self.messages)
Expand Down
70 changes: 70 additions & 0 deletions datanode/src/unit_tests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import threading
from unittest import TestCase
from shared_partition import Message, Partition
from configs.utils import MessagesStatus
from protos import datanode_pb2
import random
import string
import os


def random_string():
return ''.join(random.choices(string.ascii_uppercase + string.digits, k=10))


class MessagesTests(TestCase):
def test_get_message(self):
path = 'test.msq'
with open(path, 'a') as file:
file.write('')
queue_message = datanode_pb2.QueueMessage(key=random_string(), value=random_string().encode('utf-8'))
message = Message(queue_message, path, threading.Lock())
self.assertEqual(message.get_message(), queue_message)
os.remove(path)


class PartitionsTests(TestCase):

def setUp(self):
self.path = './test.msq'
self.test_partition = Partition(self.path)

def test_add_messages(self):
queue_message = datanode_pb2.QueueMessage(key=random_string(), value=random_string().encode('utf-8'))
self.test_partition.add_message(queue_message)
self.assertEqual(self.test_partition.messages[0].get_message(), queue_message)

def test_get_messages(self):
queue_message = datanode_pb2.QueueMessage(key=random_string(), value=random_string().encode('utf-8'))
self.test_partition.add_message(queue_message)
self.assertEqual(self.test_partition.get_messages(), queue_message)

def test_pending_state(self):
queue_message = datanode_pb2.QueueMessage(key=random_string(), value=random_string().encode('utf-8'))
self.test_partition.add_message(queue_message)
self.test_partition.get_messages()
self.assertEqual(self.test_partition.messages[0].message_status,
MessagesStatus.PENDING)

def test_removing_messages(self):
key, value = random_string(), random_string().encode('utf-8')
queue_message = datanode_pb2.QueueMessage(key=key, value=value)
self.test_partition.add_message(queue_message)
message = self.test_partition.messages[0]
self.test_partition.get_messages()
self.test_partition.remove_message(key)

self.assertEqual(message.message_status, MessagesStatus.SENT)
self.assertListEqual(self.test_partition.messages, [])

def test_message_generation(self):
messages = [datanode_pb2.QueueMessage(key=random_string(),
value=random_string().encode('utf-8'))
for _ in range(10)]
for message in messages:
self.test_partition.add_message(message)

self.assertListEqual(list(self.test_partition.message_generator()), messages)

def tearDown(self):
os.remove(self.path)

0 comments on commit e6aff50

Please sign in to comment.