Skip to content

Commit

Permalink
Add leader replica support to client_py
Browse files Browse the repository at this point in the history
  • Loading branch information
kysre committed Feb 10, 2024
1 parent 5567dfd commit 2524f1b
Showing 1 changed file with 28 additions and 8 deletions.
36 changes: 28 additions & 8 deletions client_py/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import logging
from threading import Thread
import grpc
# import google.protobuf.empty_pb2 as empty_pb2
from google.protobuf import empty_pb2 as _empty_pb2
from concurrent.futures import ThreadPoolExecutor
from client_py import queue_pb2_grpc
Expand All @@ -13,7 +12,8 @@

class QueueClient:
stub = None
HOST, PORT = "localhost", "8000"
HOST = "localhost"
PORT, REPLICA_PORT = "8000", "8001"
SUBSCRIBE_WORKERS = 3
SUBSCRIBE_SLEEP_TIMEOUT = 2

Expand All @@ -24,14 +24,24 @@ def get_stub(cls, host: str, port: str):
cls.stub = queue_pb2_grpc.QueueStub(channel)
return cls.stub

@classmethod
def get_replica_stub(cls, host: str, port: str):
if cls.replica_stub is None:
channel = grpc.insecure_channel(f"{host}:{port}")
cls.replica_stub = queue_pb2_grpc.QueueStub(channel)
return cls.replica_stub

def push(self, key: str, value: bytes):
try:
stub = self.get_stub(self.HOST, self.PORT)

stub.Push(queue_pb2.PushRequest(key=key, value=value))

except grpc.RpcError as e:
print(f"Error in pushing: {e}.")
try:
stub = self.get_replica_stub(self.HOST, self.REPLICA_PORT)
stub.Push(queue_pb2.PushRequest(key=key, value=value))
except grpc.RpcError as e:
pass

def pull(self) -> (str, bytes):
try:
Expand All @@ -40,16 +50,26 @@ def pull(self) -> (str, bytes):
self.ack(response.key)
return response.key, response.value
except grpc.RpcError as e:
print(f"Error in pulling: {e}.")
try:
stub = self.get_replica_stub(self.HOST, self.REPLICA_PORT)
response = stub.Pull(_empty_pb2.Empty())
self.ack(response.key)
return response.key, response.value
except grpc.RpcError as e:
return '', None

def ack(self, acknowledgement: str):
try:
stub = self.get_stub(self.HOST, self.PORT)
stub.AcknowledgePull(queue_pb2.AcknowledgePullRequest(key=acknowledgement))
return None
except grpc.RpcError as e:
print(f"Error in acknowledgement: {e}")
return False
try:
stub = self.get_replica_stub(self.HOST, self.REPLICA_PORT)
stub.AcknowledgePull(queue_pb2.AcknowledgePullRequest(key=acknowledgement))
return None
except grpc.RpcError as e:
return None

def subscribe(self, f):
thread = Thread(target=self.run_subscribe, args=(f,))
Expand All @@ -62,7 +82,7 @@ def run_subscribe(self, f):
with ThreadPoolExecutor(max_workers=QueueClient.SUBSCRIBE_WORKERS) as executer:
for _ in range(QueueClient.SUBSCRIBE_WORKERS):
pull_response = self.pull()
if pull_response is not None and pull_response is not False:
if pull_response is not None and pull_response is not False and pull_response[0] != '':
futures.append(executer.submit(f, pull_response[0], pull_response[1]))
time.sleep(QueueClient.SUBSCRIBE_SLEEP_TIMEOUT)

Expand Down

0 comments on commit 2524f1b

Please sign in to comment.