Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 84 additions & 14 deletions src/worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -302,22 +302,29 @@ void pg_net_worker(__attribute__((unused)) Datum main_arg) {
elog(DEBUG1, "Consumed " UINT64_FORMAT " request rows", requests_consumed);

if (requests_consumed > 0) {
CurlHandle *handles = palloc(mul_size(sizeof(CurlHandle), requests_consumed));
CurlHandle *handles = palloc0(mul_size(sizeof(CurlHandle), guc_batch_size));

// initialize curl handles
// keep track of which slots are currently in use
bool *slot_in_use = palloc0(mul_size(sizeof(bool), guc_batch_size));
// and the amount of slots that are actively processing requests
int active_count = 0;

// initialize curl handles for the initial batch
for (size_t j = 0; j < requests_consumed; j++) {
init_curl_handle(&handles[j],
get_request_queue_row(SPI_tuptable->vals[j], SPI_tuptable->tupdesc));

EREPORT_MULTI(curl_multi_add_handle(worker_state->curl_mhandle, handles[j].ez_handle));
slot_in_use[j] = true;
active_count++;
}

// start curl event loop
int running_handles = 0;
int maxevents = requests_consumed + 1; // 1 extra for the timer
int maxevents = guc_batch_size + 1; // 1 extra for the timer, need batch_size since we might fill more slots
event events[maxevents];
CurlHandle *finished_handles[guc_batch_size];

do {
while (active_count > 0) {
int nfds =
wait_event(worker_state->epfd, events, maxevents, curl_handle_event_timeout_ms);

Expand Down Expand Up @@ -349,30 +356,93 @@ void pg_net_worker(__attribute__((unused)) Datum main_arg) {
// insert finished responses
CURLMsg *msg = NULL;
int msgs_left = 0;
int num_finished = 0; // keep track of how many handles have finished to clear later
while ((msg = curl_multi_info_read(worker_state->curl_mhandle, &msgs_left))) {
if (msg->msg == CURLMSG_DONE) {
CurlHandle *handle = NULL;
EREPORT_CURL_GETINFO(msg->easy_handle, CURLINFO_PRIVATE, &handle);
insert_response(handle, msg->data.result);

// detach the finished handle from the multi handle
// (msg pointer is invalidated after this call, but all reads from
// msg have already been done, and msg->easy_handle as a function
// argument is evaluated before the call executes)
EREPORT_MULTI(curl_multi_remove_handle(worker_state->curl_mhandle, msg->easy_handle));

// keep a list of finished handles to cleanup and free later
finished_handles[num_finished] = handle;
num_finished++;
} else {
ereport(ERROR, errmsg("curl_multi_info_read(), CURLMsg=%d\n", msg->msg));
}
}

elog(DEBUG1, "Pending curl running_handles: %d", running_handles);
// run while there are curl handles, some won't finish in a single iteration since they
// could be slow and waiting for a timeout
} while (running_handles > 0);
// we now run two loops to be safe:
// 1. clear the finished handles and and count how many we found
// 2. read up-to that many requests from the queue
// 3. fill the emptied slots with these new requests
// More optimised would be to do this in one loop since we know how many finished,
// but we do not want to risk reading a request from the queue and then not having a slot for it
// doing it in two loops gives us a safer number of free slots

// find the slot and mark it free
for (int i = 0; i < guc_batch_size; i++) {
for (int j = 0; j < num_finished; j++) {
if (slot_in_use[i] && &handles[i] == finished_handles[j]) {
curl_easy_cleanup(handles[i].ez_handle);
pfree_handle(&handles[i]);
memset(&handles[i], 0, sizeof(CurlHandle));
slot_in_use[i] = false;
active_count--;
break;
}
}
}

int free_slots = guc_batch_size - active_count;
// we refill free slots only if the worker is not supposed to restart, we will continue
// after restart instead
if (!worker_should_restart && free_slots > 0) {
// read up to free_slots number of requests
uint64 new_requests = consume_request_queue(free_slots);
if (new_requests > 0) {
elog(DEBUG1, "Refilling " UINT64_FORMAT " new requests into %d free slots",
new_requests, free_slots);

uint64 filled = 0;
for (int i = 0; i < guc_batch_size && filled < new_requests; i++) {
if (!slot_in_use[i]) {
init_curl_handle(
&handles[i],
get_request_queue_row(SPI_tuptable->vals[filled], SPI_tuptable->tupdesc));
EREPORT_MULTI(
curl_multi_add_handle(worker_state->curl_mhandle, handles[i].ez_handle));
slot_in_use[i] = true;
active_count++;
filled++;
}
}
}
}

// these two counts should always be in sync
elog(DEBUG1, "Active curl handles: %d, curl running_handles: %d", active_count,
running_handles);
}

// cleanup
for (uint64 i = 0; i < requests_consumed; i++) {
EREPORT_MULTI(curl_multi_remove_handle(worker_state->curl_mhandle, handles[i].ez_handle));

// cleanup whatever is remaining
for (int i = 0; i < guc_batch_size; i++) {
if (slot_in_use[i]) {
EREPORT_MULTI(curl_multi_remove_handle(worker_state->curl_mhandle, handles[i].ez_handle));

curl_easy_cleanup(handles[i].ez_handle);
curl_easy_cleanup(handles[i].ez_handle);

pfree_handle(&handles[i]);
pfree_handle(&handles[i]);
}
}

pfree(slot_in_use);
pfree(handles);
}

Expand Down
Loading