Re-Use batch slots while worker is running through its batch and waiting on requests to finish.#250
Re-Use batch slots while worker is running through its batch and waiting on requests to finish.#250ftyl wants to merge 3 commits intosupabase:masterfrom
Conversation
…nd fill up to batchsize while we are waiting for other requests to finish
📝 WalkthroughSummary by CodeRabbit
WalkthroughThe worker shifts from per-request dynamic allocation to fixed-size batch processing using guc_batch_size. It adds per-slot state (slot_in_use, active_count), a finished_handles array with a num_finished counter, and initializes an initial set of slots. The main loop runs while active_count > 0, processing completed curl handles (detach, cleanup), clearing finished slots, and refilling freed slots from the request queue until no active handles remain. All per-slot resources are freed at exit. Sequence Diagram(s)sequenceDiagram
participant Worker as Worker
participant Queue as Request Queue
participant Curl as libcurl Multi
participant SlotMgr as Slot Tracker
Worker->>Queue: read up to guc_batch_size requests
Note right of SlotMgr: mark slots in use, create per-slot handles
Worker->>Curl: add handles to multi-handle
loop while active_count > 0
Worker->>Curl: perform/select / wait for events
Curl-->>Worker: report completed handles
Worker->>SlotMgr: detach finished handles, record in finished_handles
Worker->>Curl: remove finished handles
Worker->>SlotMgr: cleanup per-slot resources, decrement active_count
alt free_slots available and not restarting
Worker->>Queue: read new requests to refill slots
Note right of SlotMgr: initialize and add new handles to Curl
SlotMgr-->>Worker: increment active_count, mark slots in use
end
end
Worker->>SlotMgr: final cleanup of all per-slot arrays and handles
Tip Issue Planner is now in beta. Read the docs and try it out! Share your feedback on Discord. Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/worker.c`:
- Around line 325-326: The stack-allocated VLA finished_handles[batch_size] can
overflow for large batch_size; replace this VLA with a heap allocation (e.g.,
palloc0 or malloc/calloc) sized by batch_size for CurlHandle* pointers, assign
that pointer name (finished_handles) where the VLA was used, and ensure you free
it (pfree or free consistent with allocator used) before the existing
pfree(handles) cleanup; update all references to finished_handles in the
function so they use the heap pointer and add the matching deallocation to avoid
leaks.
src/worker.c
Outdated
| event events[maxevents]; | ||
| CurlHandle *finished_handles[batch_size]; |
There was a problem hiding this comment.
Stack overflow risk with large batch_size values.
finished_handles[batch_size] is a Variable Length Array allocated on the stack. With guc_batch_size max of PG_INT16_MAX (32767), this allocates ~256KB on the stack, risking stack overflow crashes.
Consider heap allocation:
Proposed fix
- CurlHandle *finished_handles[batch_size];
+ CurlHandle **finished_handles = palloc(mul_size(sizeof(CurlHandle *), batch_size));And add cleanup before pfree(handles):
+ pfree(finished_handles);
pfree(handles);📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| event events[maxevents]; | |
| CurlHandle *finished_handles[batch_size]; | |
| event events[maxevents]; | |
| CurlHandle **finished_handles = palloc(mul_size(sizeof(CurlHandle *), batch_size)); |
| event events[maxevents]; | |
| CurlHandle *finished_handles[batch_size]; | |
| pfree(finished_handles); | |
| pfree(handles); |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/worker.c` around lines 325 - 326, The stack-allocated VLA
finished_handles[batch_size] can overflow for large batch_size; replace this VLA
with a heap allocation (e.g., palloc0 or malloc/calloc) sized by batch_size for
CurlHandle* pointers, assign that pointer name (finished_handles) where the VLA
was used, and ensure you free it (pfree or free consistent with allocator used)
before the existing pfree(handles) cleanup; update all references to
finished_handles in the function so they use the heap pointer and add the
matching deallocation to avoid leaks.
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/worker.c (1)
327-426:⚠️ Potential issue | 🟠 MajorUnbounded loop can ignore restart/shutdown signals under continuous load.
worker_should_restartis only updated inwait_while_processing_interrupts()outside this loop. With refilling enabled,active_countmay never reach 0 if the queue stays busy, so the loop can run indefinitely, keeping the transaction open and deferring restart/shutdown/config reload. Add an in‑loop interrupt/restart check so refilling stops and the loop can drain and exit.✅ Minimal fix to make restarts observable inside the loop
while (active_count > 0) { + CHECK_FOR_INTERRUPTS(); + if (pg_atomic_exchange_u32(&worker_state->got_restart, 0)) { + worker_should_restart = true; + } + int nfds = wait_event(worker_state->epfd, events, maxevents, curl_handle_event_timeout_ms);🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/worker.c` around lines 327 - 426, The loop can keep refilling handles while the queue is busy, preventing restarts because worker_should_restart is only updated elsewhere; to fix, re-check for interrupts before refilling by calling the existing interrupt-handling update (e.g. call wait_while_processing_interrupts() or otherwise refresh worker_should_restart) after processing curl_multi_info_read() and before computing free_slots, and if worker_should_restart is true, skip the refill block (or set free_slots = 0) so the loop can drain and exit normally; reference worker_should_restart, wait_while_processing_interrupts(), active_count, free_slots and the refill block that calls consume_request_queue()/curl_multi_add_handle().
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Outside diff comments:
In `@src/worker.c`:
- Around line 327-426: The loop can keep refilling handles while the queue is
busy, preventing restarts because worker_should_restart is only updated
elsewhere; to fix, re-check for interrupts before refilling by calling the
existing interrupt-handling update (e.g. call wait_while_processing_interrupts()
or otherwise refresh worker_should_restart) after processing
curl_multi_info_read() and before computing free_slots, and if
worker_should_restart is true, skip the refill block (or set free_slots = 0) so
the loop can drain and exit normally; reference worker_should_restart,
wait_while_processing_interrupts(), active_count, free_slots and the refill
block that calls consume_request_queue()/curl_multi_add_handle().
What kind of change does this PR introduce?
This PR suggests re-using slots in batches while curl waits for other requests, letting the worker more efficiently work off the queue. The most blocking parts of the code is the network layer, therefor this approach should not really increase CPU usage dramatically.
What is the current behavior?
Currently pg_net uses a robust queue system and a worker which works off the queue in batches. The worker uses epoll so it efficiently does its work, however, the worker does this work in batches and the slowest request in this batch dictates the throughput that can be achieved!
On heavy loads, this means that 200 requests at a time get started, fire all at once, and if one of them is slow and takes 30 seconds to complete because it hits some block for example, the next requests are not run until this last one finishes.
What is the new behavior?
If curl returns with some finished requests, but has still others unfinished, we see which slots are now unused, clean them up, and then check if items can be fetched from the queue, up to the amount of freed slots. If so, we refill the slots and then continue the loop.