Skip to content

Commit ee01b18

Browse files
committed
manually ack instead of auto ack persistent messages
Signed-off-by: Lance Drane <[email protected]>
1 parent aef4508 commit ee01b18

File tree

2 files changed

+47
-15
lines changed

2 files changed

+47
-15
lines changed

src/intersect_sdk/_internal/control_plane/brokers/amqp_client.py

Lines changed: 40 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,8 @@ def disconnect(self) -> None:
146146
self._connection.close()
147147

148148
if self._thread:
149-
self._thread.join()
149+
# If gracefully shutting down, we should finish up the current job.
150+
self._thread.join(5 if self.considered_unrecoverable() else None)
150151
self._thread = None
151152

152153
def is_connected(self) -> bool:
@@ -163,7 +164,7 @@ def is_connected(self) -> bool:
163164
def considered_unrecoverable(self) -> bool:
164165
return self._unrecoverable
165166

166-
def publish(self, topic: str, payload: bytes, persist: bool) -> None: # noqa: ARG002 (TODO handle persistence)
167+
def publish(self, topic: str, payload: bytes, persist: bool) -> None:
167168
"""Publish the given message.
168169
169170
Publish payload with the pre-existing connection (via connect()) on topic.
@@ -180,9 +181,9 @@ def publish(self, topic: str, payload: bytes, persist: bool) -> None: # noqa: A
180181
body=payload,
181182
properties=pika.BasicProperties(
182183
content_type='text/plain',
183-
# delivery_mode=pika.delivery_mode.DeliveryMode.Persistent
184-
# if persist
185-
# else pika.delivery_mode.DeliveryMode.Transient,
184+
delivery_mode=pika.delivery_mode.DeliveryMode.Persistent
185+
if persist
186+
else pika.delivery_mode.DeliveryMode.Transient,
186187
# expiration=None if persist else '8640000',
187188
),
188189
)
@@ -367,7 +368,9 @@ def _create_queue(self, channel: Channel, topic: str, persist: bool) -> None:
367368
If True, this queue will persist forever, even on application or broker shutdown, and we need a persistent name.
368369
If False, we will generate a temporary queue using the broker's naming scheme.
369370
"""
370-
cb = functools.partial(self._on_queue_declareok, channel=channel, topic=topic)
371+
cb = functools.partial(
372+
self._on_queue_declareok, channel=channel, topic=topic, persist=persist
373+
)
371374
channel.queue_declare(
372375
queue=_get_queue_name(topic)
373376
if persist
@@ -377,7 +380,9 @@ def _create_queue(self, channel: Channel, topic: str, persist: bool) -> None:
377380
callback=cb,
378381
)
379382

380-
def _on_queue_declareok(self, frame: Frame, channel: Channel, topic: str) -> None:
383+
def _on_queue_declareok(
384+
self, frame: Frame, channel: Channel, topic: str, persist: bool
385+
) -> None:
381386
"""Begins listening on the given queue.
382387
383388
Used as a listener on queue declaration.
@@ -386,10 +391,15 @@ def _on_queue_declareok(self, frame: Frame, channel: Channel, topic: str) -> Non
386391
frame: Response from the queue declare we sent to the AMQP broker. We get the queue name from this.
387392
channel: The Channel being instantiated.
388393
topic: The string name for the Channel on the broker.
394+
persist: Whether or not our queue should persist on either broker or application shutdown.
389395
"""
390396
queue_name = frame.method.queue
391397
cb = functools.partial(
392-
self._on_queue_bindok, channel=channel, topic=topic, queue_name=queue_name
398+
self._on_queue_bindok,
399+
channel=channel,
400+
topic=topic,
401+
queue_name=queue_name,
402+
persist=persist,
393403
)
394404
channel.queue_bind(
395405
queue=queue_name,
@@ -399,7 +409,12 @@ def _on_queue_declareok(self, frame: Frame, channel: Channel, topic: str) -> Non
399409
)
400410

401411
def _on_queue_bindok(
402-
self, _unused_frame: Frame, channel: Channel, topic: str, queue_name: str
412+
self,
413+
_unused_frame: Frame,
414+
channel: Channel,
415+
topic: str,
416+
queue_name: str,
417+
persist: bool,
403418
) -> None:
404419
"""Consumes a message from the given channel.
405420
@@ -410,12 +425,14 @@ def _on_queue_bindok(
410425
channel: The Channel being instantiated.
411426
topic: Name of the topic on the broker.
412427
queue_name: The name of the queue on the AMQP broker.
428+
persist: Whether or not our queue should persist on either broker or application shutdown.
413429
"""
414430
cb = functools.partial(self._on_consume_ok, topic=topic)
431+
message_cb = functools.partial(self._consume_message, persist=persist)
415432
consumer_tag = channel.basic_consume(
416433
queue=queue_name,
417-
auto_ack=True,
418-
on_message_callback=self._consume_message,
434+
auto_ack=not persist, # persistent messages should be manually acked and we have no reason to NACK a message for now
435+
on_message_callback=message_cb,
419436
callback=cb,
420437
)
421438
self._topics_to_consumer_tags[topic] = consumer_tag
@@ -433,23 +450,32 @@ def _on_consume_ok(self, _unused_frame: Frame, topic: str) -> None:
433450

434451
def _consume_message(
435452
self,
436-
_unused_channel: Channel,
453+
channel: Channel,
437454
basic_deliver: Basic.Deliver,
438455
_properties: BasicProperties,
439456
body: bytes,
457+
persist: bool,
440458
) -> None:
441-
"""Handles incoming messages.
459+
"""Handles incoming messages and acknowledges them ONLY after code executes on the domain side.
442460
443461
Looks up all handlers for the topic and delegates message handling to them.
462+
The handlers comprise the Service/Client logic, which includes all domain science logic.
444463
445464
Args:
446-
_unused_channel: The AMQP channel the message was received on. Ignored
465+
channel: The AMQP channel the message was received on. Used to manually acknowledge messages.
447466
basic_deliver: Contains internal AMQP delivery information - i.e. the routing key.
448467
_properties: Object from the AMQP call. Ignored.
449468
body: the AMQP message to be handled.
469+
persist: Whether or not our queue should persist on either broker or application shutdown.
450470
"""
451471
tth_key = _amqp_2_hierarchy(basic_deliver.routing_key)
452472
topic_handler = self._topics_to_handlers().get(tth_key)
453473
if topic_handler:
454474
for cb in topic_handler.callbacks:
455475
cb(body)
476+
# With persistent messages, we only acknowledge the message AFTER we are done processing
477+
# (this removes the message from the broker queue)
478+
# this allows us to retry a message if the broker OR this application goes down
479+
# We currently never NACK or reject a message because in INTERSECT, applications currently never "share" a queue.
480+
if persist:
481+
channel.basic_ack(basic_deliver.delivery_tag)

src/intersect_sdk/service.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -249,7 +249,13 @@ def shutdown(self, reason: str | None = None) -> Self:
249249
self._status_thread.join()
250250
self._status_thread = None
251251

252-
self._send_lifecycle_message(lifecycle_type=LifecycleType.SHUTDOWN, payload=reason)
252+
try:
253+
self._send_lifecycle_message(lifecycle_type=LifecycleType.SHUTDOWN, payload=reason)
254+
except Exception as e: # noqa: BLE001 (this could fail on numerous protocols)
255+
logger.error(
256+
'Could not send shutdown message, INTERSECT Core will eventually assume this Service has shutdown.'
257+
)
258+
logger.debug(e)
253259

254260
self._control_plane_manager.disconnect()
255261

0 commit comments

Comments
 (0)