diff --git a/configure.ac b/configure.ac index 08f65c1e5e2e..1c47bfbd886b 100644 --- a/configure.ac +++ b/configure.ac @@ -1599,6 +1599,17 @@ AC_CHECK_FUNCS([pollts], [ AC_DEFINE([HAVE_POLLTS], [1], [have NetBSD pollts()]) ]) +AC_CHECK_FUNC([epoll_pwait], [AC_DEFINE([HAVE_EPOLL_PWAIT], [1], + [Define if you have epoll_pwait])]) +# Define USE_EPOLL if any of the above is found +AC_MSG_CHECKING([for epoll API support]) +if test "x$ac_cv_func_epoll_pwait" = "xyes" ; then + AC_DEFINE([USE_EPOLL], [1], [Define if any epoll API is supported]) + AC_MSG_RESULT([yes]) +else + AC_MSG_RESULT([no]) +fi + AC_CHECK_HEADER([asm-generic/unistd.h], [AC_CHECK_DECL(__NR_setns, AC_DEFINE([HAVE_NETNS], [1], [Have netns]),, diff --git a/lib/event.c b/lib/event.c index 6aac5159c1ad..7938d0c0b492 100644 --- a/lib/event.c +++ b/lib/event.c @@ -9,6 +9,7 @@ #include #include +#include #include "frrevent.h" #include "memory.h" @@ -30,6 +31,28 @@ DEFINE_MTYPE_STATIC(LIB, EVENT_MASTER, "Thread master"); DEFINE_MTYPE_STATIC(LIB, EVENT_POLL, "Thread Poll Info"); DEFINE_MTYPE_STATIC(LIB, EVENT_STATS, "Thread stats"); +#if EPOLL_ENABLED +DEFINE_MTYPE_STATIC(LIB, EVENT_EPOLL, "Thread epoll events"); + +static int epoll_event_hash_cmp(const struct frr_epoll_event *f, + const struct frr_epoll_event *s) +{ + return f->ev.data.fd - s->ev.data.fd; +} + +static uint32_t epoll_event_hash_key(const struct frr_epoll_event *e) +{ + uint32_t val, initval = 0xd4297c53; + + val = e->ev.data.fd; + return jhash_1word(val, initval); +} + +DECLARE_HASH(epoll_event_hash, struct frr_epoll_event, link, epoll_event_hash_cmp, + epoll_event_hash_key); + +#endif + DECLARE_LIST(event_list, struct event, eventitem); struct cancel_req { @@ -370,6 +393,68 @@ DEFPY (service_walltime_warning, return CMD_SUCCESS; } +#if EPOLL_ENABLED +struct show_event_poll_helper_iter_arg_t { + struct vty *vty; + struct event_loop *master; +}; + +static void show_epoll_event_helper(struct vty *vty, const struct frr_epoll_event *ev, + struct event_loop *m) +{ + struct event *thread; + + vty_out(vty, "\t fd:%6d events:%2d\t\t", ev->ev.data.fd, ev->ev.events); + + if (ev->ev.events & EPOLLIN) { + thread = m->read[ev->ev.data.fd]; + + if (!thread) + vty_out(vty, "ERROR "); + else + vty_out(vty, "%s ", thread->xref->funcname); + } else + vty_out(vty, " "); + + if (ev->ev.events & EPOLLOUT) { + thread = m->write[ev->ev.data.fd]; + + if (!thread) + vty_out(vty, "ERROR\n"); + else + vty_out(vty, "%s\n", thread->xref->funcname); + } else + vty_out(vty, "\n"); +} + +static void show_event_poll_helper(struct vty *vty, struct event_loop *m) +{ + const char *name = m->name ? m->name : "main"; + char underline[strlen(name) + 1]; + const struct frr_epoll_event *ev; + + memset(underline, '-', sizeof(underline)); + underline[sizeof(underline) - 1] = '\0'; + + vty_out(vty, "\nShowing epoll FD's count for %s\n", name); + vty_out(vty, "----------------------%s\n", underline); + for (int i = 0; i < m->handler.eventsize; i++) { + if (m->handler.fd_poll_counter[i] > 0) + vty_out(vty, "\tfd: %d, event count: %lu\n", i, + m->handler.fd_poll_counter[i]); + } + + vty_out(vty, "\nShowing epoll FD's for %s\n", name); + vty_out(vty, "----------------------%s\n", underline); + vty_out(vty, "Count: %u/%d\n", + (uint32_t)(m->handler.regular_revent_count + + epoll_event_hash_count(&m->handler.epoll_event_hash)), + m->fd_limit); + + frr_each (epoll_event_hash_const, &m->handler.epoll_event_hash, ev) + show_epoll_event_helper(vty, ev, m); +} +#else static void show_event_poll_helper(struct vty *vty, struct event_loop *m) { const char *name = m->name ? m->name : "main"; @@ -410,6 +495,7 @@ static void show_event_poll_helper(struct vty *vty, struct event_loop *m) vty_out(vty, "\n"); } } +#endif DEFUN_NOSH (show_event_poll, show_event_poll_cmd, @@ -521,11 +607,49 @@ static void initializer(void) pthread_key_create(&thread_current, NULL); } +#if EPOLL_ENABLED +/* Alloc, free epoll wrapper structs */ +static struct frr_epoll_event *frr_epoll_event_new(int fd, uint32_t events) +{ + struct frr_epoll_event *ev = XCALLOC(MTYPE_EVENT_EPOLL, + sizeof(struct frr_epoll_event)); + ev->ev.data.fd = fd; + ev->ev.events = events; + return ev; +} + +static void frr_epoll_event_del(struct frr_epoll_event **ev) +{ + XFREE(MTYPE_EVENT_EPOLL, *ev); +} + +static void get_fd_stat(int fd, struct stat *fd_stat, bool *fd_closed) +{ + assert(fd_stat != NULL); + if (fstat(fd, fd_stat) == -1) { + /* fd is probably already closed */ + if (errno == EBADF) { + if (fd_closed != NULL) + *fd_closed = true; + return; + } + zlog_debug("[!] In %s, fstat failed unexpectedly, fd: %d, errno: %d)", + __func__, fd, errno); + } + if (fd_closed != NULL) + *fd_closed = false; +} +#endif + #define STUPIDLY_LARGE_FD_SIZE 100000 + struct event_loop *event_master_create(const char *name) { struct event_loop *rv; struct rlimit limit; +#if EPOLL_ENABLED + struct epoll_event pipe_read_ev; +#endif pthread_once(&init_once, &initializer); @@ -562,10 +686,6 @@ struct event_loop *event_master_create(const char *name) rv->write = XCALLOC(MTYPE_EVENT_POLL, sizeof(struct event *) * rv->fd_limit); - char tmhashname[strlen(name) + 32]; - - snprintf(tmhashname, sizeof(tmhashname), "%s - threadmaster event hash", - name); cpu_records_init(rv->cpu_records); event_list_init(&rv->event); @@ -593,6 +713,31 @@ struct event_loop *event_master_create(const char *name) set_nonblocking(rv->io_pipe[0]); set_nonblocking(rv->io_pipe[1]); +#if EPOLL_ENABLED + /* Initailize data structures for epoll */ + rv->handler.epoll_fd = epoll_create1(0); + epoll_event_hash_init(&rv->handler.epoll_event_hash); + rv->handler.eventsize = rv->fd_limit; + rv->handler.revents = XCALLOC(MTYPE_EVENT_MASTER, + sizeof(struct epoll_event) * rv->handler.eventsize); + rv->handler.regular_revents = XCALLOC(MTYPE_EVENT_MASTER, + sizeof(struct epoll_event) * + rv->handler.eventsize); + rv->handler.regular_revent_count = 0; + memset(&pipe_read_ev, 0, sizeof(pipe_read_ev)); + pipe_read_ev.data.fd = rv->io_pipe[0]; + pipe_read_ev.events = EPOLLIN; + if (epoll_ctl(rv->handler.epoll_fd, EPOLL_CTL_ADD, rv->io_pipe[0], + &pipe_read_ev) == -1) { + flog_err(EC_LIB_NO_THREAD, + "Attempting to call epoll_ctl to add io_pipe[0] but failed, fd: %d!", + rv->io_pipe[0]); + exit(1); + } + rv->handler.fd_poll_counter = + XCALLOC(MTYPE_EVENT_MASTER, + sizeof(unsigned long) * rv->handler.eventsize); +#else /* Initialize data structures for poll() */ rv->handler.pfdsize = rv->fd_limit; rv->handler.pfdcount = 0; @@ -600,8 +745,9 @@ struct event_loop *event_master_create(const char *name) sizeof(struct pollfd) * rv->handler.pfdsize); rv->handler.copy = XCALLOC(MTYPE_EVENT_MASTER, sizeof(struct pollfd) * rv->handler.pfdsize); +#endif - /* add to list of threadmasters */ + /* add to list of loops */ frr_with_mutex (&masters_mtx) { if (!masters) masters = list_new(); @@ -699,8 +845,23 @@ void event_master_free(struct event_loop *m) cpu_records_fini(m->cpu_records); XFREE(MTYPE_EVENT_MASTER, m->name); +#if EPOLL_ENABLED + struct frr_epoll_event *ev; + + close(m->handler.epoll_fd); + + while ((ev = epoll_event_hash_pop(&(m->handler.epoll_event_hash))) != NULL) + frr_epoll_event_del(&ev); + + epoll_event_hash_fini(&(m->handler.epoll_event_hash)); + + XFREE(MTYPE_EVENT_MASTER, m->handler.revents); + XFREE(MTYPE_EVENT_MASTER, m->handler.regular_revents); + XFREE(MTYPE_EVENT_MASTER, m->handler.fd_poll_counter); +#else XFREE(MTYPE_EVENT_MASTER, m->handler.pfds); XFREE(MTYPE_EVENT_MASTER, m->handler.copy); +#endif XFREE(MTYPE_EVENT_MASTER, m); } @@ -821,7 +982,9 @@ static int fd_poll(struct event_loop *m, const struct timeval *timer_wait, { sigset_t origsigs; unsigned char trash[64]; +#if !EPOLL_ENABLED nfds_t count = m->handler.copycount; +#endif /* number of file descriptors with events */ int num; @@ -830,11 +993,13 @@ static int fd_poll(struct event_loop *m, const struct timeval *timer_wait, rcu_read_unlock(); rcu_assert_read_unlocked(); +#if !EPOLL_ENABLED /* add poll pipe poker */ assert(count + 1 < m->handler.pfdsize); m->handler.copy[count].fd = m->io_pipe[0]; m->handler.copy[count].events = POLLIN; m->handler.copy[count].revents = 0x00; +#endif /* We need to deal with a signal-handling race here: we * don't want to miss a crucial signal, such as SIGTERM or SIGINT, @@ -859,11 +1024,15 @@ static int fd_poll(struct event_loop *m, const struct timeval *timer_wait, pthread_sigmask(SIG_SETMASK, NULL, &origsigs); } -#if defined(HAVE_PPOLL) - /* ppoll supports nanosecond timeout through timespec - * Notice that we are not using previously calculated - * timeout here. + /* + * Timeout computation. We use apis that take two different timeout + * forms. Some apis take a timeout scalar in milliseconds, with + * special meaning for the values '0' and '-1'. Some apis take a + * timespec with resolution in nanoseconds, and with a special + * meaning for NULL. */ +#if !EPOLL_ENABLED && defined(HAVE_PPOLL) + /* Support for timeout via timespec */ struct timespec ts, *tsp; if (timer_wait != NULL) { @@ -873,8 +1042,6 @@ static int fd_poll(struct event_loop *m, const struct timeval *timer_wait, } else { tsp = NULL; /* block indefinitely, because there is no timer to wait for */ } - num = ppoll(m->handler.copy, count + 1, tsp, &origsigs); - pthread_sigmask(SIG_SETMASK, &origsigs, NULL); #else /* * If timer_wait is null here, that means poll() should block @@ -882,15 +1049,16 @@ static int fd_poll(struct event_loop *m, const struct timeval *timer_wait, * ->selectpoll_timeout. * * If the value is positive, it specifies the maximum number of - * milliseconds to wait. If the timeout is -1, it specifies that - * we should never wait and always return immediately even if no - * event is detected. If the value is zero, the behavior is default. + * milliseconds to wait. If the timeout is zero, it specifies that + * we should never wait and return immediately even if no + * event is detected. If the value is -1, the call blocks indefinitely. */ int timeout = -1; if (timer_wait != NULL && m->selectpoll_timeout == 0) { - /* use the default value */ + /* Convert to millisecds */ timeout = (timer_wait->tv_sec * 1000) + (timer_wait->tv_usec / 1000); + /* Round up if there are only fractional usecs */ if (timeout == 0 && timer_wait->tv_usec != 0) timeout = 1; } else if (m->selectpoll_timeout > 0) { @@ -900,6 +1068,16 @@ static int fd_poll(struct event_loop *m, const struct timeval *timer_wait, /* effect a poll (return immediately) */ timeout = 0; } +#endif /* timeout computation */ + +#if defined(USE_EPOLL) && defined(HAVE_EPOLL_PWAIT) + num = epoll_pwait(m->handler.epoll_fd, m->handler.revents, m->handler.eventsize, + timeout, &origsigs); + pthread_sigmask(SIG_SETMASK, &origsigs, NULL); +#elif defined(HAVE_PPOLL) + num = ppoll(m->handler.copy, count + 1, tsp, &origsigs); + pthread_sigmask(SIG_SETMASK, &origsigs, NULL); +#else /* Not ideal - there is a race after we restore the signal mask */ pthread_sigmask(SIG_SETMASK, &origsigs, NULL); num = poll(m->handler.copy, count + 1, timeout); @@ -910,15 +1088,143 @@ static int fd_poll(struct event_loop *m, const struct timeval *timer_wait, if (num < 0 && errno == EINTR) *eintr_p = true; - if (num > 0 && m->handler.copy[count].revents != 0 && num--) - while (read(m->io_pipe[0], &trash, sizeof(trash)) > 0) - ; + /* Drain the pipe */ + while (read(m->io_pipe[0], &trash, sizeof(trash)) > 0) + ; + + /* When poll() is used, we need to remove the io_pipe[0] + * from m->handler.copy and decreate "num" as fast as + * possible. Otherwise, when current thread is awakened, + * even if there is no ready I/O task, thread_process_io + * will still iterate over m->handler.copy until io_pipe[0] + * is find, which is inefficient. + * + * When epoll-APIs are used, we can postpone handling of + * io_pipe[0] in thread_process_io. In the case mentioned + * above, thread_process_io just need to iterate over a + * single-element m->handler.revents, which is much faster + * than poll() case (thanks to epoll_wait's behavior). Of + * course, removing io_pipe[0] from m->handler.revents is + * still a feasible choice. However, it is not as easy as + * removing the last element of m->handler.copy before, since + * we don't know where io_pipe[0] is located in m->handler.revents + * now. Only by traversing through m->handler.revents can we + * find io_pipe[0] and remove it. So, why don't we just postpone + * this traverse to thread_process_io to avoid an additional + * traverse? + */ +#if !EPOLL_ENABLED + if (num > 0 && m->handler.copy[count].revents != 0) + num--; +#endif rcu_read_lock(); return num; } +#if EPOLL_ENABLED +/* + * Helper for event add read/write on epoll platforms; we expect the loop's lock + * to be held. + */ +static void add_epoll_rw_helper(struct event_loop *m, int fd, int dir) +{ + struct frr_epoll_event set_ev = {}; + struct frr_epoll_event *hash_ev, *tmp_ev; + struct stat fd_stat = {}; + bool fd_closed; + int i, ret; + + set_ev.ev.data.fd = fd; + set_ev.ev.events = (dir == EVENT_READ ? EPOLLIN : EPOLLOUT); + + get_fd_stat(fd, &fd_stat, &fd_closed); + hash_ev = epoll_event_hash_find(&m->handler.epoll_event_hash, &set_ev); + + if (hash_ev) { + /* Existing fd */ + + /* Union epoll IN/OUT events */ + set_ev.ev.events |= hash_ev->ev.events; + + if (S_ISREG(fd_stat.st_mode)) { + /* Regular file, modify the entry in m->handler.regular_events */ + for (i = 0; i < m->handler.regular_revent_count; i++) { + if (m->handler.regular_revents[i].data.fd == fd) + break; + } + if (i < m->handler.regular_revent_count) { + m->handler.regular_revents[i].events = set_ev.ev.events; + } else { + zlog_debug("%s: A regular file I/O event registered in epoll_event_hash, but not in regular_event, fd_closed %d", + __func__, fd_closed); + zlog_debug("[!] loop: %s | fd: %d", + m->name ? m->name : "", fd); + } + + } else if (epoll_ctl(m->handler.epoll_fd, EPOLL_CTL_MOD, fd, + &(set_ev.ev)) == -1) { + /* Not regular file, modify the entry in the epoll set */ + if (errno == ENOENT) { + /* The fd is already closed and removed from epoll set, + * but still in the hash table (fd is a zombie) + */ + /* reset .events of new set_ev */ + set_ev.ev.events = (dir == EVENT_READ ? EPOLLIN : EPOLLOUT); + if (epoll_ctl(m->handler.epoll_fd, EPOLL_CTL_ADD, fd, + &(set_ev.ev)) == -1) { + /* Not regular file, add into the epoll set */ + zlog_debug("%s: EPOLL_CTL_MOD and EPOLL_CTL_ADD error, errno: %d", + __func__, errno); + zlog_debug("[!] loop: %s | fd: %d", + m->name ? m->name : "", fd); + } + } else { + zlog_debug("%s: EPOLL_CTL_MOD error, errno: %d", __func__, + errno); + zlog_debug("[!] loop: %s | fd: %d", + m->name ? m->name : "", fd); + } + } + + /* Modify existing hash element */ + hash_ev->ev.events = set_ev.ev.events; + + } else { + /* New fd */ + if (S_ISREG(fd_stat.st_mode)) { + /* Regular file, add into m->handler.regular_events */ + assert(m->handler.regular_revent_count < m->handler.eventsize); + + i = m->handler.regular_revent_count; + m->handler.regular_revents[i].data.fd = fd; + m->handler.regular_revents[i].events = set_ev.ev.events; + m->handler.regular_revent_count++; + } else { + /* Not regular file, add into the epoll set */ + ret = epoll_ctl(m->handler.epoll_fd, EPOLL_CTL_ADD, fd, + &set_ev.ev); + if (ret == -1) { + zlog_debug("%s: EPOLL_CTL_ADD error, errno: %d", + __func__, errno); + zlog_debug("[!] loop: %s | fd: %d", + m->name ? m->name : "", fd); + } + } + + /* Add hash element */ + hash_ev = frr_epoll_event_new(fd, set_ev.ev.events); + tmp_ev = epoll_event_hash_add(&m->handler.epoll_event_hash, hash_ev); + + /* We just looked the item up, and the loop object is locked: + * don't expect to find it in the hash now + */ + assert(tmp_ev == NULL); + } +} +#endif /* EPOLL */ + /* Add new read thread. */ void _event_add_read_write(const struct xref_eventsched *xref, struct event_loop *m, void (*func)(struct event *), @@ -946,14 +1252,12 @@ void _event_add_read_write(const struct xref_eventsched *xref, if (t_ptr && *t_ptr) break; +#if EPOLL_ENABLED + add_epoll_rw_helper(m, fd, dir); +#else /* default to a new pollfd */ nfds_t queuepos = m->handler.pfdcount; - if (dir == EVENT_READ) - thread_array = m->read; - else - thread_array = m->write; - /* * if we already have a pollfd for our file descriptor, find and * use it @@ -961,15 +1265,6 @@ void _event_add_read_write(const struct xref_eventsched *xref, for (nfds_t i = 0; i < m->handler.pfdcount; i++) { if (m->handler.pfds[i].fd == fd) { queuepos = i; - -#ifdef DEV_BUILD - /* - * What happens if we have a thread already - * created for this event? - */ - if (thread_array[fd]) - assert(!"Thread already scheduled for file descriptor"); -#endif break; } /* @@ -985,14 +1280,29 @@ void _event_add_read_write(const struct xref_eventsched *xref, /* make sure we have room for this fd + pipe poker fd */ assert(queuepos + 1 < m->handler.pfdsize); - event = event_get(m, dir, func, arg, xref); - m->handler.pfds[queuepos].fd = fd; m->handler.pfds[queuepos].events |= (dir == EVENT_READ ? POLLIN : POLLOUT); if (queuepos == m->handler.pfdcount) m->handler.pfdcount++; +#endif + + if (dir == EVENT_READ) + thread_array = m->read; + else + thread_array = m->write; + +#ifdef DEV_BUILD + /* + * What happens if we have a thread already + * created for this event? + */ + if (thread_array[fd]) + assert(!"Thread already scheduled for file descriptor"); +#endif + + event = event_get(m, dir, func, arg, xref); if (event) { frr_with_mutex (&event->mtx) { @@ -1148,9 +1458,109 @@ void _event_add_event(const struct xref_eventsched *xref, struct event_loop *m, * @param fd * @param state the event to cancel. One or more (OR'd together) of the * following: - * - POLLIN - * - POLLOUT + * - POLLIN/EPOLLIN + * - POLLOUT/EPOLLOUT */ +#if EPOLL_ENABLED +static void event_cancel_rw(struct event_loop *master, int fd, short state, + int idx_hint) +{ + struct frr_epoll_event set_ev = {}; + struct frr_epoll_event *hash_ev; + struct stat fd_stat = {}; + bool fd_closed = false; + int i, idx; + + get_fd_stat(fd, &fd_stat, &fd_closed); + + set_ev.ev.data.fd = fd; + hash_ev = epoll_event_hash_find(&(master->handler.epoll_event_hash), &set_ev); + if (!hash_ev) { + zlog_debug("[!] Received cancellation request for nonexistent rw job"); + zlog_debug("[!] loop: %s | fd: %d", + master->name ? master->name : "", fd); + return; + } + + /* NOT to unset specified event bit. */ + set_ev.ev.events = hash_ev->ev.events &= ~(state); + + if (set_ev.ev.events == 0) { + /* All events are canceled, unregister the fd */ + if (S_ISREG(fd_stat.st_mode)) { + /* Regular file, remove the fd from m->handler.regular_events */ + for (i = 0; i < master->handler.regular_revent_count; i++) { + if (master->handler.regular_revents[i].data.fd == fd) + break; + } + + /* Whoops - didn't find the fd? */ + if (i >= master->handler.regular_revent_count && !fd_closed) { + zlog_debug("%s: A regular file I/O event registered in epoll_event_hash, but not in regular_event", + __func__); + zlog_debug("[!] loop: %s | fd: %d", + master->name ? master->name : "", fd); + return; + } + + memmove(master->handler.regular_revents + i, + master->handler.regular_revents + i + 1, + (master->handler.regular_revent_count - i - 1) * + sizeof(struct epoll_event)); + master->handler.regular_revent_count--; + + idx = master->handler.regular_revent_count; + master->handler.regular_revents[idx].data.fd = 0; + master->handler.regular_revents[idx].events = 0; + } else if (epoll_ctl(master->handler.epoll_fd, EPOLL_CTL_DEL, fd, + NULL) == -1) { + /* Not regular file, remove the fd from the epoll set */ + if (errno != ENOENT && errno != EBADF) { + zlog_debug("%s: EPOLL_CTL_DEL error, errno: %d", + __func__, errno); + zlog_debug("[!] loop: %s | fd: %d", + master->name ? master->name : "", fd); + } + } + + /* Remove fd from hash table */ + epoll_event_hash_del(&(master->handler.epoll_event_hash), hash_ev); + frr_epoll_event_del(&hash_ev); + } else { + /* Not all events are canceled */ + if (S_ISREG(fd_stat.st_mode)) { + /* Regular file, update the fd's events in + * m->handler.regular_events + */ + for (i = 0; i < master->handler.regular_revent_count; i++) { + if (master->handler.regular_revents[i].data.fd == fd) + break; + } + + if (i < master->handler.regular_revent_count) { + master->handler.regular_revents[i].events = + set_ev.ev.events; + } else { + zlog_debug("%s: A regular file I/O event registered in epoll_event_hash, but not in regular_event", + __func__); + zlog_debug("[!] loop: %s | fd: %d", + master->name ? master->name : "", fd); + } + } else if (epoll_ctl(master->handler.epoll_fd, EPOLL_CTL_MOD, fd, + &set_ev.ev) == -1) { + /* Not regular file, update the fd's events + * from the epoll set + */ + zlog_debug("%s: EPOLL_CTL_MOD error, errno: %d", __func__, errno); + zlog_debug("[!] loop: %s | fd: %d", + master->name ? master->name : "", fd); + } + + /* update the fd's events in the hash table. */ + hash_ev->ev.events = set_ev.ev.events; + } +} +#else static void event_cancel_rw(struct event_loop *master, int fd, short state, int idx_hint) { @@ -1178,7 +1588,7 @@ static void event_cancel_rw(struct event_loop *master, int fd, short state, if (!found) { zlog_debug( "[!] Received cancellation request for nonexistent rw job"); - zlog_debug("[!] threadmaster: %s | fd: %d", + zlog_debug("[!] loop: %s | fd: %d", master->name ? master->name : "", fd); return; } @@ -1214,6 +1624,47 @@ static void event_cancel_rw(struct event_loop *master, int fd, short state, master->handler.copy[master->handler.copycount].events = 0; } } +#endif + +#if EPOLL_ENABLED +/* + * Helper for epoll task cancellation; check for match with cancelled 'arg' value. + */ +static bool epoll_cancel_arg_helper(struct frr_epoll_event *ev, struct event_loop *m, + void *arg) +{ + struct event *t; + int fd; + bool ret = false; + + fd = ev->ev.data.fd; + if (fd == m->io_pipe[0] || fd == m->io_pipe[1]) + return ret; + + if (ev->ev.events & EPOLLIN) + t = m->read[fd]; + else + t = m->write[fd]; + + if (t && t->arg == arg) { + /* Found a match to cancel: clean up fd arrays */ + event_cancel_rw(m, fd, ev->ev.events, -1); + + /* Clean up thread arrays */ + m->read[fd] = NULL; + m->write[fd] = NULL; + + /* Clear caller's ref */ + if (t->ref) + *t->ref = NULL; + + thread_add_unuse(m, t); + ret = true; + } + + return ret; +} +#endif /* * Process task cancellation given a task argument: iterate through the @@ -1223,9 +1674,13 @@ static void cancel_arg_helper(struct event_loop *master, const struct cancel_req *cr) { struct event *t; +#if EPOLL_ENABLED + struct frr_epoll_event *ev; +#else nfds_t i; int fd; struct pollfd *pfd; +#endif /* We're only processing arg-based cancellations here. */ if (cr->eventobj == NULL) @@ -1255,6 +1710,11 @@ static void cancel_arg_helper(struct event_loop *master, return; /* Check the io tasks */ +#if EPOLL_ENABLED + frr_each_safe(epoll_event_hash, &(master->handler.epoll_event_hash), ev) { + epoll_cancel_arg_helper(ev, master, cr->eventobj); + } +#else for (i = 0; i < master->handler.pfdcount;) { pfd = master->handler.pfds + i; @@ -1293,6 +1753,7 @@ static void cancel_arg_helper(struct event_loop *master, } else i++; } +#endif /* Check the timer tasks */ t = event_timer_list_first(&master->timer); @@ -1356,11 +1817,19 @@ static void do_event_cancel(struct event_loop *master) /* Determine the appropriate queue to cancel the thread from */ switch (event->type) { case EVENT_READ: +#if EPOLL_ENABLED + event_cancel_rw(master, event->u.fd, EPOLLIN, -1); +#else event_cancel_rw(master, event->u.fd, POLLIN, -1); +#endif thread_array = master->read; break; case EVENT_WRITE: +#if EPOLL_ENABLED + event_cancel_rw(master, event->u.fd, EPOLLOUT, -1); +#else event_cancel_rw(master, event->u.fd, POLLOUT, -1); +#endif thread_array = master->write; break; case EVENT_TIMER: @@ -1570,6 +2039,178 @@ static struct event *event_run(struct event_loop *m, struct event *event, struct return fetch; } +/* + * Note two versions of the io handling loops: one for EPOLL, and one for the POLL apis. + */ +#if EPOLL_ENABLED +static int thread_process_io_helper(struct event_loop *m, struct event *event, + short state, short actual_state, int i, + int fd, const struct stat *fd_stat, + struct frr_epoll_event *hash_ev) +{ + struct event **thread_array; + struct epoll_event set_ev = {}; + + /* + * Clear the events corresponding to "state" in + * regular_revents/epoll set, and hash table. + * + * This cleans up a possible infinite loop where we refuse + * to respond to a epoll event but epoll is insistent that + * we should. + */ + set_ev.data.fd = fd; + set_ev.events = hash_ev->ev.events & ~(state); + + if (S_ISREG(fd_stat->st_mode)) { + /* Regular file, update the fd's events in + * m->handler.regular_events + */ + m->handler.regular_revents[i].events = set_ev.events; + } else if (epoll_ctl(m->handler.epoll_fd, EPOLL_CTL_MOD, fd, &set_ev) == -1) { + /* Not regular file, update the fd's events + * from the epoll set + */ + zlog_debug("%s: EPOLL_CTL_MOD error, errno: %d", __func__, + errno); + zlog_debug("[!] loop: %s | fd: %d", + m->name ? m->name : "", fd); + } + + /* update the fd's events in the hash table. */ + hash_ev->ev.events = set_ev.events; + + if (!event) { + if ((actual_state & (EPOLLHUP | EPOLLIN)) != EPOLLHUP) + flog_err(EC_LIB_NO_THREAD, + "Attempting to process an I/O event but for fd: %d(%d) no event to handle this!", + fd, actual_state); + return 0; + } + + if (state == EPOLLIN) + thread_array = m->read; + else + thread_array = m->write; + + thread_array[fd] = NULL; + event_list_add_tail(&m->ready, event); + event->type = EVENT_READY; + + return 1; +} + +static inline void thread_process_io_inner_loop(struct event_loop *m, + const struct epoll_event *revents, int *i) +{ + struct frr_epoll_event *hash_ev; + struct frr_epoll_event set_ev = {}; + int fd; + struct stat fd_stat = {}; + bool fd_closed = false; + int idx, ret; + struct event *event; + + fd = revents[*i].data.fd; + m->handler.fd_poll_counter[fd] += 1; + if (fd == m->io_pipe[0]) + return; + + get_fd_stat(fd, &fd_stat, &fd_closed); + + set_ev.ev.data.fd = fd; + hash_ev = epoll_event_hash_find(&(m->handler.epoll_event_hash), &set_ev); + assert(hash_ev); + + /* Process the I/O event. Handle errors also, which may occur for READ or WRITE + * events. + */ + + /* + * Error detected: notify the application code appropriately, + * and remove the fd from regular_revents/epoll set, and hash table. + */ + if (fd_closed || (revents[*i].events & (EPOLLHUP | EPOLLERR | EPOLLRDHUP))) { + /* Return any application tasks back to the application. */ + event = m->read[fd]; + if (event) { + m->read[fd] = NULL; + event->type = EVENT_READY; + event_list_add_tail(&m->ready, event); + } + event = m->write[fd]; + if (event) { + m->write[fd] = NULL; + event->type = EVENT_READY; + event_list_add_tail(&m->ready, event); + } + + if (S_ISREG(fd_stat.st_mode)) { + /* Regular file, remove the fd from m->handler.regular_events */ + memmove(m->handler.regular_revents + *i, + m->handler.regular_revents + *i + 1, + (m->handler.regular_revent_count - *i - 1) * + sizeof(struct epoll_event)); + + m->handler.regular_revent_count--; + idx = m->handler.regular_revent_count; + m->handler.regular_revents[idx].data.fd = 0; + m->handler.regular_revents[idx].events = 0; + + /* regular_revents is modified when iterating on it, rollback */ + *i = *i - 1; + } else { + /* Not regular file, remove the fd from the epoll set */ + ret = epoll_ctl(m->handler.epoll_fd, EPOLL_CTL_DEL, fd, NULL); + + if (!fd_closed && ret == -1 && errno != EBADF && errno != ENOENT) { + zlog_debug("%s: EPOLL_CTL_DEL error, errno: %d", + __func__, errno); + zlog_debug("[!] loop: %s | fd: %d", + m->name ? m->name : "", fd); + } + } + + /* Remove fd from hash table */ + epoll_event_hash_del(&(m->handler.epoll_event_hash), hash_ev); + frr_epoll_event_del(&hash_ev); + return; + } + + if (revents[*i].events & (EPOLLIN)) + thread_process_io_helper(m, m->read[fd], EPOLLIN, revents[*i].events, + *i, fd, &fd_stat, hash_ev); + + if (revents[*i].events & (EPOLLOUT)) { + thread_process_io_helper(m, m->write[fd], EPOLLOUT, revents[*i].events, + *i, fd, &fd_stat, hash_ev); + } +} + +/** + * Process I/O events. + * + * @param m the thread master + * @param num return value of epoll_wait() + */ +static void thread_process_io(struct event_loop *m, int num) +{ + int i; + + /* First, handle regular file I/O events in m->handler.regular_revents. */ + for (i = 0; i < m->handler.regular_revent_count; ++i) + thread_process_io_inner_loop(m, m->handler.regular_revents, &i); + + /* Second, handle I/O events in m->handler.revents which are returned by + * epoll_wait(). + */ + for (i = 0; i < num; ++i) + thread_process_io_inner_loop(m, m->handler.revents, &i); +} +#else +/* + * version for poll family of apis + */ static int thread_process_io_helper(struct event_loop *m, struct event *event, short state, short actual_state, int pos) { @@ -1695,6 +2336,7 @@ static void thread_process_io(struct event_loop *m, unsigned int num) m->last_read++; } +#endif /* Add all timers that have popped to the ready list. */ static unsigned int thread_process_timers(struct event_loop *m, @@ -1730,11 +2372,10 @@ static unsigned int thread_process(struct event_list_head *list) return ready; } - -/* Fetch next ready thread. */ -struct event *event_fetch(struct event_loop *m, struct event *fetch) +static void event_fetch_inner_loop(struct event_loop *m, struct event *event, + struct event *fetch, bool *broken, + bool *continued) { - struct event *event = NULL; struct timeval now; struct timeval zerotime = {0, 0}; struct timeval tv; @@ -1742,110 +2383,145 @@ struct event *event_fetch(struct event_loop *m, struct event *fetch) bool eintr_p = false; int num = 0; - do { - /* Handle signals if any */ - if (m->handle_signals) - frr_sigevent_process(); + /* Handle signals if any */ + if (m->handle_signals) + frr_sigevent_process(); - pthread_mutex_lock(&m->mtx); + pthread_mutex_lock(&m->mtx); - /* Process any pending cancellation requests */ - do_event_cancel(m); + /* Process any pending cancellation requests */ + do_event_cancel(m); - /* - * Attempt to flush ready queue before going into poll(). - * This is performance-critical. Think twice before modifying. - */ - if ((event = event_list_pop(&m->ready))) { - fetch = event_run(m, event, fetch); - if (fetch->ref) - *fetch->ref = NULL; - pthread_mutex_unlock(&m->mtx); - if (!m->ready_run_loop) - GETRUSAGE(&m->last_getrusage); - m->ready_run_loop = true; - break; - } + /* + * Attempt to flush ready queue before going into poll(). + * This is performance-critical. Think twice before modifying. + */ + if ((event = event_list_pop(&m->ready))) { + fetch = event_run(m, event, fetch); + if (fetch->ref) + *fetch->ref = NULL; + pthread_mutex_unlock(&m->mtx); + if (!m->ready_run_loop) + GETRUSAGE(&m->last_getrusage); + m->ready_run_loop = true; + *broken = true; + return; + } - m->ready_run_loop = false; - /* otherwise, tick through scheduling sequence */ + m->ready_run_loop = false; + /* otherwise, tick through scheduling sequence */ - /* - * Post events to ready queue. This must come before the - * following block since events should occur immediately - */ - thread_process(&m->event); + /* + * Post events to ready queue. This must come before the + * following block since events should occur immediately + */ + thread_process(&m->event); - /* - * If there are no tasks on the ready queue, we will poll() - * until a timer expires or we receive I/O, whichever comes - * first. The strategy for doing this is: - * - * - If there are events pending, set the poll() timeout to zero - * - If there are no events pending, but there are timers - * pending, set the timeout to the smallest remaining time on - * any timer. - * - If there are neither timers nor events pending, but there - * are file descriptors pending, block indefinitely in poll() - * - If nothing is pending, it's time for the application to die - * - * In every case except the last, we need to hit poll() at least - * once per loop to avoid starvation by events - */ - if (!event_list_count(&m->ready)) - tw = thread_timer_wait(&m->timer, &tv); + /* + * If there are no tasks on the ready queue, we will poll() + * until a timer expires or we receive I/O, whichever comes + * first. The strategy for doing this is: + * + * - If there are events pending, set the poll() timeout to zero + * - If there are no events pending, but there are timers + * pending, set the timeout to the smallest remaining time on + * any timer. + * - If there are neither timers nor events pending, but there + * are file descriptors pending, block indefinitely in poll() + * - If nothing is pending, it's time for the application to die + * + * In every case except the last, we need to hit poll() at least + * once per loop to avoid starvation by events + */ + if (!event_list_count(&m->ready)) + tw = thread_timer_wait(&m->timer, &tv); - if (event_list_count(&m->ready) || - (tw && !timercmp(tw, &zerotime, >))) - tw = &zerotime; + if (event_list_count(&m->ready) || (tw && !timercmp(tw, &zerotime, >))) + tw = &zerotime; - if (!tw && m->handler.pfdcount == 0) { /* die */ - pthread_mutex_unlock(&m->mtx); - fetch = NULL; - break; - } +#if EPOLL_ENABLED + if (!tw && m->handler.regular_revent_count == 0 && + epoll_event_hash_count(&m->handler.epoll_event_hash) == 0) { /* die */ + pthread_mutex_unlock(&m->mtx); + fetch = NULL; + *broken = true; + return; + } +#else + if (!tw && m->handler.pfdcount == 0) { /* die */ + pthread_mutex_unlock(&m->mtx); + fetch = NULL; + *broken = true; + return; + } +#endif - /* - * Copy pollfd array + # active pollfds in it. Not necessary to - * copy the array size as this is fixed. - */ - m->handler.copycount = m->handler.pfdcount; - memcpy(m->handler.copy, m->handler.pfds, - m->handler.copycount * sizeof(struct pollfd)); +#if !EPOLL_ENABLED + /* + * Copy pollfd array + # active pollfds in it. Not necessary to + * copy the array size as this is fixed. + */ + m->handler.copycount = m->handler.pfdcount; + memcpy(m->handler.copy, m->handler.pfds, + m->handler.copycount * sizeof(struct pollfd)); +#endif - pthread_mutex_unlock(&m->mtx); - { - eintr_p = false; - num = fd_poll(m, tw, &eintr_p); - } - pthread_mutex_lock(&m->mtx); - - /* Handle any errors received in poll() */ - if (num < 0) { - if (eintr_p) { - pthread_mutex_unlock(&m->mtx); - /* loop around to signal handler */ - continue; - } + pthread_mutex_unlock(&m->mtx); + { + eintr_p = false; + num = fd_poll(m, tw, &eintr_p); + } + pthread_mutex_lock(&m->mtx); - /* else die */ - flog_err(EC_LIB_SYSTEM_CALL, "poll() error: %s", - safe_strerror(errno)); + /* Handle any errors received in poll()/epoll_wait() */ + if (num < 0) { + if (eintr_p) { pthread_mutex_unlock(&m->mtx); - fetch = NULL; - break; + /* loop around to signal handler */ + *continued = true; + return; } - /* Post timers to ready queue. */ - monotime(&now); - thread_process_timers(m, &now); + /* else die */ +#if EPOLL_ENABLED + flog_err(EC_LIB_SYSTEM_CALL, "epoll_wait() error: %s", + safe_strerror(errno)); +#else + flog_err(EC_LIB_SYSTEM_CALL, "poll() error: %s", + safe_strerror(errno)); +#endif + pthread_mutex_unlock(&m->mtx); + fetch = NULL; + *broken = true; + return; + } - /* Post I/O to ready queue. */ - if (num > 0) - thread_process_io(m, num); + /* Post timers to ready queue. */ + monotime(&now); + thread_process_timers(m, &now); - pthread_mutex_unlock(&m->mtx); + /* Post I/O to ready queue. */ + if (num > 0) + thread_process_io(m, num); + pthread_mutex_unlock(&m->mtx); +} + +/* Fetch next ready thread. */ +struct event *event_fetch(struct event_loop *m, struct event *fetch) +{ + struct event *event = NULL; + bool broken = false; + bool continued = false; + + do { + broken = false; + continued = false; + event_fetch_inner_loop(m, event, fetch, &broken, &continued); + if (broken) + break; + if (continued) + continue; } while (!event && m->spin); return fetch; diff --git a/lib/frrevent.h b/lib/frrevent.h index 42e52bc33d44..4c04158dc5de 100644 --- a/lib/frrevent.h +++ b/lib/frrevent.h @@ -6,10 +6,19 @@ #ifndef _ZEBRA_THREAD_H #define _ZEBRA_THREAD_H +#if defined(USE_EPOLL) && defined(HAVE_EPOLL_PWAIT) +#define EPOLL_ENABLED 1 +#else +#define EPOLL_ENABLED 0 +#endif + #include #include #include #include +#if EPOLL_ENABLED +#include +#endif #include "monotime.h" #include "frratomic.h" #include "typesafe.h" @@ -39,6 +48,42 @@ struct rusage_t { PREDECL_LIST(event_list); PREDECL_HEAP(event_timer_list); +#if EPOLL_ENABLED + +PREDECL_HASH(epoll_event_hash); + +struct frr_epoll_event { + struct epoll_event ev; + struct epoll_event_hash_item link; +}; + +struct fd_handler { + /* The epoll set file descriptor */ + int epoll_fd; + + /* A hash table in which monitored I/O file descriptors and events + * are registered + */ + struct epoll_event_hash_head epoll_event_hash; + + /* Maximum size of .revents and .regular_revents arrays */ + int eventsize; + + /* The buffer which stores the results of epoll_wait */ + struct epoll_event *revents; + + /* Vtysh might redirect stdin/stdout to regular files. However, + * regular files can't be added into epoll set and need special + * treatment. I/O events from/to regular file will be directly + * added to regular_revents, but not into epoll set, whereby + * sidestepping epoll_wait. + */ + struct epoll_event *regular_revents; + int regular_revent_count; + + unsigned long *fd_poll_counter; +}; +#else struct fd_handler { /* number of pfd that fit in the allocated space of pfds. This is a * constant and is the same for both pfds and copy. @@ -55,6 +100,7 @@ struct fd_handler { /* number of pollfds stored in copy */ nfds_t copycount; }; +#endif struct xref_eventsched { struct xref xref; @@ -87,7 +133,9 @@ struct event_loop { pthread_mutex_t mtx; pthread_t owner; +#if !EPOLL_ENABLED nfds_t last_read; +#endif bool ready_run_loop; RUSAGE_T last_getrusage;