From 3444d8dfb3fbc3ba0c8949fbf464e2fcbb11d17c Mon Sep 17 00:00:00 2001 From: johnny-emp Date: Tue, 6 May 2025 01:03:58 -0700 Subject: [PATCH 1/3] run coroutine --- src/emp_hooks/handlers/sqs_hooks.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/emp_hooks/handlers/sqs_hooks.py b/src/emp_hooks/handlers/sqs_hooks.py index 3ee9c73..a4e99ca 100644 --- a/src/emp_hooks/handlers/sqs_hooks.py +++ b/src/emp_hooks/handlers/sqs_hooks.py @@ -1,3 +1,4 @@ +import asyncio import json import os import threading @@ -62,6 +63,8 @@ def _run(self, visibility_timeout: int = 30, loop_interval: int = 5): query = body["query"] if query in self.hooks: do_delete: bool = self.hooks[query](body) + if asyncio.iscoroutinefunction(self.hooks[query]): + asyncio.run(self.hooks[query](body)) if do_delete: message.delete() time.sleep(loop_interval) From 525e2fdb24f8d6f4125fcfb82a1919995ce3b6ad Mon Sep 17 00:00:00 2001 From: johnny-emp Date: Tue, 6 May 2025 01:04:10 -0700 Subject: [PATCH 2/3] bump version --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 36de397..211f4e6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "emp_hooks" -version = "0.0.10" +version = "0.0.11" description = "emp hooks" authors = [ {name = "johnny-emp",email = "johnny@empyrealsdk.com"} From ca70670e67f58d010f06e5ec60b920ff198283b4 Mon Sep 17 00:00:00 2001 From: johnny-emp Date: Tue, 6 May 2025 01:09:20 -0700 Subject: [PATCH 3/3] improved async --- .vscode/settings.json | 10 ++++++---- pyproject.toml | 4 +++- src/emp_hooks/handlers/sqs_hooks.py | 26 +++++++++++++++++++------- 3 files changed, 28 insertions(+), 12 deletions(-) diff --git a/.vscode/settings.json b/.vscode/settings.json index a6cee73..8e49708 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -6,14 +6,16 @@ "files.insertFinalNewline": true, "[python]": { "editor.formatOnSave": true, + "editor.defaultFormatter": "charliermarsh.ruff", "editor.codeActionsOnSave": { - "source.organizeImports": "explicit" + "source.organizeImports.ruff": "explicit", + "source.fixAll.ruff": "explicit" } }, "python.analysis.extraPaths": [ "src/" ], - "python.linting.flake8Args": [ - "--config=pyproject.toml" - ] + "ruff.enable": true, + "ruff.organizeImports": true, + "ruff.fixAll": true } diff --git a/pyproject.toml b/pyproject.toml index 211f4e6..f26f59c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -31,8 +31,10 @@ pytest = "^8.3.4" pytest-asyncio = "^0.25.3" ipython = "^8.32.0" -[tool.ruff] +[tool.ruff.lint] ignore = ["E704"] + +[tool.ruff] src-paths = ["./src"] line-length = 120 diff --git a/src/emp_hooks/handlers/sqs_hooks.py b/src/emp_hooks/handlers/sqs_hooks.py index a4e99ca..055264d 100644 --- a/src/emp_hooks/handlers/sqs_hooks.py +++ b/src/emp_hooks/handlers/sqs_hooks.py @@ -19,6 +19,7 @@ class SQSHooks(Hook): running: bool = Field(default=False) _thread: threading.Thread | None = PrivateAttr(default=None) + _loop: asyncio.AbstractEventLoop | None = PrivateAttr(default=None) model_config = ConfigDict(arbitrary_types_allowed=True) @@ -37,10 +38,11 @@ def run( return self.running = True + self._loop = asyncio.new_event_loop() self._thread = threading.Thread( - target=self._run, + target=self._run_loop, args=(visibility_timeout, loop_interval), - daemon=daemon, # keep the program running until the hook is stopped + daemon=daemon, ) self._thread.start() @@ -49,7 +51,12 @@ def stop(self, timeout: int = 5): if self._thread: self._thread.join(timeout) - def _run(self, visibility_timeout: int = 30, loop_interval: int = 5): + def _run_loop(self, visibility_timeout: int = 30, loop_interval: int = 5): + asyncio.set_event_loop(self._loop) + assert self._loop is not None + self._loop.run_until_complete(self._run(visibility_timeout, loop_interval)) + + async def _run(self, visibility_timeout: int = 30, loop_interval: int = 5): if not self.queue: self.queue = SQSQueue(name=os.environ["AWS_SQS_QUEUE_NAME"]) @@ -62,12 +69,17 @@ def _run(self, visibility_timeout: int = 30, loop_interval: int = 5): body = json.loads(message.body) query = body["query"] if query in self.hooks: - do_delete: bool = self.hooks[query](body) - if asyncio.iscoroutinefunction(self.hooks[query]): - asyncio.run(self.hooks[query](body)) + func = self.hooks[query] + + do_delete: bool + if asyncio.iscoroutinefunction(func): + do_delete = await func(body) + else: + do_delete = func(body) + if do_delete: message.delete() - time.sleep(loop_interval) + await asyncio.sleep(loop_interval) sqs_hooks: SQSHooks = SQSHooks()