From f947b86e7d14bd957d68f46485a02fb77fb0ae3b Mon Sep 17 00:00:00 2001 From: Justin Karneges Date: Thu, 15 May 2025 13:59:15 -0700 Subject: [PATCH] add durable sse subscriptions --- Cargo.lock | 27 +++- Cargo.toml | 1 + src/admin.rs | 4 +- src/auth.rs | 72 +++++++--- src/config.rs | 8 ++ src/events.rs | 317 +++++++++++++++++++++++++++++++++++++++---- src/main.rs | 5 +- src/mqtthandler.rs | 6 +- src/mqtttransport.rs | 12 ++ src/publish.rs | 32 +++-- src/routes.rs | 7 +- 11 files changed, 425 insertions(+), 66 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f79d762..32f7bb6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -305,7 +305,7 @@ dependencies = [ "serde_urlencoded", "sha2 0.9.9", "smallvec", - "thiserror", + "thiserror 1.0.69", "time", "url", ] @@ -674,7 +674,7 @@ dependencies = [ "serde", "serde_json", "spki 0.6.0", - "thiserror", + "thiserror 1.0.69", "zeroize", ] @@ -927,6 +927,7 @@ dependencies = [ "serde", "serde_json", "sha1", + "thiserror 2.0.12", "time", ] @@ -1237,7 +1238,16 @@ version = "1.0.69" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b6aaf5339b578ea85b50e080feb250a3e8ae8cfcdff9a461c9ec2904bc923f52" dependencies = [ - "thiserror-impl", + "thiserror-impl 1.0.69", +] + +[[package]] +name = "thiserror" +version = "2.0.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "567b8a2dae586314f7be2a752ec7474332959c6460e02bde30d702a66d488708" +dependencies = [ + "thiserror-impl 2.0.12", ] [[package]] @@ -1251,6 +1261,17 @@ dependencies = [ "syn 2.0.90", ] +[[package]] +name = "thiserror-impl" +version = "2.0.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f7cf42b4507d8ea322120659672cf1b9dbb93f8f2d4ecfd6e51350ff5b17a1d" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.90", +] + [[package]] name = "time" version = "0.3.41" diff --git a/Cargo.toml b/Cargo.toml index 0de7e72..7631b92 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,6 +18,7 @@ rand = "0.9" serde = "1" serde_json = "1" sha1 = "0.10" +thiserror = "2" time = "0.3" [lints.rust] diff --git a/src/admin.rs b/src/admin.rs index 8a2752e..294eb4d 100644 --- a/src/admin.rs +++ b/src/admin.rs @@ -16,8 +16,8 @@ fn text_response(status: StatusCode, text: &str) -> Response { Response::from_status(status).with_body_text_plain(&format!("{text}\n")) } -pub fn post_keys(req: Request) -> Response { - if !req.fastly_key_is_valid() { +pub fn post_keys(fastly_authed: bool, _req: Request) -> Response { + if !fastly_authed { return text_response( StatusCode::UNAUTHORIZED, "Fastly-Key header invalid or not specified", diff --git a/src/auth.rs b/src/auth.rs index e4198a2..7d6f0ae 100644 --- a/src/auth.rs +++ b/src/auth.rs @@ -77,6 +77,26 @@ fn validate_token(token: &str, key: &[u8]) -> Result { Ok(caps) } +pub fn create_token( + readable: Vec, + writable: Vec, + key_id: &str, + key: &[u8], +) -> String { + let key = HS256Key::from_bytes(key).with_key_id(key_id); + + let claims = Claims::with_custom_claims( + CustomClaims { + x_fastly_read: readable, + x_fastly_write: writable, + }, + Duration::from_hours(1), + ); + + key.authenticate(claims) + .expect("token creation should always succeed") +} + #[derive(Debug)] pub enum AuthorizationError { Token(TokenError), @@ -92,7 +112,11 @@ impl From for AuthorizationError { } pub trait Authorizor { - fn validate_token(&self, token: &str) -> Result; + fn validate_token( + &self, + token: &str, + internal_key: Option<&[u8]>, + ) -> Result; } pub struct KVStoreAuthorizor { @@ -108,7 +132,11 @@ impl KVStoreAuthorizor { } impl Authorizor for KVStoreAuthorizor { - fn validate_token(&self, token: &str) -> Result { + fn validate_token( + &self, + token: &str, + internal_key: Option<&[u8]>, + ) -> Result { let Ok(metadata) = Token::decode_metadata(token) else { return Err(AuthorizationError::Token(TokenError::Invalid)); }; @@ -117,28 +145,40 @@ impl Authorizor for KVStoreAuthorizor { return Err(AuthorizationError::Token(TokenError::NoKeyId)); }; - let store = match kv_store::KVStore::open(&self.store_name) { - Ok(Some(store)) => store, - Ok(None) => return Err(AuthorizationError::StoreNotFound), - Err(_) => return Err(AuthorizationError::StoreError), - }; - - let v = match store.lookup(key_id) { - Ok(mut lookup) => lookup.take_body_bytes(), - Err(kv_store::KVStoreError::ItemNotFound) => { - return Err(AuthorizationError::KeyNotFound) + let key = if key_id == "internal" { + let Some(internal_key) = internal_key else { + return Err(AuthorizationError::KeyNotFound); + }; + + internal_key.to_vec() + } else { + let store = match kv_store::KVStore::open(&self.store_name) { + Ok(Some(store)) => store, + Ok(None) => return Err(AuthorizationError::StoreNotFound), + Err(_) => return Err(AuthorizationError::StoreError), + }; + + match store.lookup(key_id) { + Ok(mut lookup) => lookup.take_body_bytes(), + Err(kv_store::KVStoreError::ItemNotFound) => { + return Err(AuthorizationError::KeyNotFound) + } + Err(_) => return Err(AuthorizationError::StoreError), } - Err(_) => return Err(AuthorizationError::StoreError), }; - Ok(validate_token(token, &v)?) + Ok(validate_token(token, &key)?) } } pub struct TestAuthorizor; impl Authorizor for TestAuthorizor { - fn validate_token(&self, token: &str) -> Result { + fn validate_token( + &self, + token: &str, + _internal_key: Option<&[u8]>, + ) -> Result { Ok(validate_token(token, b"notasecret")?) } } @@ -160,7 +200,7 @@ mod tests { let key = HS256Key::from_bytes(b"notasecret"); let token = key.authenticate(claims).unwrap(); - let caps = TestAuthorizor.validate_token(&token).unwrap(); + let caps = TestAuthorizor.validate_token(&token, None).unwrap(); assert!(caps.can_subscribe("readable")); assert!(!caps.can_subscribe("foo")); assert!(caps.can_publish("writable")); diff --git a/src/config.rs b/src/config.rs index 4c9b478..7347a02 100644 --- a/src/config.rs +++ b/src/config.rs @@ -7,6 +7,7 @@ pub struct Config { pub mqtt_enabled: bool, pub admin_enabled: bool, pub publish_token: String, + pub internal_key: Vec, } impl Default for Config { @@ -17,6 +18,7 @@ impl Default for Config { mqtt_enabled: true, admin_enabled: true, publish_token: String::new(), + internal_key: Vec::new(), } } } @@ -101,6 +103,12 @@ impl Source for ConfigAndSecretStoreSource { Ok(None) => {} Err(_) => return Err(ConfigError::StoreError), } + + match store.try_get("internal-key") { + Ok(Some(v)) => config.internal_key = v.plaintext().to_vec(), + Ok(None) => {} + Err(_) => return Err(ConfigError::StoreError), + } } Ok(config) diff --git a/src/events.rs b/src/events.rs index 9db44d9..a40a5af 100644 --- a/src/events.rs +++ b/src/events.rs @@ -1,15 +1,21 @@ -use crate::auth::{AuthorizationError, Authorizor, Capabilities}; +use crate::auth::{create_token, AuthorizationError, Authorizor, Capabilities}; use crate::config::Config; use crate::publish::{publish, Sequencing, MESSAGE_SIZE_MAX}; -use crate::storage::Storage; +use crate::storage::{RetainedVersion, Storage, StorageError}; +use base64::Engine; use fastly::http::{header, StatusCode}; use fastly::{Request, Response}; -use std::collections::{HashMap, HashSet}; +use std::collections::HashMap; +use std::fmt::Write; use std::str; use std::time::Duration; +use thiserror::Error; const TOPICS_PER_REQUEST_MAX: usize = 10; +struct VersionParseError; + +#[derive(Debug, Copy, Clone)] struct Version { generation: u64, seq: u64, @@ -17,8 +23,68 @@ struct Version { impl Version { fn to_id(&self) -> String { - format!("{}-{}", self.generation, self.seq) + format!("{:16x}-{}", self.generation, self.seq) } + + fn parse(s: &str) -> Result { + let pos = match s.find('-') { + Some(pos) => pos, + None => return Err(VersionParseError), + }; + + let generation = &s[..pos]; + let seq = &s[(pos + 1)..]; + + let Ok(generation) = u64::from_str_radix(generation, 16) else { + return Err(VersionParseError); + }; + + let Ok(seq) = seq.parse() else { + return Err(VersionParseError); + }; + + Ok(Self { generation, seq }) + } +} + +#[derive(Error, Debug)] +enum GripLastError<'a> { + #[error("failed to parse grip last header: [{0}]")] + ParseHeader(&'a str), +} + +// if there is at least one Grip-Last header, this function is guaranteed +// to return at least one item or error +fn parse_grip_last(req: &Request) -> Result, GripLastError> { + let mut out = Vec::new(); + + for hvalue in req.get_header_all_str("Grip-Last") { + for value in hvalue.split(',') { + let Some(pos) = value.find(';') else { + return Err(GripLastError::ParseHeader(hvalue)); + }; + + let channel = value[..pos].trim(); + let params = &value[(pos + 1)..]; + + let Some(pos) = params.find("last-id=") else { + return Err(GripLastError::ParseHeader(hvalue)); + }; + + let remainder = ¶ms[(pos + 8)..]; + + let end = match remainder.find(';') { + Some(pos) => pos, + None => remainder.len(), + }; + + let id = remainder[..end].trim(); + + out.push((channel, id)); + } + } + + Ok(out) } fn text_response(status: StatusCode, text: &str) -> Response { @@ -33,34 +99,123 @@ fn sse_error(condition: &str, text: &str) -> Response { let data = serde_json::to_string(&data).unwrap(); - Response::from_status(StatusCode::OK) + Response::new() .with_header(header::CONTENT_TYPE, "text/event-stream") .with_body(format!("event: stream-error\ndata: {data}\n\n")) } -pub fn get(authorizor: &dyn Authorizor, req: Request) -> Response { - let topics = { - let mut topics = HashSet::new(); +pub fn get( + config: &Config, + authorizor: &dyn Authorizor, + storage: &dyn Storage, + fastly_authed: bool, + req: Request, +) -> Response { + let grip_last = match parse_grip_last(&req) { + Ok(v) => v, + Err(e) => { + println!("{e}"); + + // close + return Response::new(); + } + }; + + let is_next = !grip_last.is_empty(); + + let mut topics = HashMap::new(); + + if !grip_last.is_empty() { + for &(channel, last_id) in &grip_last { + if !channel.starts_with("d:") { + continue; + } + + let topic = &channel[2..]; + + let version = if last_id != "none" { + let Ok(version) = Version::parse(last_id) else { + println!("grip last ID not a valid version: [last_id]"); + + // close + return Response::new(); + }; + + Some(version) + } else { + None + }; + + topics.insert(topic.to_string(), version); + } + + if topics.is_empty() { + println!("no valid grip last topics"); + // close + return Response::new(); + } + } else { for (k, v) in req.get_url().query_pairs() { if k == "topic" { - topics.insert(v.to_string()); + topics.insert(v.to_string(), None); } } - topics - }; - - if topics.is_empty() { - return sse_error("bad-request", "Missing 'topic' parameter"); + if topics.is_empty() { + return sse_error("bad-request", "Missing 'topic' parameter"); + } } if topics.len() >= TOPICS_PER_REQUEST_MAX { return sse_error("bad-request", "Too many topics"); } - let caps = if req.fastly_key_is_valid() { - Capabilities::new_admin() + if grip_last.is_empty() { + let last_event_id = if let Some(s) = req.get_query_parameter("lastEventId") { + Some(s) + } else if let Some(s) = req.get_header_str("Last-Event-ID") { + Some(s) + } else { + None + }; + + if let Some(last_event_id) = last_event_id { + for part in last_event_id.split(',') { + let Some(pos) = part.find(':') else { + return sse_error("bad-request", "Last-Event-ID part missing ':'\n"); + }; + + let topic = &part[..pos]; + let version = &part[(pos + 1)..]; + + let Ok(version) = Version::parse(version) else { + return sse_error( + "bad-request", + &format!("Last-Event-ID part not a valid version: [{}]\n", version), + ); + }; + + if let Some(v) = topics.get_mut(topic) { + *v = Some(version); + } + } + } + } + + let durable = req.get_query_parameter("durable") == Some("true"); + + let (token, caps) = if fastly_authed { + let readable_topics = topics.keys().cloned().collect(); + + let token = create_token( + readable_topics, + Vec::new(), + "internal", + &config.internal_key, + ); + + (token, Capabilities::new_admin()) } else { let token = if let Some(v) = req.get_query_parameter("auth") { v @@ -88,7 +243,7 @@ pub fn get(authorizor: &dyn Authorizor, req: Request) -> Response { ); }; - match authorizor.validate_token(token) { + let caps = match authorizor.validate_token(token, Some(&config.internal_key)) { Ok(caps) => caps, Err(AuthorizationError::Token(_)) => { return sse_error("forbidden", "Invalid token"); @@ -98,16 +253,103 @@ pub fn get(authorizor: &dyn Authorizor, req: Request) -> Response { return sse_error("internal-server-error", "Auth process failed"); } - } + }; + + (token.to_string(), caps) }; - for topic in &topics { + for topic in topics.keys() { if !caps.can_subscribe(topic) { return sse_error("forbidden", &format!("Cannot subscribe to topic: {topic}")); } } - let mut resp = Response::from_status(StatusCode::OK) + let mut events = Vec::new(); + + if durable { + let mut keys: Vec = topics.keys().cloned().collect(); + keys.sort(); + + for topic in &keys { + let version = topics.get_mut(topic).unwrap(); + + let after = version.map(|v| RetainedVersion { + generation: v.generation, + seq: v.seq, + }); + + let retained = match storage.read_retained(topic, after) { + Ok(Some(r)) => r, + Ok(None) | Err(StorageError::StoreNotFound) => continue, + Err(e) => { + println!("failed to read message from storage: {:?}", e); + + return sse_error( + "internal-server-error", + "Failed to read message from storage", + ); + } + }; + + let v = Version { + generation: retained.version.generation, + seq: retained.version.seq, + }; + + *version = Some(v); + + let Some(message) = retained.message else { + continue; + }; + + let id = { + let mut parts = Vec::new(); + + for topic in &keys { + if let Some(v) = &topics[topic] { + let id = v.to_id(); + parts.push(format!("{topic}:{id}")); + } + } + + parts.join(",") + }; + + let sse_content = match str::from_utf8(&message.data) { + Ok(s) => { + let mut content = String::new(); + content.push_str("event: message\n"); + content.write_fmt(format_args!("id: {id}\n")).unwrap(); + + for line in s.split('\n') { + content.write_fmt(format_args!("data: {line}\n")).unwrap(); + } + + content.push('\n'); + + content + } + Err(_) => { + let encoded = base64::prelude::BASE64_STANDARD.encode(message.data); + + let mut content = String::new(); + content.push_str("event: message-base64\n"); + content.write_fmt(format_args!("id: {id}\n")).unwrap(); + content.push_str("data: "); + content.push_str(&encoded); + content.push_str("\n\n"); + + content + } + }; + + events.push(sse_content); + } + } + + println!("topics: {:?}", topics); + + let mut resp = Response::new() .with_header(header::CONTENT_TYPE, "text/event-stream") .with_header("Grip-Hold", "stream") .with_header( @@ -115,17 +357,44 @@ pub fn get(authorizor: &dyn Authorizor, req: Request) -> Response { "event: keep-alive\\ndata: \\n\\n; format=cstring; timeout=55", ); - for topic in topics { + for (topic, version) in &topics { resp.append_header("Grip-Channel", format!("s:{topic}")); + + if durable { + let prev_id = match version { + Some(v) => v.to_id(), + None => "none".to_string(), + }; + + resp.append_header("Grip-Channel", format!("d:{topic}; prev-id={prev_id}")); + } + } + + if durable { + resp.append_header( + "Grip-Link", + &format!("; rel=next; timeout=10"), + ); + } + + let mut body = String::new(); + + if !is_next { + body.push_str("event: stream-open\ndata: \n\n"); + } + + for s in events { + body.push_str(&s); } - resp.with_body("event: stream-open\ndata: \n\n") + resp.with_body(body) } pub fn post( config: &Config, authorizor: &dyn Authorizor, storage: &dyn Storage, + fastly_authed: bool, mut req: Request, ) -> Response { let body = req.take_body(); @@ -149,7 +418,7 @@ pub fn post( None => None, }; - let caps = if req.fastly_key_is_valid() { + let caps = if fastly_authed { Capabilities::new_admin() } else { let token = if let Some(v) = req.get_header_str(header::AUTHORIZATION) { @@ -175,7 +444,7 @@ pub fn post( return text_response(StatusCode::BAD_REQUEST, "Missing 'Authorization' header"); }; - match authorizor.validate_token(token) { + match authorizor.validate_token(token, None) { Ok(caps) => caps, Err(AuthorizationError::Token(_)) => { return text_response(StatusCode::FORBIDDEN, "Invalid token"); diff --git a/src/main.rs b/src/main.rs index d59ece1..41b46d9 100644 --- a/src/main.rs +++ b/src/main.rs @@ -13,11 +13,12 @@ fn main() -> Result<(), Error> { if local { let config_source = config::TestSource; - routes::handle_request(&config_source, &authorizor, &storage, req)?; + routes::handle_request(&config_source, &authorizor, &storage, false, req)?; } else { let config_source = config::ConfigAndSecretStoreSource::new("config", "secrets"); + let fastly_authed = req.fastly_key_is_valid(); - routes::handle_request(&config_source, &authorizor, &storage, req)?; + routes::handle_request(&config_source, &authorizor, &storage, fastly_authed, req)?; } Ok(()) diff --git a/src/mqtthandler.rs b/src/mqtthandler.rs index 2469f81..54600a6 100644 --- a/src/mqtthandler.rs +++ b/src/mqtthandler.rs @@ -24,7 +24,7 @@ pub struct Version { impl Version { pub fn to_id(&self) -> String { - format!("{}-{}", self.generation, self.seq) + format!("{:16x}-{}", self.generation, self.seq) } } @@ -141,7 +141,7 @@ fn handle_subscribe<'a>(ctx: &mut Context, p: Subscribe<'a>) -> Vec> let mut allowed = false; if let Some(s) = &ctx.state.token { - if let Ok(caps) = ctx.authorizor.validate_token(s) { + if let Ok(caps) = ctx.authorizor.validate_token(s, None) { if caps.can_subscribe(p.topic) { allowed = true; } @@ -241,7 +241,7 @@ fn handle_publish<'a>(ctx: &mut Context, p: Publish<'a>) -> Vec> { let mut allowed = false; if let Some(s) = &ctx.state.token { - if let Ok(caps) = ctx.authorizor.validate_token(s) { + if let Ok(caps) = ctx.authorizor.validate_token(s, None) { if caps.can_publish(p.topic.as_ref()) { allowed = true; } diff --git a/src/mqtttransport.rs b/src/mqtttransport.rs index d8515d8..24ff656 100644 --- a/src/mqtttransport.rs +++ b/src/mqtttransport.rs @@ -240,6 +240,12 @@ where filters, ..Default::default() }); + + cmsgs.push(ControlMessage { + ctype: "subscribe".to_string(), + channel: Some(format!("d:{topic}")), + ..Default::default() + }); } } @@ -250,6 +256,12 @@ where channel: Some(format!("s:{topic}")), ..Default::default() }); + + cmsgs.push(ControlMessage { + ctype: "unsubscribe".to_string(), + channel: Some(format!("d:{topic}")), + ..Default::default() + }); } } diff --git a/src/publish.rs b/src/publish.rs index 1deb97b..4adcd6b 100644 --- a/src/publish.rs +++ b/src/publish.rs @@ -49,9 +49,17 @@ pub fn publish( } }; - let ws_message = if sequencing.is_some() { + let mut item = if sequencing.is_some() { serde_json::json!({ - "action": "refresh" // currently the only way to reliably deliver over websockets + "channel": format!("d:{topic}"), + "formats": { + "http-stream": { + "action": "hint", // TODO: send content instead + }, + "ws-message": { + "action": "refresh", // currently the only way to reliably deliver over websockets + } + } }) } else { let mqtt_content = { @@ -70,20 +78,18 @@ pub fn publish( }; serde_json::json!({ - "content-bin": mqtt_content + "channel": format!("s:{topic}"), + "formats": { + "http-stream": { + "content": sse_content + }, + "ws-message": { + "content-bin": mqtt_content, + } + } }) }; - let mut item = serde_json::json!({ - "channel": format!("s:{topic}"), - "formats": { - "http-stream": { - "content": sse_content - }, - "ws-message": ws_message, - } - }); - if let Some(sender) = sender { item["meta"] = serde_json::json!({ "sender": sender, diff --git a/src/routes.rs b/src/routes.rs index 53ef719..5a5b97e 100644 --- a/src/routes.rs +++ b/src/routes.rs @@ -26,6 +26,7 @@ pub fn handle_request( config_source: &dyn config::Source, authorizor: &dyn auth::Authorizor, storage: &dyn storage::Storage, + fastly_authed: bool, req: Request, ) -> Result<(), Error> { let config = match config_source.config() { @@ -55,9 +56,9 @@ pub fn handle_request( return Ok(()); } - events::get(authorizor, req) + events::get(&config, authorizor, storage, fastly_authed, req) } else if req.get_method() == Method::POST && config.http_publish_enabled { - events::post(&config, authorizor, storage, req) + events::post(&config, authorizor, storage, fastly_authed, req) } else { let mut allow = "OPTIONS".to_string(); @@ -89,7 +90,7 @@ pub fn handle_request( } } else if path == "/admin/keys" && config.admin_enabled { if req.get_method() == "POST" { - admin::post_keys(req) + admin::post_keys(fastly_authed, req) } else { Response::from_status(StatusCode::METHOD_NOT_ALLOWED) .with_header(header::ALLOW, "POST")