diff --git a/packages/tauri-app/src-tauri/src/cli_manager.rs b/packages/tauri-app/src-tauri/src/cli_manager.rs index e44828f5..2ae45add 100644 --- a/packages/tauri-app/src-tauri/src/cli_manager.rs +++ b/packages/tauri-app/src-tauri/src/cli_manager.rs @@ -1,4 +1,5 @@ use crate::managed_node::resolve_bundled_node_binary; +use crate::desktop_event_transport::DesktopEventStreamConfig; use dirs::home_dir; use parking_lot::Mutex; use regex::Regex; @@ -185,12 +186,13 @@ fn kill_process_tree_windows(pid: u32, force: bool) -> bool { } fn navigate_main(app: &AppHandle, url: &str) { if let Some(win) = app.webview_windows().get("main") { - let mut display = url.to_string(); + let final_url = augment_launch_url(url); + let mut display = final_url.clone(); if let Some(hash_index) = display.find('#') { display.replace_range(hash_index + 1.., "[REDACTED]"); } log_line(&format!("navigating main to {display}")); - if let Ok(parsed) = Url::parse(url) { + if let Ok(parsed) = Url::parse(&final_url) { let _ = win.navigate(parsed); } else { log_line("failed to parse URL for navigation"); @@ -200,6 +202,23 @@ fn navigate_main(app: &AppHandle, url: &str) { } } +fn augment_launch_url(base_url: &str) -> String { + let launch_query = std::env::var("CODENOMAD_UI_LAUNCH_QUERY") + .ok() + .map(|value| value.trim().to_string()) + .filter(|value| !value.is_empty()); + + let Some(launch_query) = launch_query else { + return base_url.to_string(); + }; + + if base_url.contains('?') { + return format!("{}&{}", base_url, launch_query.trim_start_matches('?')); + } + + format!("{}?{}", base_url, launch_query.trim_start_matches('?')) +} + fn extract_cookie_value(set_cookie: &str, name: &str) -> Option { let prefix = format!("{name}="); let cookie_kv = set_cookie.split(';').next()?.trim(); @@ -298,6 +317,15 @@ fn generate_auth_cookie_name() -> String { format!("{SESSION_COOKIE_NAME_PREFIX}_{pid}_{timestamp}") } +fn generate_transport_connection_id() -> String { + let ts = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_millis(); + let tid = std::thread::current().id(); + format!("tauri-{}-{:?}", ts, tid) +} + const DEFAULT_CONFIG_PATH: &str = "~/.config/codenomad/config.json"; #[derive(Debug, Deserialize)] @@ -456,6 +484,8 @@ pub struct CliProcessManager { job: Arc>>, ready: Arc, bootstrap_token: Arc>>, + session_cookie: Arc>>, + auth_cookie_name: Arc>>, } impl CliProcessManager { @@ -467,6 +497,8 @@ impl CliProcessManager { job: Arc::new(Mutex::new(None)), ready: Arc::new(AtomicBool::new(false)), bootstrap_token: Arc::new(Mutex::new(None)), + session_cookie: Arc::new(Mutex::new(None)), + auth_cookie_name: Arc::new(Mutex::new(None)), } } @@ -475,6 +507,8 @@ impl CliProcessManager { self.stop()?; self.ready.store(false, Ordering::SeqCst); *self.bootstrap_token.lock() = None; + *self.session_cookie.lock() = None; + *self.auth_cookie_name.lock() = None; { let mut status = self.status.lock(); status.state = CliState::Starting; @@ -491,6 +525,8 @@ impl CliProcessManager { let job_arc = self.job.clone(); let ready_flag = self.ready.clone(); let token_arc = self.bootstrap_token.clone(); + let session_cookie_arc = self.session_cookie.clone(); + let auth_cookie_name_arc = self.auth_cookie_name.clone(); thread::spawn(move || { if let Err(err) = Self::spawn_cli( app.clone(), @@ -500,6 +536,8 @@ impl CliProcessManager { job_arc, ready_flag, token_arc, + session_cookie_arc, + auth_cookie_name_arc, dev, ) { log_line(&format!("cli spawn failed: {err}")); @@ -594,6 +632,7 @@ impl CliProcessManager { status.port = None; status.url = None; status.error = None; + *self.session_cookie.lock() = None; Ok(()) } @@ -602,6 +641,26 @@ impl CliProcessManager { self.status.lock().clone() } + pub fn desktop_event_stream_config(&self) -> Option { + let base_url = self.status.lock().url.clone()?; + let events_url = format!("{}/api/events", base_url.trim_end_matches('/')); + let client_id = format!("tauri-{}", std::process::id()); + let cookie_name = self + .auth_cookie_name + .lock() + .clone() + .unwrap_or_else(|| SESSION_COOKIE_NAME_PREFIX.to_string()); + + Some(DesktopEventStreamConfig { + base_url, + events_url, + client_id, + connection_id: generate_transport_connection_id(), + cookie_name, + session_cookie: self.session_cookie.lock().clone(), + }) + } + fn spawn_cli( app: AppHandle, status: Arc>, @@ -609,6 +668,8 @@ impl CliProcessManager { #[cfg(windows)] job_holder: Arc>>, ready: Arc, bootstrap_token: Arc>>, + session_cookie: Arc>>, + auth_cookie_name_holder: Arc>>, dev: bool, ) -> anyhow::Result<()> { log_line("resolving CLI entry"); @@ -619,6 +680,7 @@ impl CliProcessManager { resolution.runner, resolution.entry, host )); let auth_cookie_name = Arc::new(generate_auth_cookie_name()); + *auth_cookie_name_holder.lock() = Some(auth_cookie_name.as_str().to_string()); let args = resolution.build_args(dev, &host, auth_cookie_name.as_str()); log_line(&format!("CLI args: {:?}", args)); if dev { @@ -723,6 +785,7 @@ impl CliProcessManager { let app_clone = app.clone(); let ready_clone = ready.clone(); let token_clone = bootstrap_token.clone(); + let session_cookie_clone = session_cookie.clone(); let auth_cookie_name_clone = auth_cookie_name.clone(); thread::spawn(move || { @@ -742,6 +805,7 @@ impl CliProcessManager { let status = status_clone.clone(); let ready = ready_clone.clone(); let token = token_clone.clone(); + let session_cookie = session_cookie_clone.clone(); let auth_cookie_name = auth_cookie_name_clone.clone(); thread::spawn(move || { Self::process_stream( @@ -751,6 +815,7 @@ impl CliProcessManager { &status, &ready, &token, + &session_cookie, auth_cookie_name.as_str(), ); }); @@ -761,6 +826,7 @@ impl CliProcessManager { let status = status_clone.clone(); let ready = ready_clone.clone(); let token = token_clone.clone(); + let session_cookie = session_cookie_clone.clone(); let auth_cookie_name = auth_cookie_name_clone.clone(); thread::spawn(move || { Self::process_stream( @@ -770,6 +836,7 @@ impl CliProcessManager { &status, &ready, &token, + &session_cookie, auth_cookie_name.as_str(), ); }); @@ -894,6 +961,7 @@ impl CliProcessManager { status: &Arc>, ready: &Arc, bootstrap_token: &Arc>>, + session_cookie: &Arc>>, auth_cookie_name: &str, ) { let mut buffer = String::new(); @@ -946,6 +1014,7 @@ impl CliProcessManager { status, ready, bootstrap_token, + session_cookie, auth_cookie_name, url, ); @@ -963,6 +1032,7 @@ impl CliProcessManager { status: &Arc>, ready: &Arc, bootstrap_token: &Arc>>, + session_cookie: &Arc>>, auth_cookie_name: &str, base_url: String, ) { @@ -995,6 +1065,7 @@ impl CliProcessManager { log_line(&format!("failed to set session cookie: {err}")); navigate_main(app, &format!("{base_url}/login")); } else { + *session_cookie.lock() = Some(session_id.clone()); navigate_main(app, &base_url); } } @@ -1215,7 +1286,8 @@ fn resolve_dev_entry(_app: &AppHandle) -> Option { } fn resolve_prod_entry(_app: &AppHandle) -> Option { - let mut candidates = Vec::new(); + let base = workspace_root(); + let mut candidates = vec![base.as_ref().map(|p| p.join("packages/server/dist/bin.js"))]; if let Ok(exe) = std::env::current_exe() { if let Some(dir) = exe.parent() { @@ -1233,12 +1305,6 @@ fn resolve_prod_entry(_app: &AppHandle) -> Option { } } - let base = workspace_root(); - candidates.push( - base.as_ref() - .map(|p| p.join("packages/server/dist/bin.js")), - ); - first_existing(candidates) } diff --git a/packages/tauri-app/src-tauri/src/desktop_event_transport.rs b/packages/tauri-app/src-tauri/src/desktop_event_transport.rs new file mode 100644 index 00000000..f9b0240c --- /dev/null +++ b/packages/tauri-app/src-tauri/src/desktop_event_transport.rs @@ -0,0 +1,464 @@ +use parking_lot::Mutex; +use reqwest::blocking::{Client, Response}; +use reqwest::StatusCode; +use serde::{Deserialize, Serialize}; +use serde_json::Value; +use std::io::{BufRead, BufReader}; +use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; +use std::sync::mpsc::{self, RecvTimeoutError, SyncSender}; +use std::sync::Arc; +use std::thread; +use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; +use tauri::{AppHandle, Emitter, Manager, Url}; + +mod assembler; +mod stream; +mod transport; + +use stream::*; +use transport::*; + +const EVENT_BATCH_NAME: &str = "desktop:event-batch"; +const EVENT_STATUS_NAME: &str = "desktop:event-stream-status"; +const FLUSH_INTERVAL_MS: u64 = 16; +const DELTA_STREAM_WINDOW_MS: u64 = 48; +const MAX_BATCH_EVENTS: usize = 256; +const DEFAULT_RECONNECT_INITIAL_DELAY_MS: u64 = 1_000; +const DEFAULT_RECONNECT_MAX_DELAY_MS: u64 = 10_000; +const DEFAULT_RECONNECT_MULTIPLIER: f64 = 2.0; +const STREAM_CONNECT_TIMEOUT_MS: u64 = 5_000; +const STREAM_TCP_KEEPALIVE_MS: u64 = 30_000; +const STREAM_STALL_TIMEOUT_MS: u64 = 30_000; + +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct DesktopEventStreamConfig { + pub base_url: String, + pub events_url: String, + pub client_id: String, + pub connection_id: String, + pub cookie_name: String, + pub session_cookie: Option, +} + +#[derive(Clone, Debug, Default, Deserialize)] +#[serde(default, rename_all = "camelCase")] +pub struct DesktopEventsStartRequest { + pub reconnect: Option, +} + +#[derive(Clone, Debug, Default, Deserialize)] +#[serde(default, rename_all = "camelCase")] +pub struct DesktopEventReconnectPolicy { + pub initial_delay_ms: Option, + pub max_delay_ms: Option, + pub multiplier: Option, + pub max_attempts: Option, +} + +#[derive(Clone, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct DesktopEventsStartResult { + pub started: bool, + pub generation: Option, + pub reason: Option, +} + +#[derive(Clone, Debug, PartialEq)] +struct ResolvedDesktopEventReconnectPolicy { + initial_delay_ms: u64, + max_delay_ms: u64, + multiplier: f64, + max_attempts: Option, +} + +impl ResolvedDesktopEventReconnectPolicy { + fn resolve(policy: Option<&DesktopEventReconnectPolicy>) -> Self { + let initial_delay_ms = policy + .and_then(|value| value.initial_delay_ms) + .unwrap_or(DEFAULT_RECONNECT_INITIAL_DELAY_MS) + .max(1); + let max_delay_ms = policy + .and_then(|value| value.max_delay_ms) + .unwrap_or(DEFAULT_RECONNECT_MAX_DELAY_MS) + .max(initial_delay_ms); + let multiplier = policy + .and_then(|value| value.multiplier) + .filter(|value| value.is_finite() && *value >= 1.0) + .unwrap_or(DEFAULT_RECONNECT_MULTIPLIER); + let max_attempts = policy + .and_then(|value| value.max_attempts) + .filter(|value| *value > 0); + + Self { + initial_delay_ms, + max_delay_ms, + multiplier, + max_attempts, + } + } +} + +#[derive(Clone, Debug, PartialEq)] +struct DesktopEventTransportConfig { + stream: DesktopEventStreamConfig, + reconnect: ResolvedDesktopEventReconnectPolicy, +} + +impl DesktopEventTransportConfig { + fn new(stream: DesktopEventStreamConfig, request: &DesktopEventsStartRequest) -> Self { + Self { + stream, + reconnect: ResolvedDesktopEventReconnectPolicy::resolve(request.reconnect.as_ref()), + } + } +} + +#[derive(Clone, Serialize)] +#[serde(rename_all = "camelCase")] +struct WorkspaceEventBatchPayload { + generation: u64, + sequence: u64, + emitted_at: u128, + events: Vec, +} + +#[derive(Clone, Serialize)] +#[serde(rename_all = "camelCase")] +struct DesktopEventStreamStatusPayload { + generation: u64, + state: &'static str, + reconnect_attempt: u32, + terminal: bool, + reason: Option, + next_delay_ms: Option, + status_code: Option, + stats: DesktopEventTransportStats, +} + +#[derive(Clone, Default, Serialize)] +#[serde(rename_all = "camelCase")] +struct DesktopEventTransportStats { + raw_events: u64, + emitted_events: u64, + emitted_batches: u64, + delta_coalesces: u64, + snapshot_coalesces: u64, + status_coalesces: u64, + superseded_deltas_dropped: u64, +} + +struct DesktopEventTransportState { + stop: Option>, + config: Option, +} + +pub struct DesktopEventTransportManager { + state: Arc>, + generation: Arc, +} + +enum ReaderMessage { + Activity, + Event(Value), + Ping(Value), + End(Option), +} + +enum PendingEntry { + Delta { + key: String, + scope: String, + event: Value, + started_at: Instant, + }, + Status { + key: String, + event: Value, + }, + Snapshot { + key: String, + event: Value, + }, + Event(Value), +} + +enum EventDeliveryPolicy { + CoalesceDelta(String), + CoalesceStatus(String), + CoalesceSnapshot(String), + Passthrough, +} + +enum OpenStreamErrorKind { + Unauthorized, + Http, + Transport, +} + +struct OpenStreamError { + kind: OpenStreamErrorKind, + message: String, + status_code: Option, +} + +#[derive(Default)] +struct PendingBatch { + events: Vec, +} + +impl DesktopEventTransportManager { + pub fn new() -> Self { + Self { + state: Arc::new(Mutex::new(DesktopEventTransportState { + stop: None, + config: None, + })), + generation: Arc::new(AtomicU64::new(0)), + } + } + + pub fn start( + &self, + app: AppHandle, + stream_config: Option, + request: Option, + ) -> DesktopEventsStartResult { + let Some(stream_config) = stream_config else { + return DesktopEventsStartResult { + started: false, + generation: None, + reason: Some("desktop event stream unavailable".to_string()), + }; + }; + + let request = request.unwrap_or_default(); + let transport_config = DesktopEventTransportConfig::new(stream_config, &request); + + let mut state = self.state.lock(); + if state.config.as_ref() == Some(&transport_config) { + if let Some(stop) = &state.stop { + if !stop.load(Ordering::SeqCst) { + return DesktopEventsStartResult { + started: true, + generation: Some(self.generation.load(Ordering::SeqCst)), + reason: None, + }; + } + } + } + + if let Some(stop) = state.stop.take() { + stop.store(true, Ordering::SeqCst); + } + + let generation = self.generation.fetch_add(1, Ordering::SeqCst) + 1; + let stop = Arc::new(AtomicBool::new(false)); + state.stop = Some(stop.clone()); + state.config = Some(transport_config.clone()); + let shared_generation = self.generation.clone(); + drop(state); + + thread::spawn(move || { + run_transport_loop(app, shared_generation, generation, stop, transport_config) + }); + + DesktopEventsStartResult { + started: true, + generation: Some(generation), + reason: None, + } + } + + pub fn stop(&self) { + let mut state = self.state.lock(); + if let Some(stop) = state.stop.take() { + stop.store(true, Ordering::SeqCst); + } + state.config = None; + self.generation.fetch_add(1, Ordering::SeqCst); + } +} + +fn classify_event(event: &Value) -> EventDeliveryPolicy { + if let Some(key) = delta_key(event) { + return EventDeliveryPolicy::CoalesceDelta(key); + } + + if let Some(key) = status_key(event) { + return EventDeliveryPolicy::CoalesceStatus(key); + } + + if let Some(key) = snapshot_key(event) { + return EventDeliveryPolicy::CoalesceSnapshot(key); + } + + EventDeliveryPolicy::Passthrough +} + +fn coalesced_payload_event<'a>(event: &'a Value) -> &'a Value { + if event.get("type").and_then(Value::as_str) == Some("instance.event") { + event.get("event").unwrap_or(event) + } else { + event + } +} + +fn coalesced_instance_id(event: &Value) -> &str { + event + .get("instanceId") + .and_then(Value::as_str) + .unwrap_or_default() +} + +fn snapshot_key(event: &Value) -> Option { + let instance_id = coalesced_instance_id(event); + let inner = coalesced_payload_event(event); + let inner_type = inner.get("type")?.as_str()?; + let props = inner.get("properties")?; + + match inner_type { + "message.part.updated" => { + let session_id = props + .get("part") + .and_then(|part| part.get("sessionID").or_else(|| part.get("sessionId"))) + .and_then(Value::as_str)?; + let message_id = props + .get("part") + .and_then(|part| part.get("messageID").or_else(|| part.get("messageId"))) + .and_then(Value::as_str)?; + let part_id = props + .get("part") + .and_then(|part| part.get("id")) + .and_then(Value::as_str)?; + + Some(format!( + "message.part.updated:{}:{}:{}:{}", + instance_id, session_id, message_id, part_id + )) + } + "message.updated" => { + let info = props.get("info")?; + let session_id = info + .get("sessionID") + .or_else(|| info.get("sessionId")) + .and_then(Value::as_str)?; + let message_id = info.get("id").and_then(Value::as_str)?; + + Some(format!( + "message.updated:{}:{}:{}", + instance_id, session_id, message_id + )) + } + "session.updated" | "session.status" => { + let session_id = props + .get("info") + .and_then(|info| info.get("id")) + .and_then(Value::as_str) + .or_else(|| { + props + .get("sessionID") + .or_else(|| props.get("sessionId")) + .and_then(Value::as_str) + })?; + + Some(format!("{}:{}:{}", inner_type, instance_id, session_id)) + } + _ => None, + } +} + +fn delta_scope(event: &Value) -> Option { + let instance_id = coalesced_instance_id(event); + let inner = coalesced_payload_event(event); + if inner.get("type")?.as_str()? != "message.part.delta" { + return None; + } + + let props = inner.get("properties")?; + let session_id = props + .get("sessionID") + .or_else(|| props.get("sessionId")) + .and_then(Value::as_str) + .unwrap_or_default(); + let message_id = props + .get("messageID") + .or_else(|| props.get("messageId")) + .and_then(Value::as_str)?; + let part_id = props + .get("partID") + .or_else(|| props.get("partId")) + .and_then(Value::as_str)?; + + Some(format!( + "message.part:{}:{}:{}:{}", + instance_id, session_id, message_id, part_id + )) +} + +fn delta_key(event: &Value) -> Option { + let scope = delta_scope(event)?; + let props = coalesced_payload_event(event).get("properties")?; + let field = props.get("field")?.as_str()?; + + Some(format!("{}:{}", scope, field)) +} + +fn snapshot_superseded_delta_scope(event: &Value) -> Option { + let instance_id = coalesced_instance_id(event); + let inner = coalesced_payload_event(event); + if inner.get("type")?.as_str()? != "message.part.updated" { + return None; + } + + let part = inner.get("properties")?.get("part")?; + let session_id = part + .get("sessionID") + .or_else(|| part.get("sessionId")) + .and_then(Value::as_str)?; + let message_id = part + .get("messageID") + .or_else(|| part.get("messageId")) + .and_then(Value::as_str)?; + let part_id = part.get("id")?.as_str()?; + + Some(format!( + "message.part:{}:{}:{}:{}", + instance_id, session_id, message_id, part_id + )) +} + +fn append_delta(target: &mut Value, event: &Value) { + let next_delta = coalesced_payload_event(event) + .get("properties") + .and_then(|value| value.get("delta")) + .and_then(Value::as_str) + .unwrap_or_default(); + + if let Some(existing_delta) = coalesced_payload_event_mut(target) + .and_then(|event| event.get_mut("properties")) + .and_then(Value::as_object_mut) + .and_then(|props| props.get_mut("delta")) + { + let combined = existing_delta.as_str().unwrap_or_default().to_string() + next_delta; + *existing_delta = Value::String(combined); + } +} + +fn coalesced_payload_event_mut(event: &mut Value) -> Option<&mut serde_json::Map> { + if event.get("type").and_then(Value::as_str) == Some("instance.event") { + event.get_mut("event").and_then(Value::as_object_mut) + } else { + event.as_object_mut() + } +} + +fn status_key(event: &Value) -> Option { + match event.get("type")?.as_str()? { + "instance.eventStatus" => Some(coalesced_instance_id(event).to_string()), + "session.status" => snapshot_key(event), + _ => None, + } +} + +#[cfg(test)] +mod tests; diff --git a/packages/tauri-app/src-tauri/src/desktop_event_transport/assembler.rs b/packages/tauri-app/src-tauri/src/desktop_event_transport/assembler.rs new file mode 100644 index 00000000..f91bcb76 --- /dev/null +++ b/packages/tauri-app/src-tauri/src/desktop_event_transport/assembler.rs @@ -0,0 +1,112 @@ +use super::*; + +impl PendingBatch { + pub(super) fn push(&mut self, event: Value, stats: &mut DesktopEventTransportStats) { + match classify_event(&event) { + EventDeliveryPolicy::CoalesceDelta(key) => { + let Some(scope) = delta_scope(&event) else { + self.events.push(PendingEntry::Event(event)); + return; + }; + + if let Some(PendingEntry::Delta { + key: existing_key, + event: existing_event, + .. + }) = self.events.last_mut() + { + if existing_key == &key { + append_delta(existing_event, &event); + stats.delta_coalesces = stats.delta_coalesces.saturating_add(1); + return; + } + } + + self.events.push(PendingEntry::Delta { + key, + scope, + event, + started_at: Instant::now(), + }); + } + EventDeliveryPolicy::CoalesceStatus(key) => { + if let Some(PendingEntry::Status { + key: existing_key, + event: existing_event, + }) = self.events.last_mut() + { + if existing_key == &key { + *existing_event = event; + stats.status_coalesces = stats.status_coalesces.saturating_add(1); + return; + } + } + + self.events.push(PendingEntry::Status { key, event }); + } + EventDeliveryPolicy::CoalesceSnapshot(key) => { + if let Some(part_scope) = snapshot_superseded_delta_scope(&event) { + let mut dropped = 0_u64; + while matches!( + self.events.last(), + Some(PendingEntry::Delta { scope, .. }) if scope == &part_scope + ) { + self.events.pop(); + dropped = dropped.saturating_add(1); + } + if dropped > 0 { + stats.superseded_deltas_dropped = + stats.superseded_deltas_dropped.saturating_add(dropped); + } + } + + if let Some(PendingEntry::Snapshot { + key: existing_key, + event: existing_event, + }) = self.events.last_mut() + { + if existing_key == &key { + *existing_event = event; + stats.snapshot_coalesces = stats.snapshot_coalesces.saturating_add(1); + return; + } + } + + self.events.push(PendingEntry::Snapshot { key, event }); + } + EventDeliveryPolicy::Passthrough => { + self.events.push(PendingEntry::Event(event)); + } + } + } + + pub(super) fn take_events(&mut self) -> Vec { + let pending = std::mem::take(&mut self.events); + pending + .into_iter() + .map(|entry| match entry { + PendingEntry::Delta { event, .. } => event, + PendingEntry::Status { event, .. } => event, + PendingEntry::Snapshot { event, .. } => event, + PendingEntry::Event(event) => event, + }) + .collect() + } + + pub(super) fn is_empty(&self) -> bool { + self.events.is_empty() + } + + pub(super) fn pending_len(&self) -> usize { + self.events.len() + } + + pub(super) fn should_hold_single_delta(&self, now: Instant) -> bool { + matches!( + self.events.as_slice(), + [PendingEntry::Delta { started_at, .. }] + if now.duration_since(*started_at) + < Duration::from_millis(DELTA_STREAM_WINDOW_MS) + ) + } +} diff --git a/packages/tauri-app/src-tauri/src/desktop_event_transport/stream.rs b/packages/tauri-app/src-tauri/src/desktop_event_transport/stream.rs new file mode 100644 index 00000000..ef6c851f --- /dev/null +++ b/packages/tauri-app/src-tauri/src/desktop_event_transport/stream.rs @@ -0,0 +1,278 @@ +use super::*; +use reqwest::blocking::RequestBuilder; + +pub(super) fn build_stream_client() -> Result { + Client::builder() + .connect_timeout(Duration::from_millis(STREAM_CONNECT_TIMEOUT_MS)) + .tcp_keepalive(Duration::from_millis(STREAM_TCP_KEEPALIVE_MS)) + // Note: reqwest's blocking client doesn't expose a per-read timeout. + // The global `.timeout()` would kill the entire SSE stream, so we + // rely on: + // 1. tcp_keepalive to detect dead connections (OS will RST after + // several unacked probes, typically ~2 min). + // 2. Consumer-side stall detection (STREAM_STALL_TIMEOUT_MS). + // 3. Reader thread breaking on channel send error (consumer dropped). + .build() + .map_err(|error: reqwest::Error| OpenStreamError { + kind: OpenStreamErrorKind::Transport, + message: error.to_string(), + status_code: None, + }) +} + +pub(super) fn open_stream( + app: &AppHandle, + client: &Client, + config: &DesktopEventStreamConfig, +) -> Result { + let url = format!( + "{}?clientId={}&connectionId={}", + config.events_url, config.client_id, config.connection_id + ); + + let request = attach_session_cookie( + client.get(&url).header("Accept", "text/event-stream"), + app, + config, + ); + + let response = request.send().map_err(|error| OpenStreamError { + kind: OpenStreamErrorKind::Transport, + message: error.to_string(), + status_code: None, + })?; + + if response.status().is_success() { + return Ok(response); + } + + let status = response.status(); + let kind = if matches!(status, StatusCode::UNAUTHORIZED | StatusCode::FORBIDDEN) { + OpenStreamErrorKind::Unauthorized + } else { + OpenStreamErrorKind::Http + }; + + Err(OpenStreamError { + kind, + message: format!("desktop event stream unavailable ({status})"), + status_code: Some(status.as_u16()), + }) +} + +fn resolve_session_cookie(app: &AppHandle, config: &DesktopEventStreamConfig) -> Option { + read_session_cookie_from_webview(app, &config.base_url, &config.cookie_name) + .or_else(|| config.session_cookie.clone()) + .filter(|value| !value.is_empty()) +} + +pub(super) fn attach_session_cookie( + request: RequestBuilder, + app: &AppHandle, + config: &DesktopEventStreamConfig, +) -> RequestBuilder { + attach_session_cookie_value( + request, + &config.cookie_name, + resolve_session_cookie(app, config).as_deref(), + ) +} + +fn attach_session_cookie_value( + request: RequestBuilder, + cookie_name: &str, + session_cookie: Option<&str>, +) -> RequestBuilder { + let Some(session_cookie) = session_cookie.filter(|value| !value.is_empty()) else { + return request; + }; + + request.header("Cookie", format!("{}={}", cookie_name, session_cookie)) +} + +fn read_session_cookie_from_webview( + app: &AppHandle, + base_url: &str, + cookie_name: &str, +) -> Option { + let url = Url::parse(base_url).ok()?; + let host = url.host_str()?.to_ascii_lowercase(); + let path = url.path(); + let windows = app.webview_windows(); + let window = windows.get("main")?; + let cookies = window.cookies().ok()?; + cookies + .into_iter() + .filter(|cookie: &tauri::webview::cookie::Cookie<'static>| cookie.name() == cookie_name) + .filter(|cookie: &tauri::webview::cookie::Cookie<'static>| { + let Some(domain) = cookie.domain() else { + return true; + }; + + let normalized_domain = domain.trim_start_matches('.').to_ascii_lowercase(); + host == normalized_domain || host.ends_with(&format!(".{}", normalized_domain)) + }) + .filter(|cookie: &tauri::webview::cookie::Cookie<'static>| { + let Some(cookie_path) = cookie.path() else { + return true; + }; + + path.starts_with(cookie_path) + }) + .map(|cookie: tauri::webview::cookie::Cookie<'static>| cookie.value().to_string()) + .next() +} + +pub(super) fn read_sse( + response: Response, + tx: SyncSender, + stop: Arc, + generation_atomic: Arc, + generation: u64, +) { + let mut reader = BufReader::new(response); + let mut line = String::new(); + let mut event_name: Option = None; + let mut data_lines: Vec = Vec::new(); + + loop { + if stop.load(Ordering::SeqCst) || !generation_matches(&generation_atomic, generation) { + let _ = tx.send(ReaderMessage::End(Some("stopped".to_string()))); + return; + } + + line.clear(); + match reader.read_line(&mut line) { + Ok(0) => { + let _ = flush_sse_frame(&tx, &event_name, &data_lines); + let _ = tx.send(ReaderMessage::End(Some("stream closed".to_string()))); + return; + } + Ok(_) => { + if tx.send(ReaderMessage::Activity).is_err() { + return; // consumer dropped — stop reading + } + let trimmed = line.trim_end_matches(['\r', '\n']); + if handle_sse_line(trimmed, &mut event_name, &mut data_lines) { + if flush_sse_frame(&tx, &event_name, &data_lines).is_err() { + return; + } + event_name = None; + data_lines.clear(); + continue; + } + } + Err(error) => { + let _ = flush_sse_frame(&tx, &event_name, &data_lines); + let _ = tx.send(ReaderMessage::End(Some(error.to_string()))); + return; + } + } + } +} + +fn handle_sse_line( + trimmed: &str, + event_name: &mut Option, + data_lines: &mut Vec, +) -> bool { + if trimmed.is_empty() { + return true; + } + + if trimmed.starts_with(':') { + return false; + } + + if let Some(name) = trimmed.strip_prefix("event:") { + *event_name = Some(name.strip_prefix(' ').unwrap_or(name).to_string()); + return false; + } + + if let Some(data) = trimmed.strip_prefix("data:") { + data_lines.push(data.strip_prefix(' ').unwrap_or(data).to_string()); + } + + false +} + +fn flush_sse_frame( + tx: &SyncSender, + event_name: &Option, + lines: &[String], +) -> Result<(), ()> { + let Some(payload) = parse_sse_payload(lines) else { + return Ok(()); + }; + + if event_name.as_deref() == Some("codenomad.client.ping") { + tx.send(ReaderMessage::Ping(payload)).map_err(|_| ()) + } else { + tx.send(ReaderMessage::Event(payload)).map_err(|_| ()) + } +} + +fn parse_sse_payload(lines: &[String]) -> Option { + if lines.is_empty() { + return None; + } + + let payload = lines.join("\n").trim().to_string(); + if payload.is_empty() { + return None; + } + + serde_json::from_str::(&payload).ok() +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn named_ping_event_is_routed_to_ping_channel() { + let (tx, rx) = mpsc::sync_channel(1); + let mut event_name = None; + let mut data_lines = Vec::new(); + + assert!(!handle_sse_line( + "event: codenomad.client.ping", + &mut event_name, + &mut data_lines + )); + assert!(!handle_sse_line( + r#"data: {"ts":123}"#, + &mut event_name, + &mut data_lines + )); + assert!(handle_sse_line("", &mut event_name, &mut data_lines)); + + flush_sse_frame(&tx, &event_name, &data_lines).expect("ping frame should flush"); + + match rx.recv().expect("ping frame should be emitted") { + ReaderMessage::Ping(payload) => { + assert_eq!(payload.get("ts").and_then(Value::as_u64), Some(123)); + } + _ => panic!("expected ping frame"), + } + } + + #[test] + fn session_cookie_is_attached_to_requests() { + let request = attach_session_cookie_value( + Client::new().post("http://localhost/api/client-connections/pong"), + "codenomad_session", + Some("cookie-value"), + ) + .build() + .expect("request should build"); + + assert_eq!( + request + .headers() + .get("Cookie") + .and_then(|value| value.to_str().ok()), + Some("codenomad_session=cookie-value") + ); + } +} diff --git a/packages/tauri-app/src-tauri/src/desktop_event_transport/tests.rs b/packages/tauri-app/src-tauri/src/desktop_event_transport/tests.rs new file mode 100644 index 00000000..d9ba344b --- /dev/null +++ b/packages/tauri-app/src-tauri/src/desktop_event_transport/tests.rs @@ -0,0 +1,343 @@ +use super::*; +use serde_json::json; + +fn fresh_stats() -> DesktopEventTransportStats { + DesktopEventTransportStats::default() +} + +fn delta_event(delta: &str) -> Value { + json!({ + "type": "instance.event", + "instanceId": "inst-1", + "event": { + "type": "message.part.delta", + "properties": { + "sessionID": "sess-1", + "messageID": "msg-1", + "partID": "part-1", + "field": "text", + "delta": delta, + } + } + }) +} + +fn delta_event_for(part_id: &str, delta: &str) -> Value { + json!({ + "type": "instance.event", + "instanceId": "inst-1", + "event": { + "type": "message.part.delta", + "properties": { + "sessionID": "sess-1", + "messageID": "msg-1", + "partID": part_id, + "field": "text", + "delta": delta, + } + } + }) +} + +fn direct_delta_event(delta: &str) -> Value { + json!({ + "type": "message.part.delta", + "properties": { + "sessionID": "sess-1", + "messageID": "msg-1", + "partID": "part-1", + "field": "text", + "delta": delta, + } + }) +} + +fn direct_message_part_updated_event(text: &str) -> Value { + json!({ + "type": "message.part.updated", + "properties": { + "part": { + "id": "part-1", + "type": "text", + "text": text, + "sessionID": "sess-1", + "messageID": "msg-1" + } + } + }) +} + +fn message_part_updated_event(text: &str) -> Value { + json!({ + "type": "instance.event", + "instanceId": "inst-1", + "event": { + "type": "message.part.updated", + "properties": { + "part": { + "id": "part-1", + "type": "text", + "text": text, + "sessionID": "sess-1", + "messageID": "msg-1" + } + } + } + }) +} + +#[test] +fn coalesces_message_part_delta_events() { + let mut pending = PendingBatch::default(); + let mut stats = fresh_stats(); + pending.push(delta_event("Hello"), &mut stats); + pending.push(delta_event(" world"), &mut stats); + + let events = pending.take_events(); + assert_eq!(events.len(), 1); + assert_eq!( + events[0]["event"]["properties"]["delta"].as_str(), + Some("Hello world") + ); +} + +#[test] +fn last_write_wins_for_status_events() { + let mut pending = PendingBatch::default(); + let mut stats = fresh_stats(); + pending.push( + json!({ + "type": "instance.eventStatus", + "instanceId": "inst-1", + "status": "connecting" + }), + &mut stats, + ); + pending.push( + json!({ + "type": "instance.eventStatus", + "instanceId": "inst-1", + "status": "connected" + }), + &mut stats, + ); + + let events = pending.take_events(); + assert_eq!(events.len(), 1); + assert_eq!(events[0]["status"].as_str(), Some("connected")); +} + +#[test] +fn last_write_wins_for_consecutive_snapshot_events() { + let mut pending = PendingBatch::default(); + let mut stats = fresh_stats(); + pending.push(message_part_updated_event("Hello"), &mut stats); + pending.push(message_part_updated_event("Hello world"), &mut stats); + + let events = pending.take_events(); + assert_eq!(events.len(), 1); + assert_eq!( + events[0]["event"]["properties"]["part"]["text"].as_str(), + Some("Hello world") + ); +} + +#[test] +fn interleaved_snapshot_keys_keep_order() { + let mut pending = PendingBatch::default(); + let mut stats = fresh_stats(); + pending.push(message_part_updated_event("A1"), &mut stats); + pending.push( + json!({ + "type": "instance.event", + "instanceId": "inst-1", + "event": { + "type": "message.part.updated", + "properties": { + "part": { + "id": "part-2", + "type": "text", + "text": "B1", + "sessionID": "sess-1", + "messageID": "msg-1" + } + } + } + }), + &mut stats, + ); + pending.push(message_part_updated_event("A2"), &mut stats); + + let events = pending.take_events(); + assert_eq!(events.len(), 3); + assert_eq!( + events[0]["event"]["properties"]["part"]["id"].as_str(), + Some("part-1") + ); + assert_eq!( + events[1]["event"]["properties"]["part"]["id"].as_str(), + Some("part-2") + ); + assert_eq!( + events[2]["event"]["properties"]["part"]["text"].as_str(), + Some("A2") + ); +} + +#[test] +fn snapshot_replaces_trailing_deltas_for_same_part() { + let mut pending = PendingBatch::default(); + let mut stats = fresh_stats(); + pending.push(delta_event("Hello"), &mut stats); + pending.push(message_part_updated_event("Hello world"), &mut stats); + + let events = pending.take_events(); + assert_eq!(events.len(), 1); + assert_eq!( + events[0]["event"]["type"].as_str(), + Some("message.part.updated") + ); + assert_eq!( + events[0]["event"]["properties"]["part"]["text"].as_str(), + Some("Hello world") + ); +} + +#[test] +fn structural_events_force_coalesced_flush_before_append() { + let mut pending = PendingBatch::default(); + let mut stats = fresh_stats(); + pending.push(delta_event("Hello"), &mut stats); + pending.push( + json!({ + "type": "instance.event", + "instanceId": "inst-1", + "event": { + "type": "message.updated", + "properties": { + "id": "msg-1" + } + } + }), + &mut stats, + ); + + let events = pending.take_events(); + assert_eq!(events.len(), 2); + assert_eq!( + events[0]["event"]["type"].as_str(), + Some("message.part.delta") + ); + assert_eq!(events[1]["event"]["type"].as_str(), Some("message.updated")); +} + +#[test] +fn interleaved_delta_keys_keep_order() { + let mut pending = PendingBatch::default(); + let mut stats = fresh_stats(); + pending.push(delta_event_for("part-1", "A1"), &mut stats); + pending.push(delta_event_for("part-2", "B1"), &mut stats); + pending.push(delta_event_for("part-1", "A2"), &mut stats); + + let events = pending.take_events(); + assert_eq!(events.len(), 3); + assert_eq!( + events[0]["event"]["properties"]["partID"].as_str(), + Some("part-1") + ); + assert_eq!( + events[0]["event"]["properties"]["delta"].as_str(), + Some("A1") + ); + assert_eq!( + events[1]["event"]["properties"]["partID"].as_str(), + Some("part-2") + ); + assert_eq!( + events[1]["event"]["properties"]["delta"].as_str(), + Some("B1") + ); + assert_eq!( + events[2]["event"]["properties"]["partID"].as_str(), + Some("part-1") + ); + assert_eq!( + events[2]["event"]["properties"]["delta"].as_str(), + Some("A2") + ); +} + +#[test] +fn reconnect_delay_grows_and_caps() { + let policy = ResolvedDesktopEventReconnectPolicy { + initial_delay_ms: 100, + max_delay_ms: 500, + multiplier: 2.0, + max_attempts: None, + }; + + assert_eq!(compute_reconnect_delay_ms(1, &policy), 100); + assert_eq!(compute_reconnect_delay_ms(2, &policy), 200); + assert_eq!(compute_reconnect_delay_ms(3, &policy), 400); + assert_eq!(compute_reconnect_delay_ms(4, &policy), 500); +} + +#[test] +fn holds_single_delta_within_stream_window() { + let pending = PendingBatch { + events: vec![PendingEntry::Delta { + key: "delta-key".to_string(), + scope: "delta-scope".to_string(), + event: delta_event("Hello"), + started_at: Instant::now(), + }], + }; + + assert!(pending.should_hold_single_delta(Instant::now())); +} + +#[test] +fn flushes_single_delta_after_stream_window() { + let started_at = Instant::now() - Duration::from_millis(DELTA_STREAM_WINDOW_MS + 1); + let pending = PendingBatch { + events: vec![PendingEntry::Delta { + key: "delta-key".to_string(), + scope: "delta-scope".to_string(), + event: delta_event("Hello"), + started_at, + }], + }; + + assert!(!pending.should_hold_single_delta(Instant::now())); +} + +#[test] +fn coalesces_direct_message_part_delta_events() { + let mut pending = PendingBatch::default(); + let mut stats = fresh_stats(); + pending.push(direct_delta_event("Hello"), &mut stats); + pending.push(direct_delta_event(" world"), &mut stats); + + let events = pending.take_events(); + assert_eq!(events.len(), 1); + assert_eq!( + events[0]["properties"]["delta"].as_str(), + Some("Hello world") + ); +} + +#[test] +fn direct_snapshot_replaces_trailing_direct_deltas_for_same_part() { + let mut pending = PendingBatch::default(); + let mut stats = fresh_stats(); + pending.push(direct_delta_event("Hello"), &mut stats); + pending.push(direct_message_part_updated_event("Hello world"), &mut stats); + + let events = pending.take_events(); + assert_eq!(events.len(), 1); + assert_eq!(events[0]["type"].as_str(), Some("message.part.updated")); + assert_eq!( + events[0]["properties"]["part"]["text"].as_str(), + Some("Hello world") + ); +} diff --git a/packages/tauri-app/src-tauri/src/desktop_event_transport/transport.rs b/packages/tauri-app/src-tauri/src/desktop_event_transport/transport.rs new file mode 100644 index 00000000..5f0ed231 --- /dev/null +++ b/packages/tauri-app/src-tauri/src/desktop_event_transport/transport.rs @@ -0,0 +1,428 @@ +use super::*; + +fn send_connection_pong( + app: &AppHandle, + client: &Client, + config: &DesktopEventStreamConfig, + payload: &Value, +) { + let body = serde_json::json!({ + "clientId": config.client_id, + "connectionId": config.connection_id, + "pingTs": payload.get("ts").and_then(Value::as_u64), + }); + + let request = client + .post(format!( + "{}/api/client-connections/pong", + config.base_url.trim_end_matches('/') + )) + .json(&body); + + let _ = attach_session_cookie(request, app, config).send(); +} + +pub(super) fn run_transport_loop( + app: AppHandle, + generation_atomic: Arc, + generation: u64, + stop: Arc, + config: DesktopEventTransportConfig, +) { + let mut reconnect_attempt = 0_u32; + let mut stats = DesktopEventTransportStats::default(); + + let client = match build_stream_client() { + Ok(client) => client, + Err(error) => { + emit_status( + &app, + generation, + "error", + 0, + true, + Some(error.message), + None, + None, + &stats, + ); + return; + } + }; + + loop { + if stop.load(Ordering::SeqCst) || !generation_matches(&generation_atomic, generation) { + break; + } + + emit_status( + &app, + generation, + "connecting", + reconnect_attempt, + false, + None, + None, + None, + &stats, + ); + + match open_stream(&app, &client, &config.stream) { + Ok(response) => { + reconnect_attempt = 0; + emit_status( + &app, + generation, + "connected", + reconnect_attempt, + false, + None, + None, + None, + &stats, + ); + + let disconnect_reason = consume_stream( + &app, + &client, + &config.stream, + response, + &generation_atomic, + generation, + stop.clone(), + &mut stats, + ); + if stop.load(Ordering::SeqCst) + || !generation_matches(&generation_atomic, generation) + { + break; + } + + if !schedule_retry( + &app, + &generation_atomic, + generation, + stop.clone(), + &config.reconnect, + &mut reconnect_attempt, + "disconnected", + disconnect_reason, + None, + &stats, + ) { + break; + } + } + Err(error) => { + let state_name = match error.kind { + OpenStreamErrorKind::Unauthorized => "unauthorized", + OpenStreamErrorKind::Http | OpenStreamErrorKind::Transport => "error", + }; + + if !schedule_retry( + &app, + &generation_atomic, + generation, + stop.clone(), + &config.reconnect, + &mut reconnect_attempt, + state_name, + Some(error.message), + error.status_code, + &stats, + ) { + break; + } + } + } + } + + emit_status( + &app, + generation, + "stopped", + reconnect_attempt, + true, + None, + None, + None, + &stats, + ); +} + +fn schedule_retry( + app: &AppHandle, + generation_atomic: &Arc, + generation: u64, + stop: Arc, + policy: &ResolvedDesktopEventReconnectPolicy, + reconnect_attempt: &mut u32, + state_name: &'static str, + reason: Option, + status_code: Option, + stats: &DesktopEventTransportStats, +) -> bool { + *reconnect_attempt = reconnect_attempt.saturating_add(1); + let terminal = policy + .max_attempts + .map(|max_attempts| *reconnect_attempt >= max_attempts) + .unwrap_or(false); + let next_delay_ms = if terminal { + None + } else { + Some(compute_reconnect_delay_ms(*reconnect_attempt, policy)) + }; + + emit_status( + app, + generation, + state_name, + *reconnect_attempt, + terminal, + reason, + next_delay_ms, + status_code, + stats, + ); + + if terminal { + return false; + } + + if let Some(delay_ms) = next_delay_ms { + wait_with_cancellation(generation_atomic, generation, stop, delay_ms); + } + + true +} + +fn wait_with_cancellation( + generation_atomic: &Arc, + generation: u64, + stop: Arc, + delay_ms: u64, +) { + let mut remaining_ms = delay_ms; + while remaining_ms > 0 { + if stop.load(Ordering::SeqCst) || !generation_matches(generation_atomic, generation) { + return; + } + + let chunk_ms = remaining_ms.min(100); + thread::sleep(Duration::from_millis(chunk_ms)); + remaining_ms -= chunk_ms; + } +} + +fn consume_stream( + app: &AppHandle, + client: &Client, + stream_config: &DesktopEventStreamConfig, + response: Response, + generation_atomic: &Arc, + generation: u64, + stop: Arc, + stats: &mut DesktopEventTransportStats, +) -> Option { + let (tx, rx) = mpsc::sync_channel::(4096); + let reader_stop = stop.clone(); + let reader_generation_atomic = generation_atomic.clone(); + thread::spawn(move || { + read_sse( + response, + tx, + reader_stop, + reader_generation_atomic, + generation, + ) + }); + + let mut pending = PendingBatch::default(); + let mut sequence = 0_u64; + let mut last_reader_activity = Instant::now(); + + loop { + if stop.load(Ordering::SeqCst) || !generation_matches(generation_atomic, generation) { + return Some("stopped".to_string()); + } + + match rx.recv_timeout(Duration::from_millis(FLUSH_INTERVAL_MS)) { + Ok(ReaderMessage::Activity) => { + last_reader_activity = Instant::now(); + } + Ok(ReaderMessage::Ping(payload)) => { + last_reader_activity = Instant::now(); + send_connection_pong(app, client, stream_config, &payload); + } + Ok(ReaderMessage::Event(event)) => { + last_reader_activity = Instant::now(); + stats.raw_events = stats.raw_events.saturating_add(1); + + pending.push(event, stats); + if pending.pending_len() >= MAX_BATCH_EVENTS { + emit_pending_batch( + app, + generation, + &mut pending, + &mut sequence, + generation_atomic, + stats, + ); + } + } + Ok(ReaderMessage::End(reason)) => { + if !pending.is_empty() { + emit_pending_batch( + app, + generation, + &mut pending, + &mut sequence, + generation_atomic, + stats, + ); + } + return reason; + } + Err(RecvTimeoutError::Timeout) => { + if last_reader_activity.elapsed() >= Duration::from_millis(STREAM_STALL_TIMEOUT_MS) + { + if !pending.is_empty() { + sequence += 1; + emit_batch( + app, + generation, + &mut pending, + sequence, + generation_atomic, + stats, + ); + } + return Some("stream stalled".to_string()); + } + + if !pending.is_empty() { + if pending.should_hold_single_delta(Instant::now()) { + continue; + } + emit_pending_batch( + app, + generation, + &mut pending, + &mut sequence, + generation_atomic, + stats, + ); + } + } + Err(RecvTimeoutError::Disconnected) => { + if !pending.is_empty() { + emit_pending_batch( + app, + generation, + &mut pending, + &mut sequence, + generation_atomic, + stats, + ); + } + return Some("reader disconnected".to_string()); + } + } + } +} + +fn emit_pending_batch( + app: &AppHandle, + generation: u64, + pending: &mut PendingBatch, + sequence: &mut u64, + generation_atomic: &Arc, + stats: &mut DesktopEventTransportStats, +) { + if pending.is_empty() { + return; + } + + *sequence += 1; + emit_batch( + app, + generation, + pending, + *sequence, + generation_atomic, + stats, + ); +} + +fn emit_batch( + app: &AppHandle, + generation: u64, + pending: &mut PendingBatch, + sequence: u64, + generation_atomic: &Arc, + stats: &mut DesktopEventTransportStats, +) { + if !generation_matches(generation_atomic, generation) { + return; + } + + let events = pending.take_events(); + if events.is_empty() { + return; + } + + stats.emitted_batches = stats.emitted_batches.saturating_add(1); + stats.emitted_events = stats.emitted_events.saturating_add(events.len() as u64); + + let _ = app.emit( + EVENT_BATCH_NAME, + WorkspaceEventBatchPayload { + generation, + sequence, + emitted_at: SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_millis(), + events, + }, + ); +} + +fn emit_status( + app: &AppHandle, + generation: u64, + state_name: &'static str, + reconnect_attempt: u32, + terminal: bool, + reason: Option, + next_delay_ms: Option, + status_code: Option, + stats: &DesktopEventTransportStats, +) { + let _ = app.emit( + EVENT_STATUS_NAME, + DesktopEventStreamStatusPayload { + generation, + state: state_name, + reconnect_attempt, + terminal, + reason, + next_delay_ms, + status_code, + stats: stats.clone(), + }, + ); +} + +pub(super) fn generation_matches(generation_atomic: &Arc, generation: u64) -> bool { + generation_atomic.load(Ordering::SeqCst) == generation +} + +pub(super) fn compute_reconnect_delay_ms( + attempt: u32, + policy: &ResolvedDesktopEventReconnectPolicy, +) -> u64 { + let exponent = attempt.saturating_sub(1) as i32; + let scaled = (policy.initial_delay_ms as f64) * policy.multiplier.powi(exponent); + (scaled.round().max(policy.initial_delay_ms as f64) as u64).min(policy.max_delay_ms) +} diff --git a/packages/tauri-app/src-tauri/src/main.rs b/packages/tauri-app/src-tauri/src/main.rs index 43fccc43..7ad5d7c2 100644 --- a/packages/tauri-app/src-tauri/src/main.rs +++ b/packages/tauri-app/src-tauri/src/main.rs @@ -3,11 +3,13 @@ #[allow(dead_code)] mod cert_manager; mod cli_manager; +mod desktop_event_transport; #[cfg(target_os = "linux")] mod linux_tls; mod managed_node; use cli_manager::{CliProcessManager, CliStatus}; +use desktop_event_transport::{DesktopEventTransportManager, DesktopEventsStartRequest, DesktopEventsStartResult}; use keepawake::KeepAwake; use serde::Deserialize; use serde_json::json; @@ -49,6 +51,7 @@ const WINDOWS_APP_USER_MODEL_ID: &str = "ai.neuralnomads.codenomad.client"; pub struct AppState { pub manager: CliProcessManager, + pub desktop_events: DesktopEventTransportManager, pub wake_lock: Mutex>, pub zoom_level: Mutex, pub remote_origins: Mutex>, @@ -133,6 +136,7 @@ fn cli_get_status(state: tauri::State) -> CliStatus { #[tauri::command] fn cli_restart(app: AppHandle, state: tauri::State) -> Result { let dev_mode = is_dev_mode(); + state.desktop_events.stop(); state.manager.stop().map_err(|e| e.to_string())?; state .manager @@ -141,6 +145,21 @@ fn cli_restart(app: AppHandle, state: tauri::State) -> Result, + request: Option, +) -> DesktopEventsStartResult { + let config = state.manager.desktop_event_stream_config(); + state.desktop_events.start(app, config, request) +} + +#[tauri::command] +fn desktop_events_stop(state: tauri::State) { + state.desktop_events.stop(); +} + #[tauri::command] fn wake_lock_start( state: tauri::State, @@ -563,6 +582,7 @@ fn main() { .plugin(navigation_guard) .manage(AppState { manager: CliProcessManager::new(), + desktop_events: DesktopEventTransportManager::new(), wake_lock: Mutex::new(None), zoom_level: Mutex::new(DEFAULT_ZOOM_LEVEL), remote_origins: Mutex::new(HashMap::new()), @@ -617,6 +637,8 @@ fn main() { .invoke_handler(tauri::generate_handler![ cli_get_status, cli_restart, + desktop_events_start, + desktop_events_stop, wake_lock_start, wake_lock_stop, needs_local_certificate_install, @@ -722,6 +744,7 @@ fn main() { let app = app_handle.clone(); std::thread::spawn(move || { if let Some(state) = app.try_state::() { + state.desktop_events.stop(); let _ = state.manager.stop(); } app.exit(0); @@ -773,6 +796,7 @@ fn main() { let app = app_handle.clone(); std::thread::spawn(move || { if let Some(state) = app.try_state::() { + state.desktop_events.stop(); let _ = state.manager.stop(); } app.exit(0); diff --git a/packages/ui/src/App.tsx b/packages/ui/src/App.tsx index 2aa430fc..9db0d2bd 100644 --- a/packages/ui/src/App.tsx +++ b/packages/ui/src/App.tsx @@ -77,6 +77,8 @@ const App: Component = () => { const { t } = useI18n() const { preferences, + useTauriNativeEventTransport, + setUseTauriNativeEventTransport, serverSettings, recordWorkspaceLaunch, toggleShowThinkingBlocks, @@ -444,6 +446,8 @@ const App: Component = () => { const { commands: paletteCommands, executeCommand } = useCommands({ preferences, + useTauriNativeEventTransport, + setUseTauriNativeEventTransport, toggleAutoCleanupBlankSessions, toggleShowThinkingBlocks, toggleKeyboardShortcutHints, diff --git a/packages/ui/src/components/settings/appearance-settings-section.tsx b/packages/ui/src/components/settings/appearance-settings-section.tsx index 281366d6..cfb03981 100644 --- a/packages/ui/src/components/settings/appearance-settings-section.tsx +++ b/packages/ui/src/components/settings/appearance-settings-section.tsx @@ -17,6 +17,8 @@ export const AppearanceSettingsSection: Component = () => { const { themeMode, setThemeMode } = useTheme() const { preferences, + useTauriNativeEventTransport, + setUseTauriNativeEventTransport, updatePreferences, toggleShowThinkingBlocks, toggleKeyboardShortcutHints, @@ -36,6 +38,8 @@ export const AppearanceSettingsSection: Component = () => { const behaviorSettings = createMemo(() => getBehaviorSettings({ preferences, + useTauriNativeEventTransport, + setUseTauriNativeEventTransport, updatePreferences, toggleShowThinkingBlocks, toggleKeyboardShortcutHints, diff --git a/packages/ui/src/lib/desktop-event-transport-preference.ts b/packages/ui/src/lib/desktop-event-transport-preference.ts new file mode 100644 index 00000000..1ee0ebcb --- /dev/null +++ b/packages/ui/src/lib/desktop-event-transport-preference.ts @@ -0,0 +1,25 @@ +export const TAURI_NATIVE_EVENT_TRANSPORT_STORAGE_KEY = "codenomad-use-tauri-native-event-transport" + +export function readUseTauriNativeEventTransportPreference(): boolean { + if (typeof window === "undefined") { + return true + } + + try { + return window.localStorage?.getItem(TAURI_NATIVE_EVENT_TRANSPORT_STORAGE_KEY) !== "0" + } catch { + return true + } +} + +export function writeUseTauriNativeEventTransportPreference(enabled: boolean): void { + if (typeof window === "undefined") { + return + } + + try { + window.localStorage?.setItem(TAURI_NATIVE_EVENT_TRANSPORT_STORAGE_KEY, enabled ? "1" : "0") + } catch { + // Ignore localStorage failures and keep the in-memory preference only. + } +} diff --git a/packages/ui/src/lib/event-transport-contract.ts b/packages/ui/src/lib/event-transport-contract.ts new file mode 100644 index 00000000..e4d91629 --- /dev/null +++ b/packages/ui/src/lib/event-transport-contract.ts @@ -0,0 +1,62 @@ +export interface DesktopEventTransportReconnectPolicy { + initialDelayMs: number + maxDelayMs: number + multiplier: number + maxAttempts?: number +} + +export interface DesktopEventTransportStartOptions { + reconnect?: Partial +} + +export type DesktopEventTransportState = + | "connecting" + | "connected" + | "disconnected" + | "unauthorized" + | "error" + | "stopped" + +export interface DesktopEventTransportStats { + rawEvents: number + emittedEvents: number + emittedBatches: number + deltaCoalesces: number + snapshotCoalesces: number + statusCoalesces: number + supersededDeltasDropped: number +} + +export interface DesktopEventTransportStatusPayload { + generation: number + state: DesktopEventTransportState + reconnectAttempt: number + terminal: boolean + reason?: string + nextDelayMs?: number + statusCode?: number + stats?: DesktopEventTransportStats +} + +export interface DesktopEventsStartResult { + started: boolean + generation?: number + reason?: string +} + +export const DEFAULT_DESKTOP_EVENT_RECONNECT_POLICY: DesktopEventTransportReconnectPolicy = { + initialDelayMs: 1000, + maxDelayMs: 10000, + multiplier: 2, +} + +export function resolveDesktopEventTransportStartOptions( + options?: DesktopEventTransportStartOptions, +): Required { + return { + reconnect: { + ...DEFAULT_DESKTOP_EVENT_RECONNECT_POLICY, + ...options?.reconnect, + }, + } +} diff --git a/packages/ui/src/lib/event-transport.ts b/packages/ui/src/lib/event-transport.ts new file mode 100644 index 00000000..3c13c3d4 --- /dev/null +++ b/packages/ui/src/lib/event-transport.ts @@ -0,0 +1,70 @@ +import type { WorkspaceEventPayload } from "../../../server/src/api-types" +import { serverApi } from "./api-client" +import { + resolveDesktopEventTransportStartOptions, + type DesktopEventTransportStartOptions, +} from "./event-transport-contract" +import { readUseTauriNativeEventTransportPreference } from "./desktop-event-transport-preference" +import { getLogger } from "./logger" +import { runtimeEnv } from "./runtime-env" +import { connectTauriWorkspaceEvents } from "./native/desktop-events" + +const log = getLogger("sse") + +export interface WorkspaceEventTransportCallbacks { + onBatch: (events: WorkspaceEventPayload[]) => void + onError?: () => void + onOpen?: () => void + onPing?: (payload: { ts?: number }) => void +} + +export interface WorkspaceEventConnection { + disconnect: () => void +} + +async function connectBrowserWorkspaceEvents( + callbacks: WorkspaceEventTransportCallbacks, +): Promise { + const source = serverApi.connectEvents((event) => { + callbacks.onBatch([event]) + }, callbacks.onError, callbacks.onPing) + source.onopen = () => callbacks.onOpen?.() + return { + disconnect() { + source.close() + }, + } +} + +export async function connectWorkspaceEvents( + callbacks: WorkspaceEventTransportCallbacks, + options?: DesktopEventTransportStartOptions, +): Promise { + const nativeDesktopTransportEnabled = readUseTauriNativeEventTransportPreference() + + if (runtimeEnv.host === "tauri" && nativeDesktopTransportEnabled) { + try { + const conn = await connectTauriWorkspaceEvents( + callbacks, + resolveDesktopEventTransportStartOptions(options), + ) + log.info("Event transport: rust-native (desktop_event_transport)") + return conn + } catch (error) { + log.warn("Failed to start native desktop event transport, falling back to browser EventSource", error) + } + } else if (runtimeEnv.host === "tauri") { + log.info("Event transport: browser-eventsource forced by settings") + } + + log.info(`Event transport: browser-eventsource (host=${runtimeEnv.host})`) + return connectBrowserWorkspaceEvents(callbacks) +} + +export type { + DesktopEventsStartResult, + DesktopEventTransportReconnectPolicy, + DesktopEventTransportStartOptions, + DesktopEventTransportState, + DesktopEventTransportStatusPayload, +} from "./event-transport-contract" diff --git a/packages/ui/src/lib/hooks/use-commands.ts b/packages/ui/src/lib/hooks/use-commands.ts index 160cd87f..734ceb4e 100644 --- a/packages/ui/src/lib/hooks/use-commands.ts +++ b/packages/ui/src/lib/hooks/use-commands.ts @@ -29,6 +29,8 @@ function splitKeywords(key: string): string[] { export interface UseCommandsOptions { preferences: Accessor + useTauriNativeEventTransport: Accessor + setUseTauriNativeEventTransport: (next: boolean) => void toggleShowThinkingBlocks: () => void toggleKeyboardShortcutHints: () => void toggleShowMessageTimeline: () => void @@ -419,6 +421,8 @@ export function useCommands(options: UseCommandsOptions) { registerBehaviorCommands((command) => commandRegistry.register(command), { preferences: options.preferences, + useTauriNativeEventTransport: options.useTauriNativeEventTransport, + setUseTauriNativeEventTransport: options.setUseTauriNativeEventTransport, toggleShowThinkingBlocks: options.toggleShowThinkingBlocks, toggleKeyboardShortcutHints: options.toggleKeyboardShortcutHints, toggleShowMessageTimeline: options.toggleShowMessageTimeline, diff --git a/packages/ui/src/lib/i18n/messages/en/settings.ts b/packages/ui/src/lib/i18n/messages/en/settings.ts index 79c27144..b39c671b 100644 --- a/packages/ui/src/lib/i18n/messages/en/settings.ts +++ b/packages/ui/src/lib/i18n/messages/en/settings.ts @@ -156,6 +156,8 @@ export const settingsMessages = { "settings.behavior.autoCleanup.subtitle": "Automatically clean up blank sessions when creating new ones.", "settings.behavior.keepUnseenSubagentIdle.title": "Keep subagent idle markers", "settings.behavior.keepUnseenSubagentIdle.subtitle": "Keep subagent idle markers visible until viewed instead of hiding them after 5 seconds.", + "settings.behavior.tauriNativeEventTransport.title": "Native Tauri event transport", + "settings.behavior.tauriNativeEventTransport.subtitle": "Use the Rust-native desktop event transport in Tauri. Disable this to fall back to the browser EventSource path.", "settings.behavior.promptVoiceInput.title": "Prompt voice input", "settings.behavior.promptVoiceInput.subtitle": "Show the microphone control for speech-to-text prompt input when speech is configured.", "settings.behavior.promptSubmit.title": "Enter to submit", diff --git a/packages/ui/src/lib/i18n/messages/es/settings.ts b/packages/ui/src/lib/i18n/messages/es/settings.ts index 44a6bacd..a7970b5c 100644 --- a/packages/ui/src/lib/i18n/messages/es/settings.ts +++ b/packages/ui/src/lib/i18n/messages/es/settings.ts @@ -155,6 +155,8 @@ export const settingsMessages = { "settings.behavior.autoCleanup.subtitle": "Limpia automaticamente las sesiones en blanco al crear nuevas.", "settings.behavior.keepUnseenSubagentIdle.title": "Mantener marcadores idle de subagentes", "settings.behavior.keepUnseenSubagentIdle.subtitle": "Mantiene visibles los marcadores idle de subagentes hasta verlos, en lugar de ocultarlos despues de 5 segundos.", + "settings.behavior.tauriNativeEventTransport.title": "Transporte de eventos nativo de Tauri", + "settings.behavior.tauriNativeEventTransport.subtitle": "Usa el transporte de eventos de escritorio nativo en Rust dentro de Tauri. Desactivalo para volver a la ruta browser EventSource.", "settings.behavior.promptVoiceInput.title": "Prompt voice input", "settings.behavior.promptVoiceInput.subtitle": "Show the microphone control for speech-to-text prompt input when speech is configured.", "settings.behavior.promptSubmit.title": "Enter para enviar", diff --git a/packages/ui/src/lib/i18n/messages/fr/settings.ts b/packages/ui/src/lib/i18n/messages/fr/settings.ts index a462271f..69f4942b 100644 --- a/packages/ui/src/lib/i18n/messages/fr/settings.ts +++ b/packages/ui/src/lib/i18n/messages/fr/settings.ts @@ -155,6 +155,8 @@ export const settingsMessages = { "settings.behavior.autoCleanup.subtitle": "Nettoyer automatiquement les sessions vides lors de la creation de nouvelles.", "settings.behavior.keepUnseenSubagentIdle.title": "Garder les marqueurs inactifs des sous-agents", "settings.behavior.keepUnseenSubagentIdle.subtitle": "Garde les marqueurs inactifs des sous-agents visibles jusqu'a consultation au lieu de les masquer apres 5 secondes.", + "settings.behavior.tauriNativeEventTransport.title": "Transport d'evenements natif Tauri", + "settings.behavior.tauriNativeEventTransport.subtitle": "Utiliser le transport d'evenements desktop natif Rust dans Tauri. Desactivez-le pour revenir au chemin browser EventSource.", "settings.behavior.promptVoiceInput.title": "Prompt voice input", "settings.behavior.promptVoiceInput.subtitle": "Show the microphone control for speech-to-text prompt input when speech is configured.", "settings.behavior.promptSubmit.title": "Entrer pour envoyer", diff --git a/packages/ui/src/lib/i18n/messages/he/settings.ts b/packages/ui/src/lib/i18n/messages/he/settings.ts index b0c0e7f9..57a322ac 100644 --- a/packages/ui/src/lib/i18n/messages/he/settings.ts +++ b/packages/ui/src/lib/i18n/messages/he/settings.ts @@ -154,6 +154,8 @@ export const settingsMessages = { "settings.behavior.autoCleanup.subtitle": "נקה אוטומטית סשנים ריקים בעת יצירת סשנים חדשים.", "settings.behavior.keepUnseenSubagentIdle.title": "השאר סמני idle של תתי-סוכנים", "settings.behavior.keepUnseenSubagentIdle.subtitle": "השאר סמני idle של תתי-סוכנים גלויים עד צפייה במקום להסתיר אותם אחרי 5 שניות.", + "settings.behavior.tauriNativeEventTransport.title": "תעבורת אירועים מקורית של Tauri", + "settings.behavior.tauriNativeEventTransport.subtitle": "השתמש בתעבורת האירועים השולחנית המקורית ב-Rust בתוך Tauri. כבה זאת כדי לחזור למסלול browser EventSource.", "settings.behavior.promptVoiceInput.title": "קלט קולי לפרומפט", "settings.behavior.promptVoiceInput.subtitle": "הצג את כפתור המיקרופון לקלט דיבור-לטקסט כאשר תכונת הקול מוגדרת.", "settings.behavior.promptSubmit.title": "Enter לשליחה", diff --git a/packages/ui/src/lib/i18n/messages/ja/settings.ts b/packages/ui/src/lib/i18n/messages/ja/settings.ts index c69dd402..58675d02 100644 --- a/packages/ui/src/lib/i18n/messages/ja/settings.ts +++ b/packages/ui/src/lib/i18n/messages/ja/settings.ts @@ -155,6 +155,8 @@ export const settingsMessages = { "settings.behavior.autoCleanup.subtitle": "新しいセッション作成時に空のセッションを自動的にクリーンアップします。", "settings.behavior.keepUnseenSubagentIdle.title": "サブエージェントの idle マーカーを保持", "settings.behavior.keepUnseenSubagentIdle.subtitle": "サブエージェントの idle マーカーを 5 秒後に隠さず、表示するまで残します。", + "settings.behavior.tauriNativeEventTransport.title": "Tauri ネイティブイベント転送", + "settings.behavior.tauriNativeEventTransport.subtitle": "Tauri で Rust ネイティブのデスクトップイベント転送を使います。無効にすると browser EventSource 経路に戻ります。", "settings.behavior.promptVoiceInput.title": "Prompt voice input", "settings.behavior.promptVoiceInput.subtitle": "Show the microphone control for speech-to-text prompt input when speech is configured.", "settings.behavior.promptSubmit.title": "Enterで送信", diff --git a/packages/ui/src/lib/i18n/messages/zh-Hans/settings.ts b/packages/ui/src/lib/i18n/messages/zh-Hans/settings.ts index 613a5ca5..bfc0cc3d 100644 --- a/packages/ui/src/lib/i18n/messages/zh-Hans/settings.ts +++ b/packages/ui/src/lib/i18n/messages/zh-Hans/settings.ts @@ -155,6 +155,8 @@ export const settingsMessages = { "settings.behavior.autoCleanup.subtitle": "创建新会话时自动清理空会话。", "settings.behavior.keepUnseenSubagentIdle.title": "保留子智能体 idle 标记", "settings.behavior.keepUnseenSubagentIdle.subtitle": "让子智能体 idle 标记保持可见直到查看,而不是 5 秒后隐藏。", + "settings.behavior.tauriNativeEventTransport.title": "Tauri 原生事件传输", + "settings.behavior.tauriNativeEventTransport.subtitle": "在 Tauri 中使用 Rust 原生桌面事件传输。禁用后将回退到浏览器 EventSource 路径。", "settings.behavior.promptVoiceInput.title": "Prompt voice input", "settings.behavior.promptVoiceInput.subtitle": "Show the microphone control for speech-to-text prompt input when speech is configured.", "settings.behavior.promptSubmit.title": "回车发送", diff --git a/packages/ui/src/lib/native/desktop-events.ts b/packages/ui/src/lib/native/desktop-events.ts new file mode 100644 index 00000000..875a24e1 --- /dev/null +++ b/packages/ui/src/lib/native/desktop-events.ts @@ -0,0 +1,151 @@ +import { invoke } from "@tauri-apps/api/core" +import { listen } from "@tauri-apps/api/event" +import type { WorkspaceEventPayload } from "../../../../server/src/api-types" +import type { + DesktopEventsStartResult, + DesktopEventTransportStartOptions, + DesktopEventTransportStatusPayload, +} from "../event-transport-contract" +import type { WorkspaceEventConnection, WorkspaceEventTransportCallbacks } from "../event-transport" +import { getLogger } from "../logger" + +const log = getLogger("sse") + +interface WorkspaceEventBatchPayload { + generation: number + sequence: number + emittedAt: number + events: WorkspaceEventPayload[] +} + +export async function connectTauriWorkspaceEvents( + callbacks: WorkspaceEventTransportCallbacks, + options: DesktopEventTransportStartOptions, +): Promise { + let closed = false + let opened = false + let expectedGeneration: number | null = null + let terminalErrorRaised = false + const pendingBatches: WorkspaceEventBatchPayload[] = [] + const pendingStatuses: DesktopEventTransportStatusPayload[] = [] + + const matchesGeneration = (generation: number) => expectedGeneration === generation + + const handleBatchPayload = (payload: WorkspaceEventBatchPayload) => { + if (!payload || !matchesGeneration(payload.generation)) return + + if (!opened) { + opened = true + callbacks.onOpen?.() + } + + const events = payload.events ?? [] + if (events.length === 0) { + return + } + + callbacks.onBatch(events) + } + + const handleStatusPayload = (payload: DesktopEventTransportStatusPayload) => { + if (!payload || !matchesGeneration(payload.generation)) return + + if (payload.state === "connected" && !opened) { + opened = true + callbacks.onOpen?.() + } + + if (payload.state === "unauthorized") { + log.warn("Native desktop event transport is waiting for authentication", { + reason: payload.reason, + reconnectAttempt: payload.reconnectAttempt, + nextDelayMs: payload.nextDelayMs, + stats: payload.stats, + }) + } else if (payload.state === "error") { + log.warn("Native desktop event transport reported an error", { + reason: payload.reason, + reconnectAttempt: payload.reconnectAttempt, + nextDelayMs: payload.nextDelayMs, + statusCode: payload.statusCode, + stats: payload.stats, + }) + } else if ((payload.state === "disconnected" || payload.state === "stopped") && payload.stats) { + log.info("Native desktop event transport stats", { + state: payload.state, + reconnectAttempt: payload.reconnectAttempt, + stats: payload.stats, + }) + } + + if (payload.state === "stopped") { + callbacks.onError?.() + return + } + + if (payload.terminal && !terminalErrorRaised) { + terminalErrorRaised = true + callbacks.onError?.() + } + } + + const flushPending = () => { + if (expectedGeneration === null) return + for (const payload of pendingStatuses.splice(0, pendingStatuses.length)) { + handleStatusPayload(payload) + } + for (const payload of pendingBatches.splice(0, pendingBatches.length)) { + handleBatchPayload(payload) + } + } + + const unlistenBatch = await listen("desktop:event-batch", (event) => { + if (closed) return + const payload = event.payload + if (!payload) return + if (expectedGeneration === null) { + pendingBatches.push(payload) + return + } + handleBatchPayload(payload) + }) + + const unlistenStatus = await listen("desktop:event-stream-status", (event) => { + if (closed) return + const payload = event.payload + if (!payload) return + if (expectedGeneration === null) { + pendingStatuses.push(payload) + return + } + handleStatusPayload(payload) + }) + + try { + const result = await invoke("desktop_events_start", { request: options }) + if (!result?.started) { + throw new Error(result?.reason ?? "desktop event transport unavailable") + } + expectedGeneration = result.generation ?? null + flushPending() + } catch (error) { + unlistenBatch() + unlistenStatus() + throw error + } + + return { + disconnect() { + if (closed) { + return + } + + closed = true + unlistenBatch() + unlistenStatus() + void invoke("desktop_events_stop").catch((error) => { + log.warn("Failed to stop native desktop event transport", error) + }) + }, + } +} diff --git a/packages/ui/src/lib/server-events.ts b/packages/ui/src/lib/server-events.ts index 833e6c2a..8f011011 100644 --- a/packages/ui/src/lib/server-events.ts +++ b/packages/ui/src/lib/server-events.ts @@ -1,6 +1,8 @@ +import { batch as solidBatch } from "solid-js" import type { WorkspaceEventPayload, WorkspaceEventType } from "../../../server/src/api-types" import { serverApi } from "./api-client" import { getClientIdentity } from "./client-identity" +import { connectWorkspaceEvents, type WorkspaceEventConnection } from "./event-transport" import { getLogger } from "./logger" const RETRY_BASE_DELAY = 1000 @@ -18,65 +20,118 @@ function logSse(message: string, context?: Record) { class ServerEvents { private handlers = new Map void>>() private openHandlers = new Set<() => void>() - private source: EventSource | null = null + private connection: WorkspaceEventConnection | null = null + private connectGeneration = 0 private retryDelay = RETRY_BASE_DELAY - private reconnectTimer: ReturnType | null = null + private retryTimer: ReturnType | null = null constructor() { - this.connect() + void this.connect() } - private connect() { - if (this.reconnectTimer !== null) { - clearTimeout(this.reconnectTimer) - this.reconnectTimer = null - } - if (this.source) { - this.source.close() + private async connect() { + const generation = ++this.connectGeneration + this.clearReconnectTimer() + + if (this.connection) { + this.connection.disconnect() + this.connection = null } + logSse("Connecting to backend events stream") - this.source = serverApi.connectEvents( - (event) => this.dispatch(event), - () => this.scheduleReconnect(), - (payload) => { - void serverApi - .sendClientConnectionPong({ - ...getClientIdentity(), - pingTs: payload.ts, - }) - .catch((error) => { - log.error("Failed to send client connection pong", error) - }) - }, - ) - this.source.onopen = () => { - logSse("Events stream connected") - this.retryDelay = RETRY_BASE_DELAY - this.openHandlers.forEach((handler) => handler()) + + try { + const connection = await connectWorkspaceEvents({ + onBatch: (events) => this.dispatchBatch(events), + onError: () => { + if (generation !== this.connectGeneration) { + return + } + this.scheduleReconnect() + }, + onOpen: () => { + if (generation !== this.connectGeneration) { + return + } + logSse("Events stream connected") + this.retryDelay = RETRY_BASE_DELAY + this.openHandlers.forEach((handler) => handler()) + }, + onPing: (payload) => { + void serverApi + .sendClientConnectionPong({ + ...getClientIdentity(), + pingTs: payload.ts, + }) + .catch((error) => { + log.error("Failed to send client connection pong", error) + }) + }, + }) + + if (generation !== this.connectGeneration) { + connection.disconnect() + return + } + + this.connection = connection + } catch (error) { + if (generation !== this.connectGeneration) { + return + } + + logSse("Events stream failed to connect, scheduling reconnect", { + error: error instanceof Error ? error.message : String(error), + }) + this.scheduleReconnect() } } private scheduleReconnect() { - if (this.reconnectTimer !== null) { + if (this.retryTimer) { return } - const source = this.source - this.source = null + + if (this.connection) { + this.connection.disconnect() + this.connection = null + } + logSse("Events stream disconnected, scheduling reconnect", { delayMs: this.retryDelay }) - this.reconnectTimer = setTimeout(() => { - this.reconnectTimer = null + this.retryTimer = setTimeout(() => { + this.retryTimer = null this.retryDelay = Math.min(this.retryDelay * 2, RETRY_MAX_DELAY) - this.connect() + void this.connect() }, this.retryDelay) - source?.close() + } + + private clearReconnectTimer() { + if (!this.retryTimer) { + return + } + + clearTimeout(this.retryTimer) + this.retryTimer = null } private dispatch(event: WorkspaceEventPayload) { - logSse(`event ${event.type}`) this.handlers.get("*")?.forEach((handler) => handler(event)) this.handlers.get(event.type)?.forEach((handler) => handler(event)) } + private dispatchBatch(events: WorkspaceEventPayload[]) { + if (events.length === 0) { + return + } + + logSse("event batch", { size: events.length }) + solidBatch(() => { + for (const event of events) { + this.dispatch(event) + } + }) + } + on(type: WorkspaceEventType | "*", handler: (event: WorkspaceEventPayload) => void): () => void { if (!this.handlers.has(type)) { this.handlers.set(type, new Set()) @@ -90,6 +145,19 @@ class ServerEvents { this.openHandlers.add(handler) return () => this.openHandlers.delete(handler) } + + restart(reason = "manual restart"): void { + this.retryDelay = RETRY_BASE_DELAY + this.clearReconnectTimer() + + if (this.connection) { + this.connection.disconnect() + this.connection = null + } + + logSse("Restarting backend events stream", { reason }) + void this.connect() + } } export const serverEvents = new ServerEvents() diff --git a/packages/ui/src/lib/settings/behavior-registry.ts b/packages/ui/src/lib/settings/behavior-registry.ts index 25bd13e4..b28229a1 100644 --- a/packages/ui/src/lib/settings/behavior-registry.ts +++ b/packages/ui/src/lib/settings/behavior-registry.ts @@ -6,7 +6,7 @@ import type { } from "../../stores/preferences" import type { Command } from "../commands" import { tGlobal } from "../i18n" -import { isWebHost } from "../runtime-env" +import { isTauriHost, isWebHost } from "../runtime-env" export type BehaviorSettingKind = "toggle" | "enum" @@ -35,6 +35,8 @@ export type BehaviorSetting = BehaviorToggleSetting | BehaviorEnumSetting export type BehaviorRegistryActions = { preferences: Accessor + useTauriNativeEventTransport: Accessor + setUseTauriNativeEventTransport: (next: boolean) => void updatePreferences?: (updates: Partial) => void toggleShowThinkingBlocks: () => void toggleKeyboardShortcutHints: () => void @@ -280,6 +282,20 @@ export function getBehaviorSettings(actions: BehaviorRegistryActions): BehaviorS } }, }, + ...(isTauriHost() + ? [ + { + kind: "toggle" as const, + id: "behavior.tauriNativeEventTransport", + titleKey: "settings.behavior.tauriNativeEventTransport.title", + subtitleKey: "settings.behavior.tauriNativeEventTransport.subtitle", + get: () => actions.useTauriNativeEventTransport(), + set: (next: boolean) => { + actions.setUseTauriNativeEventTransport(next) + }, + }, + ] + : []), { kind: "toggle", id: "behavior.promptVoiceInput", diff --git a/packages/ui/src/stores/preferences.tsx b/packages/ui/src/stores/preferences.tsx index a39b0796..80882930 100644 --- a/packages/ui/src/stores/preferences.tsx +++ b/packages/ui/src/stores/preferences.tsx @@ -1,5 +1,9 @@ import { createContext, createMemo, createSignal, onMount, useContext } from "solid-js" import type { Accessor, ParentComponent } from "solid-js" +import { + readUseTauriNativeEventTransportPreference, + writeUseTauriNativeEventTransportPreference, +} from "../lib/desktop-event-transport-preference" import { storage, type OwnerBucket } from "../lib/storage" import type { RemoteServerProfile } from "../../../server/src/api-types" import { @@ -388,6 +392,9 @@ const [uiConfigBucket, setUiConfigBucket] = createSignal({}) const [serverConfigBucket, setServerConfigBucket] = createSignal({}) const [uiStateBucket, setUiStateBucket] = createSignal({}) const [isLoaded, setIsLoaded] = createSignal(false) +const [useTauriNativeEventTransport, setUseTauriNativeEventTransportSignal] = createSignal( + readUseTauriNativeEventTransportPreference(), +) const uiSettings = createMemo(() => normalizeUiSettings(uiConfigBucket().settings)) const themePreference = createMemo(() => uiConfigBucket().theme ?? "system") @@ -436,6 +443,23 @@ async function patchConfigOwner(owner: string, patch: unknown) { if (owner === "server") setServerConfigBucket(updated as any) } +function setUseTauriNativeEventTransport(enabled: boolean): void { + if (useTauriNativeEventTransport() === enabled) { + return + } + + setUseTauriNativeEventTransportSignal(enabled) + writeUseTauriNativeEventTransportPreference(enabled) + + void import("../lib/server-events") + .then(({ serverEvents }) => { + serverEvents.restart("desktop transport preference changed") + }) + .catch((error) => { + log.error("Failed to restart backend events stream after desktop transport preference change", error) + }) +} + async function patchStateOwner(owner: string, patch: unknown) { await ensureLoaded() const updated = await storage.patchStateOwner(owner, patch) @@ -714,6 +738,8 @@ void ensureLoaded().catch((error: unknown) => { interface ConfigContextValue { isLoaded: Accessor preferences: typeof preferences + useTauriNativeEventTransport: typeof useTauriNativeEventTransport + setUseTauriNativeEventTransport: typeof setUseTauriNativeEventTransport updatePreferences: typeof updatePreferences themePreference: typeof themePreference setThemePreference: typeof setThemePreference @@ -772,6 +798,8 @@ const ConfigContext = createContext() const configContextValue: ConfigContextValue = { isLoaded, preferences, + useTauriNativeEventTransport, + setUseTauriNativeEventTransport, updatePreferences, themePreference, setThemePreference, @@ -858,6 +886,8 @@ export function useConfig(): ConfigContextValue { export { preferences, + useTauriNativeEventTransport, + setUseTauriNativeEventTransport, uiState, serverSettings, recentFolders,