Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Debug clean partition to hande directory doesn't exist. #42

Merged
merged 2 commits into from
Feb 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 27 additions & 6 deletions .github/workflows/go_test.yaml → .github/workflows/test.yaml
Original file line number Diff line number Diff line change
@@ -1,22 +1,21 @@
name: Test
on: [push]
on: [ push ]

jobs:
Go:
name: Leader Test
runs-on: ubuntu-latest
steps:
- name: Check out code into the Go module directory
uses: actions/checkout@v2

- name: Download Go
- name: Set up Go
uses: actions/setup-go@v2
with:
go-version: 1.21
id: go

- name: Check out code into the Go module directory
uses: actions/checkout@v2

- name: Get dependencies
- name: Install dependencies
working-directory: ./leader
run: |
go get -v -t -d ./...
Expand All @@ -32,3 +31,25 @@ jobs:
GOPROXY: "https://proxy.golang.org"
working-directory: ./leader
run: go test -v ./...

Python:
name: Datanode Test
runs-on: ubuntu-latest
steps:
- name: Check out code
uses: actions/checkout@v4

- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.10'

- name: Install dependencies
working-directory: ./datanode
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt

- name: Test
working-directory: ./datanode/src
run: python -m unittest unit_tests.py
30 changes: 16 additions & 14 deletions datanode/datanode_test/integration_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,30 +65,32 @@ def test_multiple_push_pull(self):

self.assertEqual(value, value_1)


def test_concurrent_push_without_order(self):
for _ in range(50):

_futures = []
_futures = []

num_threads = 3
num_threads = 3

res = []
res = []

with futures.ThreadPoolExecutor(max_workers=num_threads) as executor:
with futures.ThreadPoolExecutor(max_workers=num_threads) as executor:

for i in range(num_threads):
_futures.append(executor.submit(sample_push_pull, self.client))
for i in range(num_threads):
_futures.append(executor.submit(sample_push_pull, self.client))

for pulled_value in futures.as_completed(_futures):
try:
res.append(pulled_value.result())
except Exception as e:
print(f'Exception: {e}')
for pulled_value in futures.as_completed(_futures):
try:
res.append(pulled_value.result())
except Exception as e:
print(f'Exception: {e}')

real, expected = [r[0] for r in res], [r[1] for r in res]
real, expected = [r[0] for r in res], [r[1] for r in res]

print(f'real: {real}, expected: {expected}')
print(f'real: {real}, expected: {expected}')

self.assertSetEqual(set(real), set(expected))
self.assertSetEqual(set(real), set(expected))

def test_read_partition(self):
client = self.client
Expand Down
44 changes: 25 additions & 19 deletions datanode/src/shared_partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,23 +11,27 @@


def clear_path(path):
for filename in os.listdir(path):
file_path = os.path.join(path, filename)
try:
if os.path.isfile(file_path) or os.path.islink(file_path):
os.unlink(file_path)
elif os.path.isdir(file_path):
shutil.rmtree(file_path)
except Exception as e:
logger.exception('Failed to delete %s. Reason: %s' % (file_path, e))
try:
for filename in os.listdir(path):
file_path = os.path.join(path, filename)
try:
if os.path.isfile(file_path) or os.path.islink(file_path):
os.unlink(file_path)
elif os.path.isdir(file_path):
shutil.rmtree(file_path)
except Exception as e:
logger.exception('Failed to delete %s. Reason: %s' % (file_path, e))
except FileNotFoundError:
pass # No need for clearing the path, because the path is not exist.


class Message:
encoding_method = ConfigManager.get_prop('encoding_method')

def __init__(self, message: datanode_pb2.QueueMessage, file_address):
def __init__(self, message: datanode_pb2.QueueMessage, file_address, read_write_lock):
self.key = message.key
self.message_status = MessagesStatus.NOT_SENT
self.read_write_lock = read_write_lock
self.file_address = file_address
self.index = self.write_file_system(message.value)

Expand Down Expand Up @@ -60,22 +64,24 @@ def set_message_free(self):
self.write_file_system(b'')

def write_file_system(self, value):
count_lines = 0
with open(self.file_address, 'r') as file_system:
for line in file_system:
count_lines += 1
with self.read_write_lock:
count_lines = 0
with open(self.file_address, 'r') as file_system:
for line in file_system:
count_lines += 1

with open(self.file_address, 'a') as file_system:
value = value.decode(Message.encoding_method)
file_system.write(json.dumps(dict(key=self.key, value=value, status=self.message_status.value)) + '\n')
with open(self.file_address, 'a') as file_system:
value = value.decode(Message.encoding_method)
file_system.write(json.dumps(dict(key=self.key, value=value, status=self.message_status.value)) + '\n')

return count_lines
return count_lines


class Partition:
def __init__(self, path):
self.messages = []
self.dir = path
self.messages_lock = threading.Lock()

self.last_pull = {'message': None,
'time': None}
Expand Down Expand Up @@ -115,7 +121,7 @@ def remove_message(self, key):
message.sent_message()

def add_message(self, message: datanode_pb2.QueueMessage):
self.messages.append(Message(message, self.dir))
self.messages.append(Message(message, self.dir, self.messages_lock))

def get_remaining_message_count(self) -> int:
return len(self.messages)
Expand Down
70 changes: 70 additions & 0 deletions datanode/src/unit_tests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import threading
from unittest import TestCase
from shared_partition import Message, Partition
from configs.utils import MessagesStatus
from protos import datanode_pb2
import random
import string
import os


def random_string():
return ''.join(random.choices(string.ascii_uppercase + string.digits, k=10))


class MessagesTests(TestCase):
def test_get_message(self):
path = 'test.msq'
with open(path, 'a') as file:
file.write('')
queue_message = datanode_pb2.QueueMessage(key=random_string(), value=random_string().encode('utf-8'))
message = Message(queue_message, path, threading.Lock())
self.assertEqual(message.get_message(), queue_message)
os.remove(path)


class PartitionsTests(TestCase):

def setUp(self):
self.path = './test.msq'
self.test_partition = Partition(self.path)

def test_add_messages(self):
queue_message = datanode_pb2.QueueMessage(key=random_string(), value=random_string().encode('utf-8'))
self.test_partition.add_message(queue_message)
self.assertEqual(self.test_partition.messages[0].get_message(), queue_message)

def test_get_messages(self):
queue_message = datanode_pb2.QueueMessage(key=random_string(), value=random_string().encode('utf-8'))
self.test_partition.add_message(queue_message)
self.assertEqual(self.test_partition.get_messages(), queue_message)

def test_pending_state(self):
queue_message = datanode_pb2.QueueMessage(key=random_string(), value=random_string().encode('utf-8'))
self.test_partition.add_message(queue_message)
self.test_partition.get_messages()
self.assertEqual(self.test_partition.messages[0].message_status,
MessagesStatus.PENDING)

def test_removing_messages(self):
key, value = random_string(), random_string().encode('utf-8')
queue_message = datanode_pb2.QueueMessage(key=key, value=value)
self.test_partition.add_message(queue_message)
message = self.test_partition.messages[0]
self.test_partition.get_messages()
self.test_partition.remove_message(key)

self.assertEqual(message.message_status, MessagesStatus.SENT)
self.assertListEqual(self.test_partition.messages, [])

def test_message_generation(self):
messages = [datanode_pb2.QueueMessage(key=random_string(),
value=random_string().encode('utf-8'))
for _ in range(10)]
for message in messages:
self.test_partition.add_message(message)

self.assertListEqual(list(self.test_partition.message_generator()), messages)

def tearDown(self):
os.remove(self.path)
Loading