Skip to content

Commit 822bafe

Browse files
authored
[Messages] No blocking on end of queue (#303)
Fixed an issue where the pending message job would block on the final messages in the queue and stop processing newer messages. Once the job finishes the loop on all the messages in the pending message collection, the previous implementation waits until all the message tasks finish. This causes a delay of several hours until the node finishes these tasks and is able to process newer pending messages again. Messages end up being processed, but far later than expected. The issue arises because we never remove messages from the pending queue if we fail to retrieve the associated content. The job then always has messages in the queue, causing the issue. Fixed the issue by allowing the loop to restart without waiting for messages to be processed. We now compute an individual ID for each pending message and add it to a set. The job will simply ignore any message that is already being processed, allowing for newer messages to be taken into account.
1 parent b12abc9 commit 822bafe

File tree

1 file changed

+36
-7
lines changed

1 file changed

+36
-7
lines changed

src/aleph/jobs/process_pending_messages.py

Lines changed: 36 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ async def process_message_job_results(
7272
finished_tasks: Set[asyncio.Task],
7373
task_message_dict: Dict[asyncio.Task, Dict],
7474
shared_stats: Dict[str, Any],
75+
processing_messages: Set[Tuple],
7576
):
7677
await process_job_results(
7778
finished_tasks,
@@ -84,6 +85,9 @@ async def process_message_job_results(
8485
shared_stats["retry_messages_job_tasks"] -= 1
8586
shared_stats["message_jobs"][message_type] -= 1
8687

88+
pending_message_id = get_pending_message_id(pending)
89+
processing_messages.remove(pending_message_id)
90+
8791
del task_message_dict[message_task]
8892

8993

@@ -100,12 +104,26 @@ def validate_pending_message(pending: Dict):
100104
)
101105

102106

107+
def get_pending_message_id(pending_message: Dict) -> Tuple:
108+
source = pending_message.get("source", {})
109+
chain_name = source.get("chain_name", None)
110+
height = source.get("height", None)
111+
112+
return (
113+
pending_message["message"]["item_hash"],
114+
pending_message["message"]["sender"],
115+
chain_name,
116+
height,
117+
)
118+
119+
103120
async def process_pending_messages(config: Config, shared_stats: Dict):
104121
"""
105122
Processes all the messages in the pending message queue.
106123
"""
107124

108125
seen_ids: Dict[Tuple, int] = dict()
126+
processing_messages = set()
109127
find_params: Dict = {}
110128

111129
max_concurrent_tasks = config.aleph.jobs.pending_messages.max_concurrency.value
@@ -116,21 +134,30 @@ async def process_pending_messages(config: Config, shared_stats: Dict):
116134
for message_type in MessageType:
117135
shared_stats["message_jobs"][message_type] = 0
118136

137+
# Using a set is required as asyncio.wait takes and returns sets.
138+
pending_tasks: Set[asyncio.Task] = set()
139+
task_message_dict: Dict[asyncio.Task, Dict] = {}
140+
119141
while await PendingMessage.collection.count_documents(find_params):
120-
# Using a set is required as asyncio.wait takes and returns sets.
121-
pending_tasks: Set[asyncio.Task] = set()
122-
task_message_dict: Dict[asyncio.Task, Dict] = {}
123142

124143
async for pending in PendingMessage.collection.find(find_params).sort(
125144
[("retries", ASCENDING), ("message.time", ASCENDING)]
126145
).batch_size(max_concurrent_tasks):
127146

147+
# Check if the message is already processing
148+
pending_message_id = get_pending_message_id(pending)
149+
if pending_message_id in processing_messages:
150+
# Skip the message, we're already processing it
151+
continue
152+
153+
processing_messages.add(pending_message_id)
154+
128155
if len(pending_tasks) == max_concurrent_tasks:
129156
finished_tasks, pending_tasks = await asyncio.wait(
130157
pending_tasks, return_when=asyncio.FIRST_COMPLETED
131158
)
132159
await process_message_job_results(
133-
finished_tasks, task_message_dict, shared_stats
160+
finished_tasks, task_message_dict, shared_stats, processing_messages
134161
)
135162

136163
validate_pending_message(pending)
@@ -148,13 +175,15 @@ async def process_pending_messages(config: Config, shared_stats: Dict):
148175
pending_tasks.add(message_task)
149176
task_message_dict[message_task] = pending
150177

151-
# Wait for the last tasks
178+
# This synchronization point is required when a few pending messages remain.
179+
# We wait for at least one task to finish; the remaining tasks will be collected
180+
# on the next iterations of the loop.
152181
if pending_tasks:
153182
finished_tasks, _ = await asyncio.wait(
154-
pending_tasks, return_when=asyncio.ALL_COMPLETED
183+
pending_tasks, return_when=asyncio.FIRST_COMPLETED
155184
)
156185
await process_message_job_results(
157-
finished_tasks, task_message_dict, shared_stats
186+
finished_tasks, task_message_dict, shared_stats, processing_messages
158187
)
159188

160189
# TODO: move this to a dedicated job and/or check unicity on insertion

0 commit comments

Comments
 (0)