diff --git a/crates/ebpf/Cargo.lock b/crates/ebpf/Cargo.lock index 10068ec68..9f2562bc8 100644 --- a/crates/ebpf/Cargo.lock +++ b/crates/ebpf/Cargo.lock @@ -1,6 +1,6 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. -version = 3 +version = 4 [[package]] name = "aya-ebpf" @@ -41,74 +41,12 @@ dependencies = [ "syn", ] -[[package]] -name = "aya-log-common" -version = "0.1.15" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "befef9fe882e63164a2ba0161874e954648a72b0e1c4b361f532d590638c4eec" -dependencies = [ - "num_enum", -] - -[[package]] -name = "aya-log-ebpf" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ae348f459df78a79e5cd5e164b6562b927033b97ca3b033605b341a474f44510" -dependencies = [ - "aya-ebpf", - "aya-log-common", - "aya-log-ebpf-macros", -] - -[[package]] -name = "aya-log-ebpf-macros" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f6d8251a75f56077db51892041aa6b77c70ef2723845d7a210979700b2f01bc4" -dependencies = [ - "aya-log-common", - "aya-log-parser", - "proc-macro2", - "quote", - "syn", -] - -[[package]] -name = "aya-log-parser" -version = "0.1.13" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "14b102eb5c88c9aa0b49102d3fbcee08ecb0dfa81014f39b373311de7a7032cb" -dependencies = [ - "aya-log-common", -] - [[package]] name = "network-types" version = "0.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e82e9f64c09f56aa7c80c3fa087997bd99a913f91d9c74d36cf5fd75dd5773e6" -[[package]] -name = "num_enum" -version = "0.7.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4e613fc340b2220f734a8595782c551f1250e969d87d3be1ae0579e8d4065179" -dependencies = [ - "num_enum_derive", -] - -[[package]] -name = "num_enum_derive" -version = "0.7.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "af1844ef2428cc3e1cb900be36181049ef3d3193c63e43026cfe202983b27a56" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] - [[package]] name = "proc-macro-error" version = "1.0.4" @@ -146,7 +84,6 @@ name = "quilkin-ebpf" version = "0.1.0" dependencies = [ "aya-ebpf", - "aya-log-ebpf", "network-types", ] diff --git a/crates/ebpf/Cargo.toml b/crates/ebpf/Cargo.toml index dd17c429e..70562a21f 100644 --- a/crates/ebpf/Cargo.toml +++ b/crates/ebpf/Cargo.toml @@ -6,7 +6,6 @@ edition = "2021" [dependencies] aya-ebpf = "0.1.1" -aya-log-ebpf = "0.1.1" network-types = "0.0.7" [[bin]] diff --git a/crates/ebpf/src/ebpf-main.rs b/crates/ebpf/src/ebpf-main.rs index 8c0cb1dab..19ca041ca 100644 --- a/crates/ebpf/src/ebpf-main.rs +++ b/crates/ebpf/src/ebpf-main.rs @@ -46,6 +46,9 @@ static mut COUNTER: u32 = 0; /// The external port used by clients. Network order. #[no_mangle] static EXTERNAL_PORT_NO: u16 = u16::to_be(7777); +/// The port used to respond to QCMP messages. Network order. +#[no_mangle] +static QCMP_PORT_NO: u16 = u16::to_be(7600); /// The beginning of the port range quilkin will use for server sessions, we /// take advantage of the fact that, by default, the range Linux uses for @@ -116,6 +119,7 @@ pub fn packet_router(ctx: &XdpContext) -> Result<(), ()> { if dest_port == unsafe { core::ptr::read_volatile(&EXTERNAL_PORT_NO) } || u16::from_be(dest_port) >= EPHEMERAL_PORT_START + || dest_port == unsafe { core::ptr::read_volatile(&QCMP_PORT_NO) } { Ok(()) } else { diff --git a/crates/test/tests/xdp.rs b/crates/test/tests/xdp.rs index 7f2c74f4f..f6489a8b6 100644 --- a/crates/test/tests/xdp.rs +++ b/crates/test/tests/xdp.rs @@ -5,7 +5,10 @@ use quilkin::{ filters::{self, StaticFilter as _}, net::{ self, - xdp::process::{self, xdp}, + xdp::process::{ + self, + xdp::{self, packet::net_types as nt}, + }, }, }; use std::{ @@ -56,6 +59,7 @@ async fn simple_forwarding() { let mut state = process::State { external_port: PROXY.port().into(), + qcmp_port: 0.into(), config: Arc::new(config), destinations: Vec::with_capacity(1), sessions: Arc::new(Default::default()), @@ -138,6 +142,7 @@ async fn changes_ip_version() { let mut state = process::State { external_port: PROXY4.port().into(), + qcmp_port: 0.into(), config: Arc::new(config), destinations: Vec::with_capacity(1), sessions: Arc::new(Default::default()), @@ -269,6 +274,7 @@ async fn packet_manipulation() { let mut state = process::State { external_port: PROXY.port().into(), + qcmp_port: 0.into(), config: Arc::new(config), destinations: Vec::with_capacity(1), sessions: Arc::new(Default::default()), @@ -330,6 +336,7 @@ async fn packet_manipulation() { let mut state = process::State { external_port: PROXY.port().into(), + qcmp_port: 0.into(), config: Arc::new(config), destinations: Vec::with_capacity(1), sessions: Arc::new(Default::default()), @@ -399,6 +406,7 @@ async fn packet_manipulation() { let mut state = process::State { external_port: PROXY.port().into(), + qcmp_port: 0.into(), config: Arc::new(config), destinations: Vec::with_capacity(1), sessions: Arc::new(Default::default()), @@ -491,6 +499,7 @@ async fn multiple_servers() { let mut state = process::State { external_port: PROXY.port().into(), + qcmp_port: 0.into(), config: Arc::new(config), destinations: Vec::with_capacity(1), sessions: Arc::new(Default::default()), @@ -542,8 +551,6 @@ async fn multiple_servers() { /// Ensures that surpassing the session limits doesn't completely break #[tokio::test] async fn many_sessions() { - use xdp::packet::net_types as nt; - const SERVER: SocketAddrV4 = SocketAddrV4::new(Ipv4Addr::new(1, 1, 1, 1), 1111); const PROXY: SocketAddrV4 = SocketAddrV4::new(Ipv4Addr::new(2, 2, 2, 2), 7777); @@ -568,6 +575,7 @@ async fn many_sessions() { let mut state = process::State { external_port: PROXY.port().into(), + qcmp_port: 0.into(), config: Arc::new(config), destinations: Vec::with_capacity(1), sessions: Arc::new(Default::default()), @@ -695,6 +703,7 @@ async fn frees_dropped_packets() { let mut state = process::State { external_port: PROXY4.port().into(), + qcmp_port: 0.into(), config: Arc::new(config), destinations: Vec::with_capacity(1), sessions: Arc::new(Default::default()), @@ -777,3 +786,119 @@ async fn frees_dropped_packets() { unsafe { umem.alloc().expect("umem should have available memory") }; } } + +/// Validates we can process QCMP packets with the same loop as regular packets +#[tokio::test] +async fn qcmp() { + use quilkin::{codec::qcmp, time::UtcTimestamp}; + + const PROXY: SocketAddrV4 = SocketAddrV4::new(Ipv4Addr::new(2, 2, 2, 2), 2020); + const CLIENT: SocketAddrV4 = SocketAddrV4::new(Ipv4Addr::new(1, 2, 3, 4), 9999); + + let mut state = process::State { + external_port: 7777.into(), + qcmp_port: PROXY.port().into(), + config: Arc::new(quilkin::Config::default_non_agent()), + destinations: Vec::with_capacity(1), + sessions: Arc::new(Default::default()), + local_ipv4: *PROXY.ip(), + local_ipv6: Ipv6Addr::from_bits(0), + }; + + let mut umem = xdp::Umem::map( + xdp::umem::UmemCfgBuilder { + frame_size: xdp::umem::FrameSize::TwoK, + head_room: 0, + frame_count: 1, + tx_metadata: false, + } + .build() + .unwrap(), + ) + .unwrap(); + + let mut rx_slab = xdp::HeapSlab::with_capacity(1); + let mut tx_slab = xdp::HeapSlab::with_capacity(1); + + // sanity check the umem won't allow more than 1 packet at a time + unsafe { + let first = umem.alloc().unwrap(); + assert!(umem.alloc().is_none()); + umem.free_packet(first); + }; + + let mut qp = qcmp::QcmpPacket::default(); + + let ping_time = UtcTimestamp::from_nanos(100000); + + // Valid ping packet + { + // If this fails, the dropped packet wasn't freed + let mut ping_packet = unsafe { umem.alloc().expect("umem has no available packets") }; + + let ping = qcmp::Protocol::Ping { + client_timestamp: ping_time, + nonce: 99, + }; + + ping.encode(&mut qp); + + etherparse::PacketBuilder::ethernet2([3, 3, 3, 3, 3, 3], [4, 4, 4, 4, 4, 4]) + .ipv4(CLIENT.ip().octets(), PROXY.ip().octets(), 64) + .udp(CLIENT.port(), PROXY.port()) + .write(&mut ping_packet, &qp) + .unwrap(); + + rx_slab.push_front(ping_packet); + process::process_packets(&mut rx_slab, &mut umem, &mut tx_slab, &mut state); + + let pong_packet = tx_slab.pop_back().unwrap(); + let udp = nt::UdpPacket::parse_packet(&pong_packet).unwrap().unwrap(); + let pong = qcmp::Protocol::parse( + pong_packet + .slice_at_offset(udp.data_offset, udp.data_length) + .unwrap(), + ) + .unwrap() + .unwrap(); + + match pong { + qcmp::Protocol::PingReply { + client_timestamp, + nonce, + .. + } => { + assert_eq!(ping_time, client_timestamp); + assert_eq!(nonce, 99); + } + _ => unreachable!(), + } + + umem.free_packet(pong_packet); + } + + // A pong packet, should be rejected + { + let mut bad_packet = unsafe { umem.alloc().expect("umem has no available packets") }; + + let pong = qcmp::Protocol::PingReply { + client_timestamp: ping_time, + nonce: 200, + server_start_timestamp: UtcTimestamp::from_nanos(100001), + server_transmit_timestamp: UtcTimestamp::from_nanos(100002), + }; + pong.encode(&mut qp); + + etherparse::PacketBuilder::ethernet2([3, 3, 3, 3, 3, 3], [4, 4, 4, 4, 4, 4]) + .ipv4(CLIENT.ip().octets(), PROXY.ip().octets(), 64) + .udp(CLIENT.port(), PROXY.port()) + .write(&mut bad_packet, &qp) + .unwrap(); + + rx_slab.push_front(bad_packet); + process::process_packets(&mut rx_slab, &mut umem, &mut tx_slab, &mut state); + + assert!(tx_slab.is_empty()); + unsafe { umem.alloc().expect("umem should have available memory") }; + } +} diff --git a/crates/xdp/bin/packet-router.bin b/crates/xdp/bin/packet-router.bin index 32580878e..5d27dc7ce 100644 Binary files a/crates/xdp/bin/packet-router.bin and b/crates/xdp/bin/packet-router.bin differ diff --git a/crates/xdp/src/lib.rs b/crates/xdp/src/lib.rs index b9f682705..27b384665 100644 --- a/crates/xdp/src/lib.rs +++ b/crates/xdp/src/lib.rs @@ -87,6 +87,8 @@ pub struct EbpfProgram { /// be the same port used in the I/O loop to determine if the packet is sent /// from a client or a server pub external_port: xdp::packet::net_types::NetworkU16, + /// The port QCMP packets are sent to + pub qcmp_port: xdp::packet::net_types::NetworkU16, } impl EbpfProgram { @@ -94,11 +96,14 @@ impl EbpfProgram { /// /// The external port, the port used by clients, must be passed in due to /// how globals work in eBPF. - pub fn load(external_port: u16) -> Result { + pub fn load(external_port: u16, qcmp_port: u16) -> Result { let mut loader = aya::EbpfLoader::new(); let external_port_no = external_port.to_be(); loader.set_global("EXTERNAL_PORT_NO", &external_port_no, true); + let qcmp_port_no = qcmp_port.to_be(); + loader.set_global("QCMP_PORT_NO", &qcmp_port_no, true); + // We exploit the fact that Linux by default does not assign ephemeral // ports in the full range allowed by IANA, but we want to sanity check // it here, as otherwise something else could have been assigned an @@ -133,6 +138,7 @@ impl EbpfProgram { Ok(Self { bpf: loader.load(PROGRAM)?, external_port: xdp::packet::net_types::NetworkU16(external_port_no), + qcmp_port: xdp::packet::net_types::NetworkU16(qcmp_port_no), }) } diff --git a/src/cli/service.rs b/src/cli/service.rs index ee7dd01b4..4d68b1c9e 100644 --- a/src/cli/service.rs +++ b/src/cli/service.rs @@ -175,7 +175,7 @@ impl Service { /// spawn any and all enabled services, if successful returning a future /// that can be await to wait on services to be cancelled. pub fn spawn_services( - self, + mut self, config: &Arc, shutdown_rx: &crate::signal::ShutdownRx, ) -> crate::Result>> { @@ -184,8 +184,10 @@ impl Service { Ok(tokio::spawn(async move { let mds_task = self.publish_mds(&config)?; let phoenix_task = self.publish_phoenix(&config, &shutdown_rx)?; - let qcmp_task = self.publish_qcmp(&shutdown_rx)?; + // We need to call this before qcmp since if we use XDP we handle QCMP + // internally without a separate task let (udp_task, finalizer) = self.publish_udp(&config)?; + let qcmp_task = self.publish_qcmp(&shutdown_rx)?; let xds_task = self.publish_xds(&config)?; let result = tokio::select! { @@ -293,13 +295,13 @@ impl Service { #[allow(clippy::type_complexity)] pub fn publish_udp( - &self, + &mut self, config: &Arc, ) -> eyre::Result<( impl Future>, Option>, )> { - if !self.udp_enabled { + if !self.udp_enabled && !self.qcmp_enabled { return Ok((either::Left(std::future::pending()), None)); } @@ -308,7 +310,10 @@ impl Service { #[cfg(target_os = "linux")] { match self.spawn_xdp(config.clone(), self.xdp.force_xdp) { - Ok(xdp) => return Ok((either::Left(std::future::pending()), Some(xdp))), + Ok(xdp) => { + self.qcmp_enabled = false; + return Ok((either::Left(std::future::pending()), Some(xdp))); + } Err(err) => { if self.xdp.force_xdp { return Err(err); @@ -322,6 +327,10 @@ impl Service { } } + if !self.udp_enabled { + return Ok((either::Left(std::future::pending()), None)); + } + self.spawn_user_space_router(config.clone()) .map(|(fut, func)| (either::Right(fut), Some(func))) } @@ -389,7 +398,8 @@ impl Service { .network_interface .as_deref() .map_or(xdp::NicConfig::Default, xdp::NicConfig::Name), - external_port: self.udp_port, + external_port: if self.udp_enabled { self.udp_port } else { 0 }, + qcmp_port: if self.qcmp_enabled { self.qcmp_port } else { 0 }, maximum_packet_memory: self.xdp.maximum_memory, require_zero_copy: self.xdp.force_zerocopy, require_tx_checksum: self.xdp.force_tx_checksum_offload, diff --git a/src/net/xdp.rs b/src/net/xdp.rs index f1fc3e7c8..bcb98739e 100644 --- a/src/net/xdp.rs +++ b/src/net/xdp.rs @@ -23,6 +23,8 @@ pub struct XdpConfig<'n> { pub nic: NicConfig<'n>, /// The external port that downstream clients use to communicate with Quilkin pub external_port: u16, + /// The port QCMP packets can be sent to + pub qcmp_port: u16, /// The maximum amount of memory, in bytes, that the memory mappings used for /// packet buffers will be allowed to take. /// @@ -50,6 +52,7 @@ impl Default for XdpConfig<'_> { Self { nic: NicConfig::Default, external_port: 7777, + qcmp_port: 7600, maximum_packet_memory: None, require_zero_copy: false, require_tx_checksum: false, @@ -62,6 +65,7 @@ pub struct XdpWorkers { workers: Vec, nic: NicIndex, external_port: NetworkU16, + qcmp_port: NetworkU16, ipv6: std::net::Ipv6Addr, ipv4: std::net::Ipv4Addr, } @@ -214,7 +218,7 @@ pub fn setup_xdp_io(config: XdpConfig<'_>) -> Result 2 * 1024 }; - let mut ebpf_prog = quilkin_xdp::EbpfProgram::load(config.external_port)?; + let mut ebpf_prog = quilkin_xdp::EbpfProgram::load(config.external_port, config.qcmp_port)?; let umem_cfg = xdp::umem::UmemCfgBuilder { frame_size: xdp::umem::FrameSize::TwoK, @@ -239,7 +243,8 @@ pub fn setup_xdp_io(config: XdpConfig<'_>) -> Result ebpf_prog, workers, nic: nic_index, - external_port: NetworkU16(config.external_port.to_be()), + external_port: config.external_port.into(), + qcmp_port: config.qcmp_port.into(), ipv4, ipv6, }) @@ -293,6 +298,7 @@ impl XdpLoop { /// more likely reason for failure is the inability to attach the eBPF program pub fn spawn(workers: XdpWorkers, config: Arc) -> Result { let external_port = workers.external_port; + let qcmp_port = workers.qcmp_port; let ipv4 = workers.ipv4; let ipv6 = workers.ipv6; let session_state = Arc::new(process::SessionState::default()); @@ -311,7 +317,16 @@ pub fn spawn(workers: XdpWorkers, config: Arc) -> Result, sessions: Arc, local_ipv4: std::net::Ipv4Addr, @@ -369,6 +386,7 @@ fn io_loop( let mut state = process::State { external_port, + qcmp_port, config, destinations: Vec::with_capacity(1), sessions, diff --git a/src/net/xdp/process.rs b/src/net/xdp/process.rs index bc5171e7d..c50a02153 100644 --- a/src/net/xdp/process.rs +++ b/src/net/xdp/process.rs @@ -114,6 +114,7 @@ pub struct State { /// The external port is how we determine if packets come from clients (downstream) /// or servers (upstream) pub external_port: NetworkU16, + pub qcmp_port: NetworkU16, pub config: Arc, pub destinations: Vec, pub sessions: Arc, @@ -371,6 +372,11 @@ pub fn process_packets( unreachable!("we somehow got a non-UDP packet, this should be impossible with the eBPF program we use to route packets"); }; + if udp.dst_port == state.qcmp_port { + process_qcmp_packet(inner, udp, umem, tx_slab); + continue; + } + let is_client = udp.dst_port == state.external_port; let direction = if is_client { metrics::READ @@ -600,3 +606,110 @@ fn fill_packet( frame.insert(hdr_len, data)?; Ok(()) } + +fn process_qcmp_packet( + mut packet: Packet, + udp: UdpPacket, + umem: &mut Umem, + tx_slab: &mut HeapSlab, +) { + use crate::{codec::qcmp, time::UtcTimestamp}; + + fn inner(packet: &mut Packet, udp: UdpPacket) -> bool { + let received_at = UtcTimestamp::now(); + let data = match packet.slice_at_offset(udp.data_offset, udp.data_length) { + Ok(d) => d, + Err(error) => { + tracing::debug!(%error, "corrupt UDP packet"); + return false; + } + }; + let command = match qcmp::Protocol::parse(data) { + Ok(Some(command)) => command, + Ok(None) => { + tracing::debug!("rejected non-qcmp packet"); + return false; + } + Err(error) => { + tracing::debug!(%error, "rejected malformed packet"); + return false; + } + }; + + let qcmp::Protocol::Ping { + client_timestamp, + nonce, + } = command + else { + tracing::warn!("rejected unsupported QCMP packet"); + return false; + }; + + let mut ob = qcmp::QcmpPacket::default(); + let buf = qcmp::Protocol::ping_reply(nonce, client_timestamp, received_at).encode(&mut ob); + + if let Err(error) = packet.adjust_tail(-(udp.data_length as i32)) { + tracing::debug!(%error, "unable to trim QCMP ping data"); + return false; + } + + if let Err(error) = packet.insert(udp.data_offset, buf) { + tracing::debug!(%error, "unable to write QCMP pong data"); + return false; + } + + let new = UdpPacket { + src_mac: udp.dst_mac, + dst_mac: udp.src_mac, + ips: match udp.ips { + IpAddresses::V4 { + source, + destination, + } => IpAddresses::V4 { + source: destination, + destination: source, + }, + IpAddresses::V6 { + source, + destination, + } => IpAddresses::V6 { + source: destination, + destination: source, + }, + }, + src_port: udp.dst_port, + dst_port: udp.src_port, + data_offset: udp.data_offset, + data_length: buf.len(), + hop: udp.hop - 1, + checksum: 0.into(), + }; + + if let Err(error) = modify_packet_headers(&udp, &new, packet) { + tracing::debug!(%error, "unable to modify QCMP packet headers"); + return false; + } + + if let Err(error) = packet.calc_udp_checksum() { + tracing::debug!(%error, "failed to calculate QCMP packet checksum"); + return false; + } + + true + } + + let packet = if inner(&mut packet, udp) { + tracing::debug!("sending QCMP pong"); + + if let Some(packet) = tx_slab.push_back(packet) { + tracing::debug!("tx slab full, unable to send QCMP pong"); + packet + } else { + return; + } + } else { + packet + }; + + umem.free_packet(packet); +} diff --git a/src/time.rs b/src/time.rs index bbe4e10cf..83726280e 100644 --- a/src/time.rs +++ b/src/time.rs @@ -60,6 +60,12 @@ impl fmt::Debug for UtcTimestamp { } } +impl PartialEq for UtcTimestamp { + fn eq(&self, other: &Self) -> bool { + self.inner == other.inner + } +} + #[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq)] #[cfg_attr(test, derive(Debug))] pub struct DurationNanos(i64);