diff --git a/.vscode/settings.json b/.vscode/settings.json index a6cee73..8e49708 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -6,14 +6,16 @@ "files.insertFinalNewline": true, "[python]": { "editor.formatOnSave": true, + "editor.defaultFormatter": "charliermarsh.ruff", "editor.codeActionsOnSave": { - "source.organizeImports": "explicit" + "source.organizeImports.ruff": "explicit", + "source.fixAll.ruff": "explicit" } }, "python.analysis.extraPaths": [ "src/" ], - "python.linting.flake8Args": [ - "--config=pyproject.toml" - ] + "ruff.enable": true, + "ruff.organizeImports": true, + "ruff.fixAll": true } diff --git a/pyproject.toml b/pyproject.toml index 36de397..f26f59c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "emp_hooks" -version = "0.0.10" +version = "0.0.11" description = "emp hooks" authors = [ {name = "johnny-emp",email = "johnny@empyrealsdk.com"} @@ -31,8 +31,10 @@ pytest = "^8.3.4" pytest-asyncio = "^0.25.3" ipython = "^8.32.0" -[tool.ruff] +[tool.ruff.lint] ignore = ["E704"] + +[tool.ruff] src-paths = ["./src"] line-length = 120 diff --git a/src/emp_hooks/handlers/sqs_hooks.py b/src/emp_hooks/handlers/sqs_hooks.py index 3ee9c73..055264d 100644 --- a/src/emp_hooks/handlers/sqs_hooks.py +++ b/src/emp_hooks/handlers/sqs_hooks.py @@ -1,3 +1,4 @@ +import asyncio import json import os import threading @@ -18,6 +19,7 @@ class SQSHooks(Hook): running: bool = Field(default=False) _thread: threading.Thread | None = PrivateAttr(default=None) + _loop: asyncio.AbstractEventLoop | None = PrivateAttr(default=None) model_config = ConfigDict(arbitrary_types_allowed=True) @@ -36,10 +38,11 @@ def run( return self.running = True + self._loop = asyncio.new_event_loop() self._thread = threading.Thread( - target=self._run, + target=self._run_loop, args=(visibility_timeout, loop_interval), - daemon=daemon, # keep the program running until the hook is stopped + daemon=daemon, ) self._thread.start() @@ -48,7 +51,12 @@ def stop(self, timeout: int = 5): if self._thread: self._thread.join(timeout) - def _run(self, visibility_timeout: int = 30, loop_interval: int = 5): + def _run_loop(self, visibility_timeout: int = 30, loop_interval: int = 5): + asyncio.set_event_loop(self._loop) + assert self._loop is not None + self._loop.run_until_complete(self._run(visibility_timeout, loop_interval)) + + async def _run(self, visibility_timeout: int = 30, loop_interval: int = 5): if not self.queue: self.queue = SQSQueue(name=os.environ["AWS_SQS_QUEUE_NAME"]) @@ -61,10 +69,17 @@ def _run(self, visibility_timeout: int = 30, loop_interval: int = 5): body = json.loads(message.body) query = body["query"] if query in self.hooks: - do_delete: bool = self.hooks[query](body) + func = self.hooks[query] + + do_delete: bool + if asyncio.iscoroutinefunction(func): + do_delete = await func(body) + else: + do_delete = func(body) + if do_delete: message.delete() - time.sleep(loop_interval) + await asyncio.sleep(loop_interval) sqs_hooks: SQSHooks = SQSHooks()