Skip to content
38 changes: 13 additions & 25 deletions src/openhuman/composio/ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use crate::openhuman::config::Config;
use crate::openhuman::memory::MemoryClient;
use crate::openhuman::memory_store::chunks::store as memory_tree_store;
use crate::openhuman::memory_store::chunks::types::SourceKind;
use crate::openhuman::memory_store::content::raw::slug_account_email;
use crate::rpc::RpcOutcome;

/// Result alias used by every `composio_*` op in this module.
Expand Down Expand Up @@ -526,6 +525,7 @@ pub async fn composio_delete_connection(
enum MemoryCleanupTarget {
Exact(SourceKind, String),
Prefix(SourceKind, String),
Owner(SourceKind, String),
}

impl MemoryCleanupTarget {
Expand All @@ -541,6 +541,9 @@ impl MemoryCleanupTarget {
source_id_prefix,
)
}
Self::Owner(source_kind, owner) => {
memory_tree_store::delete_chunks_by_owner(config, *source_kind, owner)
}
}
}

Expand All @@ -552,6 +555,9 @@ impl MemoryCleanupTarget {
Self::Prefix(source_kind, source_id_prefix) => {
format!("{}:{source_id_prefix}*", source_kind.as_str())
}
Self::Owner(source_kind, owner) => {
format!("{} owner:{owner}", source_kind.as_str())
}
}
}
}
Expand Down Expand Up @@ -581,30 +587,12 @@ async fn composio_memory_targets_for_connection(
}

fn gmail_memory_sources_for_connection(connection_id: &str) -> Vec<MemoryCleanupTarget> {
let normalized_connection_id =
super::providers::profile::normalize_connection_identifier(connection_id);
let mut sources = Vec::new();
for identity in super::providers::profile::load_connected_identities() {
if identity.source != "gmail" || identity.identifier != normalized_connection_id {
continue;
}
let Some(email) = identity
.email
.as_deref()
.map(str::trim)
.filter(|s| !s.is_empty())
else {
continue;
};
let source = MemoryCleanupTarget::Exact(
SourceKind::Email,
format!("gmail:{}", slug_account_email(email)),
);
if !sources.iter().any(|existing| existing == &source) {
sources.push(source);
}
}
sources
vec![
MemoryCleanupTarget::Owner(SourceKind::Email, format!("gmail-sync:{connection_id}")),
MemoryCleanupTarget::Exact(SourceKind::Email, format!("gmail:{connection_id}")),
MemoryCleanupTarget::Prefix(SourceKind::Email, format!("gmail:{connection_id}:")),
MemoryCleanupTarget::Prefix(SourceKind::Email, format!("gmail:{connection_id}/")),
]
}
Comment thread
Felyx-Fu marked this conversation as resolved.

async fn notion_memory_targets_for_connection(
Expand Down
88 changes: 85 additions & 3 deletions src/openhuman/composio/ops_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -316,16 +316,26 @@ fn config_with_backend(tmp: &tempfile::TempDir, base: String) -> Config {
}

fn sample_memory_chunk(source_kind: SourceKind, source_id: &str, seq: u32) -> Chunk {
sample_memory_chunk_with_owner(source_kind, source_id, "alice@example.com", seq)
}

fn sample_memory_chunk_with_owner(
source_kind: SourceKind,
source_id: &str,
owner: &str,
seq: u32,
) -> Chunk {
let ts = Utc
.timestamp_millis_opt(1_700_000_000_000 + i64::from(seq))
.unwrap();
let content = format!("composio memory {source_id} {owner} {seq}");
Chunk {
id: chunk_id(source_kind, source_id, seq, "composio memory"),
content: format!("composio memory {source_id} {seq}"),
id: chunk_id(source_kind, source_id, seq, &content),
content,
metadata: Metadata {
source_kind,
source_id: source_id.to_string(),
owner: "alice@example.com".to_string(),
owner: owner.to_string(),
timestamp: ts,
time_range: (ts, ts),
tags: vec!["composio".to_string()],
Expand Down Expand Up @@ -512,6 +522,78 @@ async fn composio_delete_connection_clear_memory_deletes_slack_source() {
assert_eq!(remaining[0].metadata.source_id, "slack:c2");
}

#[tokio::test]
async fn composio_delete_connection_clear_memory_keeps_other_gmail_connections() {
let app = Router::new()
.route(
"/agent-integrations/composio/connections",
get(|| async {
Json(json!({
"success": true,
"data": {"connections": [
{"id":"c1","toolkit":"gmail","status":"ACTIVE"},
{"id":"c2","toolkit":"gmail","status":"ACTIVE"}
]}
}))
}),
)
.route(
"/agent-integrations/composio/connections/{id}",
axum::routing::delete(|Path(_id): Path<String>| async move {
Json(json!({"success": true, "data": {"deleted": true}}))
}),
);
let base = start_mock_backend(app).await;
let tmp = tempfile::tempdir().unwrap();
let config = config_with_backend(&tmp, base);
let c1_account = sample_memory_chunk_with_owner(
SourceKind::Email,
"gmail:pilot-at-example-dot-com",
"gmail-sync:c1",
0,
);
let c2_account = sample_memory_chunk_with_owner(
SourceKind::Email,
"gmail:pilot-at-example-dot-com",
"gmail-sync:c2",
1,
);
let c1_connection_scoped =
sample_memory_chunk_with_owner(SourceKind::Email, "gmail:c1:thread-a", "gmail-sync:c1", 2);
let c2_connection_scoped =
sample_memory_chunk_with_owner(SourceKind::Email, "gmail:c2:thread-b", "gmail-sync:c2", 3);
memory_tree_store::upsert_chunks(
&config,
&[
c1_account,
c2_account.clone(),
c1_connection_scoped,
c2_connection_scoped.clone(),
],
)
.expect("chunks should seed");

let outcome = composio_delete_connection(&config, "c1", true)
.await
.unwrap();

assert!(outcome.value.deleted);
assert_eq!(outcome.value.memory_chunks_deleted, 2);
let remaining = memory_tree_store::list_chunks(
&config,
&memory_tree_store::ListChunksQuery {
source_kind: Some(SourceKind::Email),
..Default::default()
},
)
.expect("chunks should list");
assert_eq!(remaining.len(), 2);
assert!(remaining.iter().any(|chunk| chunk.id == c2_account.id));
assert!(remaining
.iter()
.any(|chunk| chunk.id == c2_connection_scoped.id));
}

#[tokio::test]
async fn notion_cleanup_targets_include_synced_page_sources() {
let tmp = tempfile::tempdir().unwrap();
Expand Down
1 change: 1 addition & 0 deletions src/openhuman/config/ops_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -630,6 +630,7 @@ async fn apply_autonomy_settings_updates_action_budget() {
&mut cfg,
AutonomySettingsPatch {
max_actions_per_hour: Some(64),
..Default::default()
},
)
.await
Expand Down
84 changes: 71 additions & 13 deletions src/openhuman/memory_store/chunks/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use anyhow::{Context, Result};
use chrono::{DateTime, TimeZone, Utc};
use parking_lot::Mutex as PMutex;
use rusqlite::{params, Connection, OptionalExtension, Transaction};
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
#[cfg(test)]
Expand All @@ -35,7 +35,7 @@ use std::sync::{Arc, OnceLock};
use std::time::{Duration, Instant};

use crate::openhuman::config::Config;
use crate::openhuman::memory::util::redact;
use crate::openhuman::memory::util::redact::{self, redact as redact_value};
use crate::openhuman::memory_store::chunks::types::{Chunk, Metadata, SourceKind, SourceRef};
use crate::openhuman::memory_store::content::StagedChunk;

Expand Down Expand Up @@ -730,7 +730,12 @@ pub fn delete_chunks_by_source(
source_kind: SourceKind,
source_id: &str,
) -> Result<usize> {
delete_chunks_by_source_filter(config, source_kind, |candidate| candidate == source_id)
delete_chunks_by_source_filter(
config,
source_kind,
|candidate, _owner| candidate == source_id,
|candidate| candidate == source_id,
)
}

/// Delete all chunk rows whose source id starts with `source_id_prefix`.
Expand All @@ -742,36 +747,56 @@ pub fn delete_chunks_by_source_prefix(
source_kind: SourceKind,
source_id_prefix: &str,
) -> Result<usize> {
delete_chunks_by_source_filter(config, source_kind, |candidate| {
candidate.starts_with(source_id_prefix)
})
delete_chunks_by_source_filter(
config,
source_kind,
|candidate, _owner| candidate.starts_with(source_id_prefix),
|candidate| candidate.starts_with(source_id_prefix),
)
}

/// Delete all chunk rows for one exact `(source_kind, owner)` while preserving
/// source ingest gates that still have chunks owned by another connection.
pub fn delete_chunks_by_owner(
config: &Config,
source_kind: SourceKind,
owner: &str,
) -> Result<usize> {
delete_chunks_by_source_filter(
config,
source_kind,
|_source_id, candidate_owner| candidate_owner == owner,
|_source_id| false,
)
}

fn delete_chunks_by_source_filter(
config: &Config,
source_kind: SourceKind,
matches_source: impl Fn(&str) -> bool,
matches_chunk: impl Fn(&str, &str) -> bool,
matches_ingested_source: impl Fn(&str) -> bool,
) -> Result<usize> {
let mut content_paths = Vec::new();
let deleted = with_connection(config, |conn| {
let tx = conn.unchecked_transaction()?;

let chunks = {
let mut stmt = tx.prepare(
"SELECT id, source_id, content_path
"SELECT id, source_id, owner, content_path
FROM mem_tree_chunks
WHERE source_kind = ?1",
)?;
let rows = stmt.query_map(params![source_kind.as_str()], |row| {
Ok((
row.get::<_, String>(0)?,
row.get::<_, String>(1)?,
row.get::<_, Option<String>>(2)?,
row.get::<_, String>(2)?,
row.get::<_, Option<String>>(3)?,
))
})?;
rows.filter_map(|row| match row {
Ok((id, source_id, content_path)) if matches_source(&source_id) => {
Some(Ok((id, content_path)))
Ok((id, source_id, owner, content_path)) if matches_chunk(&source_id, &owner) => {
Some(Ok((id, source_id, content_path)))
}
Ok(_) => None,
Err(error) => Some(Err(error)),
Expand All @@ -780,7 +805,12 @@ fn delete_chunks_by_source_filter(
.context("Failed to collect memory_tree chunks by source")?
};

for (chunk_id, content_path) in &chunks {
let deleted_source_ids: HashSet<String> = chunks
.iter()
.map(|(_, source_id, _)| source_id.clone())
.collect();

for (chunk_id, _source_id, content_path) in &chunks {
tx.execute(
"DELETE FROM mem_tree_score WHERE chunk_id = ?1",
params![chunk_id],
Expand All @@ -806,6 +836,29 @@ fn delete_chunks_by_source_filter(
}
}

let mut orphaned_deleted_sources = HashSet::new();
for source_id in &deleted_source_ids {
let remaining: i64 = tx.query_row(
"SELECT COUNT(*)
FROM mem_tree_chunks
WHERE source_kind = ?1 AND source_id = ?2",
params![source_kind.as_str(), source_id],
|row| row.get(0),
)?;
if remaining == 0 {
log::debug!(
"[memory::chunk_store] delete_chunks_by_owner: source_id_hash={} orphaned; removing ingest gate",
Comment thread
Felyx-Fu marked this conversation as resolved.
Outdated
redact_value(source_id),
);
orphaned_deleted_sources.insert(source_id.clone());
} else {
log::debug!(
"[memory::chunk_store] delete_chunks_by_owner: source_id_hash={} remaining_chunks={remaining}; preserving ingest gate",
redact_value(source_id),
);
}
}
Comment thread
Felyx-Fu marked this conversation as resolved.

let ingested_sources = {
let mut stmt = tx.prepare(
"SELECT source_id
Expand All @@ -815,7 +868,12 @@ fn delete_chunks_by_source_filter(
let rows =
stmt.query_map(params![source_kind.as_str()], |row| row.get::<_, String>(0))?;
rows.filter_map(|row| match row {
Ok(source_id) if matches_source(&source_id) => Some(Ok(source_id)),
Ok(source_id)
if matches_ingested_source(&source_id)
|| orphaned_deleted_sources.contains(&source_id) =>
{
Some(Ok(source_id))
}
Ok(_) => None,
Err(error) => Some(Err(error)),
})
Expand Down
Loading
Loading