Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

h3i: implement close trigger frames #1890

Merged
merged 1 commit into from
Jan 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions h3i/examples/content_length_mismatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,12 @@ fn main() {
},
];

let summary =
sync_client::connect(config, &actions).expect("connection failed");
// This example doesn't use close trigger frames, since we manually close the
// connection upon receiving a HEADERS frame on stream 0.
let close_trigger_frames = None;

let summary = sync_client::connect(config, &actions, close_trigger_frames)
.expect("connection failed");

println!(
"=== received connection summary! ===\n\n{}",
Expand Down
233 changes: 219 additions & 14 deletions h3i/src/client/connection_summary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use std::cmp;
use std::collections::HashMap;
use std::iter::FromIterator;

use crate::frame::CloseTriggerFrame;
use crate::frame::EnrichedHeaders;
use crate::frame::H3iFrame;

Expand Down Expand Up @@ -74,22 +75,36 @@ impl Serialize for ConnectionSummary {
self.path_stats.iter().map(SerializablePathStats).collect();
state.serialize_field("path_stats", &p)?;
state.serialize_field("error", &self.conn_close_details)?;
state.serialize_field(
"missed_close_trigger_frames",
&self.stream_map.missing_close_trigger_frames(),
)?;
state.end()
}
}

/// A read-only aggregation of frames received over a connection, mapped to the
/// stream ID over which they were received.
evanrittenhouse marked this conversation as resolved.
Show resolved Hide resolved
///
/// [`StreamMap`] also contains the [`CloseTriggerFrames`] for the connection so
/// that its state can be updated as new frames are received.
#[derive(Clone, Debug, Default, Serialize)]
pub struct StreamMap(HashMap<u64, Vec<H3iFrame>>);
pub struct StreamMap {
stream_frame_map: HashMap<u64, Vec<H3iFrame>>,
close_trigger_frames: Option<CloseTriggerFrames>,
}

impl<T> From<T> for StreamMap
where
T: IntoIterator<Item = (u64, Vec<H3iFrame>)>,
{
fn from(value: T) -> Self {
let map = HashMap::from_iter(value);
Self(map)
let stream_frame_map = HashMap::from_iter(value);

Self {
stream_frame_map,
close_trigger_frames: None,
}
}
}

Expand All @@ -113,7 +128,7 @@ impl StreamMap {
/// assert_eq!(stream_map.all_frames(), vec![headers]);
/// ```
pub fn all_frames(&self) -> Vec<H3iFrame> {
self.0
self.stream_frame_map
.values()
.flatten()
.map(Clone::clone)
Expand All @@ -140,7 +155,10 @@ impl StreamMap {
/// assert_eq!(stream_map.stream(0), vec![headers]);
/// ```
pub fn stream(&self, stream_id: u64) -> Vec<H3iFrame> {
self.0.get(&stream_id).cloned().unwrap_or_default()
self.stream_frame_map
.get(&stream_id)
.cloned()
.unwrap_or_default()
}

/// Check if a provided [`H3iFrame`] was received, regardless of what stream
Expand All @@ -155,8 +173,6 @@ impl StreamMap {
/// use quiche::h3::Header;
/// use std::iter::FromIterator;
///
/// let mut stream_map = StreamMap::default();
///
/// let h = Header::new(b"hello", b"world");
/// let headers = H3iFrame::Headers(EnrichedHeaders::from(vec![h]));
///
Expand All @@ -178,8 +194,6 @@ impl StreamMap {
/// use quiche::h3::Header;
/// use std::iter::FromIterator;
///
/// let mut stream_map = StreamMap::default();
///
/// let h = Header::new(b"hello", b"world");
/// let headers = H3iFrame::Headers(EnrichedHeaders::from(vec![h]));
///
Expand All @@ -189,7 +203,10 @@ impl StreamMap {
pub fn received_frame_on_stream(
&self, stream: u64, frame: &H3iFrame,
) -> bool {
self.0.get(&stream).map(|v| v.contains(frame)).is_some()
self.stream_frame_map
.get(&stream)
.map(|v| v.contains(frame))
.is_some()
}

/// Check if the stream map is empty, e.g., no frames were received.
Expand All @@ -213,7 +230,7 @@ impl StreamMap {
/// assert!(!stream_map.is_empty());
/// ```
pub fn is_empty(&self) -> bool {
self.0.is_empty()
self.stream_frame_map.is_empty()
}

/// See all HEADERS received on a given stream.
Expand All @@ -227,8 +244,6 @@ impl StreamMap {
/// use quiche::h3::Header;
/// use std::iter::FromIterator;
///
/// let mut stream_map = StreamMap::default();
///
/// let h = Header::new(b"hello", b"world");
/// let enriched = EnrichedHeaders::from(vec![h]);
/// let headers = H3iFrame::Headers(enriched.clone());
Expand All @@ -246,8 +261,121 @@ impl StreamMap {
.collect()
}

/// If all [`CloseTriggerFrame`]s were seen. If no triggers were expected,
/// this will return `false`.
pub fn all_close_trigger_frames_seen(&self) -> bool {
if let Some(triggers) = self.close_trigger_frames.as_ref() {
triggers.saw_all_trigger_frames()
} else {
false
}
}

/// The set of all [`CloseTriggerFrame`]s that were _not_ seen on the
/// connection. Returns `None` if
pub fn missing_close_trigger_frames(&self) -> Option<Vec<CloseTriggerFrame>> {
self.close_trigger_frames
.as_ref()
.map(|e| e.missing_triggers())
}

/// Not `pub` as users aren't expected to build their own [`StreamMap`]s.
pub(crate) fn new(close_trigger_frames: Option<CloseTriggerFrames>) -> Self {
Self {
close_trigger_frames,
..Default::default()
}
}

pub(crate) fn insert(&mut self, stream_id: u64, frame: H3iFrame) {
self.0.entry(stream_id).or_default().push(frame);
if let Some(expected) = self.close_trigger_frames.as_mut() {
expected.receive_frame(stream_id, &frame);
}

self.stream_frame_map
.entry(stream_id)
.or_default()
.push(frame);
}

/// Close a [`quiche::Connection`] with the CONNECTION_CLOSE frame specified
/// by [`CloseTriggerFrames`]. If no [`CloseTriggerFrames`] exist, this is a
/// no-op.
pub(crate) fn close_due_to_trigger_frames(
&self, qconn: &mut quiche::Connection,
) {
if let Some(ConnectionError {
is_app,
error_code,
reason,
}) = self.close_trigger_frames.as_ref().map(|tf| &tf.close_with)
{
let _ = qconn.close(*is_app, *error_code, reason);
}
}
}

/// A container for frames that h3i expects to see over a given connection. If
/// h3i receives all the frames it expects, it will send a CONNECTION_CLOSE
/// frame to the server. This bypasses the idle timeout and vastly quickens test
/// suites which depend heavily on h3i.
///
/// The specific CONNECTION_CLOSE frame can be customized by passing a
/// [`ConnectionError`] to [`Self::new_with_close`]. h3i will send an
/// application CONNECTION_CLOSE frame with error code 0x100 if this struct is
/// constructed with the [`Self::new`] constructor.
#[derive(Clone, Serialize, Debug)]
pub struct CloseTriggerFrames {
missing: Vec<CloseTriggerFrame>,
#[serde(skip)]
close_with: ConnectionError,
}

impl CloseTriggerFrames {
/// Create a new [`CloseTriggerFrames`]. If all expected frames are
/// received, h3i will close the connection with an application-level
/// CONNECTION_CLOSE frame with error code 0x100.
pub fn new(frames: Vec<CloseTriggerFrame>) -> Self {
Self::new_with_connection_close(frames, ConnectionError {
is_app: true,
error_code: quiche::h3::WireErrorCode::NoError as u64,
reason: b"saw all close trigger frames".to_vec(),
})
}

/// Create a new [`CloseTriggerFrames`] with a custom close frame. When all
/// close trigger frames are received, h3i will close the connection with
/// the level, error code, and reason from `close_with`.
pub fn new_with_connection_close(
frames: Vec<CloseTriggerFrame>, close_with: ConnectionError,
) -> Self {
Self {
missing: frames,
close_with,
}
}

fn receive_frame(&mut self, stream_id: u64, frame: &H3iFrame) {
for (i, trigger) in self.missing.iter_mut().enumerate() {
if trigger.is_equivalent(frame) && trigger.stream_id() == stream_id {
self.missing.remove(i);
break;
}
}
}

fn saw_all_trigger_frames(&self) -> bool {
self.missing.is_empty()
}

fn missing_triggers(&self) -> Vec<CloseTriggerFrame> {
self.missing.clone()
}
}

impl From<Vec<CloseTriggerFrame>> for CloseTriggerFrames {
fn from(value: Vec<CloseTriggerFrame>) -> Self {
Self::new(value)
}
}

Expand Down Expand Up @@ -404,6 +532,7 @@ impl Serialize for SerializableStats<'_> {
}

/// A wrapper to help serialize a [quiche::ConnectionError]
#[derive(Clone, Debug)]
pub struct SerializableConnectionError<'a>(&'a quiche::ConnectionError);

impl Serialize for SerializableConnectionError<'_> {
Expand All @@ -422,3 +551,79 @@ impl Serialize for SerializableConnectionError<'_> {
state.end()
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::frame::EnrichedHeaders;
use quiche::h3::Header;

fn h3i_frame() -> H3iFrame {
vec![Header::new(b"hello", b"world")].into()
}

#[test]
fn close_trigger_frame() {
let frame = h3i_frame();
let mut triggers = CloseTriggerFrames::new(vec![CloseTriggerFrame::new(
0,
frame.clone(),
)]);

triggers.receive_frame(0, &frame);

assert!(triggers.saw_all_trigger_frames());
}

#[test]
fn trigger_frame_missing() {
let frame = h3i_frame();
let expected_frames = vec![
CloseTriggerFrame::new(0, frame.clone()),
CloseTriggerFrame::new(4, frame.clone()),
CloseTriggerFrame::new(8, vec![Header::new(b"go", b"jets")]),
];
let mut expected = CloseTriggerFrames::new(expected_frames.clone());

expected.receive_frame(0, &frame);

assert!(!expected.saw_all_trigger_frames());
assert_eq!(expected.missing_triggers(), expected_frames[1..].to_vec());
}

fn stream_map_data() -> Vec<H3iFrame> {
let headers =
H3iFrame::Headers(EnrichedHeaders::from(vec![Header::new(
b"hello", b"world",
)]));
let data = H3iFrame::QuicheH3(quiche::h3::frame::Frame::Data {
payload: b"hello world".to_vec(),
});

vec![headers, data]
}

#[test]
fn test_stream_map_trigger_frames_with_none() {
let stream_map: StreamMap = vec![(0, stream_map_data())].into();
assert!(!stream_map.all_close_trigger_frames_seen());
}

#[test]
fn test_stream_map_trigger_frames() {
let data = stream_map_data();
let mut stream_map = StreamMap::new(Some(
vec![
CloseTriggerFrame::new(0, data[0].clone()),
CloseTriggerFrame::new(0, data[1].clone()),
]
.into(),
));

stream_map.insert(0, data[0].clone());
assert!(!stream_map.all_close_trigger_frames_seen());
assert_eq!(stream_map.missing_close_trigger_frames().unwrap(), vec![
CloseTriggerFrame::new(0, data[1].clone())
]);
}
}
3 changes: 2 additions & 1 deletion h3i/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ use qlog::events::h3::H3FrameParsed;
use qlog::events::h3::Http3Frame;
use qlog::events::EventData;
use qlog::streamer::QlogStreamer;
use serde::Serialize;

use quiche::h3::frame::Frame as QFrame;
use quiche::h3::Error;
Expand Down Expand Up @@ -160,7 +161,7 @@ fn handle_qlog(
}
}

#[derive(Debug)]
#[derive(Debug, Serialize)]
/// Represents different errors that can occur when [sync_client] runs.
pub enum ClientError {
/// An error during the QUIC handshake.
Expand Down
Loading
Loading