Skip to content

Commit

Permalink
Merge branch 'main' into datanode-integeration-test
Browse files Browse the repository at this point in the history
  • Loading branch information
tandalalam authored Feb 10, 2024
2 parents dd6b293 + d3c7179 commit f615ce6
Show file tree
Hide file tree
Showing 36 changed files with 754 additions and 164 deletions.
70 changes: 67 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,76 @@

This is a MVP of a message queueing system.

## How to test locally?
## Architecture

Test with docker compose:
Design graph of this system is like below:

![Design Graph](docs/design_graph.png)

## How to build locally

Datanode implementation is in Python and it doesn't require any build process.
Leader implementation is in Golang and in order to use it you can use it's `Makefile`.
First cd in to `./leader`:

```bash
cd leader
make leader
```

## How to run locally

This project is containerized.
Run the `test.docker-compose.yaml` with `latest` tag to run with docker compose:

```bash
docker compose -f test.docker-compose.yaml up -d
docker compose -f test.docker-compose.yaml logs -f
docker compose -f test.docker-compose.yaml down -d
docker compose -f test.docker-compose.yaml down
```

## Clients

Clients are implemented in Python and Golang. Import them accordingly and use them.

Python:

```
from client_py.client import QueueClient
if __name__ == '__main__':
c = QueueClient()
c.push('test_key', b'test_value')
k, v = c.pull()
print(f'key={k}, value={v}')
c.subscribe(lambda k, v: print(f'key={k}, value={v}'))
```

Golang:

```
import (
"fmt"
"github.com/kysre/TurtleMQ/client_go"
)
func testFunc(k string, v []byte) {
fmt.Printf("key=%s, value=%b", k, v)
}
func main() {
c := client_go.GetQueueClient()
c.Push("test_key", []byte("test_value"))
k, v := c.Pull()
fmt.Printf("key=%s, value=%b", k, v)
c.Subscribe(testFunc)
}
```
9 changes: 9 additions & 0 deletions client_go/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,22 @@ func (c *queueClient) Pull() (string, []byte) {
fmt.Print(err)
return "", nil
}
defer c.acknowledgePull(ctx, res.GetKey())
return res.GetKey(), res.GetValue()
}

func (c *queueClient) Subscribe(function SubscribeFunction) {
go c.runSubscribe(function)
}

func (c *queueClient) acknowledgePull(ctx context.Context, key string) {
req := queue.AcknowledgePullRequest{Key: key}
_, err := c.client.AcknowledgePull(ctx, &req)
if err != nil {
fmt.Print(err)
}
}

func (c *queueClient) runSubscribe(function SubscribeFunction) {
tickerPeriod := time.Duration(1) * time.Second
ticker := time.NewTicker(tickerPeriod)
Expand Down
48 changes: 20 additions & 28 deletions client_py/client.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,21 @@
import time
from typing import List
import asyncio
import logging

from threading import Thread
import grpc
# import google.protobuf.empty_pb2 as empty_pb2
from google.protobuf import empty_pb2 as _empty_pb2

from concurrent.futures import ThreadPoolExecutor
from client_py import queue_pb2_grpc
from client_py import queue_pb2


class QueueClient:
stub = None
HOST, PORT = "localhost", "8000"
SUBSCRIBE_WORKERS = 3
SUBSCRIBE_SLEEP_TIMEOUT = 2

@classmethod
def get_stub(cls, host: str, port: str):
Expand All @@ -33,7 +36,7 @@ def push(self, key: str, value: bytes):
def pull(self) -> (str, bytes):
try:
stub = self.get_stub(self.HOST, self.PORT)
response = stub.Pull(f())
response = stub.Pull(_empty_pb2.Empty())
self.ack(response.key)
return response.key, response.value
except grpc.RpcError as e:
Expand All @@ -48,33 +51,22 @@ def ack(self, acknowledgement: str):
print(f"Error in acknowledgement: {e}")
return False

async def non_blocking_pull(self):
try:
stub = self.get_stub(self.HOST, self.PORT)
response = stub.Pull(f())
await self.non_blocking_ack(response.key)
return response
except grpc.RpcError as e:
print(f"Error in pulling: {e}.")

async def non_blocking_ack(self, acknowledgement: str):
try:
stub = self.get_stub(self.HOST, self.PORT)
ack_request = queue_pb2.AcknowledgePullRequest(key=acknowledgement)
stub.AcknowledgePull(ack_request)
return True
except grpc.RpcError as e:
print(f"Error in acknowledgement: {e}")
return False
def subscribe(self, f):
thread = Thread(target=self.run_subscribe, args=(f,))
thread.start()

async def subscribe(self):
def run_subscribe(self, f):
try:
futures = []
while True:
await self.non_blocking_pull()
except grpc.RpcError as e:
print(f"Error in pulling: {e}.")
pass
with ThreadPoolExecutor(max_workers=QueueClient.SUBSCRIBE_WORKERS) as executer:
for _ in range(QueueClient.SUBSCRIBE_WORKERS):
pull_response = self.pull()
if pull_response is not None and pull_response is not False:
futures.append(executer.submit(f, pull_response[0], pull_response[1]))
time.sleep(QueueClient.SUBSCRIBE_SLEEP_TIMEOUT)

_ = [future.result() for future in futures]

def f():
return _empty_pb2.Empty()
except grpc.RpcError as e:
print(f"Error in pulling: {e}.")
2 changes: 1 addition & 1 deletion client_py/datanode-server.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def AcknowledgePull(self, request, context):


def serve():
port = "8888"
port = "8000"
host = "localhost"

server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
Expand Down
27 changes: 26 additions & 1 deletion client_py/test_client.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,26 @@
import json
import time
from unittest import TestCase
from client_py.client import QueueClient
from concurrent import futures
import asyncio
import os


def sample_push_pull(client: QueueClient):
asyncio.run(client.push("test key", [b'test value']))
return asyncio.run(client.pull()).value


def store_key_value(key, value):
print(key, value)
print(os.path)
path = '/tmp/test/'
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(f'{path}/test.json', 'w') as file:
file.write(json.dumps(dict(key=key, value=value.decode())))


class TestQueueClient(TestCase):
def setUp(self):
self.client = QueueClient()
Expand Down Expand Up @@ -45,4 +57,17 @@ def test_concurrent_push_without_order(self):
self.assertListEqual(res, [[b'test value'], [b'test value'], [b'test value']])

def test_subscribe(self):
self.fail()
client_2 = QueueClient()
self.client.subscribe(f=store_key_value)

client_2.push("test key", b'test value')

time.sleep(5)

with open('/tmp/test/test.json') as file:
json_string = file.read()
json_dict = json.loads(json_string)

print(json_dict['key'])

self.assertEqual(json_dict['key'], 'test key')
2 changes: 1 addition & 1 deletion datanode/datanode_test/datanode_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,4 +93,4 @@ def purge_main(self):
stub = self.get_stub()
stub.PurgeMainData(empty_pb2.Empty())
except grpc.RpcError as e:
print(f"Error in PurgeMain: {e}.")
print(f"Error in PurgeMain: {e}.")
5 changes: 4 additions & 1 deletion datanode/datanode_test/integration_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ def random_string():


class TestQueueClient(TestCase):

def setUp(self):
self.client = QueueClient()
self.client.purge_main()
Expand Down Expand Up @@ -71,7 +72,8 @@ def test_multiple_push_pull(self):
self.assertEqual(value, value_1)

def test_concurrent_push_without_order(self):
for _ in range(100):

for _ in range(100):

self.client.purge_main()

Expand Down Expand Up @@ -206,6 +208,7 @@ def test_push_ack_replica(self):
for key, value in zip(push_keys, push_values) if
hash_function(key, 4) == 0]


res = client.read_partition(partition_id=0, is_replica=True)
res = [(r.key, r.value) for r in res]

Expand Down
37 changes: 25 additions & 12 deletions datanode/src/shared_partition.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
import json
import threading
import time
import os
import shutil
import random
from protos import datanode_pb2
from configs.utils import MessagesStatus, PartitionsBusyError, PartitionStatus
from configs.configs import ConfigManager
Expand All @@ -21,8 +19,7 @@ def clear_path(path):
elif os.path.isdir(file_path):
shutil.rmtree(file_path)
except Exception as e:
print('Failed to delete %s. Reason: %s' % (file_path, e))
pass
logger.exception('Failed to delete %s. Reason: %s' % (file_path, e))


class Message:
Expand All @@ -49,8 +46,9 @@ def get_message(self):
key, decoded_value = message['key'], message['value']

encoded_value = decoded_value.encode(Message.encoding_method)
messageProto = datanode_pb2.QueueMessage(key=key, value=encoded_value)
return messageProto
message_proto = datanode_pb2.QueueMessage(key=key, value=encoded_value)

return message_proto

def pend_message(self):
self.message_status = MessagesStatus.PENDING
Expand Down Expand Up @@ -109,9 +107,17 @@ def get_messages(self):

return response.get_message()

def remove_message(self):
if len(self.messages) != 0:
message = self.messages.pop(0)
def message_generator(self):
for message in self.messages:
yield message.get_message()

def remove_message(self, key):
message = None
for message in self.messages:
if message.key == key:
break
if message is not None:
self.messages.remove(message)
message.sent_message()

def add_message(self, message: datanode_pb2.QueueMessage):
Expand All @@ -126,12 +132,13 @@ class SharedPartitions:

def __init__(self, partition_count, home_path, cleaner=True):
self.partition_lock = threading.Lock()
self.write_lock = threading.Lock()

assert type(partition_count) is int
self.partition_count = partition_count

self.partitions_status = partition_count * [PartitionStatus.FREE]
self.partitions = [Partition(path=f'{home_path}/partition_{i + 1}.tmq') for i in range(partition_count)]
self.partitions = [Partition(path=f'{home_path}/partition_{i}.tmq') for i in range(partition_count)]

if cleaner:
self.cleaner = threading.Thread(target=clean_partitions, args=[self])
Expand All @@ -153,6 +160,10 @@ def pull(self):

return self.partitions[partition_index].get_messages()

def read_partition_non_blocking(self, partition_index):
partition = self.partitions[partition_index]
return partition.message_generator()

def get_free_partition(self):
with self.partition_lock:
available_partitions = []
Expand All @@ -170,13 +181,15 @@ def push(self, message, partition_index=None):
if partition_index is None:
partition_index = self.get_dest_partition(message.key)
destination_partition = self.partitions[partition_index]
destination_partition.add_message(message)
# to avoid same index for multiple push request in messages
with self.write_lock:
destination_partition.add_message(message)

def acknowledge(self, key):
partition_index = self.get_dest_partition(key)
with self.partition_lock:
self.partitions[partition_index].remove_message(key)
self.partitions_status[partition_index] = PartitionStatus.FREE
self.partitions[partition_index].remove_message()

def get_dest_partition(self, key):
return hash_function(key, self.partition_count)
Expand Down
Binary file added docs/design_graph.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
9 changes: 9 additions & 0 deletions grafana/datasource.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
apiVersion: 1

datasources:
- name: Prometheus
type: prometheus
url: http://prometheus:9090
isDefault: true
access: proxy
editable: true
2 changes: 1 addition & 1 deletion leader/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ clean: ## to remove generated files
-rm -rf postviewd
-find . -type d -name mocks -exec rm -rf \{} +

leader: $(SRCS) $(PBS) | generate ## Compile leader daemon
leader:
go build -o $@ leader ./cmd/$@

docker: ## to build docker image
Expand Down
Loading

0 comments on commit f615ce6

Please sign in to comment.