Skip to content
Closed
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 85 additions & 14 deletions src/worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -302,22 +302,30 @@ void pg_net_worker(__attribute__((unused)) Datum main_arg) {
elog(DEBUG1, "Consumed " UINT64_FORMAT " request rows", requests_consumed);

if (requests_consumed > 0) {
CurlHandle *handles = palloc(mul_size(sizeof(CurlHandle), requests_consumed));
int batch_size = guc_batch_size;
CurlHandle *handles = palloc0(mul_size(sizeof(CurlHandle), batch_size));

// initialize curl handles
// keep track of which slots are currently in use
bool *slot_in_use = palloc0(mul_size(sizeof(bool), batch_size));
// and the amount of slots that are actively processing requests
int active_count = 0;

// initialize curl handles for the initial batch
for (size_t j = 0; j < requests_consumed; j++) {
init_curl_handle(&handles[j],
get_request_queue_row(SPI_tuptable->vals[j], SPI_tuptable->tupdesc));

EREPORT_MULTI(curl_multi_add_handle(worker_state->curl_mhandle, handles[j].ez_handle));
slot_in_use[j] = true;
active_count++;
}

// start curl event loop
int running_handles = 0;
int maxevents = requests_consumed + 1; // 1 extra for the timer
int maxevents = batch_size + 1; // 1 extra for the timer, need batch_size since we might fill more slots
event events[maxevents];
CurlHandle *finished_handles[batch_size];
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Stack overflow risk with large batch_size values.

finished_handles[batch_size] is a Variable Length Array allocated on the stack. With guc_batch_size max of PG_INT16_MAX (32767), this allocates ~256KB on the stack, risking stack overflow crashes.

Consider heap allocation:

Proposed fix
-        CurlHandle *finished_handles[batch_size];
+        CurlHandle **finished_handles = palloc(mul_size(sizeof(CurlHandle *), batch_size));

And add cleanup before pfree(handles):

+        pfree(finished_handles);
         pfree(handles);
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
event events[maxevents];
CurlHandle *finished_handles[batch_size];
event events[maxevents];
CurlHandle **finished_handles = palloc(mul_size(sizeof(CurlHandle *), batch_size));
Suggested change
event events[maxevents];
CurlHandle *finished_handles[batch_size];
pfree(finished_handles);
pfree(handles);
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/worker.c` around lines 325 - 326, The stack-allocated VLA
finished_handles[batch_size] can overflow for large batch_size; replace this VLA
with a heap allocation (e.g., palloc0 or malloc/calloc) sized by batch_size for
CurlHandle* pointers, assign that pointer name (finished_handles) where the VLA
was used, and ensure you free it (pfree or free consistent with allocator used)
before the existing pfree(handles) cleanup; update all references to
finished_handles in the function so they use the heap pointer and add the
matching deallocation to avoid leaks.


do {
while (active_count > 0) {
int nfds =
wait_event(worker_state->epfd, events, maxevents, curl_handle_event_timeout_ms);

Expand Down Expand Up @@ -349,30 +357,93 @@ void pg_net_worker(__attribute__((unused)) Datum main_arg) {
// insert finished responses
CURLMsg *msg = NULL;
int msgs_left = 0;
int num_finished = 0; // keep track of how many handles have finished to clear later
while ((msg = curl_multi_info_read(worker_state->curl_mhandle, &msgs_left))) {
if (msg->msg == CURLMSG_DONE) {
CurlHandle *handle = NULL;
EREPORT_CURL_GETINFO(msg->easy_handle, CURLINFO_PRIVATE, &handle);
insert_response(handle, msg->data.result);

// detach the finished handle from the multi handle
// (msg pointer is invalidated after this call, but all reads from
// msg have already been done, and msg->easy_handle as a function
// argument is evaluated before the call executes)
EREPORT_MULTI(curl_multi_remove_handle(worker_state->curl_mhandle, msg->easy_handle));

// keep a list of finished handles to cleanup and free later
finished_handles[num_finished] = handle;
num_finished++;
} else {
ereport(ERROR, errmsg("curl_multi_info_read(), CURLMsg=%d\n", msg->msg));
}
}

elog(DEBUG1, "Pending curl running_handles: %d", running_handles);
// run while there are curl handles, some won't finish in a single iteration since they
// could be slow and waiting for a timeout
} while (running_handles > 0);
// we now run two loops to be safe:
// 1. clear the finished handles and and count how many we found
// 2. read up-to that many requests from the queue
// 3. fill the emptied slots with these new requests
// More optimised would be to do this in one loop since we know how many finished,
// but we do not want to risk reading a request from the queue and then not having a slot for it
// doing it in two loops gives us a safer number of free slots

// find the slot and mark it free
for (int i = 0; i < batch_size; i++) {
for (int j = 0; j < num_finished; j++) {
if (slot_in_use[i] && &handles[i] == finished_handles[j]) {
curl_easy_cleanup(handles[i].ez_handle);
pfree_handle(&handles[i]);
memset(&handles[i], 0, sizeof(CurlHandle));
slot_in_use[i] = false;
active_count--;
break;
}
}
}

int free_slots = batch_size - active_count;
// we refill free slots only if the worker is not supposed to restart, we will continue
// after restart instead
if (!worker_should_restart && free_slots > 0) {
// read up to free_slots number of requests
uint64 new_requests = consume_request_queue(free_slots);
if (new_requests > 0) {
elog(DEBUG1, "Refilling " UINT64_FORMAT " new requests into %d free slots",
new_requests, free_slots);

uint64 filled = 0;
for (int i = 0; i < batch_size && filled < new_requests; i++) {
if (!slot_in_use[i]) {
init_curl_handle(
&handles[i],
get_request_queue_row(SPI_tuptable->vals[filled], SPI_tuptable->tupdesc));
EREPORT_MULTI(
curl_multi_add_handle(worker_state->curl_mhandle, handles[i].ez_handle));
slot_in_use[i] = true;
active_count++;
filled++;
}
}
}
}

// these two counts should always be in sync
elog(DEBUG1, "Active curl handles: %d, curl running_handles: %d", active_count,
running_handles);
}

// cleanup
for (uint64 i = 0; i < requests_consumed; i++) {
EREPORT_MULTI(curl_multi_remove_handle(worker_state->curl_mhandle, handles[i].ez_handle));

// cleanup whatever is remaining
for (size_t i = 0; i < batch_size; i++) {
if (slot_in_use[i]) {
EREPORT_MULTI(curl_multi_remove_handle(worker_state->curl_mhandle, handles[i].ez_handle));

curl_easy_cleanup(handles[i].ez_handle);
curl_easy_cleanup(handles[i].ez_handle);

pfree_handle(&handles[i]);
pfree_handle(&handles[i]);
}
}

pfree(slot_in_use);
pfree(handles);
}

Expand Down
Loading