diff --git a/c/src/proactor/epoll-internal.h b/c/src/proactor/epoll-internal.h index 7ae86f136..967464e4e 100644 --- a/c/src/proactor/epoll-internal.h +++ b/c/src/proactor/epoll-internal.h @@ -271,6 +271,7 @@ typedef struct pconnection_t { pmutex rearm_mutex; /* protects pconnection_rearm from out of order arming*/ bool io_doublecheck; /* callbacks made and new IO may have arrived */ uint64_t expected_timeout; + bool name_lookup_pending; char addr_buf[1]; } pconnection_t; diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c index df1bde8b2..b77e7d772 100644 --- a/c/src/proactor/epoll.c +++ b/c/src/proactor/epoll.c @@ -836,7 +836,7 @@ static const char *pconnection_setup(pconnection_t *pc, pn_proactor_t *p, pn_con // Call with lock held and closing == true (i.e. pn_connection_driver_finished() == true), no pending timer. // Return true when all possible outstanding epoll events associated with this pconnection have been processed. static inline bool pconnection_is_final(pconnection_t *pc) { - return !pc->current_arm && !pc->task.ready && !pc->tick_pending; + return !pc->current_arm && !pc->task.ready && !pc->tick_pending && !pc->name_lookup_pending; } static void pconnection_final_free(pconnection_t *pc) { @@ -1400,6 +1400,7 @@ static void pconnection_start(pconnection_t *pc, int fd) { } /* Called on initial connect, and if connection fails to try another address */ +/* May be called within the pconnection task or from an external name_lookup task */ static void pconnection_maybe_connect_lh(pconnection_t *pc) { errno = 0; if (!pc->connected) { /* Not yet connected */ @@ -1447,7 +1448,11 @@ bool schedule_if_inactive(pn_proactor_t *p) { static void connection_lookup_done_lh(pconnection_t *pc, struct addrinfo *ai, int gai_error) { pn_proactor_t *p = pc->task.proactor; bool notify = false; - if (gai_error) { + + if (pconnection_rclosed(pc) && pconnection_wclosed(pc)) { + // Closed by application. Skip pending connect(). gai_error no longer of interest. + freeaddrinfo(ai); + } else if (gai_error) { psocket_gai_error(&pc->psocket, gai_error, "connect to "); } else if (ai) { pc->addrinfo = ai; @@ -1464,6 +1469,7 @@ static void connection_lookup_done_lh(pconnection_t *pc, struct addrinfo *ai, in static void connection_done_cb(void *user_data, struct addrinfo *ai, int gai_error) { pconnection_t *pc = (pconnection_t *)user_data; lock(&pc->task.mutex); + pc->name_lookup_pending = false; connection_lookup_done_lh(pc, ai, gai_error); unlock(&pc->task.mutex); } @@ -1472,10 +1478,28 @@ static void connection_done_cb(void *user_data, struct addrinfo *ai, int gai_err // Return true if the socket is connecting and there are no Proton events to deliver. static bool pconnection_first_connect_lh(pconnection_t *pc) { pn_proactor_t *p = pc->task.proactor; + pn_transport_t *tp = pc->driver.transport; + pc->name_lookup_pending = true; + unlock(&pc->task.mutex); bool rc = pni_name_lookup_start(&p->name_lookup, pc->host, pc->port, pc, connection_done_cb); lock(&pc->task.mutex); - return rc; + + if (!rc) { + // Either the callback was synchronous or no callback was possible + if (pc->name_lookup_pending) { + // Clean up since there will be no callback. + pc->name_lookup_pending = false; + psocket_error(&pc->psocket, EAI_FAIL, "internal error on connect"); + } + return false; + } + if (!pc->name_lookup_pending) { + // connection_done_cb already completed + if (pn_condition_is_set(pn_transport_condition(tp))) + return false; + } + return !pc->queued_disconnect && !pni_task_wake_pending(&pc->task); } void pn_proactor_connect2(pn_proactor_t *p, pn_connection_t *c, pn_transport_t *t, const char *addr) { diff --git a/c/src/proactor/epoll_raw_connection.c b/c/src/proactor/epoll_raw_connection.c index 1f056c85c..bd5f787f7 100644 --- a/c/src/proactor/epoll_raw_connection.c +++ b/c/src/proactor/epoll_raw_connection.c @@ -56,6 +56,7 @@ struct praw_connection_t { bool hup_detected; bool read_check; bool first_schedule; + bool name_lookup_pending; char *taddr; }; @@ -110,8 +111,14 @@ static void praw_connection_start(praw_connection_t *prc, int fd) { } /* Called on initial connect, and if connection fails to try another address */ +/* May be called within the praw_connection task or from an external name_lookup task */ static void praw_connection_maybe_connect_lh(praw_connection_t *prc) { + int err = 0; + if (prc->task.closing) { + return; + } while (prc->ai) { /* Have an address */ + err = 0; struct addrinfo *ai = prc->ai; prc->ai = prc->ai->ai_next; /* Move to next address in case this fails */ int fd = socket(ai->ai_family, SOCK_STREAM, 0); @@ -125,14 +132,19 @@ static void praw_connection_maybe_connect_lh(praw_connection_t *prc) { praw_connection_start(prc, fd); return; /* Async connection started */ } else { + err = errno; close(fd); } + } else { + err = errno; } /* connect failed immediately, go round the loop to try the next addr */ } - int err; - socklen_t errlen = sizeof(err); - getsockopt(prc->psocket.epoll_io.fd, SOL_SOCKET, SO_ERROR, (void *)&err, &errlen); + + if (err == 0 && prc->psocket.epoll_io.fd >= 0) { + socklen_t errlen = sizeof(err); + getsockopt(prc->psocket.epoll_io.fd, SOL_SOCKET, SO_ERROR, (void *)&err, &errlen); + } psocket_error(prc, err, "on connect"); freeaddrinfo(prc->addrinfo); @@ -161,6 +173,7 @@ static void raw_connection_lookup_done_lh(praw_connection_t *prc, struct addrinf static void raw_connection_done_cb(void *user_data, struct addrinfo *ai, int gai_error) { praw_connection_t *prc = (praw_connection_t *)user_data; lock(&prc->task.mutex); + prc->name_lookup_pending = false; raw_connection_lookup_done_lh(prc, ai, gai_error); unlock(&prc->task.mutex); } @@ -211,6 +224,9 @@ static void praw_initiate_cleanup(praw_connection_t *prc) { shutdown(prc->psocket.epoll_io.fd, SHUT_RDWR); return; } + if (prc->name_lookup_pending) { + return; // name lookup callback will reschedule + } pni_raw_finalize(&prc->raw_connection); praw_connection_cleanup(prc); } @@ -224,21 +240,30 @@ pn_raw_connection_t *pn_raw_connection(void) { return &conn->raw_connection; } -// Call from pconnection_process with task lock held. -// Return true if the socket is connecting and there are no Proton events to deliver. -static bool praw_connection_first_connect_lh(praw_connection_t *prc) { +// Call from pconnection_process with no locks. +// Callback may complete before pni_name_lookup_start returns. +static void praw_connection_first_connect(praw_connection_t *prc) { const char *host; const char *port; pn_proactor_t *p = prc->task.proactor; + bool notify = false; - unlock(&prc->task.mutex); size_t addrlen = strlen(prc->taddr); char *addr_buf = (char*) alloca(addrlen+1); pni_parse_addr(prc->taddr, addr_buf, addrlen+1, &host, &port); bool rc = pni_name_lookup_start(&p->name_lookup, host, port, prc, raw_connection_done_cb); - lock(&prc->task.mutex); - - return rc; + if (!rc) { + // Either the callback was synchronous or no callback was possible + lock(&prc->task.mutex); + if (prc->name_lookup_pending) { + // Clean up since there will be no callback. + prc->name_lookup_pending = false; + psocket_error(prc, EAI_FAIL, "internal error on connect"); + notify = schedule(&prc->task); + } + unlock(&prc->task.mutex); + if (notify) notify_poller(p); + } } void pn_proactor_raw_connect(pn_proactor_t *p, pn_raw_connection_t *rc, const char *addr) { @@ -443,6 +468,11 @@ pn_event_batch_t *pni_raw_connection_process(task_t *t, uint32_t io_events, bool praw_initiate_cleanup(rc); return NULL; } + if (rc->task.closing) { + // rclosed and wclosed. Allow final events to be processed. + unlock(&rc->task.mutex); + return &rc->batch; + } int events = io_events; int fd = rc->psocket.epoll_io.fd; @@ -450,10 +480,18 @@ pn_event_batch_t *pni_raw_connection_process(task_t *t, uint32_t io_events, bool rc->first_schedule = false; assert(!events); // No socket yet. assert(!rc->connected); - if (praw_connection_first_connect_lh(rc)) { - unlock(&rc->task.mutex); - return NULL; + bool wake_event = pni_task_wake_pending(&rc->task); + + t->working = false; + rc->name_lookup_pending = true; + unlock(&rc->task.mutex); + praw_connection_first_connect(rc); + if (wake_event) { + lock(&rc->task.mutex); + t->working = true; + return &rc->batch; } + return NULL; } if (!rc->connected) { if (events & (EPOLLHUP | EPOLLERR)) {