From 8d774f497d40e46c81d47b97d8f26734b4259019 Mon Sep 17 00:00:00 2001 From: chaitanya Date: Mon, 7 Mar 2022 11:17:19 +0530 Subject: [PATCH 01/16] Added partial Beanstalk implementation --- py_queue_factory/abstract_queue.py | 22 +++++++++++ py_queue_factory/beanstalk_queue.py | 59 +++++++++++++++++++++++++++++ py_queue_factory/queue_factory.py | 15 ++++++-- py_queue_factory/sqs_queue.py | 24 +----------- 4 files changed, 94 insertions(+), 26 deletions(-) create mode 100644 py_queue_factory/beanstalk_queue.py diff --git a/py_queue_factory/abstract_queue.py b/py_queue_factory/abstract_queue.py index 449337f..2a95150 100644 --- a/py_queue_factory/abstract_queue.py +++ b/py_queue_factory/abstract_queue.py @@ -1,4 +1,6 @@ import copy +import json +import base64 from abc import ABC, abstractmethod import urllib.parse as url_parse @@ -87,3 +89,23 @@ def get_queue_name(self): self.subdomain, self.queue_name) return self.queue_prefix + queue_name_with_suffix + + @staticmethod + def encode_mesage(message_body, encoding): + if encoding == 'json': + message_body = json.dumps(message_body) + elif encoding == 'base64': + json_message = json.dumps(message_body).encode('utf-8') + message_body = base64.b64encode(json_message).decode('utf-8') + + return message_body + + @staticmethod + def decode_message(message_body, encoding): + if encoding == 'json': + message_body = json.loads(message_body) + elif encoding == 'base64': + message_body = base64.b64decode(message_body.encode('utf-8')) + message_body = json.loads(message_body.decode('utf-8')) + + return message_body diff --git a/py_queue_factory/beanstalk_queue.py b/py_queue_factory/beanstalk_queue.py new file mode 100644 index 0000000..4acba19 --- /dev/null +++ b/py_queue_factory/beanstalk_queue.py @@ -0,0 +1,59 @@ +import json +import base64 +import urllib.parse as url_parse + +import beanstalkc +from beanstalkc import Job + +from . import AbstractQueue, QueueMessage + + +class Beanstalk(AbstractQueue): + + BEANSTALK_MAX_VISIBILITY_TIMEOUT = "30" + + def __init__(self, uri, host_url, subdomain, default_port=11300): + parts = url_parse(uri) + host = parts.hostname + port = parts.port if parts.port else default_port + self.set_host_url(host_url).set_subdomain(subdomain) + path_parts = list(filter(None, parts.path.split('/'))) + self.queue_prefix = "/".join(path_parts) if path_parts else '' + self.beanstalk_client = beanstalkc.Connection(host=host, port=port) + + def set_default_queue(self, queue_name): + self.queue_name = self.queue_prefix + queue_name + self.beanstalk_client.watch(self.queue_name) + + def do_send_message(self, message, delay, attempt=1): + if delay > 900: + delay = 900 + try: + message_body = self.encode_mesage(message.get_body(), self.encoding) + self.beanstalk_client.use(self.get_queue_url()) + respone = self.beanstalk_client.put(message_body, delay=delay) + message.set_id(respone) + except: + if attempt < 3: + attempt += 1 + self.do_send_message(message, delay, attempt) + else: + raise Exception('Could not send message') + + def delete_message(self, message): + job = Job(self.beanstalk_client, message.get_id(), queue_job.get_body()) + job.delete() + + def receive_message(self): + result = self.beanstalk_client.reserve(int(self.visibility_timeout)) + message = QueueMessage(result.jid, result.body) + return message + + def get_queue_url(self): + return url_parse.urljoin(self.scheme + '://', 'beanstalkd', self.get_queue_name()) + + def validate_visibility_timeout(self): + if self.visibility_timeout > self.BEANSTALK_MAX_VISIBILITY_TIMEOUT: + raise Exception(f'visibility_timeout range 0 to ' + f'{self.SQS_MAX_VISIBILITY_TIMEOUT}, but received' + f' {self.visibility_timeout}') diff --git a/py_queue_factory/queue_factory.py b/py_queue_factory/queue_factory.py index fd0737e..d23dc7a 100644 --- a/py_queue_factory/queue_factory.py +++ b/py_queue_factory/queue_factory.py @@ -1,19 +1,26 @@ import urllib.parse as url_parse -from . import Sqs, SqsLocal +from . import Sqs, SqsLocal, Beanstalk + class QueueFactory: @staticmethod - def get_queue(queue_uri, host_url, subdomain): + def get_queue(queue_uri, host_url, subdomain, port=None): parts = url_parse.urlparse(queue_uri) + host = parts.hostname + host_parts = host.split('.', 2) if parts.scheme == 'https': - host = parts.hostname - host_parts = host.split('.', 2) if host_parts[0] == 'sqs' and host_parts[2] == 'amazonaws.com': return Sqs(queue_uri, host_url, subdomain) elif host_parts[0] == 'sqs' and host_parts[2] == 'awslocal': return SqsLocal(queue_uri, host_url, subdomain) else: raise Exception('Invalid Sqs URI') + elif parts.scheme == 'beanstalk': + if host_parts[0] == 'beanstalkd': + return Beanstalk(queue_uri, host_url, subdomain, port) \ + if port else Beanstalk(queue_uri, host_url, subdomain) + else: + raise Exception('Invalid Beanstalk URI') else: raise Exception('Unsupported URI scheme') diff --git a/py_queue_factory/sqs_queue.py b/py_queue_factory/sqs_queue.py index 87581e4..b3d32ff 100644 --- a/py_queue_factory/sqs_queue.py +++ b/py_queue_factory/sqs_queue.py @@ -1,6 +1,4 @@ import boto3 -import json -import base64 import urllib.parse as url_parse from . import AbstractQueue, QueueMessage @@ -43,7 +41,7 @@ def do_send_message(self, message, delay, attempt=1): if delay > 900: delay = 900 try: - message_body = self.encode_mesage(message.get_body()) + message_body = self.encode_mesage(message.get_body(), self.encoding) respone = self.sqs_client.send_message( QueueUrl=self.get_queue_url(), MessageBody=message_body, @@ -91,7 +89,7 @@ def receive_message(self, attribute_names=[]): ) if 'Messages' in result: data = result['Messages'][0] - message_body = self.decode_message(data['Body']) + message_body = self.decode_message(data['Body'], self.encoding) message = QueueMessage(message_body, data['MessageId']) message.set_receipt_handle(data['ReceiptHandle']) message.set_attributes(data.get('Attributes', {})) @@ -100,24 +98,6 @@ def receive_message(self, attribute_names=[]): self.create_queue(self.get_queue_name()) return message - def encode_mesage(self, message_body): - if self.encoding == 'json': - message_body = json.dumps(message_body) - elif self.encoding == 'base64': - json_message = json.dumps(message_body).encode('utf-8') - message_body = base64.b64encode(json_message).decode('utf-8') - - return message_body - - def decode_message(self, message_body): - if self.encoding == 'json': - message_body = json.loads(message_body) - elif self.encoding == 'base64': - message_body = base64.b64decode(message_body.encode('utf-8')) - message_body = json.loads(message_body.decode('utf-8')) - - return message_body - def change_message_visibility(self, message, visibility_timeout): if visibility_timeout > self.SQS_MAX_VISIBILITY_TIMEOUT: visibility_timeout = self.SQS_MAX_VISIBILITY_TIMEOUT From e7ad8dce74314ebe06cd76e67212f97e5ec6f12c Mon Sep 17 00:00:00 2001 From: chaitanya Date: Mon, 7 Mar 2022 12:00:44 +0530 Subject: [PATCH 02/16] Added abstract method to child class --- py_queue_factory/beanstalk_queue.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/py_queue_factory/beanstalk_queue.py b/py_queue_factory/beanstalk_queue.py index 4acba19..82b9529 100644 --- a/py_queue_factory/beanstalk_queue.py +++ b/py_queue_factory/beanstalk_queue.py @@ -52,6 +52,9 @@ def receive_message(self): def get_queue_url(self): return url_parse.urljoin(self.scheme + '://', 'beanstalkd', self.get_queue_name()) + def change_message_visibility(self, message, visibility_timeout): + pass + def validate_visibility_timeout(self): if self.visibility_timeout > self.BEANSTALK_MAX_VISIBILITY_TIMEOUT: raise Exception(f'visibility_timeout range 0 to ' From d7b1974687aec1052a677bacb6d6cbe5e3a5a7d4 Mon Sep 17 00:00:00 2001 From: chaitanya Date: Mon, 7 Mar 2022 13:39:39 +0530 Subject: [PATCH 03/16] Added dependency --- setup.py | 1 + 1 file changed, 1 insertion(+) diff --git a/setup.py b/setup.py index d559dc9..8458e24 100644 --- a/setup.py +++ b/setup.py @@ -18,6 +18,7 @@ install_requires=[ 'boto3>=1.7.*', ], + dependency_links=['https://github.com/practo/beanstalkc.git@v2.0.0'], classifiers=[ 'Development Status :: 3 - Alpha', 'Intended Audience :: Developers', From 715ffd08d49d799b4073d1abcf741987528fa767 Mon Sep 17 00:00:00 2001 From: chaitanya Date: Mon, 7 Mar 2022 15:04:24 +0530 Subject: [PATCH 04/16] Added dependency --- setup.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/setup.py b/setup.py index 8458e24..ada9772 100644 --- a/setup.py +++ b/setup.py @@ -16,9 +16,9 @@ url='https://github.com/practo/py-queue-factory', packages=setuptools.find_packages(), install_requires=[ - 'boto3>=1.7.*', - ], - dependency_links=['https://github.com/practo/beanstalkc.git@v2.0.0'], + 'boto3>=1.7.*', + 'beanstalkc3 @git+https://github.com/practo/beanstalkc.git@v2.0.0', + ], classifiers=[ 'Development Status :: 3 - Alpha', 'Intended Audience :: Developers', From b5466a73447045c04dcaeafe76f347f637584baa Mon Sep 17 00:00:00 2001 From: chaitanya Date: Mon, 7 Mar 2022 15:22:47 +0530 Subject: [PATCH 05/16] Minor bugfixes --- .idea/workspace.xml | 40 +++++++++++++++++++++++++++++ py_queue_factory/__init__.py | 1 + py_queue_factory/beanstalk_queue.py | 16 +++++------- 3 files changed, 48 insertions(+), 9 deletions(-) create mode 100644 .idea/workspace.xml diff --git a/.idea/workspace.xml b/.idea/workspace.xml new file mode 100644 index 0000000..3d83d83 --- /dev/null +++ b/.idea/workspace.xml @@ -0,0 +1,40 @@ + + + + + + + + + + + + + + + + + + + + + + + + + 1646645779289 + + + + \ No newline at end of file diff --git a/py_queue_factory/__init__.py b/py_queue_factory/__init__.py index 091acfe..80f4f98 100644 --- a/py_queue_factory/__init__.py +++ b/py_queue_factory/__init__.py @@ -3,3 +3,4 @@ from .sqs_queue import * from .sqs_local import * from .queue_factory import * +from .beanstalk_queue import * diff --git a/py_queue_factory/beanstalk_queue.py b/py_queue_factory/beanstalk_queue.py index 82b9529..98b4267 100644 --- a/py_queue_factory/beanstalk_queue.py +++ b/py_queue_factory/beanstalk_queue.py @@ -1,5 +1,3 @@ -import json -import base64 import urllib.parse as url_parse import beanstalkc @@ -15,16 +13,13 @@ class Beanstalk(AbstractQueue): def __init__(self, uri, host_url, subdomain, default_port=11300): parts = url_parse(uri) host = parts.hostname + self.scheme = parts.scheme port = parts.port if parts.port else default_port self.set_host_url(host_url).set_subdomain(subdomain) path_parts = list(filter(None, parts.path.split('/'))) self.queue_prefix = "/".join(path_parts) if path_parts else '' self.beanstalk_client = beanstalkc.Connection(host=host, port=port) - def set_default_queue(self, queue_name): - self.queue_name = self.queue_prefix + queue_name - self.beanstalk_client.watch(self.queue_name) - def do_send_message(self, message, delay, attempt=1): if delay > 900: delay = 900 @@ -41,16 +36,19 @@ def do_send_message(self, message, delay, attempt=1): raise Exception('Could not send message') def delete_message(self, message): - job = Job(self.beanstalk_client, message.get_id(), queue_job.get_body()) + job = Job(self.beanstalk_client, message.get_id(), message.get_body()) job.delete() def receive_message(self): + self.beanstalk_client.watch(self.queue_name) result = self.beanstalk_client.reserve(int(self.visibility_timeout)) message = QueueMessage(result.jid, result.body) + return message def get_queue_url(self): - return url_parse.urljoin(self.scheme + '://', 'beanstalkd', self.get_queue_name()) + return url_parse.urljoin(self.scheme + '://', 'beanstalkd', + self.get_queue_name()) def change_message_visibility(self, message, visibility_timeout): pass @@ -58,5 +56,5 @@ def change_message_visibility(self, message, visibility_timeout): def validate_visibility_timeout(self): if self.visibility_timeout > self.BEANSTALK_MAX_VISIBILITY_TIMEOUT: raise Exception(f'visibility_timeout range 0 to ' - f'{self.SQS_MAX_VISIBILITY_TIMEOUT}, but received' + f'{self.BEANSTALK_MAX_VISIBILITY_TIMEOUT}, but received' f' {self.visibility_timeout}') From 0af0c8522822c382694eb458ebffeb483a98fc4f Mon Sep 17 00:00:00 2001 From: chaitanya Date: Mon, 7 Mar 2022 15:24:10 +0530 Subject: [PATCH 06/16] minor change --- .idea/workspace.xml | 40 ---------------------------------------- 1 file changed, 40 deletions(-) delete mode 100644 .idea/workspace.xml diff --git a/.idea/workspace.xml b/.idea/workspace.xml deleted file mode 100644 index 3d83d83..0000000 --- a/.idea/workspace.xml +++ /dev/null @@ -1,40 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - - - - 1646645779289 - - - - \ No newline at end of file From 8ece23847825a7986f02406f494e8320dc9b8d9b Mon Sep 17 00:00:00 2001 From: chaitanya Date: Mon, 7 Mar 2022 16:13:16 +0530 Subject: [PATCH 07/16] Fixed timeout --- py_queue_factory/beanstalk_queue.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/py_queue_factory/beanstalk_queue.py b/py_queue_factory/beanstalk_queue.py index 98b4267..e6e83cd 100644 --- a/py_queue_factory/beanstalk_queue.py +++ b/py_queue_factory/beanstalk_queue.py @@ -8,7 +8,8 @@ class Beanstalk(AbstractQueue): - BEANSTALK_MAX_VISIBILITY_TIMEOUT = "30" + BEANSTALK_MAX_VISIBILITY_TIMEOUT = 60 * 60 * 12 # 12 hours + BEANSTALK_RECEIVE_MESSAGE_WAIT_TIME = 30 # 30 seconds def __init__(self, uri, host_url, subdomain, default_port=11300): parts = url_parse(uri) @@ -26,7 +27,7 @@ def do_send_message(self, message, delay, attempt=1): try: message_body = self.encode_mesage(message.get_body(), self.encoding) self.beanstalk_client.use(self.get_queue_url()) - respone = self.beanstalk_client.put(message_body, delay=delay) + respone = self.beanstalk_client.put(message_body, delay=delay, ttr=self.visibility_timeout) message.set_id(respone) except: if attempt < 3: @@ -41,7 +42,7 @@ def delete_message(self, message): def receive_message(self): self.beanstalk_client.watch(self.queue_name) - result = self.beanstalk_client.reserve(int(self.visibility_timeout)) + result = self.beanstalk_client.reserve(self.BEANSTALK_RECEIVE_MESSAGE_WAIT_TIME) message = QueueMessage(result.jid, result.body) return message From 30d449b93de7603bc49b3d2c713d9a44950cddd1 Mon Sep 17 00:00:00 2001 From: chaitanya Date: Tue, 8 Mar 2022 23:57:43 +0530 Subject: [PATCH 08/16] Minor bug Fix --- .DS_Store | Bin 0 -> 6148 bytes py_queue_factory.egg-info/PKG-INFO | 95 ++++++++++++++++++ py_queue_factory.egg-info/SOURCES.txt | 14 +++ .../dependency_links.txt | 1 + py_queue_factory.egg-info/requires.txt | 2 + py_queue_factory.egg-info/top_level.txt | 1 + py_queue_factory/__init__.py | 2 +- py_queue_factory/beanstalk_queue.py | 4 +- 8 files changed, 116 insertions(+), 3 deletions(-) create mode 100644 .DS_Store create mode 100644 py_queue_factory.egg-info/PKG-INFO create mode 100644 py_queue_factory.egg-info/SOURCES.txt create mode 100644 py_queue_factory.egg-info/dependency_links.txt create mode 100644 py_queue_factory.egg-info/requires.txt create mode 100644 py_queue_factory.egg-info/top_level.txt diff --git a/.DS_Store b/.DS_Store new file mode 100644 index 0000000000000000000000000000000000000000..30c7abb7158be997f7633db287e9595146e04491 GIT binary patch literal 6148 zcmeHKOHKko5Pc03gV7k4xSJE0-~r5t;|Vwb3MgX0K?5Z0<_w;|jZ4=qTp7K9Uc*cH zs=LLWfWM6~rYfm^Jyl)P)o&(KQvgh3(yaqk02QiWX_dng5x?kyl)Pn~XyhJ)UcK4L zhNFd*YWS-R$g_*Eg&z7C7oMLf-G#nkmZt4N)~2med+glry*%s|{oPOfqm$s>C|@y@ zFU$xtv~Z3Ij;ywDj6-xVMurQ_3i)9B^gA(Op7HI@<%-Mm%e{u#d;UL(E1V0toGZeV z=Q+bP%s=Y4HbP&R=kxXEQ}o8Cj}7x#<`4IcZtT9Yw~8{L3@8KNVSqJTq_XBvM`b`6 zPzL4<$oCi_=7{eLk?uap60;9oIdqO_Sd zcqCt27Y--2Hl`j>MPyy>a2~>mEyY|brMO15LVGL|V&XA(NDD=O1S}0YC%Q BcSZmJ literal 0 HcmV?d00001 diff --git a/py_queue_factory.egg-info/PKG-INFO b/py_queue_factory.egg-info/PKG-INFO new file mode 100644 index 0000000..604eb7e --- /dev/null +++ b/py_queue_factory.egg-info/PKG-INFO @@ -0,0 +1,95 @@ +Metadata-Version: 2.1 +Name: py-queue-factory +Version: 0.0.1 +Summary: Queue factory to work with multiple types of queue +Home-page: https://github.com/practo/py-queue-factory +Author: Anujith Singh +Author-email: anujith.singh@gmail.com +License: UNKNOWN +Project-URL: Documentation, https://github.com/practo/py-queue-factory/wiki +Project-URL: Source, https://github.com/practo/py-queue-factory +Project-URL: Tracker, https://github.com/practo/py-queue-factory/issues +Keywords: queue factory +Platform: UNKNOWN +Classifier: Development Status :: 3 - Alpha +Classifier: Intended Audience :: Developers +Classifier: Natural Language :: English +Classifier: Operating System :: OS Independent +Classifier: Programming Language :: Python :: 3 +Description-Content-Type: text/markdown + +# Python Queue Factory +py-queue-factory is a python library that takes care of creating required queue instances so that you only have to worry about actually using the queue + +## Quick Links +* [Installation](https://github.com/practo/py-queue-factory#installation) +* [Usage](https://github.com/practo/py-queue-factory#usage) +* [Features](https://github.com/practo/py-queue-factory#features) +* [Queue URI Examples](https://github.com/practo/py-queue-factory#queue-uri-examples) +* [Debugging](https://github.com/practo/py-queue-factory#debugging) + +## Installation +```python +pip install git+https://github.com/practo/py-queue-factory.git@ +or +pip install git+https://github.com/practo/py-queue-factory.git@ +``` + +## Usage +[Refere this for example usage](https://github.com/practo/py-queue-factory/blob/master/example.py) + +Params required `queue_uri`, `host_url` and `subdomain` +`queue_uri` is used to decide what type of queue to create (SQS, SQS Local, Beanstalk) +`host_url` and `subdomain` is used to decide the actual queue name (staging name/latest is suffixed to queue name) + +If queue prefix is `prod-subscriptions-` +queue name is `random-queue` +and host url is `https://subscriptions-stag.practodev.com` +actual queue name would be `prod-subscriptions-random-queue-stag` + +## Features +#### SQS +- [x] Send Message +- [x] Receive Message +- [x] Delete Message +- [x] Create queue if not exists +- [x] Configurable visibility timeout +- [x] Configurable encoding (json/base64) +- [ ] SQS Spooling +#### SQS Local +- [x] Send Message +- [x] Receive Message +- [x] Delete Message +- [x] Create queue if not exists +- [x] Configurable visibility timeout +- [x] Configurable encoding (json/base64) +- [ ] SQS Spooling (Don't know if spool-consumer supports aws local) +#### Beanstalk +- [ ] Send Message +- [ ] Receive Message +- [ ] Delete Message +- [ ] Create queue if not exists + +## Queue URI Examples: +```python +# Format for AWS SQS queue +QUEUE_URI = 'https://:@sqs..amazonaws.com//' + +# Format for AWSLocal/localstack SQS queue +QUEUE_URI = 'https://anything:anything@sqs..awslocal://' + +#Example AWS local SQS queue uri +QUEUE_URI = 'https://anything:anything@sqs.ap-south-east-1.awslocal:4576/1/prod-subscriptions-' + +#Example AWS SQS queue uri +QUEUE_URI = 'https://AKIAJHY5ABCPXF4YXXYZ:kdsjhfkjsdksdfkdsjnckjsdnkfjdsdkfjndskjf@sqs.ap-south-1.amazonaws.com/961234512345/prod-subscriptions-' +``` + +## Debugging +For debugging you can clone this repo locally and install it from that location +```python +pip install -e +``` +Note: Remove any previously installed version of this library + + diff --git a/py_queue_factory.egg-info/SOURCES.txt b/py_queue_factory.egg-info/SOURCES.txt new file mode 100644 index 0000000..a7f7a6a --- /dev/null +++ b/py_queue_factory.egg-info/SOURCES.txt @@ -0,0 +1,14 @@ +README.md +setup.py +py_queue_factory/__init__.py +py_queue_factory/abstract_queue.py +py_queue_factory/beanstalk_queue.py +py_queue_factory/queue_factory.py +py_queue_factory/queue_message.py +py_queue_factory/sqs_local.py +py_queue_factory/sqs_queue.py +py_queue_factory.egg-info/PKG-INFO +py_queue_factory.egg-info/SOURCES.txt +py_queue_factory.egg-info/dependency_links.txt +py_queue_factory.egg-info/requires.txt +py_queue_factory.egg-info/top_level.txt \ No newline at end of file diff --git a/py_queue_factory.egg-info/dependency_links.txt b/py_queue_factory.egg-info/dependency_links.txt new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/py_queue_factory.egg-info/dependency_links.txt @@ -0,0 +1 @@ + diff --git a/py_queue_factory.egg-info/requires.txt b/py_queue_factory.egg-info/requires.txt new file mode 100644 index 0000000..48cec34 --- /dev/null +++ b/py_queue_factory.egg-info/requires.txt @@ -0,0 +1,2 @@ +boto3>=1.7.* +beanstalkc3@ git+https://github.com/practo/beanstalkc.git@v2.0.0 diff --git a/py_queue_factory.egg-info/top_level.txt b/py_queue_factory.egg-info/top_level.txt new file mode 100644 index 0000000..c77b9c9 --- /dev/null +++ b/py_queue_factory.egg-info/top_level.txt @@ -0,0 +1 @@ +py_queue_factory diff --git a/py_queue_factory/__init__.py b/py_queue_factory/__init__.py index 80f4f98..ccf16a6 100644 --- a/py_queue_factory/__init__.py +++ b/py_queue_factory/__init__.py @@ -2,5 +2,5 @@ from .abstract_queue import * from .sqs_queue import * from .sqs_local import * -from .queue_factory import * from .beanstalk_queue import * +from .queue_factory import * diff --git a/py_queue_factory/beanstalk_queue.py b/py_queue_factory/beanstalk_queue.py index e6e83cd..35fc9a9 100644 --- a/py_queue_factory/beanstalk_queue.py +++ b/py_queue_factory/beanstalk_queue.py @@ -1,7 +1,7 @@ import urllib.parse as url_parse -import beanstalkc -from beanstalkc import Job +import beanstalkc3 +from beanstalkc3 import Job from . import AbstractQueue, QueueMessage From c94de5ee8966fa063a3d76335cec922e81e3bc55 Mon Sep 17 00:00:00 2001 From: chaitanya Date: Tue, 8 Mar 2022 23:58:32 +0530 Subject: [PATCH 09/16] removed pip package --- py_queue_factory.egg-info/PKG-INFO | 95 ------------------- py_queue_factory.egg-info/SOURCES.txt | 14 --- .../dependency_links.txt | 1 - py_queue_factory.egg-info/requires.txt | 2 - py_queue_factory.egg-info/top_level.txt | 1 - 5 files changed, 113 deletions(-) delete mode 100644 py_queue_factory.egg-info/PKG-INFO delete mode 100644 py_queue_factory.egg-info/SOURCES.txt delete mode 100644 py_queue_factory.egg-info/dependency_links.txt delete mode 100644 py_queue_factory.egg-info/requires.txt delete mode 100644 py_queue_factory.egg-info/top_level.txt diff --git a/py_queue_factory.egg-info/PKG-INFO b/py_queue_factory.egg-info/PKG-INFO deleted file mode 100644 index 604eb7e..0000000 --- a/py_queue_factory.egg-info/PKG-INFO +++ /dev/null @@ -1,95 +0,0 @@ -Metadata-Version: 2.1 -Name: py-queue-factory -Version: 0.0.1 -Summary: Queue factory to work with multiple types of queue -Home-page: https://github.com/practo/py-queue-factory -Author: Anujith Singh -Author-email: anujith.singh@gmail.com -License: UNKNOWN -Project-URL: Documentation, https://github.com/practo/py-queue-factory/wiki -Project-URL: Source, https://github.com/practo/py-queue-factory -Project-URL: Tracker, https://github.com/practo/py-queue-factory/issues -Keywords: queue factory -Platform: UNKNOWN -Classifier: Development Status :: 3 - Alpha -Classifier: Intended Audience :: Developers -Classifier: Natural Language :: English -Classifier: Operating System :: OS Independent -Classifier: Programming Language :: Python :: 3 -Description-Content-Type: text/markdown - -# Python Queue Factory -py-queue-factory is a python library that takes care of creating required queue instances so that you only have to worry about actually using the queue - -## Quick Links -* [Installation](https://github.com/practo/py-queue-factory#installation) -* [Usage](https://github.com/practo/py-queue-factory#usage) -* [Features](https://github.com/practo/py-queue-factory#features) -* [Queue URI Examples](https://github.com/practo/py-queue-factory#queue-uri-examples) -* [Debugging](https://github.com/practo/py-queue-factory#debugging) - -## Installation -```python -pip install git+https://github.com/practo/py-queue-factory.git@ -or -pip install git+https://github.com/practo/py-queue-factory.git@ -``` - -## Usage -[Refere this for example usage](https://github.com/practo/py-queue-factory/blob/master/example.py) - -Params required `queue_uri`, `host_url` and `subdomain` -`queue_uri` is used to decide what type of queue to create (SQS, SQS Local, Beanstalk) -`host_url` and `subdomain` is used to decide the actual queue name (staging name/latest is suffixed to queue name) - -If queue prefix is `prod-subscriptions-` -queue name is `random-queue` -and host url is `https://subscriptions-stag.practodev.com` -actual queue name would be `prod-subscriptions-random-queue-stag` - -## Features -#### SQS -- [x] Send Message -- [x] Receive Message -- [x] Delete Message -- [x] Create queue if not exists -- [x] Configurable visibility timeout -- [x] Configurable encoding (json/base64) -- [ ] SQS Spooling -#### SQS Local -- [x] Send Message -- [x] Receive Message -- [x] Delete Message -- [x] Create queue if not exists -- [x] Configurable visibility timeout -- [x] Configurable encoding (json/base64) -- [ ] SQS Spooling (Don't know if spool-consumer supports aws local) -#### Beanstalk -- [ ] Send Message -- [ ] Receive Message -- [ ] Delete Message -- [ ] Create queue if not exists - -## Queue URI Examples: -```python -# Format for AWS SQS queue -QUEUE_URI = 'https://:@sqs..amazonaws.com//' - -# Format for AWSLocal/localstack SQS queue -QUEUE_URI = 'https://anything:anything@sqs..awslocal://' - -#Example AWS local SQS queue uri -QUEUE_URI = 'https://anything:anything@sqs.ap-south-east-1.awslocal:4576/1/prod-subscriptions-' - -#Example AWS SQS queue uri -QUEUE_URI = 'https://AKIAJHY5ABCPXF4YXXYZ:kdsjhfkjsdksdfkdsjnckjsdnkfjdsdkfjndskjf@sqs.ap-south-1.amazonaws.com/961234512345/prod-subscriptions-' -``` - -## Debugging -For debugging you can clone this repo locally and install it from that location -```python -pip install -e -``` -Note: Remove any previously installed version of this library - - diff --git a/py_queue_factory.egg-info/SOURCES.txt b/py_queue_factory.egg-info/SOURCES.txt deleted file mode 100644 index a7f7a6a..0000000 --- a/py_queue_factory.egg-info/SOURCES.txt +++ /dev/null @@ -1,14 +0,0 @@ -README.md -setup.py -py_queue_factory/__init__.py -py_queue_factory/abstract_queue.py -py_queue_factory/beanstalk_queue.py -py_queue_factory/queue_factory.py -py_queue_factory/queue_message.py -py_queue_factory/sqs_local.py -py_queue_factory/sqs_queue.py -py_queue_factory.egg-info/PKG-INFO -py_queue_factory.egg-info/SOURCES.txt -py_queue_factory.egg-info/dependency_links.txt -py_queue_factory.egg-info/requires.txt -py_queue_factory.egg-info/top_level.txt \ No newline at end of file diff --git a/py_queue_factory.egg-info/dependency_links.txt b/py_queue_factory.egg-info/dependency_links.txt deleted file mode 100644 index 8b13789..0000000 --- a/py_queue_factory.egg-info/dependency_links.txt +++ /dev/null @@ -1 +0,0 @@ - diff --git a/py_queue_factory.egg-info/requires.txt b/py_queue_factory.egg-info/requires.txt deleted file mode 100644 index 48cec34..0000000 --- a/py_queue_factory.egg-info/requires.txt +++ /dev/null @@ -1,2 +0,0 @@ -boto3>=1.7.* -beanstalkc3@ git+https://github.com/practo/beanstalkc.git@v2.0.0 diff --git a/py_queue_factory.egg-info/top_level.txt b/py_queue_factory.egg-info/top_level.txt deleted file mode 100644 index c77b9c9..0000000 --- a/py_queue_factory.egg-info/top_level.txt +++ /dev/null @@ -1 +0,0 @@ -py_queue_factory From 64077aa6a40ee4ba119e59febfc8d84209eacdd7 Mon Sep 17 00:00:00 2001 From: chaitanya Date: Wed, 9 Mar 2022 00:05:33 +0530 Subject: [PATCH 10/16] Minor Fix --- py_queue_factory/beanstalk_queue.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/py_queue_factory/beanstalk_queue.py b/py_queue_factory/beanstalk_queue.py index 35fc9a9..a753c91 100644 --- a/py_queue_factory/beanstalk_queue.py +++ b/py_queue_factory/beanstalk_queue.py @@ -19,7 +19,7 @@ def __init__(self, uri, host_url, subdomain, default_port=11300): self.set_host_url(host_url).set_subdomain(subdomain) path_parts = list(filter(None, parts.path.split('/'))) self.queue_prefix = "/".join(path_parts) if path_parts else '' - self.beanstalk_client = beanstalkc.Connection(host=host, port=port) + self.beanstalk_client = beanstalkc3.Connection(host=host, port=port) def do_send_message(self, message, delay, attempt=1): if delay > 900: From 49b14d44edcae8cbe085d9c88f34fad59e7c2c7d Mon Sep 17 00:00:00 2001 From: chaitanya Date: Wed, 9 Mar 2022 01:24:53 +0530 Subject: [PATCH 11/16] Fixed minor bug --- py_queue_factory/beanstalk_queue.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/py_queue_factory/beanstalk_queue.py b/py_queue_factory/beanstalk_queue.py index a753c91..1292baa 100644 --- a/py_queue_factory/beanstalk_queue.py +++ b/py_queue_factory/beanstalk_queue.py @@ -12,7 +12,7 @@ class Beanstalk(AbstractQueue): BEANSTALK_RECEIVE_MESSAGE_WAIT_TIME = 30 # 30 seconds def __init__(self, uri, host_url, subdomain, default_port=11300): - parts = url_parse(uri) + parts = url_parse.urlparse(uri) host = parts.hostname self.scheme = parts.scheme port = parts.port if parts.port else default_port From fb0abd43ea352fc94e922ec4a8733f3a976c8d1a Mon Sep 17 00:00:00 2001 From: chaitanya Date: Wed, 9 Mar 2022 07:19:09 +0530 Subject: [PATCH 12/16] Minor fix for static method invocation --- .DS_Store | Bin 6148 -> 0 bytes py_queue_factory/beanstalk_queue.py | 6 +++--- py_queue_factory/sqs_queue.py | 4 ++-- 3 files changed, 5 insertions(+), 5 deletions(-) delete mode 100644 .DS_Store diff --git a/.DS_Store b/.DS_Store deleted file mode 100644 index 30c7abb7158be997f7633db287e9595146e04491..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 6148 zcmeHKOHKko5Pc03gV7k4xSJE0-~r5t;|Vwb3MgX0K?5Z0<_w;|jZ4=qTp7K9Uc*cH zs=LLWfWM6~rYfm^Jyl)P)o&(KQvgh3(yaqk02QiWX_dng5x?kyl)Pn~XyhJ)UcK4L zhNFd*YWS-R$g_*Eg&z7C7oMLf-G#nkmZt4N)~2med+glry*%s|{oPOfqm$s>C|@y@ zFU$xtv~Z3Ij;ywDj6-xVMurQ_3i)9B^gA(Op7HI@<%-Mm%e{u#d;UL(E1V0toGZeV z=Q+bP%s=Y4HbP&R=kxXEQ}o8Cj}7x#<`4IcZtT9Yw~8{L3@8KNVSqJTq_XBvM`b`6 zPzL4<$oCi_=7{eLk?uap60;9oIdqO_Sd zcqCt27Y--2Hl`j>MPyy>a2~>mEyY|brMO15LVGL|V&XA(NDD=O1S}0YC%Q BcSZmJ diff --git a/py_queue_factory/beanstalk_queue.py b/py_queue_factory/beanstalk_queue.py index 1292baa..99d77fa 100644 --- a/py_queue_factory/beanstalk_queue.py +++ b/py_queue_factory/beanstalk_queue.py @@ -25,7 +25,7 @@ def do_send_message(self, message, delay, attempt=1): if delay > 900: delay = 900 try: - message_body = self.encode_mesage(message.get_body(), self.encoding) + message_body = AbstractQueue.encode_mesage(message.get_body(), self.encoding) self.beanstalk_client.use(self.get_queue_url()) respone = self.beanstalk_client.put(message_body, delay=delay, ttr=self.visibility_timeout) message.set_id(respone) @@ -41,9 +41,9 @@ def delete_message(self, message): job.delete() def receive_message(self): - self.beanstalk_client.watch(self.queue_name) + self.beanstalk_client.watch(self.get_queue_url()) result = self.beanstalk_client.reserve(self.BEANSTALK_RECEIVE_MESSAGE_WAIT_TIME) - message = QueueMessage(result.jid, result.body) + message = QueueMessage(result.jid, AbstractQueue.decode_message(result.body, self.encoding)) return message diff --git a/py_queue_factory/sqs_queue.py b/py_queue_factory/sqs_queue.py index b3d32ff..576854f 100644 --- a/py_queue_factory/sqs_queue.py +++ b/py_queue_factory/sqs_queue.py @@ -41,7 +41,7 @@ def do_send_message(self, message, delay, attempt=1): if delay > 900: delay = 900 try: - message_body = self.encode_mesage(message.get_body(), self.encoding) + message_body = AbstractQueue.encode_mesage(message.get_body(), self.encoding) respone = self.sqs_client.send_message( QueueUrl=self.get_queue_url(), MessageBody=message_body, @@ -89,7 +89,7 @@ def receive_message(self, attribute_names=[]): ) if 'Messages' in result: data = result['Messages'][0] - message_body = self.decode_message(data['Body'], self.encoding) + message_body = AbstractQueue.decode_message(data['Body'], self.encoding) message = QueueMessage(message_body, data['MessageId']) message.set_receipt_handle(data['ReceiptHandle']) message.set_attributes(data.get('Attributes', {})) From d60038652fb7007356c9d7fc428a8d77b47628fc Mon Sep 17 00:00:00 2001 From: chaitanya Date: Wed, 9 Mar 2022 07:47:19 +0530 Subject: [PATCH 13/16] Added message return for debug --- py_queue_factory/beanstalk_queue.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/py_queue_factory/beanstalk_queue.py b/py_queue_factory/beanstalk_queue.py index 99d77fa..82239b6 100644 --- a/py_queue_factory/beanstalk_queue.py +++ b/py_queue_factory/beanstalk_queue.py @@ -29,6 +29,8 @@ def do_send_message(self, message, delay, attempt=1): self.beanstalk_client.use(self.get_queue_url()) respone = self.beanstalk_client.put(message_body, delay=delay, ttr=self.visibility_timeout) message.set_id(respone) + + return message except: if attempt < 3: attempt += 1 From 882d3ea343dc98474c90260608b45518cde7d373 Mon Sep 17 00:00:00 2001 From: chaitanya Date: Wed, 9 Mar 2022 09:22:31 +0530 Subject: [PATCH 14/16] added debug --- py_queue_factory/abstract_queue.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/py_queue_factory/abstract_queue.py b/py_queue_factory/abstract_queue.py index 2a95150..56a7e73 100644 --- a/py_queue_factory/abstract_queue.py +++ b/py_queue_factory/abstract_queue.py @@ -26,6 +26,8 @@ def send_message(self, message, delay=0): message = QueueMessage(message) self.do_send_message(message, delay) + return message + @abstractmethod def do_send_message(message, delay): pass From 79e10ae17e9cd70e5296db5d983d9f4fe99290ec Mon Sep 17 00:00:00 2001 From: chaitanya Date: Wed, 9 Mar 2022 10:41:14 +0530 Subject: [PATCH 15/16] Minor fix --- py_queue_factory/beanstalk_queue.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/py_queue_factory/beanstalk_queue.py b/py_queue_factory/beanstalk_queue.py index 82239b6..cfb284d 100644 --- a/py_queue_factory/beanstalk_queue.py +++ b/py_queue_factory/beanstalk_queue.py @@ -1,3 +1,4 @@ +import os import urllib.parse as url_parse import beanstalkc3 @@ -26,7 +27,7 @@ def do_send_message(self, message, delay, attempt=1): delay = 900 try: message_body = AbstractQueue.encode_mesage(message.get_body(), self.encoding) - self.beanstalk_client.use(self.get_queue_url()) + self.beanstalk_client.use(self.get_queue_name()) respone = self.beanstalk_client.put(message_body, delay=delay, ttr=self.visibility_timeout) message.set_id(respone) @@ -43,15 +44,14 @@ def delete_message(self, message): job.delete() def receive_message(self): - self.beanstalk_client.watch(self.get_queue_url()) + self.beanstalk_client.watch(self.get_queue_name()) result = self.beanstalk_client.reserve(self.BEANSTALK_RECEIVE_MESSAGE_WAIT_TIME) message = QueueMessage(result.jid, AbstractQueue.decode_message(result.body, self.encoding)) return message def get_queue_url(self): - return url_parse.urljoin(self.scheme + '://', 'beanstalkd', - self.get_queue_name()) + return os.path.join(self.scheme + '://', 'beanstalkd', self.get_queue_name()) def change_message_visibility(self, message, visibility_timeout): pass From 52b5a15defde42063479ec975139be08a668459d Mon Sep 17 00:00:00 2001 From: chaitanya Date: Wed, 9 Mar 2022 19:54:46 +0530 Subject: [PATCH 16/16] Minor Refactor --- README.md | 8 ++++---- py_queue_factory/abstract_queue.py | 7 +++---- py_queue_factory/beanstalk_queue.py | 2 -- 3 files changed, 7 insertions(+), 10 deletions(-) diff --git a/README.md b/README.md index cf8de75..28e7661 100644 --- a/README.md +++ b/README.md @@ -45,10 +45,10 @@ actual queue name would be `prod-subscriptions-random-queue-stag` - [x] Configurable encoding (json/base64) - [ ] SQS Spooling (Don't know if spool-consumer supports aws local) #### Beanstalk -- [ ] Send Message -- [ ] Receive Message -- [ ] Delete Message -- [ ] Create queue if not exists +- [x] Send Message +- [x] Receive Message +- [x] Delete Message +- [x] Create queue if not exists ## Queue URI Examples: ```python diff --git a/py_queue_factory/abstract_queue.py b/py_queue_factory/abstract_queue.py index 56a7e73..7cda1bd 100644 --- a/py_queue_factory/abstract_queue.py +++ b/py_queue_factory/abstract_queue.py @@ -20,13 +20,12 @@ class AbstractQueue(ABC): DEFAULT_ENCODING = 'base64' VALID_ENCODING = ['json', 'base64'] - def send_message(self, message, delay=0): + def send_message(self, message, delay=0, debug=False): if not isinstance(message, QueueMessage): - # message = self.handle_cid(message) message = QueueMessage(message) self.do_send_message(message, delay) - - return message + if debug: + return message @abstractmethod def do_send_message(message, delay): diff --git a/py_queue_factory/beanstalk_queue.py b/py_queue_factory/beanstalk_queue.py index cfb284d..b8725ae 100644 --- a/py_queue_factory/beanstalk_queue.py +++ b/py_queue_factory/beanstalk_queue.py @@ -30,8 +30,6 @@ def do_send_message(self, message, delay, attempt=1): self.beanstalk_client.use(self.get_queue_name()) respone = self.beanstalk_client.put(message_body, delay=delay, ttr=self.visibility_timeout) message.set_id(respone) - - return message except: if attempt < 3: attempt += 1