Skip to content

Commit

Permalink
FIX: Wait for event emission future by default
Browse files Browse the repository at this point in the history
skipci
  • Loading branch information
cortadocodes committed Feb 6, 2025
1 parent cdc0c0c commit 9a86bce
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 8 deletions.
2 changes: 1 addition & 1 deletion octue/cloud/pub_sub/logging.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import logging
import re


ANSI_ESCAPE_SEQUENCES_PATTERN = r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])"


Expand Down Expand Up @@ -66,6 +65,7 @@ def emit(self, record):
originator_question_uuid=self.originator_question_uuid,
# The sender type is repeated here as a string to avoid a circular import.
attributes={"sender_type": "CHILD"},
wait=False,
)

except Exception: # noqa
Expand Down
17 changes: 10 additions & 7 deletions octue/cloud/pub_sub/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,17 +270,14 @@ def answer(self, question, heartbeat_interval=120, timeout=30):
if analysis.output_manifest is not None:
result["output_manifest"] = analysis.output_manifest.to_primitive()

future = self._emit_event(
self._emit_event(
event=result,
recipient=parent,
attributes={"sender_type": CHILD_SENDER_TYPE},
timeout=timeout,
**routing_metadata,
)

# Await successful publishing of the result.
future.result()

heartbeater.cancel()
logger.info("%r answered question %r.", self, question_uuid)

Expand Down Expand Up @@ -540,6 +537,7 @@ def _emit_event(
recipient,
retry_count,
attributes=None,
wait=True,
timeout=30,
):
"""Emit a JSON-serialised event as a Pub/Sub message to the services topic with optional message attributes.
Expand All @@ -566,6 +564,7 @@ def _emit_event(
:param str recipient: the SRUID of the service the event is intended for
:param int retry_count: the retry count of the question (this is zero if it's the first attempt at the question)
:param dict|None attributes: key-value pairs to attach to the event - the values must be strings or bytes
:param bool wait: if `True`, wait for the result of the publishing future before continuing execution (this is important if the python process ends promptly after the event is emitted instead of being part of a prolonged stream as the publishing may not complete and the event won't actually be emitted)
:param int|float timeout: the timeout for sending the event in seconds
:return google.cloud.pubsub_v1.publisher.futures.Future:
"""
Expand Down Expand Up @@ -603,6 +602,9 @@ def _emit_event(
**converted_attributes,
)

if wait:
future.result()

return future

def _send_question(
Expand Down Expand Up @@ -645,7 +647,7 @@ def _send_question(
input_manifest.use_signed_urls_for_datasets()
question["input_manifest"] = input_manifest.to_primitive()

future = self._emit_event(
self._emit_event(
event=question,
question_uuid=question_uuid,
parent_question_uuid=parent_question_uuid,
Expand All @@ -665,8 +667,6 @@ def _send_question(
timeout=timeout,
)

# Await successful publishing of the question.
future.result()
logger.info("%r asked a question %r to service %r.", self, question_uuid, recipient)

def _send_delivery_acknowledgment(
Expand Down Expand Up @@ -701,6 +701,7 @@ def _send_delivery_acknowledgment(
recipient=parent,
retry_count=retry_count,
attributes={"sender_type": CHILD_SENDER_TYPE},
wait=False,
)

logger.info("%r acknowledged receipt of question %r.", self, question_uuid)
Expand Down Expand Up @@ -738,6 +739,7 @@ def _send_heartbeat(
retry_count=retry_count,
attributes={"sender_type": CHILD_SENDER_TYPE},
timeout=timeout,
wait=False,
)

logger.debug("Heartbeat sent by %r.", self)
Expand Down Expand Up @@ -776,6 +778,7 @@ def _send_monitor_message(
retry_count=retry_count,
timeout=timeout,
attributes={"sender_type": CHILD_SENDER_TYPE},
wait=False,
)

logger.debug("Monitor message sent by %r.", self)
Expand Down

0 comments on commit 9a86bce

Please sign in to comment.