diff --git a/docs/graph.svg b/docs/graph.svg index 7a4818243..d0e1b2a0e 100644 --- a/docs/graph.svg +++ b/docs/graph.svg @@ -1,733 +1,727 @@ - - - - -gr-0002 - + + + +gr-0-0002-dplane + control_input - -control_input + +control_input loopback_input - -loopback_input + +loopback_input control_input->loopback_input - - + + arp_output_request - -arp_output_request + +arp_output_request control_input->arp_output_request - - + + icmp_local_send - -icmp_local_send + +icmp_local_send control_input->icmp_local_send - - + + icmp6_local_send - -icmp6_local_send + +icmp6_local_send control_input->icmp6_local_send - - + + ndp_ns_output - -ndp_ns_output + +ndp_ns_output control_input->ndp_ns_output - - - - - -ip_output - -ip_output - - - -control_input->ip_output - - - - - -arp_output_reply - -arp_output_reply - - - -control_input->arp_output_reply - - + + - + ip6_output - -ip6_output + +ip6_output - -control_input->ip6_output - - - - - + control_input->ip6_output - - - - - -ndp_na_output - -ndp_na_output - - - -control_input->ndp_na_output - - + + - + ip_input - -ip_input + +ip_input - + loopback_input->ip_input - - + + - + ip6_input - -ip6_input + +ip6_input - + loopback_input->ip6_input - - + + - + eth_output - -eth_output + +eth_output - + arp_output_request->eth_output - - + + - + icmp_output - -icmp_output + +icmp_output - + icmp_local_send->icmp_output - - + + - + icmp6_output - -icmp6_output + +icmp6_output - + icmp6_local_send->icmp6_output - - + + - + ndp_ns_output->icmp6_output - - - - - -ip_output->eth_output - - - - - -loopback_output - -loopback_output - - - -ip_output->loopback_output - - - - - -ip_hold - -ip_hold - - - -ip_output->ip_hold - - - - - -ipip_output - -ipip_output - - - -ip_output->ipip_output - - - - - -sr6_output - -sr6_output - - - -ip_output->sr6_output - - - - - -arp_output_reply->eth_output - - + + - + ip6_output->eth_output - - + + - - -ip6_output->loopback_output - - + + +sr6_output + +sr6_output - + ip6_output->sr6_output - - + + - + ip6_hold - -ip6_hold + +ip6_hold - + ip6_output->ip6_hold - - - - - -ndp_na_output->icmp6_output - - - - - -control_output - -control_output + + - + eth_input - -eth_input + +eth_input - + arp_input - -arp_input + +arp_input - + eth_input->arp_input - - + + - + eth_input->ip_input - - + + - + eth_input->ip6_input - - + + - + arp_input_request - -arp_input_request + +arp_input_request - + arp_input->arp_input_request - - + + - + arp_input_reply - -arp_input_reply + +arp_input_reply - + arp_input->arp_input_reply - - + + + + + +ip_output + +ip_output - + ip_input->ip_output - - + + - + dnat44 - -dnat44 + +dnat44 - + ip_input->dnat44 - - + + - + ip_forward - -ip_forward + +ip_forward - + ip_input->ip_forward - - + + - + ip_input_local - -ip_input_local + +ip_input_local - + ip_input->ip_input_local - - + + - + ip6_input->ip6_output - - + + - + ip6_forward - -ip6_forward + +ip6_forward - + ip6_input->ip6_forward - - + + - + ip6_input_local - -ip6_input_local + +ip6_input_local - + ip6_input->ip6_input_local - - + + - + sr6_local - -sr6_local + +sr6_local - + ip6_input->sr6_local - - + + - + port_tx - -port_tx + +port_tx - + eth_output->port_tx - - + + - + l1_xconnect - -l1_xconnect + +l1_xconnect - + l1_xconnect->port_tx - - - - - -loopback_output->control_output - - + + - + port_rx - -port_rx + +port_rx - + port_rx->eth_input - - + + - + port_rx->l1_xconnect - - + + - - -arp_input_request->control_output - - + + +nh4_unreachable + +nh4_unreachable + + + +nh4_unreachable->ip_output + + + + + +ip_output->eth_output + + + + + +ip_hold + +ip_hold + + + +ip_output->ip_hold + + + + + +ipip_output + +ipip_output + + + +ip_output->ipip_output + + - + + +ip_output->sr6_output + + + + + +arp_probe + +arp_probe + + + +arp_probe->ip_output + + + + + +arp_output_reply + +arp_output_reply + + + +arp_probe->arp_output_reply + + + + -arp_input_reply->control_output - - +arp_output_reply->eth_output + + + + + +arp_input_request->arp_probe + + + + + +arp_input_reply->arp_probe + + - + dnat44->ip_forward - - + + - + dnat44->ip_input_local - - + + - + ip_forward->ip_output - - + + - + icmp_input - -icmp_input + +icmp_input - + ip_input_local->icmp_input - - + + - + ipip_input - -ipip_input + +ipip_input - + ip_input_local->ipip_input - - + + - + l4_input_local - -l4_input_local + +l4_input_local - + ip_input_local->l4_input_local - - - - - -icmp_input->control_output - - + + - + icmp_input->icmp_output - - + + - + icmp_output->ip_output - - + + - - -ip_hold->control_output - - + + +ip_hold->nh4_unreachable + + - + ipip_input->ip_input - - + + - + l4_loopback_output - -l4_loopback_output + +l4_loopback_output - + l4_input_local->l4_loopback_output - - + + - + ipip_output->ip_output - - + + - + sr6_output->ip6_output - - + + - + + +nh6_unreachable + +nh6_unreachable + + + +nh6_unreachable->ip6_output + + + + -icmp6_input - -icmp6_input +ndp_probe + +ndp_probe - - -icmp6_input->control_output - - + + +ndp_probe->ip6_output + + + + + +ndp_na_output + +ndp_na_output + + + +ndp_probe->ndp_na_output + + + + + +ndp_na_output->icmp6_output + + + + + +ndp_router_solicit + +ndp_router_solicit + + + +icmp6_input + +icmp6_input - + icmp6_input->icmp6_output - - + + - + ndp_ns_input - -ndp_ns_input + +ndp_ns_input - + icmp6_input->ndp_ns_input - - + + - + ndp_na_input - -ndp_na_input + +ndp_na_input - + icmp6_input->ndp_na_input - - + + - + ndp_rs_input - -ndp_rs_input + +ndp_rs_input - + icmp6_input->ndp_rs_input - - + + - + icmp6_output->ip6_output - - + + - - -ndp_ns_input->control_output - - + + +ndp_ns_input->ndp_probe + + - - -ndp_na_input->control_output - - + + +ndp_na_input->ndp_probe + + - - -ndp_rs_input->control_output - - + + +ndp_rs_input->ndp_router_solicit + + - + ip6_forward->ip6_output - - + + - - -ip6_hold->control_output - - + + +ip6_hold->nh6_unreachable + + - + ip6_input_local->l4_input_local - - + + - + ip6_input_local->icmp6_input - - + + - + sr6_local->ip_input - - + + - + sr6_local->ip6_input - - + + - + sr6_local->ip6_input_local - - - - - -l4_loopback_output->loopback_output - - + + diff --git a/modules/infra/api/affinity.c b/modules/infra/api/affinity.c index 31ee43606..9d83ebcbf 100644 --- a/modules/infra/api/affinity.c +++ b/modules/infra/api/affinity.c @@ -3,7 +3,6 @@ #include #include -#include #include #include #include @@ -38,10 +37,6 @@ static struct api_out affinity_set(const void *request, void ** /*response*/) { if (ret < 0) goto out; - ret = -control_output_set_affinity(CPU_SETSIZE, &req->control_cpus); - if (ret < 0) - goto out; - gr_config.control_cpus = req->control_cpus; } if (CPU_COUNT(&req->datapath_cpus) > 0) { diff --git a/modules/infra/api/stats.c b/modules/infra/api/stats.c index 0289c7845..225fb7333 100644 --- a/modules/infra/api/stats.c +++ b/modules/infra/api/stats.c @@ -50,7 +50,17 @@ static struct api_out stats_get(const void *request, void **response) { continue; for (unsigned i = 0; i < w_stats->n_stats; i++) { const struct node_stats *n = &w_stats->stats[i]; - const char *name = rte_node_id_to_name(n->node_id); + const struct rte_node_register *nr = gr_node_info_get(n->node_id) + ->node; + char name[RTE_NODE_NAMESIZE]; + snprintf( + name, + RTE_NODE_NAMESIZE, + "%s%c", + rte_node_id_to_name(n->node_id), + nr->flags & GR_NODE_FLAG_CONTROL_PLANE ? '*' : '\0' + ); + s = find_stat(stats, name); if (s != NULL) { s->objs += n->objs; @@ -66,6 +76,39 @@ static struct api_out stats_get(const void *request, void **response) { gr_vec_add(stats, stat); } } + + if (worker->stats_ctl) + for (unsigned i = 0; i < worker->stats_ctl->n_stats; i++) { + const struct node_stats *n = &w_stats->stats[i]; + const struct rte_node_register *nr = gr_node_info_get( + n->node_id + ) + ->node; + char name[RTE_NODE_NAMESIZE]; + snprintf( + name, + RTE_NODE_NAMESIZE, + "%s%c", + rte_node_id_to_name(n->node_id), + nr->flags & GR_NODE_FLAG_CONTROL_PLANE ? '*' : '\0' + ); + + s = find_stat(stats, name); + if (s != NULL) { + s->objs += n->objs; + s->calls += n->calls; + s->cycles += n->cycles; + } else { + struct stat stat = { + .objs = n->objs, + .calls = n->calls, + .cycles = n->cycles, + }; + memccpy(stat.name, name, 0, sizeof(stat.name)); + gr_vec_add(stats, stat); + } + } + s = find_stat(stats, "idle"); if (s != NULL) { s->calls += w_stats->n_sleeps; diff --git a/modules/infra/control/control_output.c b/modules/infra/control/control_output.c deleted file mode 100644 index 378665566..000000000 --- a/modules/infra/control/control_output.c +++ /dev/null @@ -1,117 +0,0 @@ -// SPDX-License-Identifier: BSD-3-Clause -// Copyright (c) 2024 Christophe Fontaine - -#include -#include -#include -#include -#include -#include - -#include -#include - -#include -#include -#include - -static struct rte_ring *ctrlout_ring; - -int control_output_enqueue(struct rte_mbuf *m) { - return rte_ring_enqueue(ctrlout_ring, m); -} - -static void control_output_poll(evutil_socket_t, short, void *) { - struct control_output_mbuf_data *data; - struct rte_mbuf *mbuf; - void *ring_item; - - while (rte_ring_dequeue(ctrlout_ring, &ring_item) == 0) { - mbuf = ring_item; - data = control_output_mbuf_data(mbuf); - if (data->callback != NULL) - control_output_mbuf_data(mbuf)->callback(mbuf); - else - rte_pktmbuf_free(mbuf); - } -} - -static atomic_bool thread_shutdown; -static pthread_t thread_id; -static pthread_cond_t cond; -static pthread_mutex_t mutex; -static struct event *ctrlout_ev; - -void control_output_done(void) { - pthread_cond_signal(&cond); -} - -int control_output_set_affinity(size_t set_size, const cpu_set_t *affinity) { - return pthread_setaffinity_np(thread_id, set_size, affinity); -} - -static void *cond_wait_to_event(void *) { - pthread_setname_np(pthread_self(), "grout:ctrl"); - - while (!atomic_load(&thread_shutdown)) { - pthread_mutex_lock(&mutex); - if (pthread_cond_wait(&cond, &mutex) == 0) - evuser_trigger(ctrlout_ev); - pthread_mutex_unlock(&mutex); - } - - return NULL; -} - -static pthread_attr_t attr; - -static void control_output_init(struct event_base *ev_base) { - atomic_init(&thread_shutdown, false); - - if (pthread_attr_init(&attr)) - ABORT("pthread_attr_init"); - - if (pthread_mutex_init(&mutex, NULL)) - ABORT("pthread_mutex_init failed"); - - if (pthread_cond_init(&cond, NULL)) - ABORT("pthread_cond_init failed"); - - ctrlout_ring = rte_ring_create( - "control_output", - RTE_GRAPH_BURST_SIZE * 4, - SOCKET_ID_ANY, - RING_F_MP_RTS_ENQ | RING_F_SC_DEQ - ); - if (ctrlout_ring == NULL) - ABORT("rte_ring_create(ctrl_output): %s", rte_strerror(rte_errno)); - - ctrlout_ev = event_new(ev_base, -1, EV_PERSIST | EV_FINALIZE, control_output_poll, NULL); - if (ctrlout_ev == NULL) - ABORT("event_new() failed"); - - if (pthread_create(&thread_id, &attr, cond_wait_to_event, NULL)) - ABORT("pthread_create() failed"); -} - -static void control_output_fini(struct event_base *) { - atomic_store(&thread_shutdown, true); - control_output_done(); - pthread_join(thread_id, NULL); - pthread_attr_destroy(&attr); - pthread_cond_destroy(&cond); - event_free(ctrlout_ev); - rte_ring_free(ctrlout_ring); - pthread_mutex_destroy(&mutex); -} - -static struct gr_module control_output_module = { - .name = "control_output", - .depends_on = "graph", - .init = control_output_init, - .fini = control_output_fini, -}; - -RTE_INIT(control_output_module_init) { - gr_register_module(&control_output_module); -} diff --git a/modules/infra/control/gr_graph.h b/modules/infra/control/gr_graph.h index be774ee15..b548f493a 100644 --- a/modules/infra/control/gr_graph.h +++ b/modules/infra/control/gr_graph.h @@ -10,6 +10,8 @@ #include +#define RTE_GRAPH_MODEL_SELECT RTE_GRAPH_MODEL_MCORE_DISPATCH + #ifdef __GROUT_UNIT_TEST__ #include @@ -39,6 +41,8 @@ int drop_format(char *buf, size_t buf_len, const void *data, size_t data_len); typedef void (*gr_node_register_cb_t)(void); struct gr_node_info { +// Flag used in node->flags to specify that a node is running on a control plane thread +#define GR_NODE_FLAG_CONTROL_PLANE (1 << 31) struct rte_node_register *node; gr_node_register_cb_t register_callback; gr_node_register_cb_t unregister_callback; diff --git a/modules/infra/control/gr_loopback.h b/modules/infra/control/gr_loopback.h index 18527c871..6eb2a8ed6 100644 --- a/modules/infra/control/gr_loopback.h +++ b/modules/infra/control/gr_loopback.h @@ -7,6 +7,5 @@ #include -void loopback_tx(struct rte_mbuf *m); control_input_t loopback_get_control_id(void); void loopback_input_add_type(rte_be16_t eth_type, const char *next_node); diff --git a/modules/infra/control/gr_worker.h b/modules/infra/control/gr_worker.h index 38109b108..af3b54b3b 100644 --- a/modules/infra/control/gr_worker.h +++ b/modules/infra/control/gr_worker.h @@ -62,6 +62,11 @@ struct worker { pthread_t thread; struct queue_map *rxqs; struct queue_map *txqs; + + struct rte_graph *base[2]; // Base graph, not walked + struct rte_graph *ctl_graph[2]; // graph used to process ctl packets + _Atomic(const struct worker_stats *) stats_ctl; + STAILQ_ENTRY(worker) next; } __rte_cache_aligned; diff --git a/modules/infra/control/graph.c b/modules/infra/control/graph.c index 58a2ca801..9f03dc98f 100644 --- a/modules/infra/control/graph.c +++ b/modules/infra/control/graph.c @@ -124,12 +124,27 @@ void worker_graph_free(struct worker *worker) { LOG(ERR, "rte_graph_destroy: %s", rte_strerror(-ret)); worker->graph[i] = NULL; } + if (worker->ctl_graph[i] != NULL) { + if ((ret = rte_graph_destroy(worker->ctl_graph[i]->id)) < 0) + LOG(ERR, "rte_graph_destroy: %s", rte_strerror(-ret)); + worker->ctl_graph[i] = NULL; + } + if (worker->base[i] != NULL) { + if ((ret = rte_graph_destroy(worker->base[i]->id)) < 0) + LOG(ERR, "rte_graph_destroy: %s", rte_strerror(-ret)); + worker->base[i] = NULL; + } } } static int worker_graph_new(struct worker *worker, uint8_t index) { + rte_graph_t dplane_graph = RTE_GRAPH_ID_INVALID; + rte_graph_t ctl_graph = RTE_GRAPH_ID_INVALID; + rte_graph_t graph = RTE_GRAPH_ID_INVALID; struct rx_node_queues *rx = NULL; struct tx_node_queues *tx = NULL; + char dplane_name[RTE_GRAPH_NAMESIZE]; + char ctl_name[RTE_GRAPH_NAMESIZE]; char name[RTE_GRAPH_NAMESIZE]; struct queue_map *qmap; uint16_t graph_uid; @@ -149,7 +164,9 @@ static int worker_graph_new(struct worker *worker, uint8_t index) { // unique suffix for this graph graph_uid = (worker->cpu_id << 1) | (0x1 & index); - snprintf(name, sizeof(name), "gr-%04x", graph_uid); + snprintf(name, sizeof(name), "gr-%01d-%04x", index, graph_uid); + snprintf(ctl_name, sizeof(ctl_name), "gr-%01d-%04x-ctl", index, graph_uid); + snprintf(dplane_name, sizeof(dplane_name), "gr-%01d-%04x-dplane", index, graph_uid); // build rx & tx nodes data len = sizeof(*rx) + n_rxqs * sizeof(struct rx_port_queue); @@ -172,7 +189,7 @@ static int worker_graph_new(struct worker *worker, uint8_t index) { n_rxqs++; } rx->n_queues = n_rxqs; - if (gr_node_data_set(name, "port_rx", rx) < 0) { + if (gr_node_data_set(dplane_name, "port_rx", rx) < 0) { if (rte_errno == 0) rte_errno = EINVAL; ret = -rte_errno; @@ -197,7 +214,7 @@ static int worker_graph_new(struct worker *worker, uint8_t index) { qmap->queue_id); tx->txq_ids[qmap->port_id] = qmap->queue_id; } - if (gr_node_data_set(name, "port_tx", tx) < 0) { + if (gr_node_data_set(dplane_name, "port_tx", tx) < 0) { if (rte_errno == 0) rte_errno = EINVAL; ret = -rte_errno; @@ -211,19 +228,78 @@ static int worker_graph_new(struct worker *worker, uint8_t index) { .nb_node_patterns = gr_vec_len(node_names), .node_patterns = (const char **)node_names, }; - if (rte_graph_create(name, ¶ms) == RTE_GRAPH_ID_INVALID) { + if ((graph = rte_graph_create(name, ¶ms)) == RTE_GRAPH_ID_INVALID) { + if (rte_errno == 0) + rte_errno = EINVAL; + ret = -rte_errno; + goto err; + } + + if ((ret = rte_graph_worker_model_set(RTE_GRAPH_MODEL_MCORE_DISPATCH)) != 0) + ABORT("Set graph mcore dispatch model failed %s", rte_strerror(-ret)); + + struct rte_node *node_tmp; + rte_graph_off_t off; + rte_node_t count; + rte_graph_foreach_node (count, off, rte_graph_lookup(name), node_tmp) { + int lid = node_tmp->dispatch.lcore_id == rte_lcore_id() ? + rte_lcore_id() : + worker->lcore_id; + + ret = rte_graph_model_mcore_dispatch_node_lcore_affinity_set(node_tmp->name, lid); + if (ret) + ABORT("rte_graph_model_mcore_dispatch_node_lcore_affinity_set: %s lcore %u " + "(%s)", + node_tmp->name, + lid, + rte_strerror(rte_errno)); + } + + if ((dplane_graph = rte_graph_clone(graph, "dplane", ¶ms)) == RTE_GRAPH_ID_INVALID) { if (rte_errno == 0) rte_errno = EINVAL; ret = -rte_errno; goto err; } - worker->graph[index] = rte_graph_lookup(name); + params.socket_id = rte_lcore_to_socket_id(rte_lcore_id()); + + if ((ctl_graph = rte_graph_clone(graph, "ctl", ¶ms)) == RTE_GRAPH_ID_INVALID) { + if (rte_errno == 0) + rte_errno = EINVAL; + ret = -rte_errno; + goto err; + } + + if (rte_graph_model_mcore_dispatch_core_bind(dplane_graph, worker->lcore_id) != 0) { + LOG(ERR, + "bind graph %s to lcore %u failed: %s", + rte_graph_id_to_name(dplane_graph), + worker->lcore_id, + rte_strerror(rte_errno)); + goto err; + } + + if (rte_graph_model_mcore_dispatch_core_bind(ctl_graph, rte_lcore_id()) != 0) { + LOG(ERR, + "bind graph %s to lcore %u failed: %s", + rte_graph_id_to_name(ctl_graph), + rte_lcore_id(), + rte_strerror(rte_errno)); + goto err; + } + + worker->base[index] = rte_graph_lookup(rte_graph_id_to_name(graph)); + worker->graph[index] = rte_graph_lookup(rte_graph_id_to_name(dplane_graph)); + worker->ctl_graph[index] = rte_graph_lookup(rte_graph_id_to_name(ctl_graph)); return 0; err: + rte_graph_destroy(graph); + rte_graph_destroy(ctl_graph); + rte_graph_destroy(dplane_graph); free(rx); free(tx); - node_data_reset(name); + node_data_reset(dplane_name); return errno_set(-ret); } @@ -253,6 +329,19 @@ int worker_graph_reload(struct worker *worker) { worker->graph[next] = NULL; } + if (worker->ctl_graph[next] != NULL) { + if ((ret = rte_graph_destroy(worker->ctl_graph[next]->id)) < 0) + errno_log(-ret, "rte_graph_destroy"); + worker->ctl_graph[next] = NULL; + } + + if (worker->base[next] != NULL) { + int id = worker->base[next]->id; + if ((ret = rte_graph_destroy(id)) < 0) + errno_log(-ret, "rte_graph_destroy"); + worker->base[next] = NULL; + } + return 0; } @@ -291,6 +380,14 @@ static void graph_init(struct event_base *) { reg->id = __rte_node_register(reg); if (reg->id == RTE_NODE_ID_INVALID) ABORT("__rte_node_register(%s): %s", reg->name, rte_strerror(rte_errno)); + if (reg->flags & GR_NODE_FLAG_CONTROL_PLANE) + if (rte_graph_model_mcore_dispatch_node_lcore_affinity_set( + reg->name, rte_lcore_id() + )) + ABORT("control plane node %s on core %d: %s ", + reg->name, + rte_lcore_id(), + rte_strerror(rte_errno)); gr_vec_add(node_names, reg->name); } diff --git a/modules/infra/control/loop_output.c b/modules/infra/control/loop_output.c new file mode 100644 index 000000000..34c778558 --- /dev/null +++ b/modules/infra/control/loop_output.c @@ -0,0 +1,96 @@ +// SPDX-License-Identifier: BSD-3-Clause +// Copyright (c) 2024 Christophe Fontaine + +#include "loopback.h" + +#include +#include +#include +#include +#include +#include +#include + +#include + +#include +#include + +static uint16_t +loopback_output_process(struct rte_graph *, struct rte_node *, void **objs, uint16_t nb_objs) { + struct iface_info_loopback *lo; + struct iovec iov[2]; + struct mbuf_data *d; + struct rte_mbuf *m; + struct tun_pi pi; + char *data; + + for (uint16_t i = 0; i < nb_objs; i++) { + m = objs[i]; + d = mbuf_data(m); + + lo = (struct iface_info_loopback *)d->iface->info; + if (rte_pktmbuf_linearize(m) == 0) { + data = rte_pktmbuf_mtod(m, char *); + } else { + data = rte_malloc(NULL, rte_pktmbuf_pkt_len(m), 0); + if (data == NULL) { + LOG(ERR, "rte_malloc failed %s", rte_strerror(rte_errno)); + goto next; + } + // with a non-contiguous mbuf, rte_pktmbuf_read returns a pointer + // to the user provided buffer. + rte_pktmbuf_read(m, 0, rte_pktmbuf_pkt_len(m), data); + } + pi.flags = 0; + if ((data[0] & 0xf0) == 0x40) + pi.proto = RTE_BE16(RTE_ETHER_TYPE_IPV4); + else if ((data[0] & 0xf0) == 0x60) + pi.proto = RTE_BE16(RTE_ETHER_TYPE_IPV6); + else { + LOG(ERR, "Bad proto: 0x%x - drop packet", data[0]); + goto next; + } + // Do not retry even in case of if EAGAIN || EWOULDBLOCK + // If the tun device queue is full, something really bad is + // already happening on the management plane side. + iov[0].iov_base = π + iov[0].iov_len = sizeof(pi); + iov[1].iov_base = data; + iov[1].iov_len = rte_pktmbuf_pkt_len(m); + + if (writev(lo->fd, iov, ARRAY_DIM(iov)) < 0) { + // The user messed up and removed gr-loopX + // release resources on our side to try to recover + if (errno == EBADFD) { + iface_destroy(d->iface->id); + } + LOG(ERR, "write to tun device failed %s", strerror(errno)); + } + +next: + if (!rte_pktmbuf_is_contiguous(m)) + rte_free(data); + rte_pktmbuf_free(m); + } + return nb_objs; +} + +static struct rte_node_register loopback_output_node = { + .name = "loopback_output", + .process = loopback_output_process, + .nb_edges = 0, + .next_nodes = {}, +}; + +static void loopback_output_register(void) { + ip_output_register_interface_type(GR_IFACE_TYPE_LOOPBACK, "loopback_output"); + ip6_output_register_interface_type(GR_IFACE_TYPE_LOOPBACK, "loopback_output"); +} + +static struct gr_node_info info = { + .node = &loopback_output_node, + .register_callback = loopback_output_register, +}; + +GR_NODE_REGISTER(info); diff --git a/modules/infra/control/loopback.c b/modules/infra/control/loopback.c index cd3cb8d27..710e6b962 100644 --- a/modules/infra/control/loopback.c +++ b/modules/infra/control/loopback.c @@ -1,8 +1,9 @@ // SPDX-License-Identifier: BSD-3-Clause // Copyright (c) 2024 Christophe Fontaine +#include "loopback.h" + #include -#include #include #include #include @@ -29,69 +30,12 @@ static struct rte_mempool *loopback_pool; static struct event_base *ev_base; -struct iface_info_loopback { - int fd; - struct event *ev; -}; - static void finalize_fd(struct event *ev, void * /*priv*/) { int fd = event_get_fd(ev); if (fd >= 0) close(fd); } -void loopback_tx(struct rte_mbuf *m) { - struct mbuf_data *d = mbuf_data(m); - struct iface_info_loopback *lo; - struct iovec iov[2]; - struct tun_pi pi; - char *data; - - lo = (struct iface_info_loopback *)d->iface->info; - if (rte_pktmbuf_linearize(m) == 0) { - data = rte_pktmbuf_mtod(m, char *); - } else { - data = rte_malloc(NULL, rte_pktmbuf_pkt_len(m), 0); - if (data == NULL) { - LOG(ERR, "rte_malloc failed %s", rte_strerror(rte_errno)); - goto end; - } - // with a non-contiguous mbuf, rte_pktmbuf_read returns a pointer - // to the user provided buffer. - rte_pktmbuf_read(m, 0, rte_pktmbuf_pkt_len(m), data); - } - pi.flags = 0; - if ((data[0] & 0xf0) == 0x40) - pi.proto = RTE_BE16(RTE_ETHER_TYPE_IPV4); - else if ((data[0] & 0xf0) == 0x60) - pi.proto = RTE_BE16(RTE_ETHER_TYPE_IPV6); - else { - LOG(ERR, "Bad proto: 0x%x - drop packet", data[0]); - goto end; - } - // Do not retry even in case of if EAGAIN || EWOULDBLOCK - // If the tun device queue is full, something really bad is - // already happening on the management plane side. - iov[0].iov_base = π - iov[0].iov_len = sizeof(pi); - iov[1].iov_base = data; - iov[1].iov_len = rte_pktmbuf_pkt_len(m); - - if (writev(lo->fd, iov, ARRAY_DIM(iov)) < 0) { - // The user messed up and removed gr-loopX - // release resources on our side to try to recover - if (errno == EBADFD) { - iface_destroy(d->iface->id); - } - LOG(ERR, "write to tun device failed %s", strerror(errno)); - } - -end: - if (!rte_pktmbuf_is_contiguous(m)) - rte_free(data); - rte_pktmbuf_free(m); -} - static void iface_loopback_poll(evutil_socket_t, short reason, void *ev_iface) { struct eth_input_mbuf_data *e; struct iface_info_loopback *lo; diff --git a/modules/infra/control/loopback.h b/modules/infra/control/loopback.h new file mode 100644 index 000000000..5c4b90187 --- /dev/null +++ b/modules/infra/control/loopback.h @@ -0,0 +1,9 @@ +// SPDX-License-Identifier: BSD-3-Clause +// Copyright (c) 2025 Christophe Fontaine + +#pragma once + +struct iface_info_loopback { + int fd; + struct event *ev; +}; diff --git a/modules/infra/control/main_loop.c b/modules/infra/control/main_loop.c new file mode 100644 index 000000000..c51745ab9 --- /dev/null +++ b/modules/infra/control/main_loop.c @@ -0,0 +1,48 @@ +// SPDX-License-Identifier: BSD-3-Clause +// Copyright (c) 2025 Christophe Fontaine + +#include +#include +#include +#include +#include +#include + +#include + +static struct event *ctrlloop_ev; + +static void control_loop(evutil_socket_t, short, void *) { + struct rte_graph *graph; + struct worker *worker; + + STAILQ_FOREACH (worker, &workers, next) { + graph = worker->ctl_graph[atomic_load(&worker->next_config)]; + if (atomic_load(&worker->shutdown) || graph == NULL) + continue; + rte_graph_walk(graph); + } +} + +static void control_loop_init(struct event_base *ev_base) { + struct timeval time; + time.tv_sec = 0; + time.tv_usec = 10000; + + ctrlloop_ev = event_new(ev_base, -1, EV_PERSIST | EV_FINALIZE, control_loop, NULL); + evtimer_add(ctrlloop_ev, &time); +} + +static void control_loop_fini(struct event_base *) { + event_free(ctrlloop_ev); +} + +static struct gr_module control_module = { + .name = "control_loop", + .init = control_loop_init, + .fini = control_loop_fini, +}; + +RTE_INIT(control_module_init) { + gr_register_module(&control_module); +} diff --git a/modules/infra/control/meson.build b/modules/infra/control/meson.build index 2ccca3e3a..394fdda27 100644 --- a/modules/infra/control/meson.build +++ b/modules/infra/control/meson.build @@ -2,9 +2,10 @@ # Copyright (c) 2023 Robin Jarry src += files( - 'control_output.c', 'iface.c', + 'loop_output.c', 'loopback.c', + 'main_loop.c', 'mempool.c', 'nexthop.c', 'port.c', diff --git a/modules/infra/control/worker.c b/modules/infra/control/worker.c index 378aac05a..ba7e250e0 100644 --- a/modules/infra/control/worker.c +++ b/modules/infra/control/worker.c @@ -421,7 +421,7 @@ static void worker_fini(struct event_base *) { static struct gr_module worker_module = { .name = "worker", - .depends_on = "control_output", + .depends_on = "graph", .init = worker_init, .fini = worker_fini, }; diff --git a/modules/infra/datapath/control_input.c b/modules/infra/datapath/control_input.c index b542b2fc1..8f85c8be6 100644 --- a/modules/infra/datapath/control_input.c +++ b/modules/infra/datapath/control_input.c @@ -127,7 +127,7 @@ static void control_input_unregister(void) { } static struct rte_node_register control_input_node = { - .flags = RTE_NODE_SOURCE_F, + .flags = RTE_NODE_SOURCE_F | GR_NODE_FLAG_CONTROL_PLANE, .name = "control_input", .process = control_input_process, .nb_edges = EDGE_COUNT, diff --git a/modules/infra/datapath/control_output.c b/modules/infra/datapath/control_output.c deleted file mode 100644 index 789cfdd4c..000000000 --- a/modules/infra/datapath/control_output.c +++ /dev/null @@ -1,55 +0,0 @@ -// SPDX-License-Identifier: BSD-3-Clause -// Copyright (c) 2024 Christophe Fontaine - -#include -#include -#include -#include -#include - -#include - -#define ERROR 0 - -static uint16_t control_output_process( - struct rte_graph *graph, - struct rte_node *node, - void **objs, - uint16_t n_objs -) { - unsigned sent = 0; - - for (unsigned i = 0; i < n_objs; i++) { - if (control_output_enqueue(objs[i]) < 0) - rte_node_enqueue_x1(graph, node, ERROR, objs[i]); - else { - sent++; - if (gr_mbuf_is_traced(objs[i])) { - // FIXME racy: we are operating on mbufs already enqueued in ring - gr_mbuf_trace_add(objs[i], node, 0); - gr_mbuf_trace_finish(objs[i]); - } - } - } - if (sent > 0) - control_output_done(); - - return n_objs; -} - -static struct rte_node_register control_output_node = { - .name = "control_output", - .process = control_output_process, - .nb_edges = 1, - .next_nodes = { - [ERROR] = "control_output_error", - }, -}; - -static struct gr_node_info info = { - .node = &control_output_node, -}; - -GR_NODE_REGISTER(info); - -GR_DROP_REGISTER(control_output_error); diff --git a/modules/infra/datapath/drop.c b/modules/infra/datapath/drop.c index e3d6117e1..dd2349b4e 100644 --- a/modules/infra/datapath/drop.c +++ b/modules/infra/datapath/drop.c @@ -31,3 +31,4 @@ int drop_format(char *buf, size_t len, const void * /*data*/, size_t /*data_len* // Global drop counters, used by multiple nodes GR_DROP_REGISTER(error_no_headroom); +GR_DROP_REGISTER(ctlplane_sink); diff --git a/modules/infra/datapath/gr_control_output.h b/modules/infra/datapath/gr_control_output.h deleted file mode 100644 index 1fd053f32..000000000 --- a/modules/infra/datapath/gr_control_output.h +++ /dev/null @@ -1,33 +0,0 @@ -// SPDX-License-Identifier: BSD-3-Clause -// Copyright (c) 2024 Christophe Fontaine - -#pragma once - -#include - -#include -#include - -// Callback definition when a packet is sent from the data plane to the control plane. -// It is up to the function to free the received mbuf. -// -// @param struct rte_mbuf * -// Packet with the data offset set to the OSI layer of the originating node. -typedef void (*control_output_cb_t)(struct rte_mbuf *); - -GR_MBUF_PRIV_DATA_TYPE(control_output_mbuf_data, { - control_output_cb_t callback; - clock_t timestamp; - uint8_t cb_data[GR_MBUF_PRIV_MAX_SIZE - 6 * sizeof(size_t)]; -}); - -// Enqueue a packet from data plane to a control plane ring. -// -// NB: control_output_done() must be called explicitly to wake up the control plane event loop. -int control_output_enqueue(struct rte_mbuf *m); - -// Wake up the control plane event loop so that it processes the pending packets. -void control_output_done(void); - -// Change the thread affinity of the control output thread. -int control_output_set_affinity(size_t set_size, const cpu_set_t *affinity); diff --git a/modules/infra/datapath/gr_mbuf.h b/modules/infra/datapath/gr_mbuf.h index 2770b09df..72828c893 100644 --- a/modules/infra/datapath/gr_mbuf.h +++ b/modules/infra/datapath/gr_mbuf.h @@ -40,6 +40,21 @@ STAILQ_HEAD(gr_trace_head, gr_trace_item); return &priv->data; \ } +#define GR_MBUF_PRIV_DATA_EXTENDS(type_name, base, fields) \ + struct type_name { \ + struct base; \ + struct fields; \ + }; \ + struct __##type_name { \ + struct gr_trace_head traces; \ + struct type_name data; \ + }; \ + static inline struct type_name *type_name(struct rte_mbuf *m) { \ + static_assert(sizeof(struct __##type_name) <= GR_MBUF_PRIV_MAX_SIZE); \ + struct __##type_name *priv = rte_mbuf_to_priv(m); \ + return &priv->data; \ + } + GR_MBUF_PRIV_DATA_TYPE(mbuf_data, {}); GR_MBUF_PRIV_DATA_TYPE(queue_mbuf_data, { struct rte_mbuf *next; }); diff --git a/modules/infra/datapath/loop_output.c b/modules/infra/datapath/loop_output.c deleted file mode 100644 index 7244700ad..000000000 --- a/modules/infra/datapath/loop_output.c +++ /dev/null @@ -1,54 +0,0 @@ -// SPDX-License-Identifier: BSD-3-Clause -// Copyright (c) 2024 Christophe Fontaine - -#include -#include -#include -#include -#include -#include -#include - -enum { - CONTROL_OUTPUT, - EDGE_COUNT, -}; - -static uint16_t loopback_output_process( - struct rte_graph *graph, - struct rte_node *node, - void **objs, - uint16_t nb_objs -) { - struct control_output_mbuf_data *co; - struct rte_mbuf *mbuf; - - for (uint16_t i = 0; i < nb_objs; i++) { - mbuf = objs[i]; - co = control_output_mbuf_data(mbuf); - co->callback = loopback_tx; - rte_node_enqueue_x1(graph, node, CONTROL_OUTPUT, mbuf); - } - return nb_objs; -} - -static struct rte_node_register loopback_output_node = { - .name = "loopback_output", - .process = loopback_output_process, - .nb_edges = EDGE_COUNT, - .next_nodes = { - [CONTROL_OUTPUT] = "control_output", - }, -}; - -static void loopback_output_register(void) { - ip_output_register_interface_type(GR_IFACE_TYPE_LOOPBACK, "loopback_output"); - ip6_output_register_interface_type(GR_IFACE_TYPE_LOOPBACK, "loopback_output"); -} - -static struct gr_node_info info = { - .node = &loopback_output_node, - .register_callback = loopback_output_register, -}; - -GR_NODE_REGISTER(info); diff --git a/modules/infra/datapath/main_loop.c b/modules/infra/datapath/main_loop.c index 2a8a2c0be..fff3ea511 100644 --- a/modules/infra/datapath/main_loop.c +++ b/modules/infra/datapath/main_loop.c @@ -61,9 +61,13 @@ static inline void stats_reset(struct worker_stats *stats) { stats->n_sleeps = 0; } -static int stats_reload(const struct rte_graph *graph, struct stats_context *ctx) { +static int stats_reload( + const struct rte_graph *graph, + const struct rte_graph *ctl_graph, + struct stats_context *ctx +) { struct rte_graph_cluster_stats_param stats_param; - const char *graph_names[1]; + const char *graph_names[2]; assert(graph != NULL); @@ -73,9 +77,10 @@ static int stats_reload(const struct rte_graph *graph, struct stats_context *ctx } graph_names[0] = graph->name; + graph_names[1] = ctl_graph->name; memset(&stats_param, 0, sizeof(stats_param)); stats_param.socket_id = graph->socket; - stats_param.nb_graph_patterns = 1; + stats_param.nb_graph_patterns = 2; stats_param.graph_patterns = graph_names; stats_param.cookie = ctx; stats_param.fn = node_stats_callback; @@ -117,9 +122,9 @@ static struct rte_rcu_qsbr *rcu; void *gr_datapath_loop(void *priv) { struct stats_context ctx = {.last_count = 0}; uint64_t timestamp, timestamp_tmp, cycles; + struct rte_graph *graph, *ctl_graph; uint32_t sleep, max_sleep_us; struct worker *w = priv; - struct rte_graph *graph; unsigned cur, loop; char name[16]; @@ -162,6 +167,7 @@ void *gr_datapath_loop(void *priv) { cur = atomic_load(&w->next_config); graph = w->graph[cur]; + ctl_graph = w->ctl_graph[cur]; atomic_store(&w->cur_config, cur); if (graph == NULL) { @@ -171,7 +177,7 @@ void *gr_datapath_loop(void *priv) { goto reconfig; } - if (stats_reload(graph, &ctx) < 0) + if (stats_reload(graph, ctl_graph, &ctx) < 0) goto shutdown; atomic_store(&w->stats, ctx.w_stats); diff --git a/modules/infra/datapath/meson.build b/modules/infra/datapath/meson.build index 3439f759c..949db36c2 100644 --- a/modules/infra/datapath/meson.build +++ b/modules/infra/datapath/meson.build @@ -3,13 +3,11 @@ src += files( 'control_input.c', - 'control_output.c', 'drop.c', 'eth_input.c', 'eth_output.c', 'l1_xconnect.c', 'loop_input.c', - 'loop_output.c', 'main_loop.c', 'rx.c', 'trace.c', diff --git a/modules/infra/datapath/rx.c b/modules/infra/datapath/rx.c index 56359d968..3a21b3b93 100644 --- a/modules/infra/datapath/rx.c +++ b/modules/infra/datapath/rx.c @@ -97,6 +97,10 @@ static int rx_init(const struct rte_graph *graph, struct rte_node *node) { const struct rx_node_queues *data; struct rx_ctx *ctx; + // run init only for dataplane graph + if (strstr(graph->name, "-dplane") == NULL) + return 0; + if ((data = gr_node_data_get(graph->name, node->name)) == NULL) return -1; diff --git a/modules/infra/datapath/tx.c b/modules/infra/datapath/tx.c index 78cb4b6aa..4f02a7bfd 100644 --- a/modules/infra/datapath/tx.c +++ b/modules/infra/datapath/tx.c @@ -93,6 +93,10 @@ static int tx_init(const struct rte_graph *graph, struct rte_node *node) { const struct tx_node_queues *data; struct tx_ctx *ctx; + // run init only for dataplane graph + if (strstr(graph->name, "-dplane") == NULL) + return 0; + if ((data = gr_node_data_get(graph->name, node->name)) == NULL) return -1; diff --git a/modules/ip/control/gr_ip4_control.h b/modules/ip/control/gr_ip4_control.h index e1cca03ee..66f8a09d5 100644 --- a/modules/ip/control/gr_ip4_control.h +++ b/modules/ip/control/gr_ip4_control.h @@ -20,9 +20,6 @@ static inline struct nexthop *nh4_lookup(uint16_t vrf_id, ip4_addr_t ip) { return nexthop_lookup(GR_AF_IP4, vrf_id, GR_IFACE_ID_UNDEF, &ip); } -void nh4_unreachable_cb(struct rte_mbuf *m); -void arp_probe_input_cb(struct rte_mbuf *m); - int snat44_static_rule_add(struct iface *, ip4_addr_t match, ip4_addr_t replace); int snat44_static_rule_del(struct iface *, ip4_addr_t match); diff --git a/modules/ip/control/icmp.c b/modules/ip/control/icmp.c index fce076d35..815937992 100644 --- a/modules/ip/control/icmp.c +++ b/modules/ip/control/icmp.c @@ -3,13 +3,16 @@ #include #include -#include +#include +#include +#include #include #include #include #include #include #include +#include #include #include @@ -32,20 +35,6 @@ static void icmp_queue_pop(struct icmp_queue_item *i, bool free_mbuf) { rte_mempool_put(pool, i); } -// Callback invoked by control plane for each ICMP packet received for a local address. -// The packet is added at the end of a linked list. -static void icmp_input_cb(struct rte_mbuf *m) { - struct icmp_queue_item *i; - void *data; - - while (rte_mempool_get(pool, &data) < 0) - icmp_queue_pop(STAILQ_FIRST(&icmp_queue), true); - - i = data; - i->mbuf = m; - STAILQ_INSERT_TAIL(&icmp_queue, i, next); -} - // Search for the oldest ICMP response matching the given identifier. // If found, the packet is removed from the queue. static struct rte_mbuf *get_icmp_response(uint16_t ident, uint16_t seq_num) { @@ -106,6 +95,7 @@ static struct api_out icmp_send(const void *request, void ** /*response*/) { static struct api_out icmp_recv(const void *request, void **response) { const struct gr_ip4_icmp_recv_req *icmp_req = request; struct gr_ip4_icmp_recv_resp *resp = NULL; + struct icmp_mbuf_data *icmp_data; struct rte_icmp_hdr *icmp; struct rte_ipv4_hdr *ip; clock_t *timestamp; @@ -122,12 +112,10 @@ static struct api_out icmp_recv(const void *request, void **response) { goto out; } - // Ugly, there is no guarantee that the outer packet is actually IPv4 - ip = rte_pktmbuf_mtod_offset(m, struct rte_ipv4_hdr *, -sizeof(*ip)); - + icmp_data = icmp_mbuf_data(m); icmp = rte_pktmbuf_mtod(m, struct rte_icmp_hdr *); - resp->src_addr = ip->src_addr; - resp->ttl = ip->time_to_live; + resp->src_addr = icmp_data->src; + resp->ttl = icmp_data->ttl; resp->type = icmp->icmp_type; resp->code = icmp->icmp_code; @@ -144,7 +132,7 @@ static struct api_out icmp_recv(const void *request, void **response) { resp->ident = rte_be_to_cpu_16(icmp->icmp_ident); resp->seq_num = rte_be_to_cpu_16(icmp->icmp_seq_nb); timestamp = PAYLOAD(icmp); - resp->response_time = control_output_mbuf_data(m)->timestamp - *timestamp; + resp->response_time = icmp_data->timestamp - *timestamp; *response = resp; len = sizeof(*resp); @@ -201,11 +189,45 @@ static struct gr_module icmp_module = { .fini = icmp_fini, }; +static uint16_t +icmp_input_ctl_process(struct rte_graph *, struct rte_node *, void **objs, uint16_t nb_objs) { + struct icmp_queue_item *item; + void *data; + + for (uint16_t i = 0; i < nb_objs; i++) { + while (rte_mempool_get(pool, &data) < 0) + icmp_queue_pop(STAILQ_FIRST(&icmp_queue), true); + item = data; + item->mbuf = objs[i]; + STAILQ_INSERT_TAIL(&icmp_queue, item, next); + } + return nb_objs; +} + +static struct rte_node_register icmp_input_ctl_node = { + .flags = GR_NODE_FLAG_CONTROL_PLANE, + .name = "icmp_input_ctl", + .process = icmp_input_ctl_process, + .nb_edges = 0, + .next_nodes = {}, +}; + +static void icmp_input_register(void) { + icmp_input_register_type(RTE_ICMP_TYPE_DEST_UNREACHABLE, "icmp_input_ctl"); + icmp_input_register_type(RTE_ICMP_TYPE_TTL_EXCEEDED, "icmp_input_ctl"); + icmp_input_register_type(RTE_ICMP_TYPE_ECHO_REPLY, "icmp_input_ctl"); +} + +static struct gr_node_info icmp_input_info = { + .node = &icmp_input_ctl_node, + .register_callback = icmp_input_register, + .trace_format = (gr_trace_format_cb_t)trace_icmp_format, +}; + +GR_NODE_REGISTER(icmp_input_info); + RTE_INIT(icmp_module_init) { gr_register_module(&icmp_module); gr_register_api_handler(&ip4_icmp_send_handler); gr_register_api_handler(&ip4_icmp_recv_handler); - icmp_input_register_callback(RTE_ICMP_TYPE_DEST_UNREACHABLE, icmp_input_cb); - icmp_input_register_callback(RTE_ICMP_TYPE_TTL_EXCEEDED, icmp_input_cb); - icmp_input_register_callback(RTE_ICMP_TYPE_ECHO_REPLY, icmp_input_cb); } diff --git a/modules/ip/control/nexthop.c b/modules/ip/control/nexthop.c index 3a92394fe..dd9526b7f 100644 --- a/modules/ip/control/nexthop.c +++ b/modules/ip/control/nexthop.c @@ -3,7 +3,6 @@ #include #include -#include #include #include #include @@ -25,167 +24,232 @@ #include #include -static control_input_t ip_output_node; +enum { + NH4_IP_OUTPUT = 0, + SINK, + NH4_EDGE_COUNT, +}; -void nh4_unreachable_cb(struct rte_mbuf *m) { - struct rte_ipv4_hdr *ip = rte_pktmbuf_mtod(m, struct rte_ipv4_hdr *); - ip4_addr_t dst = ip->dst_addr; - struct nexthop *nh; +static uint16_t nh4_unreachable_process( + struct rte_graph *graph, + struct rte_node *node, + void **objs, + uint16_t nb_objs +) { + struct rte_mbuf *m; + rte_edge_t edge; - nh = rib4_lookup(control_output_mbuf_data(m)->iface->vrf_id, dst); - if (nh == NULL) - goto free; // route to dst has disappeared + for (uint16_t i = 0; i < nb_objs; i++) { + m = objs[i]; + struct rte_ipv4_hdr *ip = rte_pktmbuf_mtod(m, struct rte_ipv4_hdr *); + ip4_addr_t dst = ip->dst_addr; + struct nexthop *nh; + edge = SINK; - if (nh->flags & GR_NH_F_LINK && dst != nh->ipv4) { - // The resolved nexthop is associated with a "connected" route. - // We currently do not have an explicit route entry for this - // destination IP. - struct nexthop *remote = nh4_lookup(nh->vrf_id, dst); + if (gr_mbuf_is_traced(m)) + gr_mbuf_trace_add(m, node, 0); - if (remote == NULL) { - // No existing nexthop for this IP, create one. - remote = nexthop_new(&(struct gr_nexthop) { - .type = GR_NH_T_L3, - .af = GR_AF_IP4, - .vrf_id = nh->vrf_id, - .iface_id = nh->iface_id, - .ipv4 = dst, - .origin = GR_NH_ORIGIN_INTERNAL, - }); - } + nh = rib4_lookup(mbuf_data(m)->iface->vrf_id, dst); + if (nh == NULL) + goto next; // route to dst has disappeared - if (remote == NULL) { - LOG(ERR, "cannot allocate nexthop: %s", strerror(errno)); - goto free; - } - if (remote->iface_id != nh->iface_id) - ABORT(IP4_F " nexthop lookup gives wrong interface", &ip); - - // Create an associated /32 route so that next packets take it - // in priority with a single route lookup. - if (rib4_insert(nh->vrf_id, dst, 32, GR_NH_ORIGIN_INTERNAL, remote) < 0) { - LOG(ERR, "failed to insert route: %s", strerror(errno)); - goto free; + if (nh->flags & GR_NH_F_LINK && dst != nh->ipv4) { + // The resolved nexthop is associated with a "connected" route. + // We currently do not have an explicit route entry for this + // destination IP. + struct nexthop *remote = nh4_lookup(nh->vrf_id, dst); + + if (remote == NULL) { + // No existing nexthop for this IP, create one. + remote = nexthop_new(&(struct gr_nexthop) { + .type = GR_NH_T_L3, + .af = GR_AF_IP4, + .vrf_id = nh->vrf_id, + .iface_id = nh->iface_id, + .ipv4 = dst, + .origin = GR_NH_ORIGIN_INTERNAL, + }); + } + + if (remote == NULL) { + LOG(ERR, "cannot allocate nexthop: %s", strerror(errno)); + goto next; + } + if (remote->iface_id != nh->iface_id) + ABORT(IP4_F " nexthop lookup gives wrong interface", &ip); + + // Create an associated /32 route so that next packets take it + // in priority with a single route lookup. + if (rib4_insert(nh->vrf_id, dst, 32, GR_NH_ORIGIN_INTERNAL, remote) < 0) { + LOG(ERR, "failed to insert route: %s", strerror(errno)); + goto next; + } + nh = remote; } - nh = remote; - } - if (nh->state == GR_NH_S_REACHABLE) { - // The nexthop may have become reachable while the packet was - // passed from the datapath to here. Re-send it to datapath. - struct ip_output_mbuf_data *d = ip_output_mbuf_data(m); - d->nh = nh; - if (post_to_stack(ip_output_node, m) < 0) { - LOG(ERR, "post_to_stack: %s", strerror(errno)); - goto free; + if (nh->state & GR_NH_S_REACHABLE) { + // The nexthop may have become reachable while the packet was + // passed from the datapath to here. Re-send it to datapath. + struct ip_output_mbuf_data *d = ip_output_mbuf_data(m); + d->nh = nh; + edge = NH4_IP_OUTPUT; + goto next; } - return; - } - if (nh->held_pkts < nh_conf.max_held_pkts) { - queue_mbuf_data(m)->next = NULL; - if (nh->held_pkts_head == NULL) - nh->held_pkts_head = m; - else - queue_mbuf_data(nh->held_pkts_tail)->next = m; - nh->held_pkts_tail = m; - nh->held_pkts++; - if (nh->state != GR_NH_S_PENDING) { - arp_output_request_solicit(nh); - nh->state = GR_NH_S_PENDING; + if (nh->held_pkts < nh_conf.max_held_pkts) { + queue_mbuf_data(m)->next = NULL; + if (nh->held_pkts_head == NULL) + nh->held_pkts_head = m; + else + queue_mbuf_data(nh->held_pkts_tail)->next = m; + nh->held_pkts_tail = m; + nh->held_pkts++; + if (nh->state != GR_NH_S_PENDING) { + arp_output_request_solicit(nh); + nh->state = GR_NH_S_PENDING; + } + continue; + } else { + LOG(DEBUG, IP4_F " hold queue full", &dst); } - return; - } else { - LOG(DEBUG, IP4_F " hold queue full", &dst); +next: + rte_node_enqueue_x1(graph, node, edge, m); } -free: - rte_pktmbuf_free(m); + + return nb_objs; } -static control_input_t arp_output_reply_node; +static struct rte_node_register nh4_unreachable_node = { + .flags = GR_NODE_FLAG_CONTROL_PLANE, + .name = "nh4_unreachable", + .process = nh4_unreachable_process, + .nb_edges = NH4_EDGE_COUNT, + .next_nodes = { + [NH4_IP_OUTPUT] = "ip_output", + [SINK] = "ctlplane_sink", + }, +}; + +static struct gr_node_info nh4_unreachable_info = { + .node = &nh4_unreachable_node, +}; + +GR_NODE_REGISTER(nh4_unreachable_info); + +enum { + IP_OUTPUT = 0, + ARP_OUTPUT_REPLY, + ARP_SINK, + ARP_EDGE_COUNT, +}; -void arp_probe_input_cb(struct rte_mbuf *m) { +static uint16_t +arp_probe_process(struct rte_graph *graph, struct rte_node *node, void **objs, uint16_t nb_objs) { const struct iface *iface; struct rte_arp_hdr *arp; struct rte_mbuf *held; struct nexthop *nh; + struct rte_mbuf *m; + rte_edge_t edge; ip4_addr_t sip; - arp = rte_pktmbuf_mtod(m, struct rte_arp_hdr *); - iface = mbuf_data(m)->iface; - - sip = arp->arp_data.arp_sip; - nh = nh4_lookup(iface->vrf_id, sip); - if (nh == NULL) { - // We don't have an entry for the ARP request sender address yet. - // - // Create one now. If the sender has requested our mac address, - // they will certainly contact us soon and it will save us an - // ARP request. - nh = nexthop_new(&(struct gr_nexthop) { - .type = GR_NH_T_L3, - .af = GR_AF_IP4, - .vrf_id = iface->vrf_id, - .iface_id = iface->id, - .ipv4 = sip, - .origin = GR_NH_ORIGIN_INTERNAL, - }); + for (uint16_t i = 0; i < nb_objs; i++) { + m = objs[i]; + edge = ARP_SINK; + + arp = rte_pktmbuf_mtod(m, struct rte_arp_hdr *); + iface = mbuf_data(m)->iface; + + sip = arp->arp_data.arp_sip; + nh = nh4_lookup(iface->vrf_id, sip); if (nh == NULL) { - LOG(ERR, "ip4_nexthop_new: %s", strerror(errno)); - goto free; - } - // Add an internal /32 route to reference the newly created nexthop. - if (rib4_insert(iface->vrf_id, sip, 32, GR_NH_ORIGIN_INTERNAL, nh) < 0) { - LOG(ERR, "ip4_nexthop_insert: %s", strerror(errno)); - goto free; + // We don't have an entry for the ARP request sender address yet. + // + // Create one now. If the sender has requested our mac address, + // they will certainly contact us soon and it will save us an + // ARP request. + nh = nexthop_new(&(struct gr_nexthop) { + .type = GR_NH_T_L3, + .af = GR_AF_IP4, + .vrf_id = iface->vrf_id, + .iface_id = iface->id, + .ipv4 = sip, + .origin = GR_NH_ORIGIN_INTERNAL, + }); + if (nh == NULL) { + LOG(ERR, "ip4_nexthop_new: %s", strerror(errno)); + goto next; + } + // Add an internal /32 route to reference the newly created nexthop. + if (rib4_insert(iface->vrf_id, sip, 32, GR_NH_ORIGIN_INTERNAL, nh) < 0) { + LOG(ERR, "ip4_nexthop_insert: %s", strerror(errno)); + goto next; + } } - } - // static next hops never need updating - if (!(nh->flags & GR_NH_F_STATIC)) { - // Refresh all fields. - nh->last_reply = gr_clock_us(); - nh->state = GR_NH_S_REACHABLE; - nh->ucast_probes = 0; - nh->bcast_probes = 0; - nh->mac = arp->arp_data.arp_sha; - } + // static next hops never need updating + if (!(nh->flags & GR_NH_F_STATIC)) { + // Refresh all fields. + nh->last_reply = gr_clock_us(); + nh->state = GR_NH_S_REACHABLE; + nh->ucast_probes = 0; + nh->bcast_probes = 0; + nh->mac = arp->arp_data.arp_sha; + } - if (arp->arp_opcode == RTE_BE16(RTE_ARP_OP_REQUEST)) { - // send a reply for our local ip - struct nexthop *local = nh4_lookup(iface->vrf_id, arp->arp_data.arp_tip); - struct arp_reply_mbuf_data *d = arp_reply_mbuf_data(m); - d->local = local; - d->iface = iface; - if (post_to_stack(arp_output_reply_node, m) < 0) { - LOG(ERR, "post_to_stack: %s", strerror(errno)); - goto free; + if (arp->arp_opcode == RTE_BE16(RTE_ARP_OP_REQUEST)) { + // send a reply for our local ip + struct nexthop *local = nh4_lookup(iface->vrf_id, arp->arp_data.arp_tip); + struct arp_reply_mbuf_data *d = arp_reply_mbuf_data(m); + d->local = local; + d->iface = iface; + edge = ARP_OUTPUT_REPLY; + goto next; } - // prevent double free, mbuf has been re-consumed by datapath - m = NULL; - } - // Flush all held packets. - held = nh->held_pkts_head; - while (held != NULL) { - struct ip_output_mbuf_data *o; - struct rte_mbuf *next; - - next = queue_mbuf_data(held)->next; - o = ip_output_mbuf_data(held); - o->nh = nh; - o->iface = NULL; - post_to_stack(ip_output_node, held); - held = next; + // Flush all held packets. + held = nh->held_pkts_head; + while (held != NULL) { + struct ip_output_mbuf_data *o; + struct rte_mbuf *next; + + next = queue_mbuf_data(held)->next; + o = ip_output_mbuf_data(held); + o->nh = nh; + o->iface = NULL; + rte_node_enqueue_x1(graph, node, IP_OUTPUT, held); + held = next; + } + nh->held_pkts_head = NULL; + nh->held_pkts_tail = NULL; + nh->held_pkts = 0; +next: + if (gr_mbuf_is_traced(m)) + gr_mbuf_trace_add(m, node, 0); + rte_node_enqueue_x1(graph, node, edge, m); } - nh->held_pkts_head = NULL; - nh->held_pkts_tail = NULL; - nh->held_pkts = 0; -free: - rte_pktmbuf_free(m); + return nb_objs; } +static struct rte_node_register arp_probe_node = { + .flags = GR_NODE_FLAG_CONTROL_PLANE, + .name = "arp_probe", + .process = arp_probe_process, + .nb_edges = ARP_EDGE_COUNT, + .next_nodes = { + [IP_OUTPUT] = "ip_output", + [ARP_OUTPUT_REPLY] = "arp_output_reply", + [ARP_SINK] = "ctlplane_sink", + }, +}; + +static struct gr_node_info arp_probe_info = { + .node = &arp_probe_node, +}; + +GR_NODE_REGISTER(arp_probe_info); + static int nh4_add(struct nexthop *nh) { return rib4_insert(nh->vrf_id, nh->ipv4, 32, GR_NH_ORIGIN_INTERNAL, nh); } @@ -198,15 +262,9 @@ static void nh4_del(struct nexthop *nh) { } } -static void nh4_init(struct event_base *) { - ip_output_node = gr_control_input_register_handler("ip_output", true); - arp_output_reply_node = gr_control_input_register_handler("arp_output_reply", true); -} - static struct gr_module nh4_module = { .name = "ipv4 nexthop", .depends_on = "graph", - .init = nh4_init, }; static struct nexthop_af_ops nh_ops = { diff --git a/modules/ip/datapath/arp_input_reply.c b/modules/ip/datapath/arp_input_reply.c index ecf4a440b..ecead1aa7 100644 --- a/modules/ip/datapath/arp_input_reply.c +++ b/modules/ip/datapath/arp_input_reply.c @@ -14,7 +14,7 @@ #include enum { - CONTROL = 0, + ARP_PROBE = 0, DROP, EDGE_COUNT, }; @@ -41,10 +41,7 @@ static uint16_t arp_input_reply_process( gr_mbuf_trace_add(mbuf, node, 0); if (remote != NULL) { - struct control_output_mbuf_data *d = control_output_mbuf_data(mbuf); - d->callback = arp_probe_input_cb; - d->iface = iface; - rte_node_enqueue_x1(graph, node, CONTROL, mbuf); + rte_node_enqueue_x1(graph, node, ARP_PROBE, mbuf); } else { rte_node_enqueue_x1(graph, node, DROP, mbuf); } @@ -60,7 +57,7 @@ static struct rte_node_register node = { .nb_edges = EDGE_COUNT, .next_nodes = { - [CONTROL] = "control_output", + [ARP_PROBE] = "arp_probe", [DROP] = "arp_input_reply_drop", }, }; diff --git a/modules/ip/datapath/arp_input_request.c b/modules/ip/datapath/arp_input_request.c index 44855c660..6f9e11707 100644 --- a/modules/ip/datapath/arp_input_request.c +++ b/modules/ip/datapath/arp_input_request.c @@ -14,7 +14,7 @@ #include enum { - CONTROL = 0, + ARP_PROBE = 0, DROP, ERROR, EDGE_COUNT, @@ -26,7 +26,6 @@ static uint16_t arp_input_request_process( void **objs, uint16_t nb_objs ) { - struct control_output_mbuf_data *ctrl_data; const struct nexthop *local; const struct iface *iface; struct rte_arp_hdr *arp; @@ -39,17 +38,11 @@ static uint16_t arp_input_request_process( arp = rte_pktmbuf_mtod(mbuf, struct rte_arp_hdr *); iface = mbuf_data(mbuf)->iface; local = nh4_lookup(iface->vrf_id, arp->arp_data.arp_tip); - if (local == NULL || !(local->flags & GR_NH_F_LOCAL)) { - // ARP request not for us - edge = DROP; - goto next; - } + if (local == NULL || !(local->flags & GR_NH_F_LOCAL)) + edge = DROP; // ARP request not for us + else + edge = ARP_PROBE; - ctrl_data = control_output_mbuf_data(mbuf); - ctrl_data->callback = arp_probe_input_cb; - ctrl_data->iface = iface; - edge = CONTROL; -next: if (gr_mbuf_is_traced(mbuf)) gr_mbuf_trace_add(mbuf, node, 0); rte_node_enqueue_x1(graph, node, edge, mbuf); @@ -65,7 +58,7 @@ static struct rte_node_register node = { .nb_edges = EDGE_COUNT, .next_nodes = { - [CONTROL] = "control_output", + [ARP_PROBE] = "arp_probe", [DROP] = "arp_input_request_drop", [ERROR] = "arp_input_request_error", }, diff --git a/modules/ip/datapath/gr_ip4_datapath.h b/modules/ip/datapath/gr_ip4_datapath.h index b9f1e9f23..926a0e864 100644 --- a/modules/ip/datapath/gr_ip4_datapath.h +++ b/modules/ip/datapath/gr_ip4_datapath.h @@ -3,7 +3,6 @@ #pragma once -#include #include #include #include @@ -27,6 +26,10 @@ GR_MBUF_PRIV_DATA_TYPE(ip_local_mbuf_data, { uint8_t ttl; }); +GR_MBUF_PRIV_DATA_EXTENDS(icmp_mbuf_data, ip_local_mbuf_data, { + clock_t timestamp; // time when the ICMP request was sent +}); + GR_NH_PRIV_DATA_TYPE(dnat44_nh_data, { ip4_addr_t replace; }); void ip_input_register_nexthop_type(gr_nh_type_t type, const char *next_node); @@ -68,7 +71,7 @@ int icmp_local_send( uint8_t ttl ); -void icmp_input_register_callback(uint8_t icmp_type, control_output_cb_t cb); +void icmp_input_register_type(uint8_t icmp_type, const char *next_node); static inline rte_be16_t fixup_checksum(rte_be16_t old_cksum, ip4_addr_t old_addr, ip4_addr_t new_addr) { diff --git a/modules/ip/datapath/icmp_input.c b/modules/ip/datapath/icmp_input.c index fb9c75277..5c3df3b9f 100644 --- a/modules/ip/datapath/icmp_input.c +++ b/modules/ip/datapath/icmp_input.c @@ -16,20 +16,20 @@ #include enum { - OUTPUT = 0, - CONTROL, + UNSUPPORTED = 0, + OUTPUT, INVALID, - UNSUPPORTED, EDGE_COUNT, }; #define ICMP_MIN_SIZE 8 -static control_output_cb_t icmp_cb[UINT8_MAX]; +static rte_edge_t edges[UINT8_MAX] = {UNSUPPORTED}; static uint16_t icmp_input_process(struct rte_graph *graph, struct rte_node *node, void **objs, uint16_t nb_objs) { struct ip_local_mbuf_data *ip_data; + struct icmp_mbuf_data *icmp_data; struct rte_icmp_hdr *icmp; struct rte_mbuf *mbuf; rte_edge_t edge; @@ -57,13 +57,10 @@ icmp_input_process(struct rte_graph *graph, struct rte_node *node, void **objs, ip_data->dst = ip_data->src; ip_data->src = ip; edge = OUTPUT; - } else if (icmp_cb[icmp->icmp_type]) { - struct control_output_mbuf_data *c = control_output_mbuf_data(mbuf); - c->callback = icmp_cb[icmp->icmp_type]; - c->timestamp = gr_clock_us(); - edge = CONTROL; } else { - edge = UNSUPPORTED; + icmp_data = icmp_mbuf_data(mbuf); + icmp_data->timestamp = gr_clock_us(); + edge = edges[icmp->icmp_type]; } next: if (gr_mbuf_is_traced(mbuf)) { @@ -76,13 +73,14 @@ icmp_input_process(struct rte_graph *graph, struct rte_node *node, void **objs, return nb_objs; } -void icmp_input_register_callback(uint8_t icmp_type, control_output_cb_t cb) { +void icmp_input_register_type(uint8_t icmp_type, const char *next_node) { + LOG(DEBUG, "icmp_input_register_type: type=%hhu -> %s", icmp_type, next_node); if (icmp_type == RTE_ICMP_TYPE_ECHO_REQUEST) ABORT("cannot register callback for echo request"); - if (icmp_cb[icmp_type]) - ABORT("callback already registered for %d", icmp_type); + if (edges[icmp_type] != UNSUPPORTED) + ABORT("icmp_input edge already registered for %d", icmp_type); - icmp_cb[icmp_type] = cb; + edges[icmp_type] = gr_node_attach_parent("icmp_input", next_node); } static void icmp_input_register(void) { @@ -97,7 +95,6 @@ static struct rte_node_register icmp_input_node = { .nb_edges = EDGE_COUNT, .next_nodes = { [OUTPUT] = "icmp_output", - [CONTROL] = "control_output", [INVALID] = "icmp_input_invalid", [UNSUPPORTED] = "icmp_input_unsupported", }, diff --git a/modules/ip/datapath/icmp_local_send.c b/modules/ip/datapath/icmp_local_send.c index c663844da..b08f50805 100644 --- a/modules/ip/datapath/icmp_local_send.c +++ b/modules/ip/datapath/icmp_local_send.c @@ -3,7 +3,6 @@ #include #include -#include #include #include #include diff --git a/modules/ip/datapath/ip_hold.c b/modules/ip/datapath/ip_hold.c index bb736972d..f3bf54a13 100644 --- a/modules/ip/datapath/ip_hold.c +++ b/modules/ip/datapath/ip_hold.c @@ -1,34 +1,26 @@ // SPDX-License-Identifier: BSD-3-Clause // Copyright (c) 2024 Robin Jarry -#include #include -#include #include #include #include enum { - CONTROL = 0, + NH4_UNREACH, EDGE_COUNT, }; static uint16_t ip_hold_process(struct rte_graph *graph, struct rte_node *node, void **objs, uint16_t nb_objs) { - struct control_output_mbuf_data *d; struct rte_mbuf *mbuf; for (uint16_t i = 0; i < nb_objs; i++) { mbuf = objs[i]; - // TODO: Allocate a new mbuf from a control plane pool and copy - // the packet into it so that the datapath mbuf can be freed and - // returned to the stack for hardware RX. - d = control_output_mbuf_data(mbuf); - d->callback = nh4_unreachable_cb; if (gr_mbuf_is_traced(mbuf)) gr_mbuf_trace_add(mbuf, node, 0); - rte_node_enqueue_x1(graph, node, CONTROL, mbuf); + rte_node_enqueue_x1(graph, node, NH4_UNREACH, mbuf); } return nb_objs; @@ -39,7 +31,7 @@ static struct rte_node_register node = { .process = ip_hold_process, .nb_edges = EDGE_COUNT, .next_nodes = { - [CONTROL] = "control_output", + [NH4_UNREACH] = "nh4_unreachable", }, }; diff --git a/modules/ip/datapath/ip_output.c b/modules/ip/datapath/ip_output.c index ec30a9872..e0a10986b 100644 --- a/modules/ip/datapath/ip_output.c +++ b/modules/ip/datapath/ip_output.c @@ -1,7 +1,6 @@ // SPDX-License-Identifier: BSD-3-Clause // Copyright (c) 2024 Robin Jarry -#include #include #include #include diff --git a/modules/ip6/control/gr_ip6_control.h b/modules/ip6/control/gr_ip6_control.h index 213d82067..8485ff90d 100644 --- a/modules/ip6/control/gr_ip6_control.h +++ b/modules/ip6/control/gr_ip6_control.h @@ -20,10 +20,6 @@ nh6_lookup(uint16_t vrf_id, uint16_t iface_id, const struct rte_ipv6_addr *ip) { return nexthop_lookup(GR_AF_IP6, vrf_id, iface_id, ip); } -void nh6_unreachable_cb(struct rte_mbuf *m); -void ndp_probe_input_cb(struct rte_mbuf *m); -void ndp_router_sollicit_input_cb(struct rte_mbuf *m); - int rib6_insert( uint16_t vrf_id, uint16_t iface_id, diff --git a/modules/ip6/control/icmp6.c b/modules/ip6/control/icmp6.c index aa50ab909..ddcd6b4c2 100644 --- a/modules/ip6/control/icmp6.c +++ b/modules/ip6/control/icmp6.c @@ -2,8 +2,6 @@ // Copyright (c) 2025 Olivier Gournet #include -#include -#include #include #include #include @@ -31,19 +29,6 @@ static void icmp6_queue_pop(struct icmp_queue_item *i, bool free_mbuf) { rte_mempool_put(pool, i); } -// called from dataplane context -static void icmp6_input_cb(struct rte_mbuf *m) { - struct icmp_queue_item *i; - void *data; - - while (rte_mempool_get(pool, &data) < 0) - icmp6_queue_pop(STAILQ_FIRST(&icmp_queue), true); - - i = data; - i->mbuf = m; - STAILQ_INSERT_TAIL(&icmp_queue, i, next); -} - #define ICMP6_ERROR_PKT_LEN \ (GR_ICMP6_HDR_LEN + sizeof(struct rte_ipv6_hdr) + GR_ICMP6_HDR_LEN + sizeof(clock_t)) @@ -104,11 +89,10 @@ static struct api_out icmp6_send(const void *request, void ** /* response */) { static struct api_out icmp6_recv(const void *request, void **response) { const struct gr_ip6_icmp_recv_req *recvreq = request; + struct icmp6_echo_reply *icmp6_echo; struct gr_ip6_icmp_recv_resp *resp; - struct control_output_mbuf_data *d_ctl; - struct ip6_local_mbuf_data *d_ip6; + struct icmp6_mbuf_data *d_ip6; struct icmp6 *icmp6; - struct icmp6_echo_reply *icmp6_echo; clock_t *timestamp; struct rte_mbuf *m; int ret = 0; @@ -117,8 +101,7 @@ static struct api_out icmp6_recv(const void *request, void **response) { if (m == NULL) return api_out(0, 0); - d_ctl = control_output_mbuf_data(m); - d_ip6 = (struct ip6_local_mbuf_data *)d_ctl->cb_data; + d_ip6 = icmp6_mbuf_data(m); icmp6_echo = PAYLOAD(icmp6); timestamp = PAYLOAD(icmp6_echo); @@ -128,7 +111,7 @@ static struct api_out icmp6_recv(const void *request, void **response) { resp->ttl = d_ip6->hop_limit; resp->ident = rte_be_to_cpu_16(icmp6_echo->ident); resp->seq_num = rte_be_to_cpu_16(icmp6_echo->seqnum); - resp->response_time = d_ctl->timestamp - *timestamp; + resp->response_time = d_ip6->timestamp - *timestamp; icmp6 = rte_pktmbuf_mtod(m, struct icmp6 *); resp->type = icmp6->type; resp->code = icmp6->code; @@ -188,13 +171,48 @@ static struct gr_module icmp6_module = { .fini = icmp_fini, }; +static uint16_t +icmp6_input_ctl_process(struct rte_graph *, struct rte_node *, void **objs, uint16_t nb_objs) { + struct icmp_queue_item *qi; + void *data; + + for (uint16_t i = 0; i < nb_objs; i++) { + while (rte_mempool_get(pool, &data) < 0) + icmp6_queue_pop(STAILQ_FIRST(&icmp_queue), true); + + qi = data; + qi->mbuf = objs[i]; + STAILQ_INSERT_TAIL(&icmp_queue, qi, next); + } + return nb_objs; +} + +static struct rte_node_register icmp6_input_ctl_node = { + .flags = GR_NODE_FLAG_CONTROL_PLANE, + .name = "icmp6_input_ctl", + .process = icmp6_input_ctl_process, + .nb_edges = 0, + .next_nodes = {}, +}; + +static void icmp6_input_register(void) { + icmp6_input_register_type(ICMP6_TYPE_ECHO_REPLY, "icmp6_input_ctl"); + icmp6_input_register_type(ICMP6_ERR_DEST_UNREACH, "icmp6_input_ctl"); + icmp6_input_register_type(ICMP6_ERR_TTL_EXCEEDED, "icmp6_input_ctl"); + icmp6_input_register_type(ICMP6_ERR_PKT_TOO_BIG, "icmp6_input_ctl"); + icmp6_input_register_type(ICMP6_ERR_PARAM_PROBLEM, "icmp6_input_ctl"); +} + +static struct gr_node_info icmp6_input_info = { + .node = &icmp6_input_ctl_node, + .register_callback = icmp6_input_register, + .trace_format = (gr_trace_format_cb_t)trace_icmp6_format, +}; + +GR_NODE_REGISTER(icmp6_input_info); + RTE_INIT(icmp_module_init) { gr_register_module(&icmp6_module); gr_register_api_handler(&ip6_icmp_send_handler); gr_register_api_handler(&ip6_icmp_recv_handler); - icmp6_input_register_callback(ICMP6_TYPE_ECHO_REPLY, icmp6_input_cb); - icmp6_input_register_callback(ICMP6_ERR_DEST_UNREACH, icmp6_input_cb); - icmp6_input_register_callback(ICMP6_ERR_TTL_EXCEEDED, icmp6_input_cb); - icmp6_input_register_callback(ICMP6_ERR_PKT_TOO_BIG, icmp6_input_cb); - icmp6_input_register_callback(ICMP6_ERR_PARAM_PROBLEM, icmp6_input_cb); } diff --git a/modules/ip6/control/nexthop.c b/modules/ip6/control/nexthop.c index 8a9c8d1d8..cc29ff88a 100644 --- a/modules/ip6/control/nexthop.c +++ b/modules/ip6/control/nexthop.c @@ -3,8 +3,6 @@ #include #include -#include -#include #include #include #include @@ -26,210 +24,319 @@ #include #include -static control_input_t ip6_output_node; +enum { + NH6_SINK = 0, + NH6_IP6_OUTPUT, + INVALID_ROUTE, + ERR_NH_ALLOC, + ERR_ROUTE_INSERT, + DROP_QUEUE_FULL, + NH6_EDGE_COUNT, +}; -void nh6_unreachable_cb(struct rte_mbuf *m) { - struct rte_ipv6_hdr *ip = rte_pktmbuf_mtod(m, struct rte_ipv6_hdr *); - const struct rte_ipv6_addr *dst = &ip->dst_addr; +static uint16_t nh6_unreachable_process( + struct rte_graph *graph, + struct rte_node *node, + void **objs, + uint16_t nb_objs +) { + struct rte_mbuf *m; struct nexthop *nh; + rte_edge_t edge; - nh = rib6_lookup(mbuf_data(m)->iface->vrf_id, mbuf_data(m)->iface->id, dst); - if (nh == NULL) - goto free; // route to dst has disappeared - - if (nh->flags & GR_NH_F_LINK && !rte_ipv6_addr_eq(dst, &nh->ipv6)) { - // The resolved nexthop is associated with a "connected" route. - // We currently do not have an explicit route entry for this - // destination IP. - struct nexthop *remote = nh6_lookup(nh->vrf_id, mbuf_data(m)->iface->id, dst); - - if (remote == NULL) { - // No existing nexthop for this IP, create one. - remote = nexthop_new(&(struct gr_nexthop) { - .type = GR_NH_T_L3, - .af = GR_AF_IP6, - .vrf_id = nh->vrf_id, - .iface_id = nh->iface_id, - .ipv6 = *dst, - .origin = GR_NH_ORIGIN_INTERNAL, - }); - } + for (uint16_t i = 0; i < nb_objs; i++) { + m = objs[i]; + struct rte_ipv6_hdr *ip = rte_pktmbuf_mtod(m, struct rte_ipv6_hdr *); + const struct rte_ipv6_addr *dst = &ip->dst_addr; + edge = NH6_SINK; - if (remote == NULL) { - LOG(ERR, "cannot allocate nexthop: %s", strerror(errno)); - goto free; + if (gr_mbuf_is_traced(m)) + gr_mbuf_trace_add(m, node, 0); + + nh = rib6_lookup(mbuf_data(m)->iface->vrf_id, mbuf_data(m)->iface->id, dst); + if (nh == NULL) { + edge = INVALID_ROUTE; + goto next; // route to dst has disappeared } - if (remote->iface_id != nh->iface_id) - ABORT(IP6_F " nexthop lookup gives wrong interface", &ip); - - // Create an associated /128 route so that next packets take it - // in priority with a single route lookup. - int ret = rib6_insert( - nh->vrf_id, - nh->iface_id, - dst, - RTE_IPV6_MAX_DEPTH, - GR_NH_ORIGIN_INTERNAL, - remote - ); - if (ret < 0) { - LOG(ERR, "failed to insert route: %s", strerror(errno)); - goto free; + if (nh->flags & GR_NH_F_LINK && !rte_ipv6_addr_eq(dst, &nh->ipv6)) { + // The resolved nexthop is associated with a "connected" route. + // We currently do not have an explicit route entry for this + // destination IP. + struct nexthop *remote = nh6_lookup( + nh->vrf_id, mbuf_data(m)->iface->id, dst + ); + + if (remote == NULL) { + // No existing nexthop for this IP, create one. + remote = nexthop_new(&(struct gr_nexthop) { + .type = GR_NH_T_L3, + .af = GR_AF_IP6, + .vrf_id = nh->vrf_id, + .iface_id = nh->iface_id, + .ipv6 = *dst, + .origin = GR_NH_ORIGIN_INTERNAL, + }); + } + + if (remote == NULL) { + LOG(ERR, "cannot allocate nexthop: %s", strerror(errno)); + edge = ERR_NH_ALLOC; + goto next; + } + if (remote->iface_id != nh->iface_id) + ABORT(IP6_F " nexthop lookup gives wrong interface", &ip); + + // Create an associated /128 route so that next packets take it + // in priority with a single route lookup. + int ret = rib6_insert( + nh->vrf_id, + nh->iface_id, + dst, + RTE_IPV6_MAX_DEPTH, + GR_NH_ORIGIN_INTERNAL, + remote + ); + if (ret < 0) { + LOG(ERR, "failed to insert route: %s", strerror(errno)); + edge = ERR_ROUTE_INSERT; + goto next; + } + nh = remote; } - nh = remote; - } - if (nh->state == GR_NH_S_REACHABLE) { - // The nexthop may have become reachable while the packet was - // passed from the datapath to here. Re-send it to datapath. - struct ip6_output_mbuf_data *d = ip6_output_mbuf_data(m); - d->nh = nh; - if (post_to_stack(ip6_output_node, m) < 0) { - LOG(ERR, "post_to_stack: %s", strerror(errno)); - goto free; + if (nh->state == GR_NH_S_REACHABLE) { + // The nexthop may have become reachable while the packet was + // passed from the datapath to here. Re-send it to datapath. + struct ip6_output_mbuf_data *d = ip6_output_mbuf_data(m); + d->nh = nh; + edge = NH6_IP6_OUTPUT; + goto next; } - return; - } - if (nh->held_pkts < nh_conf.max_held_pkts) { - queue_mbuf_data(m)->next = NULL; - if (nh->held_pkts_head == NULL) - nh->held_pkts_head = m; - else - queue_mbuf_data(nh->held_pkts_tail)->next = m; - nh->held_pkts_tail = m; - nh->held_pkts++; - if (nh->state != GR_NH_S_PENDING) { - nh6_solicit(nh); - nh->state = GR_NH_S_PENDING; + if (nh->held_pkts < nh_conf.max_held_pkts) { + queue_mbuf_data(m)->next = NULL; + if (nh->held_pkts_head == NULL) + nh->held_pkts_head = m; + else + queue_mbuf_data(nh->held_pkts_tail)->next = m; + nh->held_pkts_tail = m; + nh->held_pkts++; + if (nh->state != GR_NH_S_PENDING) { + nh6_solicit(nh); + nh->state = GR_NH_S_PENDING; + } + continue; // Do NOT enqueue the packet, it will be sent later + } else { + LOG(DEBUG, IP6_F " hold queue full", &dst); + edge = DROP_QUEUE_FULL; } - return; - } else { - LOG(DEBUG, IP4_F " hold queue full", &dst); +next: + rte_node_enqueue_x1(graph, node, edge, m); } -free: - rte_pktmbuf_free(m); + + return nb_objs; } -static control_input_t ndp_na_output_node; +static struct rte_node_register nh6_unreachable_node = { + .flags = GR_NODE_FLAG_CONTROL_PLANE, + .name = "nh6_unreachable", + .process = nh6_unreachable_process, + .nb_edges = NH6_EDGE_COUNT, + .next_nodes = { + [NH6_SINK] = "ctlplane_sink", + [NH6_IP6_OUTPUT] = "ip6_output", + [INVALID_ROUTE] = "nh6_invalid_route", + [ERR_NH_ALLOC] = "nh6_nh_alloc", + [ERR_ROUTE_INSERT] = "nh6_route_insert", + [DROP_QUEUE_FULL] = "nh6_queue_full", + }, +}; + +static struct gr_node_info nh6_unreachable_info = { + .node = &nh6_unreachable_node, +}; + +GR_NODE_REGISTER(nh6_unreachable_info); + +GR_DROP_REGISTER(nh6_invalid_route); +GR_DROP_REGISTER(nh6_nh_alloc); +GR_DROP_REGISTER(nh6_route_insert); +GR_DROP_REGISTER(nh6_queue_full); -void ndp_probe_input_cb(struct rte_mbuf *m) { - const struct icmp6 *icmp6 = rte_pktmbuf_mtod(m, const struct icmp6 *); +enum { + SINK = 0, + NDP_NA_OUTPUT, + IP6_OUTPUT, + ERR_ICMP6_OPT_INVAL, + ERR_ICMP6_TYPE_INVAL, + ERR_NDP_NH_ALLOC, + ERR_NDP_ROUTE_INSERT, + NDP_EDGE_COUNT, +}; + +static uint16_t ndp_probe_input_process( + struct rte_graph *graph, + struct rte_node *node, + void **objs, + uint16_t nb_objs +) { const struct rte_ipv6_addr *remote, *local; const struct ip6_local_mbuf_data *d; const struct icmp6_neigh_solicit *ns; const struct icmp6_neigh_advert *na; icmp6_opt_found_t lladdr_found; + const struct icmp6 *icmp6; const struct iface *iface; struct rte_ether_addr mac; - struct nexthop *nh = NULL; - - d = (const struct ip6_local_mbuf_data *)control_output_mbuf_data(m)->cb_data; - iface = control_output_mbuf_data(m)->iface; - - switch (icmp6->type) { - case ICMP6_TYPE_NEIGH_SOLICIT: - ns = PAYLOAD(icmp6); - local = &ns->target; - remote = &d->src; - lladdr_found = icmp6_get_opt( - m, sizeof(*icmp6) + sizeof(*ns), ICMP6_OPT_SRC_LLADDR, &mac - ); - break; - case ICMP6_TYPE_NEIGH_ADVERT: - na = PAYLOAD(icmp6); - local = NULL; - remote = &na->target; - lladdr_found = icmp6_get_opt( - m, sizeof(*icmp6) + sizeof(*na), ICMP6_OPT_TARGET_LLADDR, &mac - ); - break; - default: - goto free; - } + struct nexthop *nh; + rte_edge_t edge; - if (lladdr_found == ICMP6_OPT_INVAL) - goto free; + struct rte_mbuf *m; + for (uint16_t i = 0; i < nb_objs; i++) { + m = objs[i]; + edge = SINK; + nh = NULL; - if (!rte_ipv6_addr_is_unspec(remote) && !rte_ipv6_addr_is_mcast(remote)) { - nh = nh6_lookup(iface->vrf_id, iface->id, remote); - if (nh == NULL) { - // We don't have an entry for the probe sender address yet. - // - // Create one now. If the sender has requested our mac address, they - // will certainly contact us soon and it will save us an NDP solicitation. - nh = nexthop_new(&(struct gr_nexthop) { - .type = GR_NH_T_L3, - .af = GR_AF_IP6, - .vrf_id = iface->vrf_id, - .iface_id = iface->id, - .ipv6 = *remote, - .origin = GR_NH_ORIGIN_INTERNAL, - }); - if (nh == NULL) { - LOG(ERR, "ip6_nexthop_new: %s", strerror(errno)); - goto free; - } + if (gr_mbuf_is_traced(m)) + gr_mbuf_trace_add(m, node, 0); - // Add an internal /128 route to reference the newly created nexthop. - int ret = rib6_insert( - iface->vrf_id, - iface->id, - remote, - RTE_IPV6_MAX_DEPTH, - GR_NH_ORIGIN_INTERNAL, - nh + icmp6 = rte_pktmbuf_mtod(m, const struct icmp6 *); + d = ip6_local_mbuf_data(m); + iface = d->iface; + + switch (icmp6->type) { + case ICMP6_TYPE_NEIGH_SOLICIT: + ns = PAYLOAD(icmp6); + local = &ns->target; + remote = &d->src; + lladdr_found = icmp6_get_opt( + m, sizeof(*icmp6) + sizeof(*ns), ICMP6_OPT_SRC_LLADDR, &mac ); - if (ret < 0) { - LOG(ERR, "ip6_route_insert: %s", strerror(errno)); - goto free; + break; + case ICMP6_TYPE_NEIGH_ADVERT: + na = PAYLOAD(icmp6); + local = NULL; + remote = &na->target; + lladdr_found = icmp6_get_opt( + m, sizeof(*icmp6) + sizeof(*na), ICMP6_OPT_TARGET_LLADDR, &mac + ); + break; + default: + edge = ERR_ICMP6_TYPE_INVAL; + goto next; + } + + if (lladdr_found == ICMP6_OPT_INVAL) { + LOG(DEBUG, "invalid ICMP6 option %d", icmp6->type); + edge = ERR_ICMP6_OPT_INVAL; + goto next; + }; + + if (!rte_ipv6_addr_is_unspec(remote) && !rte_ipv6_addr_is_mcast(remote)) { + nh = nh6_lookup(iface->vrf_id, iface->id, remote); + if (nh == NULL) { + // We don't have an entry for the probe sender address yet. + // + // Create one now. If the sender has requested our mac address, they + // will certainly contact us soon and it will save us an NDP solicitation. + nh = nexthop_new(&(struct gr_nexthop) { + .type = GR_NH_T_L3, + .af = GR_AF_IP6, + .vrf_id = iface->vrf_id, + .iface_id = iface->id, + .ipv6 = *remote, + .origin = GR_NH_ORIGIN_INTERNAL, + }); + if (nh == NULL) { + LOG(ERR, "ip6_nexthop_new: %s", strerror(errno)); + edge = ERR_NDP_NH_ALLOC; + goto next; + } + + // Add an internal /128 route to reference the newly created nexthop. + int ret = rib6_insert( + iface->vrf_id, + iface->id, + remote, + RTE_IPV6_MAX_DEPTH, + GR_NH_ORIGIN_INTERNAL, + nh + ); + if (ret < 0) { + LOG(ERR, "ip6_route_insert: %s", strerror(errno)); + edge = ERR_NDP_ROUTE_INSERT; + goto next; + } } } - } - if (nh && !(nh->flags & GR_NH_F_STATIC) && lladdr_found == ICMP6_OPT_FOUND) { - // Refresh all fields. - nh->last_reply = gr_clock_us(); - nh->state = GR_NH_S_REACHABLE; - nh->ucast_probes = 0; - nh->bcast_probes = 0; - nh->mac = mac; - } + if (nh && !(nh->flags & GR_NH_F_STATIC) && lladdr_found == ICMP6_OPT_FOUND) { + // Refresh all fields. + nh->last_reply = gr_clock_us(); + nh->state = GR_NH_S_REACHABLE; + nh->ucast_probes = 0; + nh->bcast_probes = 0; + nh->mac = mac; + } - if (icmp6->type == ICMP6_TYPE_NEIGH_SOLICIT && local != NULL) { - // send a reply for our local ip - struct ndp_na_output_mbuf_data *d = ndp_na_output_mbuf_data(m); - d->local = nh6_lookup(iface->vrf_id, iface->id, local); - d->remote = nh; - d->iface = iface; - if (post_to_stack(ndp_na_output_node, m) < 0) { - LOG(ERR, "post_to_stack: %s", strerror(errno)); - goto free; + if (icmp6->type == ICMP6_TYPE_NEIGH_SOLICIT && local != NULL) { + // send a reply for our local ip + struct ndp_na_output_mbuf_data *d = ndp_na_output_mbuf_data(m); + d->local = nh6_lookup(iface->vrf_id, iface->id, local); + d->remote = nh; + d->iface = iface; + edge = NDP_NA_OUTPUT; } - // prevent double free, mbuf has been re-consumed by datapath - m = NULL; - } - // Flush all held packets. - struct rte_mbuf *held = nh->held_pkts_head; - while (held != NULL) { - struct ip6_output_mbuf_data *o; - struct rte_mbuf *next; - - next = queue_mbuf_data(held)->next; - o = ip6_output_mbuf_data(held); - o->nh = nh; - o->iface = NULL; - post_to_stack(ip6_output_node, held); - held = next; + // Flush all held packets. + struct rte_mbuf *held = nh->held_pkts_head; + while (held != NULL) { + struct ip6_output_mbuf_data *o; + struct rte_mbuf *next; + + next = queue_mbuf_data(held)->next; + o = ip6_output_mbuf_data(held); + o->nh = nh; + o->iface = NULL; + rte_node_enqueue_x1(graph, node, IP6_OUTPUT, held); + held = next; + } + nh->held_pkts_head = NULL; + nh->held_pkts_tail = NULL; + nh->held_pkts = 0; +next: + rte_node_enqueue_x1(graph, node, edge, m); + continue; } - nh->held_pkts_head = NULL; - nh->held_pkts_tail = NULL; - nh->held_pkts = 0; -free: - rte_pktmbuf_free(m); + return nb_objs; } +static struct rte_node_register ndp_probe_node = { + .flags = GR_NODE_FLAG_CONTROL_PLANE, + .name = "ndp_probe", + .process = ndp_probe_input_process, + .nb_edges = NDP_EDGE_COUNT, + .next_nodes = { + [SINK] = "ctlplane_sink", + [IP6_OUTPUT] = "ip6_output", + [NDP_NA_OUTPUT] = "ndp_na_output", + [ERR_ICMP6_OPT_INVAL] = "ndp_icmp6_opt_inval", + [ERR_ICMP6_TYPE_INVAL] = "ndp_icmp6_type_inval", + [ERR_NDP_ROUTE_INSERT] = "ndp_route_insert", + [ERR_NDP_NH_ALLOC] = "ndp_nh_alloc", + }, +}; + +static struct gr_node_info ndp_probe_info = { + .node = &ndp_probe_node, +}; + +GR_NODE_REGISTER(ndp_probe_info); +GR_DROP_REGISTER(ndp_icmp6_opt_inval); +GR_DROP_REGISTER(ndp_icmp6_type_inval); +GR_DROP_REGISTER(ndp_route_insert); +GR_DROP_REGISTER(ndp_nh_alloc); + static int nh6_add(struct nexthop *nh) { return rib6_insert(nh->vrf_id, nh->iface_id, &nh->ipv6, 128, GR_NH_ORIGIN_INTERNAL, nh); } @@ -242,15 +349,9 @@ static void nh6_del(struct nexthop *nh) { } } -static void nh6_init(struct event_base *) { - ip6_output_node = gr_control_input_register_handler("ip6_output", true); - ndp_na_output_node = gr_control_input_register_handler("ndp_na_output", true); -} - static struct gr_module nh6_module = { .name = "ipv6 nexthop", .depends_on = "graph", - .init = nh6_init, }; static struct nexthop_af_ops nh_ops = { diff --git a/modules/ip6/control/router_advert.c b/modules/ip6/control/router_advert.c index 8143c24cc..7ae363ff0 100644 --- a/modules/ip6/control/router_advert.c +++ b/modules/ip6/control/router_advert.c @@ -120,12 +120,47 @@ static struct api_out iface_ra_show(const void *request, void **response) { return api_out(0, len); } -void ndp_router_sollicit_input_cb(struct rte_mbuf *m) { - uint16_t iface_id = mbuf_data(m)->iface->id; - rte_pktmbuf_free(m); - event_active(ra_conf[iface_id].timer, 0, 0); +enum { + SINK = 0, +}; + +static uint16_t ndp_router_solicit_process( + struct rte_graph *graph, + struct rte_node *node, + void **objs, + uint16_t nb_objs +) { + struct rte_mbuf *m; + uint16_t iface_id; + + for (uint16_t i = 0; i < nb_objs; i++) { + m = objs[i]; + iface_id = mbuf_data(m)->iface->id; + if (gr_mbuf_is_traced(m)) + gr_mbuf_trace_add(m, node, 0); + event_active(ra_conf[iface_id].timer, 0, 0); + } + rte_node_next_stream_move(graph, node, SINK); + + return nb_objs; } +static struct rte_node_register node_rs = { + .flags = GR_NODE_FLAG_CONTROL_PLANE, + .name = "ndp_router_solicit", + .process = ndp_router_solicit_process, + .nb_edges = 1, + .next_nodes = { + [SINK] = "ctlplane_sink", + }, +}; + +static struct gr_node_info info_rs = { + .node = &node_rs, +}; + +GR_NODE_REGISTER(info_rs); + static void build_ra_packet(struct rte_mbuf *m, struct rte_ipv6_addr *srcv6) { struct rte_ipv6_addr dst = RTE_IPV6_ADDR_ALLNODES_LINK_LOCAL; struct rte_ipv6_addr src = *srcv6; diff --git a/modules/ip6/datapath/gr_ip6_datapath.h b/modules/ip6/datapath/gr_ip6_datapath.h index df861e640..874ba46f2 100644 --- a/modules/ip6/datapath/gr_ip6_datapath.h +++ b/modules/ip6/datapath/gr_ip6_datapath.h @@ -3,7 +3,6 @@ #pragma once -#include #include #include #include @@ -32,6 +31,8 @@ GR_MBUF_PRIV_DATA_TYPE(ndp_na_output_mbuf_data, { const struct nexthop *remote; }); +GR_MBUF_PRIV_DATA_EXTENDS(icmp6_mbuf_data, ip6_local_mbuf_data, { clock_t timestamp; }); + void ip6_input_local_add_proto(uint8_t proto, const char *next_node); void ip6_input_register_nexthop_type(gr_nh_type_t type, const char *next_node); void ip6_output_register_interface_type(gr_iface_type_t type, const char *next_node); @@ -71,4 +72,4 @@ int icmp6_local_send( uint8_t hop_limit ); -void icmp6_input_register_callback(uint8_t icmp6_type, control_output_cb_t cb); +void icmp6_input_register_type(uint8_t icmp6_type, const char *next_node); diff --git a/modules/ip6/datapath/icmp6_input.c b/modules/ip6/datapath/icmp6_input.c index 034cb1bed..a4a10b51c 100644 --- a/modules/ip6/datapath/icmp6_input.c +++ b/modules/ip6/datapath/icmp6_input.c @@ -2,7 +2,6 @@ // Copyright (c) 2024 Robin Jarry #include -#include #include #include #include @@ -21,7 +20,6 @@ enum { NEIGH_SOLICIT, NEIGH_ADVERT, ROUTER_SOLICIT, - CONTROL, BAD_CHECKSUM, INVALID, UNSUPPORTED, @@ -29,14 +27,15 @@ enum { EDGE_COUNT, }; -static control_output_cb_t icmp6_cb[UINT8_MAX]; +static rte_edge_t edges[UINT8_MAX] = {UNSUPPORTED}; static uint16_t icmp6_input_process(struct rte_graph *graph, struct rte_node *node, void **objs, uint16_t nb_objs) { + struct icmp6_mbuf_data *icmp6_data; struct ip6_local_mbuf_data *d; - struct icmp6 *icmp6; struct rte_ipv6_addr tmp_ip; struct rte_mbuf *mbuf; + struct icmp6 *icmp6; rte_edge_t next; for (uint16_t i = 0; i < nb_objs; i++) { @@ -85,16 +84,9 @@ icmp6_input_process(struct rte_graph *graph, struct rte_node *node, void **objs, break; case ICMP6_TYPE_ROUTER_ADVERT: default: - if (icmp6_cb[icmp6->type] != NULL) { - struct control_output_mbuf_data *c; - c = control_output_mbuf_data(mbuf); - memmove(c->cb_data, d, sizeof(*d)); - c->callback = icmp6_cb[icmp6->type]; - c->timestamp = gr_clock_us(); - next = CONTROL; - } else { - next = UNSUPPORTED; - } + icmp6_data = icmp6_mbuf_data(mbuf); + icmp6_data->timestamp = gr_clock_us(); + next = edges[icmp6->type]; } next: rte_node_enqueue_x1(graph, node, next, mbuf); @@ -103,13 +95,14 @@ icmp6_input_process(struct rte_graph *graph, struct rte_node *node, void **objs, return nb_objs; } -void icmp6_input_register_callback(uint8_t icmp6_type, control_output_cb_t cb) { +void icmp6_input_register_type(uint8_t icmp6_type, const char *next_node) { + LOG(DEBUG, "icmp6_input_register_type: type=%hhu -> %s", icmp6_type, next_node); if (icmp6_type == ICMP6_TYPE_ECHO_REQUEST) ABORT("cannot register callback for echo request"); - if (icmp6_cb[icmp6_type]) - ABORT("callback already registered for %d", icmp6_type); + if (edges[icmp6_type]) + ABORT("icmp6_type edge already registered for %d", icmp6_type); - icmp6_cb[icmp6_type] = cb; + edges[icmp6_type] = gr_node_attach_parent("icmp6_input", next_node); } static void icmp6_input_register(void) { @@ -127,7 +120,6 @@ static struct rte_node_register icmp6_input_node = { [NEIGH_SOLICIT] = "ndp_ns_input", [NEIGH_ADVERT] = "ndp_na_input", [ROUTER_SOLICIT] = "ndp_rs_input", - [CONTROL] = "control_output", [BAD_CHECKSUM] = "icmp6_input_bad_checksum", [INVALID] = "icmp6_input_invalid", [UNSUPPORTED] = "icmp6_input_unsupported", diff --git a/modules/ip6/datapath/icmp6_local_send.c b/modules/ip6/datapath/icmp6_local_send.c index c65208888..0571d1ceb 100644 --- a/modules/ip6/datapath/icmp6_local_send.c +++ b/modules/ip6/datapath/icmp6_local_send.c @@ -3,7 +3,6 @@ #include #include -#include #include #include #include diff --git a/modules/ip6/datapath/ip6_hold.c b/modules/ip6/datapath/ip6_hold.c index b13235fe2..f475040fc 100644 --- a/modules/ip6/datapath/ip6_hold.c +++ b/modules/ip6/datapath/ip6_hold.c @@ -1,7 +1,6 @@ // SPDX-License-Identifier: BSD-3-Clause // Copyright (c) 2024 Robin Jarry -#include #include #include #include @@ -9,22 +8,19 @@ #include enum { - CONTROL = 0, + NH6_UNREACH = 0, EDGE_COUNT, }; static uint16_t ip6_hold_process(struct rte_graph *graph, struct rte_node *node, void **objs, uint16_t nb_objs) { - struct control_output_mbuf_data *d; struct rte_mbuf *mbuf; for (uint16_t i = 0; i < nb_objs; i++) { mbuf = objs[i]; - d = control_output_mbuf_data(mbuf); - d->callback = nh6_unreachable_cb; if (gr_mbuf_is_traced(mbuf)) gr_mbuf_trace_add(mbuf, node, 0); - rte_node_enqueue_x1(graph, node, CONTROL, mbuf); + rte_node_enqueue_x1(graph, node, NH6_UNREACH, mbuf); } return nb_objs; @@ -35,7 +31,7 @@ static struct rte_node_register node = { .process = ip6_hold_process, .nb_edges = EDGE_COUNT, .next_nodes = { - [CONTROL] = "control_output", + [NH6_UNREACH] = "nh6_unreachable", }, }; diff --git a/modules/ip6/datapath/ndp_na_input.c b/modules/ip6/datapath/ndp_na_input.c index 2ebb64930..d4b3d4296 100644 --- a/modules/ip6/datapath/ndp_na_input.c +++ b/modules/ip6/datapath/ndp_na_input.c @@ -1,7 +1,6 @@ // SPDX-License-Identifier: BSD-3-Clause // Copyright (c) 2024 Robin Jarry -#include #include #include #include @@ -15,7 +14,7 @@ #include enum { - CONTROL = 0, + NDP_PROBE = 0, INVAL, DROP, EDGE_COUNT, @@ -27,7 +26,6 @@ static uint16_t ndp_na_input_process( void **objs, uint16_t nb_objs ) { - struct control_output_mbuf_data *ctrl_data; icmp6_opt_found_t lladdr_found; struct icmp6_neigh_advert *na; struct ip6_local_mbuf_data *d; @@ -94,10 +92,7 @@ static uint16_t ndp_na_input_process( // received advertisement. ASSERT_NDP(lladdr_found == ICMP6_OPT_FOUND); - ctrl_data = control_output_mbuf_data(mbuf); - ctrl_data->iface = iface; - ctrl_data->callback = ndp_probe_input_cb; - edge = CONTROL; + edge = NDP_PROBE; next: if (gr_mbuf_is_traced(mbuf)) gr_mbuf_trace_add(mbuf, node, 0); @@ -114,7 +109,7 @@ static struct rte_node_register node = { .nb_edges = EDGE_COUNT, .next_nodes = { - [CONTROL] = "control_output", + [NDP_PROBE] = "ndp_probe", [INVAL] = "ndp_na_input_inval", [DROP] = "ndp_na_input_drop", }, diff --git a/modules/ip6/datapath/ndp_ns_input.c b/modules/ip6/datapath/ndp_ns_input.c index 50fdbe37a..52336b59b 100644 --- a/modules/ip6/datapath/ndp_ns_input.c +++ b/modules/ip6/datapath/ndp_ns_input.c @@ -1,7 +1,6 @@ // SPDX-License-Identifier: BSD-3-Clause // Copyright (c) 2024 Robin Jarry -#include #include #include #include @@ -15,7 +14,7 @@ #include enum { - CONTROL = 0, + NDP_PROBE = 0, INVAL, DROP, EDGE_COUNT, @@ -27,7 +26,6 @@ static uint16_t ndp_ns_input_process( void **objs, uint16_t nb_objs ) { - struct control_output_mbuf_data *c; icmp6_opt_found_t lladdr_found; struct icmp6_neigh_solicit *ns; struct ip6_local_mbuf_data d; @@ -87,11 +85,7 @@ static uint16_t ndp_ns_input_process( ASSERT_NDP(lladdr_found == ICMP6_OPT_NOT_FOUND); } - c = control_output_mbuf_data(mbuf); - c->iface = d.iface; - c->callback = ndp_probe_input_cb; - memcpy(c->cb_data, &d, sizeof(d)); - next = CONTROL; + next = NDP_PROBE; next: if (gr_mbuf_is_traced(mbuf)) { uint8_t trace_len = RTE_MIN(d.len, GR_TRACE_ITEM_MAX_LEN); @@ -111,7 +105,7 @@ static struct rte_node_register node = { .nb_edges = EDGE_COUNT, .next_nodes = { - [CONTROL] = "control_output", + [NDP_PROBE] = "ndp_probe", [INVAL] = "ndp_ns_input_inval", [DROP] = "ndp_ns_input_drop", }, diff --git a/modules/ip6/datapath/ndp_rs_input.c b/modules/ip6/datapath/ndp_rs_input.c index ab6974528..7d6abde33 100644 --- a/modules/ip6/datapath/ndp_rs_input.c +++ b/modules/ip6/datapath/ndp_rs_input.c @@ -1,7 +1,6 @@ // SPDX-License-Identifier: BSD-3-Clause // Copyright (c) 2025 Christophe Fontaine -#include #include #include #include @@ -15,7 +14,7 @@ #include enum { - CONTROL, + ROUTER_SOLICIT, INVAL, EDGE_COUNT, }; @@ -26,7 +25,6 @@ static uint16_t ndp_rs_input_process( void **objs, uint16_t nb_objs ) { - struct control_output_mbuf_data *co; struct ip6_local_mbuf_data *d; struct rte_mbuf *mbuf; struct icmp6 *icmp6; @@ -59,9 +57,7 @@ static uint16_t ndp_rs_input_process( // - ICMP length (derived from the IP length) is 8 or more octets. ASSERT_NDP(d->len >= 8); - next = CONTROL; - co = control_output_mbuf_data(mbuf); - co->callback = ndp_router_sollicit_input_cb; + next = ROUTER_SOLICIT; next: if (gr_mbuf_is_traced(mbuf)) gr_mbuf_trace_add(mbuf, node, 0); @@ -76,7 +72,7 @@ static struct rte_node_register node = { .process = ndp_rs_input_process, .nb_edges = EDGE_COUNT, .next_nodes = { - [CONTROL] = "control_output", + [ROUTER_SOLICIT] = "ndp_router_solicit", [INVAL] = "ndp_rs_input_inval", }, }; diff --git a/subprojects/dpdk.wrap b/subprojects/dpdk.wrap index 2c46f7880..b24c9be69 100644 --- a/subprojects/dpdk.wrap +++ b/subprojects/dpdk.wrap @@ -1,6 +1,7 @@ [wrap-git] url = https://github.com/DPDK/dpdk-stable revision = v24.11.1 +diff_files = graph-allow-non-EAL-Thread-for-pipeline-mode.patch depth = 1 [provide] diff --git a/subprojects/packagefiles/graph-allow-non-EAL-Thread-for-pipeline-mode.patch b/subprojects/packagefiles/graph-allow-non-EAL-Thread-for-pipeline-mode.patch new file mode 100644 index 000000000..1142380ca --- /dev/null +++ b/subprojects/packagefiles/graph-allow-non-EAL-Thread-for-pipeline-mode.patch @@ -0,0 +1,52 @@ +From 5b44778a9cfe2996017a9e95c1bd489dac442267 Mon Sep 17 00:00:00 2001 +From: Christophe Fontaine +Date: Thu, 19 Jun 2025 15:35:45 +0000 +Subject: [PATCH] graph: allow non EAL Thread for pipeline mode + +rte_graph_model_mcore_dispatch_core_bind relied on +rte_lcore_is_enabled. +Yet, "rte_lcore_is_enabled" only checks for EAL threads, which forbids +external threads (NON EAL) to run a part of the graph. + +Verify if the lcore role is not "ROLE_OFF", and return relevant +error code otherwise. + +Signed-off-by: Christophe Fontaine +--- + lib/graph/graph.c | 13 +++++++++---- + 1 file changed, 9 insertions(+), 4 deletions(-) + +diff --git a/lib/graph/graph.c b/lib/graph/graph.c +index dff8e69..d8fe43f 100644 +--- a/lib/graph/graph.c ++++ b/lib/graph/graph.c +@@ -322,17 +322,22 @@ rte_graph_model_mcore_dispatch_core_bind(rte_graph_t id, int lcore) + { + struct graph *graph; + +- if (graph_from_id(id) == NULL) ++ if (graph_from_id(id) == NULL) { ++ rte_errno = ENOENT; + goto fail; +- if (!rte_lcore_is_enabled(lcore)) +- SET_ERR_JMP(ENOLINK, fail, "lcore %d not enabled", lcore); ++ } ++ ++ if (rte_lcore_has_role(lcore, ROLE_OFF)) ++ SET_ERR_JMP(ENOLINK, fail, "lcore %d is invalid", lcore); + + STAILQ_FOREACH(graph, &graph_list, next) + if (graph->id == id) + break; + +- if (graph->graph->model != RTE_GRAPH_MODEL_MCORE_DISPATCH) ++ if (graph->graph->model != RTE_GRAPH_MODEL_MCORE_DISPATCH) { ++ rte_errno = EPERM; + goto fail; ++ } + + graph->lcore_id = lcore; + graph->graph->dispatch.lcore_id = graph->lcore_id; +-- +2.43.5 +