diff --git a/src/openhuman/composio/ops.rs b/src/openhuman/composio/ops.rs index 70b47db692..fe62011346 100644 --- a/src/openhuman/composio/ops.rs +++ b/src/openhuman/composio/ops.rs @@ -35,7 +35,6 @@ use crate::openhuman::config::Config; use crate::openhuman::memory::MemoryClient; use crate::openhuman::memory_store::chunks::store as memory_tree_store; use crate::openhuman::memory_store::chunks::types::SourceKind; -use crate::openhuman::memory_store::content::raw::slug_account_email; use crate::rpc::RpcOutcome; /// Result alias used by every `composio_*` op in this module. @@ -549,6 +548,7 @@ pub async fn composio_delete_connection( enum MemoryCleanupTarget { Exact(SourceKind, String), Prefix(SourceKind, String), + Owner(SourceKind, String), } impl MemoryCleanupTarget { @@ -564,6 +564,9 @@ impl MemoryCleanupTarget { source_id_prefix, ) } + Self::Owner(source_kind, owner) => { + memory_tree_store::delete_chunks_by_owner(config, *source_kind, owner) + } } } @@ -575,6 +578,9 @@ impl MemoryCleanupTarget { Self::Prefix(source_kind, source_id_prefix) => { format!("{}:{source_id_prefix}*", source_kind.as_str()) } + Self::Owner(source_kind, owner) => { + format!("{}:owner:{owner}", source_kind.as_str()) + } } } } @@ -604,30 +610,12 @@ async fn composio_memory_targets_for_connection( } fn gmail_memory_sources_for_connection(connection_id: &str) -> Vec { - let normalized_connection_id = - super::providers::profile::normalize_connection_identifier(connection_id); - let mut sources = Vec::new(); - for identity in super::providers::profile::load_connected_identities() { - if identity.source != "gmail" || identity.identifier != normalized_connection_id { - continue; - } - let Some(email) = identity - .email - .as_deref() - .map(str::trim) - .filter(|s| !s.is_empty()) - else { - continue; - }; - let source = MemoryCleanupTarget::Exact( - SourceKind::Email, - format!("gmail:{}", slug_account_email(email)), - ); - if !sources.iter().any(|existing| existing == &source) { - sources.push(source); - } - } - sources + vec![ + MemoryCleanupTarget::Owner(SourceKind::Email, format!("gmail-sync:{connection_id}")), + MemoryCleanupTarget::Exact(SourceKind::Email, format!("gmail:{connection_id}")), + MemoryCleanupTarget::Prefix(SourceKind::Email, format!("gmail:{connection_id}:")), + MemoryCleanupTarget::Prefix(SourceKind::Email, format!("gmail:{connection_id}/")), + ] } async fn notion_memory_targets_for_connection( diff --git a/src/openhuman/composio/ops_test.rs b/src/openhuman/composio/ops_test.rs index 90416fa41f..73997ef0e6 100644 --- a/src/openhuman/composio/ops_test.rs +++ b/src/openhuman/composio/ops_test.rs @@ -316,16 +316,26 @@ fn config_with_backend(tmp: &tempfile::TempDir, base: String) -> Config { } fn sample_memory_chunk(source_kind: SourceKind, source_id: &str, seq: u32) -> Chunk { + sample_memory_chunk_with_owner(source_kind, source_id, "alice@example.com", seq) +} + +fn sample_memory_chunk_with_owner( + source_kind: SourceKind, + source_id: &str, + owner: &str, + seq: u32, +) -> Chunk { let ts = Utc .timestamp_millis_opt(1_700_000_000_000 + i64::from(seq)) .unwrap(); + let content = format!("composio memory {source_id} {owner} {seq}"); Chunk { - id: chunk_id(source_kind, source_id, seq, "composio memory"), - content: format!("composio memory {source_id} {seq}"), + id: chunk_id(source_kind, source_id, seq, &content), + content, metadata: Metadata { source_kind, source_id: source_id.to_string(), - owner: "alice@example.com".to_string(), + owner: owner.to_string(), timestamp: ts, time_range: (ts, ts), tags: vec!["composio".to_string()], @@ -512,6 +522,78 @@ async fn composio_delete_connection_clear_memory_deletes_slack_source() { assert_eq!(remaining[0].metadata.source_id, "slack:c2"); } +#[tokio::test] +async fn composio_delete_connection_clear_memory_keeps_other_gmail_connections() { + let app = Router::new() + .route( + "/agent-integrations/composio/connections", + get(|| async { + Json(json!({ + "success": true, + "data": {"connections": [ + {"id":"c1","toolkit":"gmail","status":"ACTIVE"}, + {"id":"c2","toolkit":"gmail","status":"ACTIVE"} + ]} + })) + }), + ) + .route( + "/agent-integrations/composio/connections/{id}", + axum::routing::delete(|Path(_id): Path| async move { + Json(json!({"success": true, "data": {"deleted": true}})) + }), + ); + let base = start_mock_backend(app).await; + let tmp = tempfile::tempdir().unwrap(); + let config = config_with_backend(&tmp, base); + let c1_account = sample_memory_chunk_with_owner( + SourceKind::Email, + "gmail:pilot-at-example-dot-com", + "gmail-sync:c1", + 0, + ); + let c2_account = sample_memory_chunk_with_owner( + SourceKind::Email, + "gmail:pilot-at-example-dot-com", + "gmail-sync:c2", + 1, + ); + let c1_connection_scoped = + sample_memory_chunk_with_owner(SourceKind::Email, "gmail:c1:thread-a", "gmail-sync:c1", 2); + let c2_connection_scoped = + sample_memory_chunk_with_owner(SourceKind::Email, "gmail:c2:thread-b", "gmail-sync:c2", 3); + memory_tree_store::upsert_chunks( + &config, + &[ + c1_account, + c2_account.clone(), + c1_connection_scoped, + c2_connection_scoped.clone(), + ], + ) + .expect("chunks should seed"); + + let outcome = composio_delete_connection(&config, "c1", true) + .await + .unwrap(); + + assert!(outcome.value.deleted); + assert_eq!(outcome.value.memory_chunks_deleted, 2); + let remaining = memory_tree_store::list_chunks( + &config, + &memory_tree_store::ListChunksQuery { + source_kind: Some(SourceKind::Email), + ..Default::default() + }, + ) + .expect("chunks should list"); + assert_eq!(remaining.len(), 2); + assert!(remaining.iter().any(|chunk| chunk.id == c2_account.id)); + assert!(remaining + .iter() + .any(|chunk| chunk.id == c2_connection_scoped.id)); +} + #[tokio::test] async fn notion_cleanup_targets_include_synced_page_sources() { let tmp = tempfile::tempdir().unwrap(); diff --git a/src/openhuman/inference/provider/factory_test.rs b/src/openhuman/inference/provider/factory_test.rs index f6e688befd..ba951c2f4f 100644 --- a/src/openhuman/inference/provider/factory_test.rs +++ b/src/openhuman/inference/provider/factory_test.rs @@ -724,6 +724,7 @@ fn known_tiers_pass() { "agentic-v1", "coding-v1", "reasoning-quick-v1", + "summarization-v1", ] { assert!( is_known_openhuman_tier(tier), @@ -738,6 +739,7 @@ fn known_hints_pass() { assert!(is_known_openhuman_tier("hint:chat")); assert!(is_known_openhuman_tier("hint:agentic")); assert!(is_known_openhuman_tier("hint:coding")); + assert!(is_known_openhuman_tier("hint:summarization")); } #[test] @@ -748,7 +750,7 @@ fn invalid_models_fail() { assert!(!is_known_openhuman_tier("")); assert!(!is_known_openhuman_tier("reasoning-v2")); // Unrecognized `hint:*` values must NOT be accepted — the factory only - // translates the four hints above, so any other `hint:*` string would + // translates the known hints above, so any other `hint:*` string would // otherwise be forwarded to the backend and rejected with HTTP 400. assert!(!is_known_openhuman_tier("hint:garbage")); assert!(!is_known_openhuman_tier("hint:reasoning-quick")); @@ -771,6 +773,14 @@ fn make_openhuman_backend_forwards_unknown_hint_verbatim() { } } +#[test] +fn make_openhuman_backend_translates_summarization_hint() { + let mut config = Config::default(); + config.default_model = Some("hint:summarization".to_string()); + let (_, model) = make_openhuman_backend(&config).expect("factory should succeed"); + assert_eq!(model, crate::openhuman::config::MODEL_SUMMARIZATION_V1); +} + #[test] fn make_openhuman_backend_falls_back_for_invalid_model() { // An invalid default_model must not be forwarded to the backend. diff --git a/src/openhuman/memory/chat.rs b/src/openhuman/memory/chat.rs index 4afa381e21..feba63eca0 100644 --- a/src/openhuman/memory/chat.rs +++ b/src/openhuman/memory/chat.rs @@ -215,6 +215,7 @@ mod tests { fn build_chat_runtime_defaults_to_openhuman_resolved_model() { let cfg = Config::default(); let (_provider, model) = build_chat_runtime(&cfg).unwrap(); + assert_eq!(model, DEFAULT_CLOUD_LLM_MODEL); // build_chat_runtime resolves the "summarization" workload role, // which routes to the dedicated `summarization-v1` tier (PR #2690) // rather than the generic `reasoning-v1` fallback. diff --git a/src/openhuman/memory/ops/files.rs b/src/openhuman/memory/ops/files.rs index fbaa426bc5..3ee5caa277 100644 --- a/src/openhuman/memory/ops/files.rs +++ b/src/openhuman/memory/ops/files.rs @@ -48,6 +48,13 @@ pub async fn ai_list_memory_files( } let file_name = entry.file_name(); let file_name = file_name.to_string_lossy(); + // Hide SQLite engine files from user-facing memory file listings. + if matches!( + file_name.as_ref(), + "memory.db" | "memory.db-shm" | "memory.db-wal" + ) { + continue; + } if !file_name.is_empty() { files.push(file_name.to_string()); } @@ -193,6 +200,40 @@ mod tests { assert_eq!(listed_data.count, 2); } + #[tokio::test] + async fn list_memory_files_skips_internal_sqlite_artifacts() { + let _guard = TEST_ENV_LOCK + .lock() + .unwrap_or_else(|poisoned| poisoned.into_inner()); + let tmp = TempDir::new().expect("tempdir"); + let _workspace = WorkspaceEnvGuard::set(tmp.path()); + + let memory_root = super::super::helpers::resolve_existing_memory_path("") + .await + .expect("resolve memory root"); + tokio::fs::write(memory_root.join("memory.db"), "x") + .await + .expect("write db"); + tokio::fs::write(memory_root.join("memory.db-shm"), "x") + .await + .expect("write shm"); + tokio::fs::write(memory_root.join("memory.db-wal"), "x") + .await + .expect("write wal"); + tokio::fs::write(memory_root.join("visible.md"), "ok") + .await + .expect("write visible"); + + let listed = ai_list_memory_files(ListMemoryFilesRequest { + relative_dir: String::new(), + }) + .await + .expect("list should succeed"); + let listed_data = listed.value.data.expect("list data"); + assert_eq!(listed_data.files, vec!["visible.md"]); + assert_eq!(listed_data.count, 1); + } + #[tokio::test] async fn list_memory_files_rejects_non_directory_target() { let _guard = TEST_ENV_LOCK @@ -371,9 +412,9 @@ mod tests { .await .expect("list should succeed"); let listed_data = listed.value.data.expect("list data"); - // The test only cares about the symlink-skipping invariant — the - // listing may also include the SQLite memory store files (`memory.db`, - // `memory.db-shm`, `memory.db-wal`) that the test fixture initializes. + // This test pins the symlink-skipping invariant; SQLite engine files + // (`memory.db`, `memory.db-shm`, `memory.db-wal`) are exercised by + // `list_memory_files_skips_internal_sqlite_artifacts` above. assert!( listed_data.files.iter().any(|f| f == "real.md"), "expected real.md in listing, got {:?}", diff --git a/src/openhuman/memory_store/chunks/store.rs b/src/openhuman/memory_store/chunks/store.rs index 6d1a98c6d8..e50dd529a9 100644 --- a/src/openhuman/memory_store/chunks/store.rs +++ b/src/openhuman/memory_store/chunks/store.rs @@ -26,7 +26,7 @@ use anyhow::{Context, Result}; use chrono::{DateTime, TimeZone, Utc}; use parking_lot::Mutex as PMutex; use rusqlite::{params, Connection, OptionalExtension, Transaction}; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::path::{Path, PathBuf}; use std::sync::atomic::{AtomicBool, AtomicU32, Ordering}; #[cfg(test)] @@ -35,7 +35,7 @@ use std::sync::{Arc, OnceLock}; use std::time::{Duration, Instant}; use crate::openhuman::config::Config; -use crate::openhuman::memory::util::redact; +use crate::openhuman::memory::util::redact::{self, redact as redact_value}; use crate::openhuman::memory_store::chunks::types::{Chunk, Metadata, SourceKind, SourceRef}; use crate::openhuman::memory_store::content::StagedChunk; @@ -730,7 +730,13 @@ pub fn delete_chunks_by_source( source_kind: SourceKind, source_id: &str, ) -> Result { - delete_chunks_by_source_filter(config, source_kind, |candidate| candidate == source_id) + delete_chunks_by_source_filter( + "delete_chunks_by_source", + config, + source_kind, + |candidate, _owner| candidate == source_id, + |candidate| candidate == source_id, + ) } /// Delete all chunk rows whose source id starts with `source_id_prefix`. @@ -742,15 +748,37 @@ pub fn delete_chunks_by_source_prefix( source_kind: SourceKind, source_id_prefix: &str, ) -> Result { - delete_chunks_by_source_filter(config, source_kind, |candidate| { - candidate.starts_with(source_id_prefix) - }) + delete_chunks_by_source_filter( + "delete_chunks_by_source_prefix", + config, + source_kind, + |candidate, _owner| candidate.starts_with(source_id_prefix), + |candidate| candidate.starts_with(source_id_prefix), + ) +} + +/// Delete all chunk rows for one exact `(source_kind, owner)` while preserving +/// source ingest gates that still have chunks owned by another connection. +pub fn delete_chunks_by_owner( + config: &Config, + source_kind: SourceKind, + owner: &str, +) -> Result { + delete_chunks_by_source_filter( + "delete_chunks_by_owner", + config, + source_kind, + |_source_id, candidate_owner| candidate_owner == owner, + |_source_id| false, + ) } fn delete_chunks_by_source_filter( + op: &str, config: &Config, source_kind: SourceKind, - matches_source: impl Fn(&str) -> bool, + matches_chunk: impl Fn(&str, &str) -> bool, + matches_ingested_source: impl Fn(&str) -> bool, ) -> Result { let mut content_paths = Vec::new(); let deleted = with_connection(config, |conn| { @@ -758,7 +786,7 @@ fn delete_chunks_by_source_filter( let chunks = { let mut stmt = tx.prepare( - "SELECT id, source_id, content_path + "SELECT id, source_id, owner, content_path FROM mem_tree_chunks WHERE source_kind = ?1", )?; @@ -766,12 +794,13 @@ fn delete_chunks_by_source_filter( Ok(( row.get::<_, String>(0)?, row.get::<_, String>(1)?, - row.get::<_, Option>(2)?, + row.get::<_, String>(2)?, + row.get::<_, Option>(3)?, )) })?; rows.filter_map(|row| match row { - Ok((id, source_id, content_path)) if matches_source(&source_id) => { - Some(Ok((id, content_path))) + Ok((id, source_id, owner, content_path)) if matches_chunk(&source_id, &owner) => { + Some(Ok((id, source_id, content_path))) } Ok(_) => None, Err(error) => Some(Err(error)), @@ -780,7 +809,12 @@ fn delete_chunks_by_source_filter( .context("Failed to collect memory_tree chunks by source")? }; - for (chunk_id, content_path) in &chunks { + let deleted_source_ids: HashSet = chunks + .iter() + .map(|(_, source_id, _)| source_id.clone()) + .collect(); + + for (chunk_id, _source_id, content_path) in &chunks { tx.execute( "DELETE FROM mem_tree_score WHERE chunk_id = ?1", params![chunk_id], @@ -806,6 +840,29 @@ fn delete_chunks_by_source_filter( } } + let mut orphaned_deleted_sources = HashSet::new(); + for source_id in &deleted_source_ids { + let remaining: i64 = tx.query_row( + "SELECT COUNT(*) + FROM mem_tree_chunks + WHERE source_kind = ?1 AND source_id = ?2", + params![source_kind.as_str(), source_id], + |row| row.get(0), + )?; + if remaining == 0 { + log::debug!( + "[memory::chunk_store] {op}: source_id_hash={} orphaned; removing ingest gate", + redact_value(source_id), + ); + orphaned_deleted_sources.insert(source_id.clone()); + } else { + log::debug!( + "[memory::chunk_store] {op}: source_id_hash={} remaining_chunks={remaining}; preserving ingest gate", + redact_value(source_id), + ); + } + } + let ingested_sources = { let mut stmt = tx.prepare( "SELECT source_id @@ -815,7 +872,12 @@ fn delete_chunks_by_source_filter( let rows = stmt.query_map(params![source_kind.as_str()], |row| row.get::<_, String>(0))?; rows.filter_map(|row| match row { - Ok(source_id) if matches_source(&source_id) => Some(Ok(source_id)), + Ok(source_id) + if matches_ingested_source(&source_id) + || orphaned_deleted_sources.contains(&source_id) => + { + Some(Ok(source_id)) + } Ok(_) => None, Err(error) => Some(Err(error)), }) diff --git a/src/openhuman/memory_store/chunks/store_tests.rs b/src/openhuman/memory_store/chunks/store_tests.rs index 2487f1558b..b6d963b26a 100644 --- a/src/openhuman/memory_store/chunks/store_tests.rs +++ b/src/openhuman/memory_store/chunks/store_tests.rs @@ -296,6 +296,55 @@ fn delete_chunks_by_source_removes_chunks_side_rows_and_ingest_gate() { .unwrap(); } +#[test] +fn delete_chunks_by_owner_preserves_other_owners_for_same_source() { + let (_tmp, cfg) = test_config(); + let mut target = sample_chunk("slack:shared", 0, 1_700_000_000_000); + target.metadata.owner = "slack-sync:c-1".to_string(); + let mut same_source_other_owner = sample_chunk("slack:shared", 1, 1_700_000_001_000); + same_source_other_owner.metadata.owner = "slack-sync:c-2".to_string(); + let mut target_other_source = sample_chunk("slack:c-1-only", 0, 1_700_000_002_000); + target_other_source.metadata.owner = "slack-sync:c-1".to_string(); + upsert_chunks( + &cfg, + &[ + target.clone(), + same_source_other_owner.clone(), + target_other_source.clone(), + ], + ) + .unwrap(); + with_connection(&cfg, |conn| { + let tx = conn.unchecked_transaction()?; + assert!(claim_source_ingest_tx( + &tx, + SourceKind::Chat, + "slack:shared", + 1_700_000_000_000 + )?); + assert!(claim_source_ingest_tx( + &tx, + SourceKind::Chat, + "slack:c-1-only", + 1_700_000_000_000 + )?); + tx.commit()?; + Ok(()) + }) + .unwrap(); + + let deleted = delete_chunks_by_owner(&cfg, SourceKind::Chat, "slack-sync:c-1").unwrap(); + + assert_eq!(deleted, 2); + assert!(get_chunk(&cfg, &target.id).unwrap().is_none()); + assert!(get_chunk(&cfg, &target_other_source.id).unwrap().is_none()); + assert!(get_chunk(&cfg, &same_source_other_owner.id) + .unwrap() + .is_some()); + assert!(is_source_ingested(&cfg, SourceKind::Chat, "slack:shared").unwrap()); + assert!(!is_source_ingested(&cfg, SourceKind::Chat, "slack:c-1-only").unwrap()); +} + #[test] fn delete_chunks_by_source_removes_safe_content_files_but_rejects_escape_paths() { let (_tmp, cfg) = test_config();