Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions crates/smfs-core/src/cache/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -619,6 +619,13 @@ impl Db {
}
}

pub(crate) fn remote_count(&self) -> usize {
let conn = self.conn.lock();
conn.query_row("SELECT COUNT(*) FROM fs_remote", [], |r| r.get::<_, i64>(0))
.unwrap_or(0)
.max(0) as usize
}

/// Rows currently inflight — drives the inflight status poller.
#[allow(dead_code)] // kept for future deep diagnostics
pub(crate) fn push_queue_inflight(&self) -> Vec<InflightRow> {
Expand Down
9 changes: 9 additions & 0 deletions crates/smfs-core/src/daemon/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ pub fn pids_dir() -> PathBuf {
pub fn logs_dir() -> PathBuf {
cache_dir().join("logs")
}
pub fn startup_dir() -> PathBuf {
cache_dir().join("startup")
}

pub fn socket_path(tag: &str) -> PathBuf {
sockets_dir().join(format!("{tag}.sock"))
Expand All @@ -43,12 +46,16 @@ pub fn pid_path(tag: &str) -> PathBuf {
pub fn log_path(tag: &str) -> PathBuf {
logs_dir().join(format!("{tag}.log"))
}
pub fn startup_path(tag: &str) -> PathBuf {
startup_dir().join(format!("{tag}.json"))
}

/// Create `sockets/`, `pids/`, `logs/` subdirectories if missing.
pub fn ensure_dirs() -> std::io::Result<()> {
std::fs::create_dir_all(sockets_dir())?;
std::fs::create_dir_all(pids_dir())?;
std::fs::create_dir_all(logs_dir())?;
std::fs::create_dir_all(startup_dir())?;
Ok(())
}

Expand Down Expand Up @@ -83,10 +90,12 @@ pub fn cleanup_stale(tag: &str) -> bool {
Some(pid) if !pid_alive(pid) => {
let _ = std::fs::remove_file(pid_path(tag));
let _ = std::fs::remove_file(socket_path(tag));
let _ = std::fs::remove_file(startup_path(tag));
cleaned = true;
}
None if socket_path(tag).exists() => {
let _ = std::fs::remove_file(socket_path(tag));
let _ = std::fs::remove_file(startup_path(tag));
cleaned = true;
}
_ => {}
Expand Down
29 changes: 29 additions & 0 deletions crates/smfs-core/src/sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@ use tokio::task::JoinSet;

use crate::cache::SupermemoryFs;

#[derive(Debug, Clone, Copy)]
pub enum InitialPullProgress {
DeletionScan(scan::DeletionScanProgress),
Pull(pull::PullProgress),
}

/// Knobs for the sync engine. All optional — defaults are production-sane.
#[derive(Debug, Clone, Copy)]
pub struct SyncOptions {
Expand Down Expand Up @@ -60,6 +66,29 @@ impl SyncEngine {
Ok((removed, reconciled))
}

pub async fn initial_pull_with_progress<F>(
fs: &Arc<SupermemoryFs>,
mut on_progress: F,
) -> anyhow::Result<(usize, usize)>
where
F: FnMut(InitialPullProgress) + Send,
{
let removed = if fs.db().remote_count() == 0 {
0
} else {
scan::deletion_scan_with_progress(fs, |progress| {
on_progress(InitialPullProgress::DeletionScan(progress));
})
.await
.unwrap_or(0)
};
let reconciled = pull::full_pull_with_progress(fs, |progress| {
on_progress(InitialPullProgress::Pull(progress));
})
.await?;
Ok((removed, reconciled))
}

/// Spawn background loops for this mount. Push (D) and inflight status
/// poller (E) are always spawned — they are the mount's write path and
/// disabling them would defeat the purpose of running the mount.
Expand Down
33 changes: 33 additions & 0 deletions crates/smfs-core/src/sync/pull.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,14 @@ const SYNC_META_LAST_SEEN: &str = "last_seen_updated_at";
/// Per-file cap on R2 rehydration fetch (larger files stay 0-byte stubs).
const REHYDRATE_SIZE_CAP: u64 = 20 * 1024 * 1024;

#[derive(Debug, Clone, Copy)]
pub struct PullProgress {
pub page: u32,
pub total_pages: u32,
pub total_items: usize,
pub reconciled: usize,
}

/// Run one pass of the delta pull loop. Returns the number of remote docs
/// that were reconciled this pass (whether or not they produced local
/// changes).
Expand Down Expand Up @@ -88,6 +96,23 @@ async fn list_page(api: &ApiClient, page: u32) -> anyhow::Result<ListDocumentsRe
/// Full pull (no watermark). Used on mount when we have no prior state — we
/// want to catch every remote doc regardless of `updatedAt`.
pub async fn full_pull(fs: &Arc<SupermemoryFs>) -> anyhow::Result<usize> {
full_pull_inner(fs, None).await
}

pub async fn full_pull_with_progress<F>(
fs: &Arc<SupermemoryFs>,
mut on_progress: F,
) -> anyhow::Result<usize>
where
F: FnMut(PullProgress) + Send,
{
full_pull_inner(fs, Some(&mut on_progress)).await
}

async fn full_pull_inner(
fs: &Arc<SupermemoryFs>,
mut on_progress: Option<&mut (dyn FnMut(PullProgress) + Send)>,
) -> anyhow::Result<usize> {
let Some(api) = fs.api() else {
return Ok(0);
};
Expand All @@ -110,6 +135,14 @@ pub async fn full_pull(fs: &Arc<SupermemoryFs>) -> anyhow::Result<usize> {
if doc.updated_at > newest_seen {
newest_seen = doc.updated_at.clone();
}
if let Some(cb) = on_progress.as_mut() {
cb(PullProgress {
page,
total_pages: resp.pagination.total_pages,
total_items: resp.pagination.total_items as usize,
reconciled,
});
}
}
if page >= resp.pagination.total_pages {
break;
Expand Down
33 changes: 33 additions & 0 deletions crates/smfs-core/src/sync/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,35 @@ use crate::cache::SupermemoryFs;

const PAGE_SIZE: u32 = 100;

#[derive(Debug, Clone, Copy)]
pub struct DeletionScanProgress {
pub page: u32,
pub total_pages: u32,
pub total_items: usize,
pub remote_seen: usize,
}

/// Run one deletion-scan pass. Returns `Ok(removed)` where `removed` is the
/// number of local inodes that were unlinked because their remote_id
/// disappeared from the server.
pub async fn deletion_scan(fs: &Arc<SupermemoryFs>) -> anyhow::Result<usize> {
deletion_scan_inner(fs, None).await
}

pub async fn deletion_scan_with_progress<F>(
fs: &Arc<SupermemoryFs>,
mut on_progress: F,
) -> anyhow::Result<usize>
where
F: FnMut(DeletionScanProgress) + Send,
{
deletion_scan_inner(fs, Some(&mut on_progress)).await
}

async fn deletion_scan_inner(
fs: &Arc<SupermemoryFs>,
mut on_progress: Option<&mut (dyn FnMut(DeletionScanProgress) + Send)>,
) -> anyhow::Result<usize> {
let Some(api) = fs.api() else {
return Ok(0);
};
Expand All @@ -38,6 +63,14 @@ pub async fn deletion_scan(fs: &Arc<SupermemoryFs>) -> anyhow::Result<usize> {
for d in &resp.memories {
remote_ids.insert(d.id.clone());
}
if let Some(cb) = on_progress.as_mut() {
cb(DeletionScanProgress {
page,
total_pages: resp.pagination.total_pages,
total_items: resp.pagination.total_items as usize,
remote_seen: remote_ids.len(),
});
}
if page >= resp.pagination.total_pages {
break;
}
Expand Down
1 change: 1 addition & 0 deletions crates/smfs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ clap = { workspace = true }
tracing = { workspace = true }
tracing-subscriber = { workspace = true }
libc = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
directories = { workspace = true }

Expand Down
47 changes: 46 additions & 1 deletion crates/smfs/src/cmd/daemon_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use std::time::Instant;
use anyhow::{Context, Result};
use tokio::sync::Notify;

use super::startup::StartupReporter;
use smfs_core::cache::{Db, SupermemoryFs};
use smfs_core::daemon;
use smfs_core::mount::{mount_fs, MountBackend, MountOpts};
Expand All @@ -40,6 +41,9 @@ pub struct DaemonConfig {
}

pub async fn run(cfg: DaemonConfig) -> Result<()> {
let mut startup = StartupReporter::new(&cfg.container_tag);
startup.report("creating_mountpoint", "preparing mountpoint")?;

let created_dir = !cfg.mount_path.exists();
if created_dir {
std::fs::create_dir_all(&cfg.mount_path)?;
Expand Down Expand Up @@ -89,6 +93,7 @@ pub async fn run(cfg: DaemonConfig) -> Result<()> {

let opts = MountOpts::new(cfg.mount_path.clone(), cfg.backend).with_ownership(uid, gid);

startup.report("validating_key", "validating API key")?;
let session = if cfg.ephemeral {
smfs_core::api::ApiClient::validate_key(&cfg.api_url, &cfg.api_key)
.await
Expand All @@ -113,6 +118,7 @@ pub async fn run(cfg: DaemonConfig) -> Result<()> {
"server did not return org id; cannot open cache. Run `smfs login` and retry."
)
})?;
startup.report("opening_cache", format!("opening cache for org {org_id}"))?;
let db_path = smfs_core::config::cache_db_path(org_id, &cfg.container_tag);
if let Some(parent) = db_path.parent() {
std::fs::create_dir_all(parent)?;
Expand All @@ -132,6 +138,7 @@ pub async fn run(cfg: DaemonConfig) -> Result<()> {
Arc::new(Db::open(&db_path)?)
};

startup.report("configuring_api", "configuring API client")?;
let mut api_client =
smfs_core::api::ApiClient::new(&cfg.api_url, &cfg.api_key, &cfg.container_tag);
if let Some(uid) = session.as_ref().and_then(|s| s.user_id.clone()) {
Expand All @@ -156,9 +163,43 @@ pub async fn run(cfg: DaemonConfig) -> Result<()> {

let fs = Arc::new(SupermemoryFs::with_api(db, api));

startup.report("warming_profile", "warming profile")?;
fs.warm_profile().await;

let pull_succeeded = match smfs_core::sync::SyncEngine::initial_pull(&fs).await {
startup.report("initial_sync", "starting initial sync")?;
let pull_succeeded = match smfs_core::sync::SyncEngine::initial_pull_with_progress(
&fs,
|progress| match progress {
smfs_core::sync::InitialPullProgress::DeletionScan(progress) => {
if progress.remote_seen == 1 || progress.remote_seen % 100 == 0 {
let _ = startup.report_counts(
"initial_sync",
format!(
"deletion scan saw {} remote docs (page {}/{})",
progress.remote_seen, progress.page, progress.total_pages
),
progress.remote_seen,
progress.total_items,
);
}
}
smfs_core::sync::InitialPullProgress::Pull(progress) => {
if progress.reconciled == 1 || progress.reconciled % 100 == 0 {
let _ = startup.report_counts(
"initial_sync",
format!(
"reconciled {} docs (page {}/{})",
progress.reconciled, progress.page, progress.total_pages
),
progress.reconciled,
progress.total_items,
);
}
}
},
)
.await
{
Ok((removed, reconciled)) => {
eprintln!(
"initial sync: {reconciled} docs reconciled, {removed} stale entries removed"
Expand Down Expand Up @@ -205,6 +246,7 @@ pub async fn run(cfg: DaemonConfig) -> Result<()> {
};
let sync_tasks = smfs_core::sync::SyncEngine::start(fs.clone(), sync_opts, shutdown_rx.clone());

startup.report("mounting_fs", "mounting filesystem")?;
let handle = mount_fs(fs.clone(), opts).await?;

// Auto-install grep wrapper on first mount.
Expand All @@ -215,6 +257,7 @@ pub async fn run(cfg: DaemonConfig) -> Result<()> {
}

// Bring up the IPC control socket. Clients use it for status/sync/unmount.
startup.report("starting_ipc", "starting IPC socket")?;
daemon::ensure_dirs().context("creating daemon state dirs")?;
let ipc_shutdown_notify = Arc::new(Notify::new());
let state = Arc::new(smfs_core::daemon::ipc::IpcState {
Expand Down Expand Up @@ -244,6 +287,7 @@ pub async fn run(cfg: DaemonConfig) -> Result<()> {
std::fs::create_dir_all(parent)?;
}
std::fs::write(&pid_path, std::process::id().to_string())?;
startup.report("ready", "filesystem mounted and IPC ready")?;

eprintln!(
"supermemoryfs mounted at {} (backend: {}, tag: {})",
Expand Down Expand Up @@ -340,6 +384,7 @@ pub async fn run(cfg: DaemonConfig) -> Result<()> {
}
let _ = std::fs::remove_file(&pid_path);
let _ = std::fs::remove_file(&socket_path);
let _ = std::fs::remove_file(daemon::startup_path(&cfg.container_tag));
if created_dir {
let _ = std::fs::remove_dir(&cfg.mount_path);
}
Expand Down
1 change: 1 addition & 0 deletions crates/smfs/src/cmd/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ pub mod logout;
pub mod logs;
pub mod marker;
pub mod mount;
pub mod startup;
pub mod status;
pub mod sync;
pub mod unmount;
Expand Down
Loading
Loading