diff --git a/Cargo.lock b/Cargo.lock index 9762477..bf42cba 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1465,6 +1465,7 @@ dependencies = [ "clap", "directories", "libc", + "serde", "serde_json", "smfs-core", "tempfile", diff --git a/crates/smfs-core/src/cache/db.rs b/crates/smfs-core/src/cache/db.rs index 01b9879..45dc55d 100644 --- a/crates/smfs-core/src/cache/db.rs +++ b/crates/smfs-core/src/cache/db.rs @@ -619,6 +619,13 @@ impl Db { } } + pub(crate) fn remote_count(&self) -> usize { + let conn = self.conn.lock(); + conn.query_row("SELECT COUNT(*) FROM fs_remote", [], |r| r.get::<_, i64>(0)) + .unwrap_or(0) + .max(0) as usize + } + /// Rows currently inflight — drives the inflight status poller. #[allow(dead_code)] // kept for future deep diagnostics pub(crate) fn push_queue_inflight(&self) -> Vec { diff --git a/crates/smfs-core/src/daemon/mod.rs b/crates/smfs-core/src/daemon/mod.rs index e137d7c..a22a9a2 100644 --- a/crates/smfs-core/src/daemon/mod.rs +++ b/crates/smfs-core/src/daemon/mod.rs @@ -33,6 +33,9 @@ pub fn pids_dir() -> PathBuf { pub fn logs_dir() -> PathBuf { cache_dir().join("logs") } +pub fn startup_dir() -> PathBuf { + cache_dir().join("startup") +} pub fn socket_path(tag: &str) -> PathBuf { sockets_dir().join(format!("{tag}.sock")) @@ -43,12 +46,16 @@ pub fn pid_path(tag: &str) -> PathBuf { pub fn log_path(tag: &str) -> PathBuf { logs_dir().join(format!("{tag}.log")) } +pub fn startup_path(tag: &str) -> PathBuf { + startup_dir().join(format!("{tag}.json")) +} /// Create `sockets/`, `pids/`, `logs/` subdirectories if missing. pub fn ensure_dirs() -> std::io::Result<()> { std::fs::create_dir_all(sockets_dir())?; std::fs::create_dir_all(pids_dir())?; std::fs::create_dir_all(logs_dir())?; + std::fs::create_dir_all(startup_dir())?; Ok(()) } @@ -83,10 +90,12 @@ pub fn cleanup_stale(tag: &str) -> bool { Some(pid) if !pid_alive(pid) => { let _ = std::fs::remove_file(pid_path(tag)); let _ = std::fs::remove_file(socket_path(tag)); + let _ = std::fs::remove_file(startup_path(tag)); cleaned = true; } None if socket_path(tag).exists() => { let _ = std::fs::remove_file(socket_path(tag)); + let _ = std::fs::remove_file(startup_path(tag)); cleaned = true; } _ => {} diff --git a/crates/smfs-core/src/sync/mod.rs b/crates/smfs-core/src/sync/mod.rs index a9be907..f6158a5 100644 --- a/crates/smfs-core/src/sync/mod.rs +++ b/crates/smfs-core/src/sync/mod.rs @@ -27,6 +27,12 @@ use tokio::task::JoinSet; use crate::cache::SupermemoryFs; +#[derive(Debug, Clone, Copy)] +pub enum InitialPullProgress { + DeletionScan(scan::DeletionScanProgress), + Pull(pull::PullProgress), +} + /// Knobs for the sync engine. All optional — defaults are production-sane. #[derive(Debug, Clone, Copy)] pub struct SyncOptions { @@ -60,6 +66,29 @@ impl SyncEngine { Ok((removed, reconciled)) } + pub async fn initial_pull_with_progress( + fs: &Arc, + mut on_progress: F, + ) -> anyhow::Result<(usize, usize)> + where + F: FnMut(InitialPullProgress) + Send, + { + let removed = if fs.db().remote_count() == 0 { + 0 + } else { + scan::deletion_scan_with_progress(fs, |progress| { + on_progress(InitialPullProgress::DeletionScan(progress)); + }) + .await + .unwrap_or(0) + }; + let reconciled = pull::full_pull_with_progress(fs, |progress| { + on_progress(InitialPullProgress::Pull(progress)); + }) + .await?; + Ok((removed, reconciled)) + } + /// Spawn background loops for this mount. Push (D) and inflight status /// poller (E) are always spawned — they are the mount's write path and /// disabling them would defeat the purpose of running the mount. diff --git a/crates/smfs-core/src/sync/pull.rs b/crates/smfs-core/src/sync/pull.rs index 1448551..618de8d 100644 --- a/crates/smfs-core/src/sync/pull.rs +++ b/crates/smfs-core/src/sync/pull.rs @@ -16,6 +16,14 @@ const SYNC_META_LAST_SEEN: &str = "last_seen_updated_at"; /// Per-file cap on R2 rehydration fetch (larger files stay 0-byte stubs). const REHYDRATE_SIZE_CAP: u64 = 20 * 1024 * 1024; +#[derive(Debug, Clone, Copy)] +pub struct PullProgress { + pub page: u32, + pub total_pages: u32, + pub total_items: usize, + pub reconciled: usize, +} + /// Run one pass of the delta pull loop. Returns the number of remote docs /// that were reconciled this pass (whether or not they produced local /// changes). @@ -88,6 +96,23 @@ async fn list_page(api: &ApiClient, page: u32) -> anyhow::Result) -> anyhow::Result { + full_pull_inner(fs, None).await +} + +pub async fn full_pull_with_progress( + fs: &Arc, + mut on_progress: F, +) -> anyhow::Result +where + F: FnMut(PullProgress) + Send, +{ + full_pull_inner(fs, Some(&mut on_progress)).await +} + +async fn full_pull_inner( + fs: &Arc, + mut on_progress: Option<&mut (dyn FnMut(PullProgress) + Send)>, +) -> anyhow::Result { let Some(api) = fs.api() else { return Ok(0); }; @@ -110,6 +135,14 @@ pub async fn full_pull(fs: &Arc) -> anyhow::Result { if doc.updated_at > newest_seen { newest_seen = doc.updated_at.clone(); } + if let Some(cb) = on_progress.as_mut() { + cb(PullProgress { + page, + total_pages: resp.pagination.total_pages, + total_items: resp.pagination.total_items as usize, + reconciled, + }); + } } if page >= resp.pagination.total_pages { break; diff --git a/crates/smfs-core/src/sync/scan.rs b/crates/smfs-core/src/sync/scan.rs index 8aadd01..45ce92b 100644 --- a/crates/smfs-core/src/sync/scan.rs +++ b/crates/smfs-core/src/sync/scan.rs @@ -8,10 +8,35 @@ use crate::cache::SupermemoryFs; const PAGE_SIZE: u32 = 100; +#[derive(Debug, Clone, Copy)] +pub struct DeletionScanProgress { + pub page: u32, + pub total_pages: u32, + pub total_items: usize, + pub remote_seen: usize, +} + /// Run one deletion-scan pass. Returns `Ok(removed)` where `removed` is the /// number of local inodes that were unlinked because their remote_id /// disappeared from the server. pub async fn deletion_scan(fs: &Arc) -> anyhow::Result { + deletion_scan_inner(fs, None).await +} + +pub async fn deletion_scan_with_progress( + fs: &Arc, + mut on_progress: F, +) -> anyhow::Result +where + F: FnMut(DeletionScanProgress) + Send, +{ + deletion_scan_inner(fs, Some(&mut on_progress)).await +} + +async fn deletion_scan_inner( + fs: &Arc, + mut on_progress: Option<&mut (dyn FnMut(DeletionScanProgress) + Send)>, +) -> anyhow::Result { let Some(api) = fs.api() else { return Ok(0); }; @@ -38,6 +63,14 @@ pub async fn deletion_scan(fs: &Arc) -> anyhow::Result { for d in &resp.memories { remote_ids.insert(d.id.clone()); } + if let Some(cb) = on_progress.as_mut() { + cb(DeletionScanProgress { + page, + total_pages: resp.pagination.total_pages, + total_items: resp.pagination.total_items as usize, + remote_seen: remote_ids.len(), + }); + } if page >= resp.pagination.total_pages { break; } diff --git a/crates/smfs/Cargo.toml b/crates/smfs/Cargo.toml index d796828..1df9396 100644 --- a/crates/smfs/Cargo.toml +++ b/crates/smfs/Cargo.toml @@ -20,6 +20,7 @@ clap = { workspace = true } tracing = { workspace = true } tracing-subscriber = { workspace = true } libc = { workspace = true } +serde = { workspace = true } serde_json = { workspace = true } directories = { workspace = true } diff --git a/crates/smfs/src/cmd/daemon_runtime.rs b/crates/smfs/src/cmd/daemon_runtime.rs index 4580f23..6330df8 100644 --- a/crates/smfs/src/cmd/daemon_runtime.rs +++ b/crates/smfs/src/cmd/daemon_runtime.rs @@ -15,6 +15,7 @@ use std::time::Instant; use anyhow::{Context, Result}; use tokio::sync::Notify; +use super::startup::StartupReporter; use smfs_core::cache::{Db, SupermemoryFs}; use smfs_core::daemon; use smfs_core::mount::{mount_fs, MountBackend, MountOpts}; @@ -40,6 +41,9 @@ pub struct DaemonConfig { } pub async fn run(cfg: DaemonConfig) -> Result<()> { + let mut startup = StartupReporter::new(&cfg.container_tag); + startup.report("creating_mountpoint", "preparing mountpoint")?; + let created_dir = !cfg.mount_path.exists(); if created_dir { std::fs::create_dir_all(&cfg.mount_path)?; @@ -89,6 +93,7 @@ pub async fn run(cfg: DaemonConfig) -> Result<()> { let opts = MountOpts::new(cfg.mount_path.clone(), cfg.backend).with_ownership(uid, gid); + startup.report("validating_key", "validating API key")?; let session = if cfg.ephemeral { smfs_core::api::ApiClient::validate_key(&cfg.api_url, &cfg.api_key) .await @@ -113,6 +118,7 @@ pub async fn run(cfg: DaemonConfig) -> Result<()> { "server did not return org id; cannot open cache. Run `smfs login` and retry." ) })?; + startup.report("opening_cache", format!("opening cache for org {org_id}"))?; let db_path = smfs_core::config::cache_db_path(org_id, &cfg.container_tag); if let Some(parent) = db_path.parent() { std::fs::create_dir_all(parent)?; @@ -132,6 +138,7 @@ pub async fn run(cfg: DaemonConfig) -> Result<()> { Arc::new(Db::open(&db_path)?) }; + startup.report("configuring_api", "configuring API client")?; let mut api_client = smfs_core::api::ApiClient::new(&cfg.api_url, &cfg.api_key, &cfg.container_tag); if let Some(uid) = session.as_ref().and_then(|s| s.user_id.clone()) { @@ -156,9 +163,43 @@ pub async fn run(cfg: DaemonConfig) -> Result<()> { let fs = Arc::new(SupermemoryFs::with_api(db, api)); + startup.report("warming_profile", "warming profile")?; fs.warm_profile().await; - let pull_succeeded = match smfs_core::sync::SyncEngine::initial_pull(&fs).await { + startup.report("initial_sync", "starting initial sync")?; + let pull_succeeded = match smfs_core::sync::SyncEngine::initial_pull_with_progress( + &fs, + |progress| match progress { + smfs_core::sync::InitialPullProgress::DeletionScan(progress) => { + if progress.remote_seen == 1 || progress.remote_seen % 100 == 0 { + let _ = startup.report_counts( + "initial_sync", + format!( + "deletion scan saw {} remote docs (page {}/{})", + progress.remote_seen, progress.page, progress.total_pages + ), + progress.remote_seen, + progress.total_items, + ); + } + } + smfs_core::sync::InitialPullProgress::Pull(progress) => { + if progress.reconciled == 1 || progress.reconciled % 100 == 0 { + let _ = startup.report_counts( + "initial_sync", + format!( + "reconciled {} docs (page {}/{})", + progress.reconciled, progress.page, progress.total_pages + ), + progress.reconciled, + progress.total_items, + ); + } + } + }, + ) + .await + { Ok((removed, reconciled)) => { eprintln!( "initial sync: {reconciled} docs reconciled, {removed} stale entries removed" @@ -205,6 +246,7 @@ pub async fn run(cfg: DaemonConfig) -> Result<()> { }; let sync_tasks = smfs_core::sync::SyncEngine::start(fs.clone(), sync_opts, shutdown_rx.clone()); + startup.report("mounting_fs", "mounting filesystem")?; let handle = mount_fs(fs.clone(), opts).await?; // Auto-install grep wrapper on first mount. @@ -215,6 +257,7 @@ pub async fn run(cfg: DaemonConfig) -> Result<()> { } // Bring up the IPC control socket. Clients use it for status/sync/unmount. + startup.report("starting_ipc", "starting IPC socket")?; daemon::ensure_dirs().context("creating daemon state dirs")?; let ipc_shutdown_notify = Arc::new(Notify::new()); let state = Arc::new(smfs_core::daemon::ipc::IpcState { @@ -244,6 +287,7 @@ pub async fn run(cfg: DaemonConfig) -> Result<()> { std::fs::create_dir_all(parent)?; } std::fs::write(&pid_path, std::process::id().to_string())?; + startup.report("ready", "filesystem mounted and IPC ready")?; eprintln!( "supermemoryfs mounted at {} (backend: {}, tag: {})", @@ -340,6 +384,7 @@ pub async fn run(cfg: DaemonConfig) -> Result<()> { } let _ = std::fs::remove_file(&pid_path); let _ = std::fs::remove_file(&socket_path); + let _ = std::fs::remove_file(daemon::startup_path(&cfg.container_tag)); if created_dir { let _ = std::fs::remove_dir(&cfg.mount_path); } diff --git a/crates/smfs/src/cmd/mod.rs b/crates/smfs/src/cmd/mod.rs index 899a652..623c8a9 100644 --- a/crates/smfs/src/cmd/mod.rs +++ b/crates/smfs/src/cmd/mod.rs @@ -23,6 +23,7 @@ pub mod logout; pub mod logs; pub mod marker; pub mod mount; +pub mod startup; pub mod status; pub mod sync; pub mod unmount; diff --git a/crates/smfs/src/cmd/mount.rs b/crates/smfs/src/cmd/mount.rs index fa499a2..fd7dd35 100644 --- a/crates/smfs/src/cmd/mount.rs +++ b/crates/smfs/src/cmd/mount.rs @@ -7,7 +7,11 @@ use anyhow::{Context, Result}; use clap::Args as ClapArgs; +use std::io::{IsTerminal, Write}; use std::path::PathBuf; +use std::time::{Duration, Instant}; + +const DEFAULT_STARTUP_INACTIVITY_TIMEOUT_SECS: u64 = 30; #[derive(ClapArgs, Debug)] pub struct Args { @@ -77,6 +81,9 @@ pub struct Args { #[arg(long, default_value_t = 30)] pub drain_timeout: u64, + #[arg(long, default_value_t = DEFAULT_STARTUP_INACTIVITY_TIMEOUT_SECS)] + pub startup_timeout: u64, + /// Internal: skip injecting the path-scoped agent hint. Used for /// baseline measurement; not part of the supported user surface. #[arg(long, hide = true)] @@ -156,6 +163,8 @@ pub async fn run(args: Args) -> Result<()> { // Clean any leftover socket/pid from a prior crash. smfs_core::daemon::cleanup_stale(&container_tag); smfs_core::daemon::ensure_dirs()?; + let startup_path = smfs_core::daemon::startup_path(&container_tag); + let _ = std::fs::remove_file(&startup_path); // Open the per-tag log file for the child's stdout/stderr. Parent // handoff: the daemon never writes to the controlling TTY again. @@ -205,13 +214,15 @@ pub async fn run(args: Args) -> Result<()> { .stdout(log_file) .stderr(log_file_err); - let child = cmd.spawn()?; + let mut child = cmd.spawn()?; let child_pid = child.id(); // The child will self-install a session via setsid; parent just waits // until the child's IPC socket comes up, then exits. let socket = smfs_core::daemon::socket_path(&container_tag); - let deadline = std::time::Instant::now() + std::time::Duration::from_secs(30); + let startup_timeout = Duration::from_secs(args.startup_timeout); + let mut wait_state = StartupWaitState::new(startup_timeout, Instant::now()); + let mut display = StartupDisplay::new(&container_tag); let mut last_err: Option = None; loop { // Ping the daemon — once it responds, we know the mount is live. @@ -231,24 +242,44 @@ pub async fn run(args: Args) -> Result<()> { } } } - if std::time::Instant::now() >= deadline { + if let Some((body, progress)) = super::startup::read_progress(&container_tag) { + let now = Instant::now(); + if progress.pid == child_pid { + display.observe_progress(&progress, now); + } + wait_state.observe_progress(body, progress, now, child_pid); + } + display.tick(Instant::now()); + if wait_state.timed_out(Instant::now()) { + display.finish(); + let _ = child.kill(); + let _ = child.wait(); + let last_progress = wait_state + .last_progress_summary() + .unwrap_or_else(|| "".to_string()); anyhow::bail!( - "daemon did not become ready within 30s (pid {}). Log: {}\nLast error: {}", + "daemon made no startup progress for {}s before becoming ready (pid {}). Log: {}\nLast progress: {}\nLast error: {}", + args.startup_timeout, child_pid, log_path.display(), + last_progress, last_err.unwrap_or_else(|| "".into()), ); } // Did the child die early? - if !smfs_core::daemon::pid_alive(child_pid) { + if let Some(status) = child.try_wait()? { anyhow::bail!( - "daemon exited before becoming ready. Log: {}", - log_path.display() + "daemon exited before becoming ready (status: {}). Log: {}", + status, + log_path.display(), ); } - tokio::time::sleep(std::time::Duration::from_millis(250)).await; + tokio::time::sleep(Duration::from_millis(250)).await; } + display.finish(); + let _ = std::fs::remove_file(&startup_path); + eprintln!( "supermemoryfs mounted at {} (tag: {}, pid: {})", mount_path.display(), @@ -265,6 +296,185 @@ pub async fn run(args: Args) -> Result<()> { Ok(()) } +#[derive(Debug)] +struct StartupDisplay { + tag: String, + enabled: bool, + target_loaded: usize, + total: Option, + displayed_loaded: f64, + last_target_loaded: usize, + last_target_at: Instant, + last_tick: Instant, + rate: f64, + rendered: bool, +} + +impl StartupDisplay { + fn new(tag: &str) -> Self { + let now = Instant::now(); + Self { + tag: tag.to_string(), + enabled: std::io::stderr().is_terminal(), + target_loaded: 0, + total: None, + displayed_loaded: 0.0, + last_target_loaded: 0, + last_target_at: now, + last_tick: now, + rate: 0.0, + rendered: false, + } + } + + fn observe_progress(&mut self, progress: &super::startup::StartupProgress, now: Instant) { + let Some(loaded) = progress.loaded else { + return; + }; + if loaded < self.target_loaded { + return; + } + if let Some(total) = progress.total { + if total > 0 { + self.total = Some(total); + } + } + if loaded > self.last_target_loaded { + let elapsed = now.duration_since(self.last_target_at).as_secs_f64(); + if elapsed > 0.0 { + let instant_rate = (loaded - self.last_target_loaded) as f64 / elapsed; + self.rate = if self.rate > 0.0 { + (self.rate * 0.7) + (instant_rate * 0.3) + } else { + instant_rate + }; + } + self.last_target_loaded = loaded; + self.last_target_at = now; + } + self.target_loaded = loaded; + } + + fn tick(&mut self, now: Instant) { + if !self.enabled || self.target_loaded == 0 { + self.last_tick = now; + return; + } + if self.displayed_loaded >= self.target_loaded as f64 { + self.last_tick = now; + return; + } + let elapsed = now.duration_since(self.last_tick).as_secs_f64(); + self.last_tick = now; + let speed = (self.rate * 0.85).clamp(25.0, 1200.0); + let next = + (self.displayed_loaded + (speed * elapsed).max(1.0)).min(self.target_loaded as f64); + if next.floor() as usize != self.displayed_loaded.floor() as usize { + self.displayed_loaded = next; + self.render(false); + } else { + self.displayed_loaded = next; + } + } + + fn finish(&mut self) { + if !self.enabled { + return; + } + if self.target_loaded > 0 { + self.displayed_loaded = self.target_loaded as f64; + self.render(true); + } else if self.rendered { + eprintln!(); + } + } + + fn render(&mut self, newline: bool) { + let line = self.render_line(); + eprint!("\r{line}\x1b[K"); + if newline { + eprintln!(); + } + let _ = std::io::stderr().flush(); + self.rendered = true; + } + + fn render_line(&self) -> String { + let loaded = self.displayed_loaded.floor() as usize; + if let Some(total) = self.total { + let display_total = total.max(loaded); + let pct = loaded + .saturating_mul(100) + .checked_div(display_total) + .unwrap_or(0) + .min(100); + if self.rate > 0.0 { + format!( + "syncing {}: {} / {} files loaded ({}%, {:.0} files/s)", + self.tag, loaded, display_total, pct, self.rate + ) + } else { + format!( + "syncing {}: {} / {} files loaded ({}%)", + self.tag, loaded, display_total, pct + ) + } + } else { + format!("syncing {}: {} files loaded", self.tag, loaded) + } + } +} + +#[derive(Debug)] +struct StartupWaitState { + inactivity_timeout: Duration, + last_activity: Instant, + last_progress_body: Option, + last_progress: Option, +} + +impl StartupWaitState { + fn new(inactivity_timeout: Duration, now: Instant) -> Self { + Self { + inactivity_timeout, + last_activity: now, + last_progress_body: None, + last_progress: None, + } + } + + fn observe_progress( + &mut self, + body: String, + progress: super::startup::StartupProgress, + now: Instant, + expected_pid: u32, + ) { + if progress.pid != expected_pid { + return; + } + if self.last_progress_body.as_deref() != Some(body.as_str()) { + self.last_activity = now; + self.last_progress_body = Some(body); + self.last_progress = Some(progress); + } + } + + fn timed_out(&self, now: Instant) -> bool { + now.duration_since(self.last_activity) >= self.inactivity_timeout + } + + fn last_progress_summary(&self) -> Option { + self.last_progress.as_ref().map(|p| { + if p.message.is_empty() { + format!("seq={} phase={}", p.seq, p.phase) + } else { + format!("seq={} phase={} message={}", p.seq, p.phase, p.message) + } + }) + } +} + /// Sweep stale hints + install the new one. Logs to stderr; never fails the /// caller. fn install_hint_best_effort(tag: &str, mount_path: &std::path::Path, skip_install: bool) { @@ -542,4 +752,106 @@ mod tests { let (tag, _) = resolve_tag_and_path("my-tag_v2:prod", None).unwrap(); assert_eq!(tag, "my-tag_v2:prod"); } + + #[test] + fn startup_wait_times_out_without_progress() { + let now = Instant::now(); + let state = StartupWaitState::new(Duration::from_secs(30), now); + + assert!(!state.timed_out(now + Duration::from_secs(29))); + assert!(state.timed_out(now + Duration::from_secs(30))); + } + + #[test] + fn startup_wait_progress_resets_inactivity_timer() { + let now = Instant::now(); + let mut state = StartupWaitState::new(Duration::from_secs(30), now); + + state.observe_progress( + "{\"seq\":1}".to_string(), + crate::cmd::startup::StartupProgress { + pid: 42, + seq: 1, + phase: "validating_key".to_string(), + message: "validating API key".to_string(), + loaded: None, + total: None, + }, + now + Duration::from_secs(25), + 42, + ); + + assert!(!state.timed_out(now + Duration::from_secs(54))); + assert!(state.timed_out(now + Duration::from_secs(55))); + assert_eq!( + state.last_progress_summary().as_deref(), + Some("seq=1 phase=validating_key message=validating API key") + ); + } + + #[test] + fn startup_wait_identical_progress_does_not_reset_timer() { + let now = Instant::now(); + let mut state = StartupWaitState::new(Duration::from_secs(30), now); + let progress = crate::cmd::startup::StartupProgress { + pid: 42, + seq: 1, + phase: "opening_cache".to_string(), + message: "opening cache".to_string(), + loaded: None, + total: None, + }; + + state.observe_progress( + "{\"seq\":1}".to_string(), + progress.clone(), + now + Duration::from_secs(10), + 42, + ); + state.observe_progress( + "{\"seq\":1}".to_string(), + progress, + now + Duration::from_secs(35), + 42, + ); + + assert!(state.timed_out(now + Duration::from_secs(40))); + } + + #[test] + fn startup_wait_ignores_progress_from_other_pid() { + let now = Instant::now(); + let mut state = StartupWaitState::new(Duration::from_secs(30), now); + + state.observe_progress( + "{\"seq\":1}".to_string(), + crate::cmd::startup::StartupProgress { + pid: 7, + seq: 1, + phase: "initial_sync".to_string(), + message: "reconciled 100 docs".to_string(), + loaded: Some(100), + total: Some(1000), + }, + now + Duration::from_secs(25), + 42, + ); + + assert!(state.timed_out(now + Duration::from_secs(30))); + assert!(state.last_progress_summary().is_none()); + } + + #[test] + fn startup_display_clamps_percent_and_total() { + let mut display = StartupDisplay::new("eval"); + display.enabled = true; + display.displayed_loaded = 120.0; + display.target_loaded = 120; + display.total = Some(100); + + assert_eq!( + display.render_line(), + "syncing eval: 120 / 120 files loaded (100%)" + ); + } } diff --git a/crates/smfs/src/cmd/startup.rs b/crates/smfs/src/cmd/startup.rs new file mode 100644 index 0000000..65ae166 --- /dev/null +++ b/crates/smfs/src/cmd/startup.rs @@ -0,0 +1,75 @@ +use anyhow::Result; +use serde::{Deserialize, Serialize}; +use std::path::PathBuf; + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct StartupProgress { + pub pid: u32, + pub seq: u64, + pub phase: String, + pub message: String, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub loaded: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub total: Option, +} + +#[derive(Debug)] +pub struct StartupReporter { + path: PathBuf, + seq: u64, +} + +impl StartupReporter { + pub fn new(tag: &str) -> Self { + Self { + path: smfs_core::daemon::startup_path(tag), + seq: 0, + } + } + + pub fn report(&mut self, phase: &str, message: impl Into) -> Result<()> { + self.write(phase, message.into(), None, None) + } + + pub fn report_counts( + &mut self, + phase: &str, + message: impl Into, + loaded: usize, + total: usize, + ) -> Result<()> { + self.write(phase, message.into(), Some(loaded), Some(total)) + } + + fn write( + &mut self, + phase: &str, + message: String, + loaded: Option, + total: Option, + ) -> Result<()> { + self.seq += 1; + let progress = StartupProgress { + pid: std::process::id(), + seq: self.seq, + phase: phase.to_string(), + message, + loaded, + total, + }; + if let Some(parent) = self.path.parent() { + std::fs::create_dir_all(parent)?; + } + let tmp = self.path.with_extension("json.tmp"); + std::fs::write(&tmp, serde_json::to_vec(&progress)?)?; + std::fs::rename(tmp, &self.path)?; + Ok(()) + } +} + +pub fn read_progress(tag: &str) -> Option<(String, StartupProgress)> { + let body = std::fs::read_to_string(smfs_core::daemon::startup_path(tag)).ok()?; + let progress = serde_json::from_str(&body).ok()?; + Some((body, progress)) +}