Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions c/src/proactor/epoll-internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ typedef struct pconnection_t {
pmutex rearm_mutex; /* protects pconnection_rearm from out of order arming*/
bool io_doublecheck; /* callbacks made and new IO may have arrived */
uint64_t expected_timeout;
bool name_lookup_pending;
char addr_buf[1];
} pconnection_t;

Expand Down
30 changes: 27 additions & 3 deletions c/src/proactor/epoll.c
Original file line number Diff line number Diff line change
Expand Up @@ -836,7 +836,7 @@ static const char *pconnection_setup(pconnection_t *pc, pn_proactor_t *p, pn_con
// Call with lock held and closing == true (i.e. pn_connection_driver_finished() == true), no pending timer.
// Return true when all possible outstanding epoll events associated with this pconnection have been processed.
static inline bool pconnection_is_final(pconnection_t *pc) {
return !pc->current_arm && !pc->task.ready && !pc->tick_pending;
return !pc->current_arm && !pc->task.ready && !pc->tick_pending && !pc->name_lookup_pending;
}

static void pconnection_final_free(pconnection_t *pc) {
Expand Down Expand Up @@ -1400,6 +1400,7 @@ static void pconnection_start(pconnection_t *pc, int fd) {
}

/* Called on initial connect, and if connection fails to try another address */
/* May be called within the pconnection task or from an external name_lookup task */
static void pconnection_maybe_connect_lh(pconnection_t *pc) {
errno = 0;
if (!pc->connected) { /* Not yet connected */
Expand Down Expand Up @@ -1447,7 +1448,11 @@ bool schedule_if_inactive(pn_proactor_t *p) {
static void connection_lookup_done_lh(pconnection_t *pc, struct addrinfo *ai, int gai_error) {
pn_proactor_t *p = pc->task.proactor;
bool notify = false;
if (gai_error) {

if (pconnection_rclosed(pc) && pconnection_wclosed(pc)) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to only have one of these set in this condition?

// Closed by application. Skip pending connect(). gai_error no longer of interest.
freeaddrinfo(ai);
} else if (gai_error) {
psocket_gai_error(&pc->psocket, gai_error, "connect to ");
} else if (ai) {
pc->addrinfo = ai;
Expand All @@ -1464,6 +1469,7 @@ static void connection_lookup_done_lh(pconnection_t *pc, struct addrinfo *ai, in
static void connection_done_cb(void *user_data, struct addrinfo *ai, int gai_error) {
pconnection_t *pc = (pconnection_t *)user_data;
lock(&pc->task.mutex);
pc->name_lookup_pending = false;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is within the lock, maybe it should be in the _lh function not here? Especially as setting the flag is done in a _lh function.

connection_lookup_done_lh(pc, ai, gai_error);
unlock(&pc->task.mutex);
}
Expand All @@ -1472,10 +1478,28 @@ static void connection_done_cb(void *user_data, struct addrinfo *ai, int gai_err
// Return true if the socket is connecting and there are no Proton events to deliver.
static bool pconnection_first_connect_lh(pconnection_t *pc) {
pn_proactor_t *p = pc->task.proactor;
pn_transport_t *tp = pc->driver.transport;
pc->name_lookup_pending = true;

unlock(&pc->task.mutex);
bool rc = pni_name_lookup_start(&p->name_lookup, pc->host, pc->port, pc, connection_done_cb);
lock(&pc->task.mutex);
return rc;

if (!rc) {
// Either the callback was synchronous or no callback was possible
if (pc->name_lookup_pending) {
// Clean up since there will be no callback.
pc->name_lookup_pending = false;
psocket_error(&pc->psocket, EAI_FAIL, "internal error on connect");
}
return false;
}
if (!pc->name_lookup_pending) {
// connection_done_cb already completed
if (pn_condition_is_set(pn_transport_condition(tp)))
return false;
}
return !pc->queued_disconnect && !pni_task_wake_pending(&pc->task);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is pretty obscure compared to the previous return value (which would essentially be true at his point I think if there wer no errors from starting the lookup), perhaps an explanation in a comment is in order.

}

void pn_proactor_connect2(pn_proactor_t *p, pn_connection_t *c, pn_transport_t *t, const char *addr) {
Expand Down
64 changes: 51 additions & 13 deletions c/src/proactor/epoll_raw_connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ struct praw_connection_t {
bool hup_detected;
bool read_check;
bool first_schedule;
bool name_lookup_pending;
char *taddr;
};

Expand Down Expand Up @@ -110,8 +111,14 @@ static void praw_connection_start(praw_connection_t *prc, int fd) {
}

/* Called on initial connect, and if connection fails to try another address */
/* May be called within the praw_connection task or from an external name_lookup task */
static void praw_connection_maybe_connect_lh(praw_connection_t *prc) {
int err = 0;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move this to just before the while loop where it belongs, also no need to set it to 0 here - it is initialised first thing in the loop.

if (prc->task.closing) {
return;
}
while (prc->ai) { /* Have an address */
err = 0;
struct addrinfo *ai = prc->ai;
prc->ai = prc->ai->ai_next; /* Move to next address in case this fails */
int fd = socket(ai->ai_family, SOCK_STREAM, 0);
Expand All @@ -125,14 +132,19 @@ static void praw_connection_maybe_connect_lh(praw_connection_t *prc) {
praw_connection_start(prc, fd);
return; /* Async connection started */
} else {
err = errno;
close(fd);
}
} else {
err = errno;
}
/* connect failed immediately, go round the loop to try the next addr */
}
int err;
socklen_t errlen = sizeof(err);
getsockopt(prc->psocket.epoll_io.fd, SOL_SOCKET, SO_ERROR, (void *)&err, &errlen);

if (err == 0 && prc->psocket.epoll_io.fd >= 0) {
socklen_t errlen = sizeof(err);
getsockopt(prc->psocket.epoll_io.fd, SOL_SOCKET, SO_ERROR, (void *)&err, &errlen);
}
psocket_error(prc, err, "on connect");

freeaddrinfo(prc->addrinfo);
Expand Down Expand Up @@ -161,6 +173,7 @@ static void raw_connection_lookup_done_lh(praw_connection_t *prc, struct addrinf
static void raw_connection_done_cb(void *user_data, struct addrinfo *ai, int gai_error) {
praw_connection_t *prc = (praw_connection_t *)user_data;
lock(&prc->task.mutex);
prc->name_lookup_pending = false;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As above shouldn't this be inside the _lh function? Even though there is no _lh function anymore for lookup start here.

raw_connection_lookup_done_lh(prc, ai, gai_error);
unlock(&prc->task.mutex);
}
Expand Down Expand Up @@ -211,6 +224,9 @@ static void praw_initiate_cleanup(praw_connection_t *prc) {
shutdown(prc->psocket.epoll_io.fd, SHUT_RDWR);
return;
}
if (prc->name_lookup_pending) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about the protecting lock?

return; // name lookup callback will reschedule
}
pni_raw_finalize(&prc->raw_connection);
praw_connection_cleanup(prc);
}
Expand All @@ -224,21 +240,30 @@ pn_raw_connection_t *pn_raw_connection(void) {
return &conn->raw_connection;
}

// Call from pconnection_process with task lock held.
// Return true if the socket is connecting and there are no Proton events to deliver.
static bool praw_connection_first_connect_lh(praw_connection_t *prc) {
// Call from pconnection_process with no locks.
// Callback may complete before pni_name_lookup_start returns.
static void praw_connection_first_connect(praw_connection_t *prc) {
const char *host;
const char *port;
Comment on lines 246 to 247
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually these lines should also be just before they are used in (new) line 253

pn_proactor_t *p = prc->task.proactor;
bool notify = false;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move this down to where it's used - I think inside the if (!rc) block


unlock(&prc->task.mutex);
size_t addrlen = strlen(prc->taddr);
char *addr_buf = (char*) alloca(addrlen+1);
pni_parse_addr(prc->taddr, addr_buf, addrlen+1, &host, &port);
bool rc = pni_name_lookup_start(&p->name_lookup, host, port, prc, raw_connection_done_cb);
lock(&prc->task.mutex);

return rc;
if (!rc) {
// Either the callback was synchronous or no callback was possible
lock(&prc->task.mutex);
if (prc->name_lookup_pending) {
// Clean up since there will be no callback.
prc->name_lookup_pending = false;
psocket_error(prc, EAI_FAIL, "internal error on connect");
notify = schedule(&prc->task);
}
unlock(&prc->task.mutex);
if (notify) notify_poller(p);
}
}

void pn_proactor_raw_connect(pn_proactor_t *p, pn_raw_connection_t *rc, const char *addr) {
Expand Down Expand Up @@ -443,17 +468,30 @@ pn_event_batch_t *pni_raw_connection_process(task_t *t, uint32_t io_events, bool
praw_initiate_cleanup(rc);
return NULL;
}
if (rc->task.closing) {
// rclosed and wclosed. Allow final events to be processed.
unlock(&rc->task.mutex);
return &rc->batch;
}
int events = io_events;
int fd = rc->psocket.epoll_io.fd;

if (rc->first_schedule) {
rc->first_schedule = false;
assert(!events); // No socket yet.
assert(!rc->connected);
if (praw_connection_first_connect_lh(rc)) {
unlock(&rc->task.mutex);
return NULL;
bool wake_event = pni_task_wake_pending(&rc->task);

t->working = false;
rc->name_lookup_pending = true;
unlock(&rc->task.mutex);
praw_connection_first_connect(rc);
if (wake_event) {
lock(&rc->task.mutex);
t->working = true;
return &rc->batch;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks wrong to return without unlocking here - if it actually correct then a comment explaining why is needed.

}
return NULL;
}
if (!rc->connected) {
if (events & (EPOLLHUP | EPOLLERR)) {
Expand Down