diff --git a/src/worker.c b/src/worker.c index fbfbfe5..8470857 100644 --- a/src/worker.c +++ b/src/worker.c @@ -302,22 +302,29 @@ void pg_net_worker(__attribute__((unused)) Datum main_arg) { elog(DEBUG1, "Consumed " UINT64_FORMAT " request rows", requests_consumed); if (requests_consumed > 0) { - CurlHandle *handles = palloc(mul_size(sizeof(CurlHandle), requests_consumed)); + CurlHandle *handles = palloc0(mul_size(sizeof(CurlHandle), guc_batch_size)); - // initialize curl handles + // keep track of which slots are currently in use + bool *slot_in_use = palloc0(mul_size(sizeof(bool), guc_batch_size)); + // and the amount of slots that are actively processing requests + int active_count = 0; + + // initialize curl handles for the initial batch for (size_t j = 0; j < requests_consumed; j++) { init_curl_handle(&handles[j], get_request_queue_row(SPI_tuptable->vals[j], SPI_tuptable->tupdesc)); - EREPORT_MULTI(curl_multi_add_handle(worker_state->curl_mhandle, handles[j].ez_handle)); + slot_in_use[j] = true; + active_count++; } // start curl event loop int running_handles = 0; - int maxevents = requests_consumed + 1; // 1 extra for the timer + int maxevents = guc_batch_size + 1; // 1 extra for the timer, need batch_size since we might fill more slots event events[maxevents]; + CurlHandle *finished_handles[guc_batch_size]; - do { + while (active_count > 0) { int nfds = wait_event(worker_state->epfd, events, maxevents, curl_handle_event_timeout_ms); @@ -349,30 +356,93 @@ void pg_net_worker(__attribute__((unused)) Datum main_arg) { // insert finished responses CURLMsg *msg = NULL; int msgs_left = 0; + int num_finished = 0; // keep track of how many handles have finished to clear later while ((msg = curl_multi_info_read(worker_state->curl_mhandle, &msgs_left))) { if (msg->msg == CURLMSG_DONE) { CurlHandle *handle = NULL; EREPORT_CURL_GETINFO(msg->easy_handle, CURLINFO_PRIVATE, &handle); insert_response(handle, msg->data.result); + + // detach the finished handle from the multi handle + // (msg pointer is invalidated after this call, but all reads from + // msg have already been done, and msg->easy_handle as a function + // argument is evaluated before the call executes) + EREPORT_MULTI(curl_multi_remove_handle(worker_state->curl_mhandle, msg->easy_handle)); + + // keep a list of finished handles to cleanup and free later + finished_handles[num_finished] = handle; + num_finished++; } else { ereport(ERROR, errmsg("curl_multi_info_read(), CURLMsg=%d\n", msg->msg)); } } - elog(DEBUG1, "Pending curl running_handles: %d", running_handles); - // run while there are curl handles, some won't finish in a single iteration since they - // could be slow and waiting for a timeout - } while (running_handles > 0); + // we now run two loops to be safe: + // 1. clear the finished handles and and count how many we found + // 2. read up-to that many requests from the queue + // 3. fill the emptied slots with these new requests + // More optimised would be to do this in one loop since we know how many finished, + // but we do not want to risk reading a request from the queue and then not having a slot for it + // doing it in two loops gives us a safer number of free slots + + // find the slot and mark it free + for (int i = 0; i < guc_batch_size; i++) { + for (int j = 0; j < num_finished; j++) { + if (slot_in_use[i] && &handles[i] == finished_handles[j]) { + curl_easy_cleanup(handles[i].ez_handle); + pfree_handle(&handles[i]); + memset(&handles[i], 0, sizeof(CurlHandle)); + slot_in_use[i] = false; + active_count--; + break; + } + } + } + + int free_slots = guc_batch_size - active_count; + // we refill free slots only if the worker is not supposed to restart, we will continue + // after restart instead + if (!worker_should_restart && free_slots > 0) { + // read up to free_slots number of requests + uint64 new_requests = consume_request_queue(free_slots); + if (new_requests > 0) { + elog(DEBUG1, "Refilling " UINT64_FORMAT " new requests into %d free slots", + new_requests, free_slots); + + uint64 filled = 0; + for (int i = 0; i < guc_batch_size && filled < new_requests; i++) { + if (!slot_in_use[i]) { + init_curl_handle( + &handles[i], + get_request_queue_row(SPI_tuptable->vals[filled], SPI_tuptable->tupdesc)); + EREPORT_MULTI( + curl_multi_add_handle(worker_state->curl_mhandle, handles[i].ez_handle)); + slot_in_use[i] = true; + active_count++; + filled++; + } + } + } + } + + // these two counts should always be in sync + elog(DEBUG1, "Active curl handles: %d, curl running_handles: %d", active_count, + running_handles); + } - // cleanup - for (uint64 i = 0; i < requests_consumed; i++) { - EREPORT_MULTI(curl_multi_remove_handle(worker_state->curl_mhandle, handles[i].ez_handle)); + + // cleanup whatever is remaining + for (int i = 0; i < guc_batch_size; i++) { + if (slot_in_use[i]) { + EREPORT_MULTI(curl_multi_remove_handle(worker_state->curl_mhandle, handles[i].ez_handle)); - curl_easy_cleanup(handles[i].ez_handle); + curl_easy_cleanup(handles[i].ez_handle); - pfree_handle(&handles[i]); + pfree_handle(&handles[i]); + } } + pfree(slot_in_use); pfree(handles); }