diff --git a/crates/test/src/xdp_util.rs b/crates/test/src/xdp_util.rs index 9b830be58e..96e3565cf8 100644 --- a/crates/test/src/xdp_util.rs +++ b/crates/test/src/xdp_util.rs @@ -1,4 +1,4 @@ -use quilkin::net::xdp::process; +use quilkin::net::io::nic::xdp::process; use xdp::{Packet, packet::net_types::UdpHeaders}; #[inline] diff --git a/crates/test/tests/proxy.rs b/crates/test/tests/proxy.rs index b7bcaf6d1b..30ec528933 100644 --- a/crates/test/tests/proxy.rs +++ b/crates/test/tests/proxy.rs @@ -3,7 +3,7 @@ #![cfg(debug_assertions)] use qt::*; -use quilkin::{components::proxy, net, test::TestConfig}; +use quilkin::{net, test::TestConfig}; trace_test!(server, { let mut sc = qt::sandbox_config!(); @@ -103,12 +103,12 @@ trace_test!(uring_receiver, { let pending_sends = net::queue(1).unwrap(); // we'll test a single DownstreamReceiveWorkerConfig - proxy::packet_router::DownstreamReceiveWorkerConfig { + quilkin::net::io::Listener { worker_id: 1, port: addr.port(), config: config.clone(), buffer_pool: quilkin::test::BUFFER_POOL.clone(), - sessions: proxy::SessionPool::new( + sessions: quilkin::net::sessions::SessionPool::new( config, vec![pending_sends.0.clone()], BUFFER_POOL.clone(), @@ -152,7 +152,7 @@ trace_test!( .into_iter() .collect(); - let sessions = proxy::SessionPool::new( + let sessions = net::SessionPool::new( config.clone(), pending_sends.iter().map(|ps| ps.0.clone()).collect(), BUFFER_POOL.clone(), @@ -161,7 +161,7 @@ trace_test!( const WORKER_COUNT: usize = 3; let (socket, addr) = sb.socket(); - proxy::packet_router::spawn_receivers( + net::packet::spawn_receivers( config, socket, pending_sends, diff --git a/crates/test/tests/xdp.rs b/crates/test/tests/xdp.rs index 9bcb196f61..ea658e4721 100644 --- a/crates/test/tests/xdp.rs +++ b/crates/test/tests/xdp.rs @@ -4,7 +4,7 @@ use qt::xdp_util::{endpoints, make_config}; use quilkin::{ filters, - net::xdp::process::{ + net::io::nic::xdp::process::{ self, xdp::{ self, diff --git a/src/cli/service.rs b/src/cli/service.rs index 8a58a920ac..6a36060754 100644 --- a/src/cli/service.rs +++ b/src/cli/service.rs @@ -537,13 +537,7 @@ impl Service { let sessions = SessionPool::new(config.clone(), session_sends, buffer_pool.clone()); - crate::components::proxy::packet_router::spawn_receivers( - config, - socket, - worker_sends, - &sessions, - buffer_pool, - )?; + crate::net::packet::spawn_receivers(config, socket, worker_sends, &sessions, buffer_pool)?; Ok(( std::future::pending(), @@ -554,7 +548,7 @@ impl Service { #[cfg(target_os = "linux")] fn spawn_xdp(&self, config: Arc, force_xdp: bool) -> eyre::Result> { - use crate::net::xdp; + use crate::net::io::nic::xdp; use eyre::{Context as _, ContextCompat as _}; // TODO: remove this once it's been more stabilized @@ -573,7 +567,7 @@ impl Service { .context("XDP requires a cluster map")? .clone(); - let config = crate::net::xdp::process::ConfigState { filters, clusters }; + let config = crate::net::io::nic::xdp::process::ConfigState { filters, clusters }; let udp_port = if self.udp_enabled { self.udp_port } else { 0 }; let qcmp_port = if self.qcmp_enabled { self.qcmp_port } else { 0 }; diff --git a/src/components/proxy.rs b/src/components/proxy.rs index e520bfc748..4c7f9d835a 100644 --- a/src/components/proxy.rs +++ b/src/components/proxy.rs @@ -14,15 +14,6 @@ * limitations under the License. */ -pub(crate) mod error; -pub mod packet_router; -pub(crate) mod sessions; - -use crate::config::IcaoCode; - -use super::RunArgs; -pub use error::PipelineError; -pub use sessions::SessionPool; use std::{ net::SocketAddr, sync::{ @@ -31,6 +22,12 @@ use std::{ }, }; +pub use crate::{ + components::RunArgs, + config::IcaoCode, + net::{error::PipelineError, sessions::SessionPool}, +}; + #[derive(Clone, Debug)] pub struct Ready { pub idle_request_interval: std::time::Duration, diff --git a/src/components/proxy/packet_router.rs b/src/components/proxy/packet_router.rs deleted file mode 100644 index 1debb33347..0000000000 --- a/src/components/proxy/packet_router.rs +++ /dev/null @@ -1,252 +0,0 @@ -/* - * Copyright 2024 Google LLC All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -use super::{ - PipelineError, SessionPool, - sessions::{SessionKey, SessionManager}, -}; -use crate::{ - Config, - filters::{Filter as _, ReadContext}, - metrics, -}; -use std::{net::SocketAddr, sync::Arc}; - -#[cfg(target_os = "linux")] -mod io_uring; -#[cfg(not(target_os = "linux"))] -mod reference; - -/// Representation of an immutable set of bytes pulled from the network, this trait -/// provides an abstraction over however the packet was received (epoll, io-uring, xdp) -/// -/// Use [`PacketMut`] if you need a mutable representation. -pub trait Packet: Sized { - /// Returns the underlying slice of bytes representing the packet. - fn as_slice(&self) -> &[u8]; - - /// Returns the size of the packet. - fn len(&self) -> usize; - - /// Returns whether the given packet is empty. - fn is_empty(&self) -> bool { - self.len() == 0 - } -} - -/// Representation of an mutable set of bytes pulled from the network, this trait -/// provides an abstraction over however the packet was received (epoll, io-uring, xdp) -pub trait PacketMut: Sized + Packet { - type FrozenPacket: Packet; - fn remove_head(&mut self, length: usize); - fn remove_tail(&mut self, length: usize); - fn extend_head(&mut self, bytes: &[u8]); - fn extend_tail(&mut self, bytes: &[u8]); - /// Returns an immutable version of the packet, this allows certain types - /// return a type that can be more cheaply cloned and shared. - fn freeze(self) -> Self::FrozenPacket; -} - -/// Packet received from local port -pub(crate) struct DownstreamPacket

{ - pub(crate) contents: P, - pub(crate) source: SocketAddr, -} - -impl DownstreamPacket

{ - #[inline] - pub(crate) fn process>( - self, - worker_id: usize, - config: &Arc, - sessions: &S, - destinations: &mut Vec, - ) { - tracing::trace!( - id = worker_id, - size = self.contents.len(), - source = %self.source, - "received packet from downstream" - ); - - let timer = metrics::processing_time(metrics::READ).start_timer(); - if let Err(error) = self.process_inner(config, sessions, destinations) { - let discriminant = error.discriminant(); - - error.inc_system_errors_total(metrics::READ, &metrics::EMPTY); - metrics::packets_dropped_total(metrics::READ, discriminant, &metrics::EMPTY).inc(); - } - - timer.stop_and_record(); - } - - /// Processes a packet by running it through the filter chain. - #[inline] - fn process_inner>( - self, - config: &Arc, - sessions: &S, - destinations: &mut Vec, - ) -> Result<(), PipelineError> { - let Some(clusters) = config - .dyn_cfg - .clusters() - .filter(|c| c.read().has_endpoints()) - else { - tracing::trace!("no upstream endpoints"); - return Err(PipelineError::NoUpstreamEndpoints); - }; - - let cm = clusters.clone_value(); - let Some(filters) = config.dyn_cfg.filters() else { - return Err(PipelineError::Filter(crate::filters::FilterError::Custom( - "no filters loaded", - ))); - }; - - #[cfg(not(debug_assertions))] - { - match self.source.ip() { - std::net::IpAddr::V4(ipv4) => { - if ipv4.is_loopback() || ipv4.is_multicast() || ipv4.is_broadcast() { - return Err(PipelineError::DisallowedSourceIP(self.source.ip())); - } - } - std::net::IpAddr::V6(ipv6) => { - if ipv6.is_loopback() || ipv6.is_multicast() { - return Err(PipelineError::DisallowedSourceIP(self.source.ip())); - } - } - } - } - - let mut context = ReadContext::new(&cm, self.source.into(), self.contents, destinations); - filters.read(&mut context).map_err(PipelineError::Filter)?; - - let ReadContext { contents, .. } = context; - - // Similar to bytes::BytesMut::freeze, we turn the mutable pool buffer - // into an immutable one with its own internal arc so it can be cloned - // cheaply and returned to the pool once all references are dropped - let contents = contents.freeze(); - - for epa in destinations.drain(0..) { - let session_key = SessionKey { - source: self.source, - dest: epa.to_socket_addr()?, - }; - - sessions.send(session_key, &contents)?; - } - - Ok(()) - } -} - -/// Represents the required arguments to run a worker task that -/// processes packets received downstream. -pub struct DownstreamReceiveWorkerConfig { - /// ID of the worker. - pub worker_id: usize, - pub port: u16, - pub config: Arc, - pub sessions: Arc, - pub buffer_pool: Arc, -} - -/// Spawns a background task that sits in a loop, receiving packets from the passed in socket. -/// Each received packet is placed on a queue to be processed by a worker task. -/// This function also spawns the set of worker tasks responsible for consuming packets -/// off the aforementioned queue and processing them through the filter chain and session -/// pipeline. -pub fn spawn_receivers( - config: Arc, - socket: socket2::Socket, - worker_sends: Vec, - sessions: &Arc, - buffer_pool: Arc, -) -> crate::Result<()> { - let port = crate::net::socket_port(&socket); - - for (worker_id, ws) in worker_sends.into_iter().enumerate() { - let worker = DownstreamReceiveWorkerConfig { - worker_id, - port, - config: config.clone(), - sessions: sessions.clone(), - buffer_pool: buffer_pool.clone(), - }; - - worker.spawn(ws)?; - } - - Ok(()) -} - -#[cfg(test)] -mod tests { - #![cfg(not(debug_assertions))] - - use quilkin_xds::locality::Locality; - - use crate::collections::BufferPool; - use crate::net::Endpoint; - use crate::test::alloc_buffer; - - use super::*; - use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; - use std::net::{SocketAddrV4, SocketAddrV6}; - - // Ensure we disallow certain source IP addresses to protect against UDP amplification attacks - #[tokio::test] - async fn disallowed_ips() { - let nl1 = Locality::with_region("nl-1"); - let endpoint = Endpoint::new((Ipv4Addr::LOCALHOST, 7777).into()); - - let config = Arc::new(Config::default_agent().cluster( - None, - Some(nl1.clone()), - [endpoint.clone()].into(), - )); - let buffer_pool = Arc::new(BufferPool::new(1, 10)); - let session_manager = SessionPool::new(config.clone(), vec![], buffer_pool.clone()); - - let packet_data: [u8; 4] = [1, 2, 3, 4]; - for ip in [ - IpAddr::V4(Ipv4Addr::LOCALHOST), - IpAddr::V4(Ipv4Addr::BROADCAST), - // multicast = 224.0.0.0/4 - IpAddr::V4(Ipv4Addr::new(224, 0, 0, 0)), - IpAddr::V4(Ipv4Addr::new(239, 255, 255, 255)), - IpAddr::V6(Ipv6Addr::LOCALHOST), - // multicast = any address starting with 0xff - IpAddr::V6(Ipv6Addr::new(0xff00, 0, 0, 0, 0, 0, 0, 0)), - ] { - let packet = DownstreamPacket { - contents: alloc_buffer(packet_data), - source: match ip { - IpAddr::V4(ipv4) => SocketAddr::V4(SocketAddrV4::new(ipv4, 0)), - IpAddr::V6(ipv6) => SocketAddr::V6(SocketAddrV6::new(ipv6, 0, 0, 0)), - }, - }; - - let mut endpoints = vec![endpoint.address.clone()]; - let res = packet.process_inner(&config, &session_manager, &mut endpoints); - - assert_eq!(res, Err(PipelineError::DisallowedSourceIP(ip))); - } - } -} diff --git a/src/components/proxy/packet_router/io_uring.rs b/src/components/proxy/packet_router/io_uring.rs deleted file mode 100644 index f2d2f0361e..0000000000 --- a/src/components/proxy/packet_router/io_uring.rs +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Copyright 2024 Google LLC All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -use eyre::Context as _; - -impl super::DownstreamReceiveWorkerConfig { - pub fn spawn(self, pending_sends: crate::net::PacketQueue) -> eyre::Result<()> { - use crate::net::io_uring; - - let Self { - worker_id, - port, - config, - sessions, - buffer_pool, - } = self; - - let socket = - crate::net::DualStackLocalSocket::new(port).context("failed to bind socket")?; - - let io_loop = io_uring::IoUringLoop::new(2000, socket)?; - io_loop - .spawn( - format!("packet-router-{worker_id}"), - io_uring::PacketProcessorCtx::Router { - config, - sessions, - worker_id, - destinations: Vec::with_capacity(1), - }, - pending_sends, - buffer_pool, - ) - .context("failed to spawn io-uring loop") - } -} diff --git a/src/components/proxy/sessions/io_uring.rs b/src/components/proxy/sessions/io_uring.rs deleted file mode 100644 index 981483062a..0000000000 --- a/src/components/proxy/sessions/io_uring.rs +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Copyright 2024 Google LLC All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -use crate::components::proxy; -use std::sync::Arc; - -static SESSION_COUNTER: std::sync::atomic::AtomicUsize = std::sync::atomic::AtomicUsize::new(0); - -impl super::SessionPool { - pub(super) fn spawn_session( - self: Arc, - raw_socket: socket2::Socket, - port: u16, - pending_sends: crate::net::PacketQueue, - ) -> Result<(), proxy::PipelineError> { - use crate::net::io_uring; - - let pool = self; - let id = SESSION_COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed); - let _thread_span = uring_span!(tracing::debug_span!("session", id).or_current()); - - let io_loop = io_uring::IoUringLoop::new( - 2000, - crate::net::DualStackLocalSocket::from_raw(raw_socket), - )?; - let buffer_pool = pool.buffer_pool.clone(); - - io_loop.spawn( - format!("session-{id}"), - io_uring::PacketProcessorCtx::SessionPool { pool, port }, - pending_sends, - buffer_pool, - ) - } -} diff --git a/src/components/proxy/sessions/reference.rs b/src/components/proxy/sessions/reference.rs deleted file mode 100644 index 8d6f04fa29..0000000000 --- a/src/components/proxy/sessions/reference.rs +++ /dev/null @@ -1,113 +0,0 @@ -/* - * Copyright 2024 Google LLC All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -use crate::{components::proxy, net::PacketQueue}; - -impl super::SessionPool { - pub(super) fn spawn_session( - self: std::sync::Arc, - raw_socket: socket2::Socket, - port: u16, - pending_sends: PacketQueue, - ) -> Result<(), proxy::PipelineError> { - let pool = self; - - uring_spawn!( - uring_span!(tracing::debug_span!("session pool")), - async move { - let mut last_received_at = None; - - let socket = - std::sync::Arc::new(crate::net::DualStackLocalSocket::from_raw(raw_socket)); - let socket2 = socket.clone(); - let (tx, mut rx) = tokio::sync::oneshot::channel(); - - uring_inner_spawn!(async move { - let (pending_sends, mut sends_rx) = pending_sends; - let mut sends_double_buffer = Vec::with_capacity(pending_sends.capacity()); - - while sends_rx.changed().await.is_ok() { - if !*sends_rx.borrow() { - tracing::trace!("io loop shutdown requested"); - break; - } - - sends_double_buffer = pending_sends.swap(sends_double_buffer); - - for packet in sends_double_buffer.drain(..sends_double_buffer.len()) { - let destination = packet.destination.as_socket().unwrap(); - tracing::trace!( - %destination, - length = packet.data.len(), - "sending packet upstream" - ); - let (result, _) = socket2.send_to(packet.data, destination).await; - let asn_info = packet.asn_info.as_ref().into(); - match result { - Ok(size) => { - crate::metrics::packets_total(crate::metrics::READ, &asn_info) - .inc(); - crate::metrics::bytes_total(crate::metrics::READ, &asn_info) - .inc_by(size as u64); - } - Err(error) => { - tracing::trace!(%error, "sending packet upstream failed"); - let source = error.to_string(); - crate::metrics::errors_total( - crate::metrics::READ, - &source, - &asn_info, - ) - .inc(); - crate::metrics::packets_dropped_total( - crate::metrics::READ, - &source, - &asn_info, - ) - .inc(); - } - } - } - } - - let _ = tx.send(()); - }); - - loop { - let buf = pool.buffer_pool.clone().alloc(); - tokio::select! { - received = socket.recv_from(buf) => { - let (result, buf) = received; - match result { - Err(error) => { - tracing::trace!(%error, "error receiving packet"); - crate::metrics::errors_total(crate::metrics::WRITE, &error.to_string(), &crate::metrics::EMPTY).inc(); - }, - Ok((_size, recv_addr)) => pool.process_received_upstream_packet(buf, recv_addr, port, &mut last_received_at), - } - } - _ = &mut rx => { - tracing::debug!("Closing upstream socket loop, downstream closed"); - return; - } - } - } - } - ); - - Ok(()) - } -} diff --git a/src/filters.rs b/src/filters.rs index 489b25b3db..f47637699f 100644 --- a/src/filters.rs +++ b/src/filters.rs @@ -71,7 +71,7 @@ pub use self::{ pub use crate::test::TestFilter; pub use self::chain::FilterChain; -pub use crate::components::proxy::packet_router::{Packet, PacketMut}; +pub use crate::net::{Packet, PacketMut}; #[enum_dispatch::enum_dispatch(Filter)] pub enum FilterKind { diff --git a/src/net.rs b/src/net.rs index 02be5ebe64..ae12bf104e 100644 --- a/src/net.rs +++ b/src/net.rs @@ -14,64 +14,17 @@ * limitations under the License. */ -pub mod packet; - -/// On linux spawns a io-uring runtime + thread, everywhere else spawns a regular tokio task. -#[cfg(not(target_os = "linux"))] -macro_rules! uring_spawn { - ($span:expr_2021, $future:expr_2021) => {{ - let (tx, rx) = std::sync::mpsc::channel::<()>(); - use tracing::Instrument as _; - - use tracing::instrument::WithSubscriber as _; - - let fut = async move { - let _ = tx.send(()); - $future.await - }; - - if let Some(span) = $span { - tokio::spawn(fut.instrument(span).with_current_subscriber()); - } else { - tokio::spawn(fut.with_current_subscriber()); - } - rx - }}; -} - -/// On linux spawns a io-uring task, everywhere else spawns a regular tokio task. -#[cfg(not(target_os = "linux"))] -macro_rules! uring_inner_spawn { - ($future:expr_2021) => { - tokio::spawn($future); - }; -} - -/// Allows creation of spans only when `debug_assertions` are enabled, to avoid -/// hitting the cap of 4096 threads that is unconfigurable in -/// `tracing_subscriber` -> `sharded_slab` for span ids -macro_rules! uring_span { - ($span:expr_2021) => {{ - cfg_if::cfg_if! { - if #[cfg(debug_assertions)] { - Some($span) - } else { - Option::::None - } - } - }}; -} - pub mod cluster; pub mod endpoint; +pub mod error; +pub mod io; pub(crate) mod maxmind_db; +pub mod packet; pub mod phoenix; - -pub use quilkin_xds as xds; -pub use xds::net::TcpListener; +pub mod sessions; use std::{ - io, + io::Result as IoResult, net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}, }; @@ -80,22 +33,24 @@ use socket2::{Protocol, Socket, Type}; cfg_if::cfg_if! { if #[cfg(target_os = "linux")] { use std::net::UdpSocket; - - pub(crate) mod io_uring; - pub mod xdp; } else { use tokio::net::UdpSocket; } } -pub use self::{ - cluster::ClusterMap, - endpoint::{Endpoint, EndpointAddress}, +pub use { + self::{ + cluster::ClusterMap, + endpoint::{Endpoint, EndpointAddress}, + error::PipelineError, + packet::{Packet, PacketMut, PacketQueue, PacketQueueSender, queue}, + sessions::SessionPool, + }, + quilkin_xds as xds, + xds::net::TcpListener, }; -pub use self::packet::{PacketQueue, PacketQueueSender, queue}; - -fn socket_with_reuse_and_address(addr: SocketAddr) -> std::io::Result { +fn socket_with_reuse_and_address(addr: SocketAddr) -> IoResult { cfg_if::cfg_if! { if #[cfg(target_os = "linux")] { raw_socket_with_reuse_and_address(addr) @@ -106,24 +61,24 @@ fn socket_with_reuse_and_address(addr: SocketAddr) -> std::io::Result } } -fn epoll_socket_with_reuse(port: u16) -> std::io::Result { +fn epoll_socket_with_reuse(port: u16) -> IoResult { raw_socket_with_reuse_and_address((Ipv6Addr::UNSPECIFIED, port).into()) .map(From::from) .and_then(tokio::net::UdpSocket::from_std) } -fn epoll_socket_with_reuse_and_address(addr: SocketAddr) -> std::io::Result { +fn epoll_socket_with_reuse_and_address(addr: SocketAddr) -> IoResult { raw_socket_with_reuse_and_address(addr) .map(From::from) .and_then(tokio::net::UdpSocket::from_std) } #[inline] -pub fn raw_socket_with_reuse(port: u16) -> std::io::Result { +pub fn raw_socket_with_reuse(port: u16) -> IoResult { raw_socket_with_reuse_and_address((Ipv6Addr::UNSPECIFIED, port).into()) } -pub fn raw_socket_with_reuse_and_address(addr: SocketAddr) -> std::io::Result { +pub fn raw_socket_with_reuse_and_address(addr: SocketAddr) -> IoResult { let domain = match addr { SocketAddr::V4(_) => socket2::Domain::IPV4, SocketAddr::V6(_) => socket2::Domain::IPV6, @@ -150,13 +105,13 @@ pub fn socket_port(socket: &socket2::Socket) -> u16 { } #[cfg(not(target_family = "windows"))] -fn enable_reuse(sock: &Socket) -> io::Result<()> { +fn enable_reuse(sock: &Socket) -> IoResult<()> { sock.set_reuse_port(true)?; Ok(()) } #[cfg(target_family = "windows")] -fn enable_reuse(sock: &Socket) -> io::Result<()> { +fn enable_reuse(sock: &Socket) -> IoResult<()> { sock.set_reuse_address(true)?; Ok(()) } @@ -184,24 +139,24 @@ impl DualStackLocalSocket { Self { socket, local_addr } } - pub fn new(port: u16) -> std::io::Result { + pub fn new(port: u16) -> IoResult { raw_socket_with_reuse(port).map(Self::from_raw) } - pub fn bind_local(port: u16) -> std::io::Result { + pub fn bind_local(port: u16) -> IoResult { let local_addr = (Ipv6Addr::LOCALHOST, port).into(); let socket = socket_with_reuse_and_address(local_addr)?; Ok(Self { socket, local_addr }) } - pub fn local_ipv4_addr(&self) -> io::Result { + pub fn local_ipv4_addr(&self) -> IoResult { Ok(match self.local_addr { SocketAddr::V4(_) => self.local_addr, SocketAddr::V6(_) => (Ipv4Addr::UNSPECIFIED, self.local_addr.port()).into(), }) } - pub fn local_ipv6_addr(&self) -> io::Result { + pub fn local_ipv6_addr(&self) -> IoResult { Ok(match self.local_addr { SocketAddr::V4(v4addr) => SocketAddr::new( IpAddr::V6(v4addr.ip().to_ipv6_mapped()), @@ -213,12 +168,12 @@ impl DualStackLocalSocket { cfg_if::cfg_if! { if #[cfg(not(target_os = "linux"))] { - pub async fn recv_from>(&self, mut buf: B) -> (io::Result<(usize, SocketAddr)>, B) { + pub async fn recv_from>(&self, mut buf: B) -> (IoResult<(usize, SocketAddr)>, B) { let result = self.socket.recv_from(&mut buf).await; (result, buf) } - pub async fn send_to>(&self, buf: B, target: SocketAddr) -> (io::Result, B) { + pub async fn send_to>(&self, buf: B, target: SocketAddr) -> (IoResult, B) { let result = self.socket.send_to(&buf, target).await; (result, buf) } @@ -251,34 +206,34 @@ pub struct DualStackEpollSocket { } impl DualStackEpollSocket { - pub fn new(port: u16) -> std::io::Result { + pub fn new(port: u16) -> IoResult { Ok(Self { socket: epoll_socket_with_reuse(port)?, }) } - pub fn bind_local(port: u16) -> std::io::Result { + pub fn bind_local(port: u16) -> IoResult { Ok(Self { socket: epoll_socket_with_reuse_and_address((Ipv6Addr::LOCALHOST, port).into())?, }) } /// Primarily used for testing of ipv4 vs ipv6 addresses. - pub(crate) fn new_with_address(addr: SocketAddr) -> std::io::Result { + pub(crate) fn new_with_address(addr: SocketAddr) -> IoResult { Ok(Self { socket: epoll_socket_with_reuse_and_address(addr)?, }) } - pub async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { + pub async fn recv_from(&self, buf: &mut [u8]) -> IoResult<(usize, SocketAddr)> { self.socket.recv_from(buf).await } - pub fn local_addr(&self) -> io::Result { + pub fn local_addr(&self) -> IoResult { self.socket.local_addr() } - pub fn local_ipv4_addr(&self) -> io::Result { + pub fn local_ipv4_addr(&self) -> IoResult { let addr = self.socket.local_addr()?; match addr { SocketAddr::V4(_) => Ok(addr), @@ -286,7 +241,7 @@ impl DualStackEpollSocket { } } - pub fn local_ipv6_addr(&self) -> io::Result { + pub fn local_ipv6_addr(&self) -> IoResult { let addr = self.socket.local_addr()?; match addr { SocketAddr::V4(v4addr) => Ok(SocketAddr::new( @@ -301,7 +256,7 @@ impl DualStackEpollSocket { &self, buf: &[u8], target: A, - ) -> io::Result { + ) -> IoResult { self.socket.send_to(buf, target).await } } diff --git a/src/components/proxy/error.rs b/src/net/error.rs similarity index 100% rename from src/components/proxy/error.rs rename to src/net/error.rs diff --git a/src/net/io.rs b/src/net/io.rs new file mode 100644 index 0000000000..0b6c81a0d2 --- /dev/null +++ b/src/net/io.rs @@ -0,0 +1,51 @@ +/* + * Copyright 2024 Google LLC All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/// Allows creation of spans only when `debug_assertions` are enabled, to avoid +/// hitting the cap of 4096 threads that is unconfigurable in +/// `tracing_subscriber` -> `sharded_slab` for span ids +macro_rules! uring_span { + ($span:expr_2021) => {{ + cfg_if::cfg_if! { + if #[cfg(debug_assertions)] { + Some($span) + } else { + Option::::None + } + } + }}; +} + +use std::sync::Arc; + +use crate::Config; + +#[cfg(target_os = "linux")] +pub mod completion; +pub mod nic; +#[cfg(not(target_os = "linux"))] +pub mod poll; + +/// Represents the required arguments to run a worker task that +/// processes packets received downstream. +pub struct Listener { + /// ID of the worker. + pub worker_id: usize, + pub port: u16, + pub config: Arc, + pub sessions: Arc, + pub buffer_pool: Arc, +} diff --git a/src/net/io/completion.rs b/src/net/io/completion.rs new file mode 100644 index 0000000000..c0067fb7a0 --- /dev/null +++ b/src/net/io/completion.rs @@ -0,0 +1,2 @@ +#[cfg(target_os = "linux")] +pub mod io_uring; diff --git a/src/net/io_uring.rs b/src/net/io/completion/io_uring.rs similarity index 91% rename from src/net/io_uring.rs rename to src/net/io/completion/io_uring.rs index 8e48ca7117..1b7172fbf8 100644 --- a/src/net/io_uring.rs +++ b/src/net/io/completion/io_uring.rs @@ -20,19 +20,53 @@ //! Note there is also the QCMP loop, but that one is simpler and is different //! enough that it doesn't make sense to share the same code +use std::{ + os::fd::{AsRawFd, FromRawFd}, + sync::Arc, +}; + +use eyre::Context as _; +use io_uring::{squeue::Entry, types::Fd}; +use socket2::SockAddr; + use crate::{ collections::PoolBuffer, - components::proxy::{self, PipelineError}, metrics, - net::{PacketQueue, packet::queue::SendPacket}, + net::{PacketQueue, error::PipelineError, packet::queue::SendPacket, sessions::SessionPool}, time::UtcTimestamp, }; -use io_uring::{squeue::Entry, types::Fd}; -use socket2::SockAddr; -use std::{ - os::fd::{AsRawFd, FromRawFd}, - sync::Arc, -}; + +static SESSION_COUNTER: std::sync::atomic::AtomicUsize = std::sync::atomic::AtomicUsize::new(0); + +impl crate::net::io::Listener { + pub fn spawn(self, pending_sends: crate::net::PacketQueue) -> eyre::Result<()> { + let Self { + worker_id, + port, + config, + sessions, + buffer_pool, + } = self; + + let socket = + crate::net::DualStackLocalSocket::new(port).context("failed to bind socket")?; + + let io_loop = IoUringLoop::new(2000, socket)?; + io_loop + .spawn( + format!("packet-router-{worker_id}"), + PacketProcessorCtx::Router { + config, + sessions, + worker_id, + destinations: Vec::with_capacity(1), + }, + pending_sends, + buffer_pool, + ) + .context("failed to spawn io-uring loop") + } +} /// A simple wrapper around [eventfd](https://man7.org/linux/man-pages/man2/eventfd.2.html) /// @@ -214,12 +248,12 @@ impl LoopPacket { pub enum PacketProcessorCtx { Router { config: Arc, - sessions: Arc, + sessions: Arc, worker_id: usize, destinations: Vec, }, SessionPool { - pool: Arc, + pool: Arc, port: u16, }, } @@ -243,7 +277,7 @@ fn process_packet( } *last_received_at = Some(received_at); - let ds_packet = proxy::packet_router::DownstreamPacket { + let ds_packet = crate::net::packet::DownstreamPacket { contents: packet.buffer, source: packet.source, }; @@ -566,6 +600,30 @@ impl IoUringLoop { } } +impl SessionPool { + pub(crate) fn spawn_session( + self: Arc, + raw_socket: socket2::Socket, + port: u16, + pending_sends: crate::net::PacketQueue, + ) -> Result<(), PipelineError> { + let pool = self; + let id = SESSION_COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + let _thread_span = uring_span!(tracing::debug_span!("session", id).or_current()); + + let io_loop = + IoUringLoop::new(2000, crate::net::DualStackLocalSocket::from_raw(raw_socket))?; + let buffer_pool = pool.buffer_pool.clone(); + + io_loop.spawn( + format!("session-{id}"), + PacketProcessorCtx::SessionPool { pool, port }, + pending_sends, + buffer_pool, + ) + } +} + #[cfg(test)] mod test { use super::*; diff --git a/src/net/io/nic.rs b/src/net/io/nic.rs new file mode 100644 index 0000000000..c9c7e19fae --- /dev/null +++ b/src/net/io/nic.rs @@ -0,0 +1,2 @@ +#[cfg(target_os = "linux")] +pub mod xdp; diff --git a/src/net/xdp.rs b/src/net/io/nic/xdp.rs similarity index 100% rename from src/net/xdp.rs rename to src/net/io/nic/xdp.rs diff --git a/src/net/xdp/process.rs b/src/net/io/nic/xdp/process.rs similarity index 99% rename from src/net/xdp/process.rs rename to src/net/io/nic/xdp/process.rs index 742cbd4816..2bce81cfab 100644 --- a/src/net/xdp/process.rs +++ b/src/net/io/nic/xdp/process.rs @@ -1,10 +1,11 @@ use crate::{ - components::proxy::{PipelineError, sessions::inner_metrics as session_metrics}, filters::{self, Filter as _}, metrics::{self, AsnInfo}, net::{ EndpointAddress, + error::PipelineError, maxmind_db::{self, IpNetEntry}, + sessions::inner_metrics as session_metrics, }, time::UtcTimestamp, }; diff --git a/src/net/io/poll.rs b/src/net/io/poll.rs new file mode 100644 index 0000000000..dc18c92764 --- /dev/null +++ b/src/net/io/poll.rs @@ -0,0 +1 @@ +pub mod tokio; diff --git a/src/components/proxy/packet_router/reference.rs b/src/net/io/poll/tokio.rs similarity index 53% rename from src/components/proxy/packet_router/reference.rs rename to src/net/io/poll/tokio.rs index 7ff1664b74..69d54e3f13 100644 --- a/src/components/proxy/packet_router/reference.rs +++ b/src/net/io/poll/tokio.rs @@ -16,7 +16,35 @@ //! The reference implementation is used for non-Linux targets -impl super::DownstreamReceiveWorkerConfig { +/// On linux spawns a io-uring runtime + thread, everywhere else spawns a regular tokio task. +macro_rules! uring_spawn { + ($span:expr_2021, $future:expr_2021) => {{ + let (tx, rx) = std::sync::mpsc::channel::<()>(); + use tracing::Instrument as _; + + use tracing::instrument::WithSubscriber as _; + + let fut = async move { + let _ = tx.send(()); + $future.await + }; + + if let Some(span) = $span { + tokio::spawn(fut.instrument(span).with_current_subscriber()); + } else { + tokio::spawn(fut.with_current_subscriber()); + } + rx + }}; +} + +macro_rules! uring_inner_spawn { + ($future:expr_2021) => { + tokio::spawn($future); + }; +} + +impl crate::net::io::Listener { pub fn spawn(self, packet_queue: crate::net::PacketQueue) -> eyre::Result<()> { let Self { worker_id, @@ -109,7 +137,7 @@ impl super::DownstreamReceiveWorkerConfig { match result { Ok((_size, mut source)) => { source.set_ip(source.ip().to_canonical()); - let packet = super::DownstreamPacket { contents: buffer, source }; + let packet = crate::net::packet::DownstreamPacket { contents: buffer, source }; if let Some(last_received_at) = last_received_at { crate::metrics::packet_jitter( @@ -147,3 +175,99 @@ impl super::DownstreamReceiveWorkerConfig { Ok(()) } } + +impl crate::net::sessions::SessionPool { + pub fn spawn_session( + self: std::sync::Arc, + raw_socket: socket2::Socket, + port: u16, + pending_sends: crate::net::PacketQueue, + ) -> Result<(), crate::net::error::PipelineError> { + let pool = self; + + uring_spawn!( + uring_span!(tracing::debug_span!("session pool")), + async move { + let mut last_received_at = None; + + let socket = + std::sync::Arc::new(crate::net::DualStackLocalSocket::from_raw(raw_socket)); + let socket2 = socket.clone(); + let (tx, mut rx) = tokio::sync::oneshot::channel(); + + uring_inner_spawn!(async move { + let (pending_sends, mut sends_rx) = pending_sends; + let mut sends_double_buffer = Vec::with_capacity(pending_sends.capacity()); + + while sends_rx.changed().await.is_ok() { + if !*sends_rx.borrow() { + tracing::trace!("io loop shutdown requested"); + break; + } + + sends_double_buffer = pending_sends.swap(sends_double_buffer); + + for packet in sends_double_buffer.drain(..sends_double_buffer.len()) { + let destination = packet.destination.as_socket().unwrap(); + tracing::trace!( + %destination, + length = packet.data.len(), + "sending packet upstream" + ); + let (result, _) = socket2.send_to(packet.data, destination).await; + let asn_info = packet.asn_info.as_ref().into(); + match result { + Ok(size) => { + crate::metrics::packets_total(crate::metrics::READ, &asn_info) + .inc(); + crate::metrics::bytes_total(crate::metrics::READ, &asn_info) + .inc_by(size as u64); + } + Err(error) => { + tracing::trace!(%error, "sending packet upstream failed"); + let source = error.to_string(); + crate::metrics::errors_total( + crate::metrics::READ, + &source, + &asn_info, + ) + .inc(); + crate::metrics::packets_dropped_total( + crate::metrics::READ, + &source, + &asn_info, + ) + .inc(); + } + } + } + } + + let _ = tx.send(()); + }); + + loop { + let buf = pool.buffer_pool.clone().alloc(); + tokio::select! { + received = socket.recv_from(buf) => { + let (result, buf) = received; + match result { + Err(error) => { + tracing::trace!(%error, "error receiving packet"); + crate::metrics::errors_total(crate::metrics::WRITE, &error.to_string(), &crate::metrics::EMPTY).inc(); + }, + Ok((_size, recv_addr)) => pool.process_received_upstream_packet(buf, recv_addr, port, &mut last_received_at), + } + } + _ = &mut rx => { + tracing::debug!("Closing upstream socket loop, downstream closed"); + return; + } + } + } + } + ); + + Ok(()) + } +} diff --git a/src/net/packet.rs b/src/net/packet.rs index c9fe3ae997..9cbb1aa40a 100644 --- a/src/net/packet.rs +++ b/src/net/packet.rs @@ -1,3 +1,240 @@ +/* + * Copyright 2024 Google LLC All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + pub mod queue; +use crate::{ + Config, + filters::{Filter as _, ReadContext}, + metrics, + net::{ + PipelineError, + sessions::{SessionKey, SessionManager, SessionPool}, + }, +}; +use std::{net::SocketAddr, sync::Arc}; + pub use self::queue::{PacketQueue, PacketQueueSender, queue}; + +/// Representation of an immutable set of bytes pulled from the network, this trait +/// provides an abstraction over however the packet was received (epoll, io-uring, xdp) +/// +/// Use [`PacketMut`] if you need a mutable representation. +pub trait Packet: Sized { + /// Returns the underlying slice of bytes representing the packet. + fn as_slice(&self) -> &[u8]; + + /// Returns the size of the packet. + fn len(&self) -> usize; + + /// Returns whether the given packet is empty. + fn is_empty(&self) -> bool { + self.len() == 0 + } +} + +/// Representation of an mutable set of bytes pulled from the network, this trait +/// provides an abstraction over however the packet was received (epoll, io-uring, xdp) +pub trait PacketMut: Sized + Packet { + type FrozenPacket: Packet; + fn remove_head(&mut self, length: usize); + fn remove_tail(&mut self, length: usize); + fn extend_head(&mut self, bytes: &[u8]); + fn extend_tail(&mut self, bytes: &[u8]); + /// Returns an immutable version of the packet, this allows certain types + /// return a type that can be more cheaply cloned and shared. + fn freeze(self) -> Self::FrozenPacket; +} + +/// Packet received from local port +pub(crate) struct DownstreamPacket

{ + pub(crate) contents: P, + pub(crate) source: SocketAddr, +} + +impl DownstreamPacket

{ + #[inline] + pub(crate) fn process>( + self, + worker_id: usize, + config: &Arc, + sessions: &S, + destinations: &mut Vec, + ) { + tracing::trace!( + id = worker_id, + size = self.contents.len(), + source = %self.source, + "received packet from downstream" + ); + + let timer = metrics::processing_time(metrics::READ).start_timer(); + if let Err(error) = self.process_inner(config, sessions, destinations) { + let discriminant = error.discriminant(); + + error.inc_system_errors_total(metrics::READ, &metrics::EMPTY); + metrics::packets_dropped_total(metrics::READ, discriminant, &metrics::EMPTY).inc(); + } + + timer.stop_and_record(); + } + + /// Processes a packet by running it through the filter chain. + #[inline] + fn process_inner>( + self, + config: &Arc, + sessions: &S, + destinations: &mut Vec, + ) -> Result<(), PipelineError> { + let Some(clusters) = config + .dyn_cfg + .clusters() + .filter(|c| c.read().has_endpoints()) + else { + tracing::trace!("no upstream endpoints"); + return Err(PipelineError::NoUpstreamEndpoints); + }; + + let cm = clusters.clone_value(); + let Some(filters) = config.dyn_cfg.filters() else { + return Err(PipelineError::Filter(crate::filters::FilterError::Custom( + "no filters loaded", + ))); + }; + + #[cfg(not(debug_assertions))] + { + match self.source.ip() { + std::net::IpAddr::V4(ipv4) => { + if ipv4.is_loopback() || ipv4.is_multicast() || ipv4.is_broadcast() { + return Err(PipelineError::DisallowedSourceIP(self.source.ip())); + } + } + std::net::IpAddr::V6(ipv6) => { + if ipv6.is_loopback() || ipv6.is_multicast() { + return Err(PipelineError::DisallowedSourceIP(self.source.ip())); + } + } + } + } + + let mut context = ReadContext::new(&cm, self.source.into(), self.contents, destinations); + filters.read(&mut context).map_err(PipelineError::Filter)?; + + let ReadContext { contents, .. } = context; + + // Similar to bytes::BytesMut::freeze, we turn the mutable pool buffer + // into an immutable one with its own internal arc so it can be cloned + // cheaply and returned to the pool once all references are dropped + let contents = contents.freeze(); + + for epa in destinations.drain(0..) { + let session_key = SessionKey { + source: self.source, + dest: epa.to_socket_addr()?, + }; + + sessions.send(session_key, &contents)?; + } + + Ok(()) + } +} + +/// Spawns a background task that sits in a loop, receiving packets from the passed in socket. +/// Each received packet is placed on a queue to be processed by a worker task. +/// This function also spawns the set of worker tasks responsible for consuming packets +/// off the aforementioned queue and processing them through the filter chain and session +/// pipeline. +pub fn spawn_receivers( + config: Arc, + socket: socket2::Socket, + worker_sends: Vec, + sessions: &Arc, + buffer_pool: Arc, +) -> crate::Result<()> { + let port = crate::net::socket_port(&socket); + + for (worker_id, ws) in worker_sends.into_iter().enumerate() { + let worker = crate::net::io::Listener { + worker_id, + port, + config: config.clone(), + sessions: sessions.clone(), + buffer_pool: buffer_pool.clone(), + }; + + worker.spawn(ws)?; + } + + Ok(()) +} + +#[cfg(test)] +mod tests { + #![cfg(not(debug_assertions))] + + use quilkin_xds::locality::Locality; + + use crate::collections::BufferPool; + use crate::net::Endpoint; + use crate::test::alloc_buffer; + + use super::*; + use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; + use std::net::{SocketAddrV4, SocketAddrV6}; + + // Ensure we disallow certain source IP addresses to protect against UDP amplification attacks + #[tokio::test] + async fn disallowed_ips() { + let nl1 = Locality::with_region("nl-1"); + let endpoint = Endpoint::new((Ipv4Addr::LOCALHOST, 7777).into()); + + let config = Arc::new(Config::default_agent().cluster( + None, + Some(nl1.clone()), + [endpoint.clone()].into(), + )); + let buffer_pool = Arc::new(BufferPool::new(1, 10)); + let session_manager = SessionPool::new(config.clone(), vec![], buffer_pool.clone()); + + let packet_data: [u8; 4] = [1, 2, 3, 4]; + for ip in [ + IpAddr::V4(Ipv4Addr::LOCALHOST), + IpAddr::V4(Ipv4Addr::BROADCAST), + // multicast = 224.0.0.0/4 + IpAddr::V4(Ipv4Addr::new(224, 0, 0, 0)), + IpAddr::V4(Ipv4Addr::new(239, 255, 255, 255)), + IpAddr::V6(Ipv6Addr::LOCALHOST), + // multicast = any address starting with 0xff + IpAddr::V6(Ipv6Addr::new(0xff00, 0, 0, 0, 0, 0, 0, 0)), + ] { + let packet = DownstreamPacket { + contents: alloc_buffer(packet_data), + source: match ip { + IpAddr::V4(ipv4) => SocketAddr::V4(SocketAddrV4::new(ipv4, 0)), + IpAddr::V6(ipv6) => SocketAddr::V6(SocketAddrV6::new(ipv6, 0, 0, 0)), + }, + }; + + let mut endpoints = vec![endpoint.address.clone()]; + let res = packet.process_inner(&config, &session_manager, &mut endpoints); + + assert_eq!(res, Err(PipelineError::DisallowedSourceIP(ip))); + } + } +} diff --git a/src/net/packet/queue.rs b/src/net/packet/queue.rs index 35800c7538..20afce33a2 100644 --- a/src/net/packet/queue.rs +++ b/src/net/packet/queue.rs @@ -57,11 +57,11 @@ pub struct SendPacket { cfg_if::cfg_if! { if #[cfg(target_os = "linux")] { - pub type PacketQueueReceiver = crate::net::io_uring::EventFd; - type PacketQueueNotifier = crate::net::io_uring::EventFdWriter; + pub type PacketQueueReceiver = crate::net::io::completion::io_uring::EventFd; + type PacketQueueNotifier = crate::net::io::completion::io_uring::EventFdWriter; fn packet_queue() -> std::io::Result<(PacketQueueNotifier, PacketQueueReceiver)> { - let rx = crate::net::io_uring::EventFd::new()?; + let rx = crate::net::io::completion::io_uring::EventFd::new()?; Ok((rx.writer(), rx)) } diff --git a/src/components/proxy/sessions.rs b/src/net/sessions.rs similarity index 99% rename from src/components/proxy/sessions.rs rename to src/net/sessions.rs index 95e13d8078..326e43f8b7 100644 --- a/src/components/proxy/sessions.rs +++ b/src/net/sessions.rs @@ -44,14 +44,6 @@ pub(crate) mod inner_metrics; pub type SessionMap = crate::collections::ttl::TtlMap; -cfg_if::cfg_if! { - if #[cfg(target_os = "linux")] { - mod io_uring; - } else { - mod reference; - } -} - /// Responsible for managing sending processed traffic to its destination and /// tracking metrics and other information about the session. pub trait SessionManager { @@ -101,7 +93,7 @@ pub struct SessionPool { ports_to_sockets: RwLock>, storage: Arc>, session_map: SessionMap, - buffer_pool: Arc, + pub(super) buffer_pool: Arc, config: Arc, downstream_sends: Vec, downstream_index: atomic::AtomicUsize, diff --git a/src/components/proxy/sessions/inner_metrics.rs b/src/net/sessions/inner_metrics.rs similarity index 100% rename from src/components/proxy/sessions/inner_metrics.rs rename to src/net/sessions/inner_metrics.rs diff --git a/src/net/sessions/reference.rs b/src/net/sessions/reference.rs new file mode 100644 index 0000000000..262468c93f --- /dev/null +++ b/src/net/sessions/reference.rs @@ -0,0 +1,16 @@ +/* + * Copyright 2024 Google LLC All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +