Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,16 @@
"files.insertFinalNewline": true,
"[python]": {
"editor.formatOnSave": true,
"editor.defaultFormatter": "charliermarsh.ruff",
"editor.codeActionsOnSave": {
"source.organizeImports": "explicit"
"source.organizeImports.ruff": "explicit",
"source.fixAll.ruff": "explicit"
}
},
"python.analysis.extraPaths": [
"src/"
],
"python.linting.flake8Args": [
"--config=pyproject.toml"
]
"ruff.enable": true,
"ruff.organizeImports": true,
"ruff.fixAll": true
}
6 changes: 4 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "emp_hooks"
version = "0.0.10"
version = "0.0.11"
description = "emp hooks"
authors = [
{name = "johnny-emp",email = "johnny@empyrealsdk.com"}
Expand Down Expand Up @@ -31,8 +31,10 @@ pytest = "^8.3.4"
pytest-asyncio = "^0.25.3"
ipython = "^8.32.0"

[tool.ruff]
[tool.ruff.lint]
ignore = ["E704"]

[tool.ruff]
src-paths = ["./src"]
line-length = 120

Expand Down
25 changes: 20 additions & 5 deletions src/emp_hooks/handlers/sqs_hooks.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import json
import os
import threading
Expand All @@ -18,6 +19,7 @@ class SQSHooks(Hook):
running: bool = Field(default=False)

_thread: threading.Thread | None = PrivateAttr(default=None)
_loop: asyncio.AbstractEventLoop | None = PrivateAttr(default=None)

model_config = ConfigDict(arbitrary_types_allowed=True)

Expand All @@ -36,10 +38,11 @@ def run(
return

self.running = True
self._loop = asyncio.new_event_loop()
self._thread = threading.Thread(
target=self._run,
target=self._run_loop,
args=(visibility_timeout, loop_interval),
daemon=daemon, # keep the program running until the hook is stopped
daemon=daemon,
)
self._thread.start()

Expand All @@ -48,7 +51,12 @@ def stop(self, timeout: int = 5):
if self._thread:
self._thread.join(timeout)

def _run(self, visibility_timeout: int = 30, loop_interval: int = 5):
def _run_loop(self, visibility_timeout: int = 30, loop_interval: int = 5):
asyncio.set_event_loop(self._loop)
assert self._loop is not None
self._loop.run_until_complete(self._run(visibility_timeout, loop_interval))

async def _run(self, visibility_timeout: int = 30, loop_interval: int = 5):
if not self.queue:
self.queue = SQSQueue(name=os.environ["AWS_SQS_QUEUE_NAME"])

Expand All @@ -61,10 +69,17 @@ def _run(self, visibility_timeout: int = 30, loop_interval: int = 5):
body = json.loads(message.body)
query = body["query"]
if query in self.hooks:
do_delete: bool = self.hooks[query](body)
func = self.hooks[query]

do_delete: bool
if asyncio.iscoroutinefunction(func):
do_delete = await func(body)
else:
do_delete = func(body)

if do_delete:
message.delete()
time.sleep(loop_interval)
await asyncio.sleep(loop_interval)


sqs_hooks: SQSHooks = SQSHooks()