diff --git a/crates/openfang-api/src/channel_bridge.rs b/crates/openfang-api/src/channel_bridge.rs index feac4c7ac8..48a5b18c42 100644 --- a/crates/openfang-api/src/channel_bridge.rs +++ b/crates/openfang-api/src/channel_bridge.rs @@ -1185,6 +1185,7 @@ pub async fn start_channel_bridge_with_config( dc_config.allowed_users.clone(), dc_config.ignore_bots, dc_config.intents, + dc_config.auto_thread.clone(), )); adapters.push((adapter, dc_config.default_agent.clone())); } diff --git a/crates/openfang-channels/src/bridge.rs b/crates/openfang-channels/src/bridge.rs index 522f4b6773..05f2c3200b 100644 --- a/crates/openfang-channels/src/bridge.rs +++ b/crates/openfang-channels/src/bridge.rs @@ -723,12 +723,20 @@ async fn dispatch_message( .as_ref() .map(|o| o.lifecycle_reactions) .unwrap_or(true); - let thread_id = if threading_enabled { - message.thread_id.as_deref() + + // --- Auto-thread: decide intent now, but create AFTER all policy guards --- + let auto_thread_name = if !threading_enabled && message.thread_id.is_none() { + adapter.should_auto_thread(message).await } else { None }; + // thread_id is resolved later, after all guards pass. + // Always propagate an existing thread_id (message arrived inside a thread), + // regardless of threading_enabled — that flag controls explicit threading config, + // not auto-detected thread context. + let mut effective_thread_id: Option = message.thread_id.clone(); + // --- DM/Group policy check --- if let Some(ref ov) = overrides { if message.is_group { @@ -789,12 +797,42 @@ async fn dispatch_message( if let Err(msg) = rate_limiter.check(ct_str, sender_user_id(message), ov.rate_limit_per_user) { - send_response(adapter, &message.sender, msg, thread_id, output_format).await; + // Rate-limit rejection: don't create a thread, use existing thread if any + send_response( + adapter, + &message.sender, + msg, + message.thread_id.as_deref(), + output_format, + ) + .await; return; } } } + // --- Create auto-thread NOW (after all policy guards have passed) --- + if let Some(ref thread_name) = auto_thread_name { + match adapter + .create_thread(&message.sender, &message.platform_message_id, thread_name) + .await + { + Ok(new_thread_id) => { + info!( + "Created auto-thread {} for message {}", + thread_name, message.platform_message_id + ); + effective_thread_id = Some(new_thread_id); + } + Err(e) => { + warn!("Failed to create auto-thread: {}", e); + } + } + } + + // Resolve final thread_id reference used by all downstream send_response calls + let thread_id = effective_thread_id.as_deref(); + // Handle commands first (early return) if let ChannelContent::Command { ref name, ref args } = message.content { let result = handle_command(name, args, handle, router, &message.sender).await; diff --git a/crates/openfang-channels/src/discord.rs b/crates/openfang-channels/src/discord.rs index cffa63a876..e32e4291aa 100644 --- a/crates/openfang-channels/src/discord.rs +++ b/crates/openfang-channels/src/discord.rs @@ -8,7 +8,7 @@ use crate::types::{ }; use async_trait::async_trait; use futures::{SinkExt, Stream, StreamExt}; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::pin::Pin; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; @@ -22,6 +22,10 @@ const DISCORD_API_BASE: &str = "https://discord.com/api/v10"; const MAX_BACKOFF: Duration = Duration::from_secs(60); const INITIAL_BACKOFF: Duration = Duration::from_secs(1); const DISCORD_MSG_LIMIT: usize = 2000; +/// Maximum number of seen message IDs kept in the dedup set. +/// MESSAGE_UPDATE (embed resolution) events arrive within seconds of the +/// original CREATE; entries older than this cap are safe to discard. +const MAX_DEDUP_MSG_IDS: usize = 2_000; /// Discord Gateway opcodes. mod opcode { @@ -56,6 +60,8 @@ pub struct DiscordAdapter { allowed_users: Vec, ignore_bots: bool, intents: u64, + /// Auto-thread behavior: "true", "false", or "smart" + auto_thread: String, shutdown_tx: Arc>, shutdown_rx: watch::Receiver, /// Bot's own user ID (populated after READY event). @@ -64,6 +70,13 @@ pub struct DiscordAdapter { session_id: Arc>>, /// Resume gateway URL. resume_gateway_url: Arc>>, + /// Thread channel IDs created by this bot (thread_id → parent_channel_id). + /// Used to detect when incoming messages are inside a bot-created thread. + created_thread_ids: Arc>>, + /// Message IDs seen via MESSAGE_CREATE (used to drop duplicate MESSAGE_UPDATE events). + /// Populated immediately when MESSAGE_CREATE is forwarded — before bridge processing — + /// to eliminate the race window where MESSAGE_UPDATE arrives before thread creation completes. + threaded_message_ids: Arc>>, } impl DiscordAdapter { @@ -73,6 +86,7 @@ impl DiscordAdapter { allowed_users: Vec, ignore_bots: bool, intents: u64, + auto_thread: String, ) -> Self { let (shutdown_tx, shutdown_rx) = watch::channel(false); Self { @@ -82,11 +96,14 @@ impl DiscordAdapter { allowed_users, ignore_bots, intents, + auto_thread, shutdown_tx: Arc::new(shutdown_tx), shutdown_rx, bot_user_id: Arc::new(RwLock::new(None)), session_id: Arc::new(RwLock::new(None)), resume_gateway_url: Arc::new(RwLock::new(None)), + created_thread_ids: Arc::new(RwLock::new(HashMap::new())), + threaded_message_ids: Arc::new(RwLock::new(HashSet::new())), } } @@ -147,6 +164,79 @@ impl DiscordAdapter { .await?; Ok(()) } + + /// Create a thread from a message in a Discord channel. + async fn api_create_thread( + &self, + channel_id: &str, + message_id: &str, + name: &str, + ) -> Result> { + let url = format!( + "{DISCORD_API_BASE}/channels/{channel_id}/messages/{message_id}/threads", + channel_id = channel_id, + message_id = message_id + ); + let body = serde_json::json!({ + "name": name, + "auto_archive_duration": 1440 // 24 hours + }); + let resp = self + .client + .post(&url) + .header("Authorization", format!("Bot {}", self.token.as_str())) + .json(&body) + .send() + .await?; + + if !resp.status().is_success() { + let body_text = resp.text().await.unwrap_or_default(); + return Err(format!("Discord createThread failed: {}", body_text).into()); + } + + let response: serde_json::Value = resp.json().await?; + let thread_id = response["id"].as_str().unwrap_or("").to_string(); + + // Track thread_id → parent channel_id so we can recognise messages + // that arrive inside this thread. + if !thread_id.is_empty() { + self.created_thread_ids + .write() + .await + .insert(thread_id.clone(), channel_id.to_string()); + } + + Ok(thread_id) + } + + /// Send a message to an existing thread. + /// Discord threads are channels — post directly to channels/{thread_id}/messages. + async fn api_send_thread_message( + &self, + _channel_id: &str, + thread_id: &str, + text: &str, + ) -> Result<(), Box> { + let url = format!("{DISCORD_API_BASE}/channels/{thread_id}/messages"); + let chunks = split_message(text, DISCORD_MSG_LIMIT); + + for chunk in chunks { + let body = serde_json::json!({ "content": chunk }); + let resp = self + .client + .post(&url) + .header("Authorization", format!("Bot {}", self.token.as_str())) + .json(&body) + .send() + .await?; + + if !resp.status().is_success() { + let body_text = resp.text().await.unwrap_or_default(); + warn!("Discord sendThreadMessage failed: {body_text}"); + } + } + Ok(()) + } } #[async_trait] @@ -159,6 +249,33 @@ impl ChannelAdapter for DiscordAdapter { ChannelType::Discord } + async fn should_auto_thread(&self, message: &ChannelMessage) -> Option { + // Only auto-thread in group channels (servers), not DMs + if !message.is_group { + return None; + } + + // Check auto_thread mode + match self.auto_thread.as_str() { + "true" => Some(thread_name_from_message(message)), + "false" => None, + "smart" => { + // Only create thread if bot was @mentioned + let was_mentioned = message + .metadata + .get("was_mentioned") + .and_then(|v| v.as_bool()) + .unwrap_or(false); + if was_mentioned { + Some(thread_name_from_message(message)) + } else { + None + } + } + _ => None, + } + } + async fn start( &self, ) -> Result + Send>>, Box> @@ -176,6 +293,8 @@ impl ChannelAdapter for DiscordAdapter { let bot_user_id = self.bot_user_id.clone(); let session_id_store = self.session_id.clone(); let resume_url_store = self.resume_gateway_url.clone(); + let created_thread_ids = self.created_thread_ids.clone(); + let threaded_message_ids = self.threaded_message_ids.clone(); let mut shutdown = self.shutdown_rx.clone(); tokio::spawn(async move { @@ -414,19 +533,66 @@ impl ChannelAdapter for DiscordAdapter { &allowed_guilds, &allowed_users, ignore_bots, + &created_thread_ids, ) .await { + // MESSAGE_UPDATE must be suppressed if we already + // forwarded a MESSAGE_CREATE for this message ID. + // The check uses `seen_message_ids` (tracked below) + // which is populated the moment MESSAGE_CREATE is + // forwarded — before the bridge even processes it. + // This closes the race window where MESSAGE_UPDATE + // arrives before adapter.create_thread() completes. + if event_name == "MESSAGE_UPDATE" + && threaded_message_ids + .read() + .await + .contains(&msg.platform_message_id) + { + debug!( + "Discord MESSAGE_UPDATE skipped (already seen {})", + msg.platform_message_id + ); + continue; + } + debug!( "Discord {event_name} from {}: {:?}", msg.sender.display_name, msg.content ); + + // Mark this message as seen immediately so any + // concurrent or subsequent MESSAGE_UPDATE is dropped. + if event_name == "MESSAGE_CREATE" { + threaded_message_ids + .write() + .await + .insert(msg.platform_message_id.clone()); + } + if tx.send(msg).await.is_err() { return; } } } + "THREAD_DELETE" | "CHANNEL_DELETE" => { + // Clean up tracking when a thread is deleted so the + // next message in the parent channel is treated fresh. + if let Some(tid) = d["id"].as_str() { + created_thread_ids.write().await.remove(tid); + // Prune the dedup set to prevent unbounded growth. + // Entries older than MAX_DEDUP_MSG_IDS are safe to + // discard — embed UPDATE events arrive within seconds. + let mut ids = threaded_message_ids.write().await; + if ids.len() > MAX_DEDUP_MSG_IDS { + ids.clear(); + } + debug!("Discord thread/channel deleted: {tid}"); + } + } + "RESUMED" => { info!("Discord session resumed successfully"); } @@ -532,6 +698,46 @@ impl ChannelAdapter for DiscordAdapter { self.api_send_typing(&user.platform_id).await } + async fn send_in_thread( + &self, + user: &ChannelUser, + content: ChannelContent, + thread_id: &str, + ) -> Result<(), Box> { + let channel_id = &user.platform_id; + match content { + ChannelContent::Text(text) => { + self.api_send_thread_message(channel_id, thread_id, &text) + .await?; + } + _ => { + self.api_send_thread_message(channel_id, thread_id, "(Unsupported content type)") + .await?; + } + } + Ok(()) + } + + async fn create_thread( + &self, + user: &ChannelUser, + message_id: &str, + thread_name: &str, + ) -> Result> { + let channel_id = &user.platform_id; + let thread_id = self + .api_create_thread(channel_id, message_id, thread_name) + .await?; + // Also ensure the message_id is marked as seen (belt-and-suspenders: + // the gateway loop already inserts on MESSAGE_CREATE, but keep this + // in case create_thread is ever called from another path). + self.threaded_message_ids + .write() + .await + .insert(message_id.to_string()); + Ok(thread_id) + } + async fn stop(&self) -> Result<(), Box> { let _ = self.shutdown_tx.send(true); Ok(()) @@ -545,6 +751,7 @@ async fn parse_discord_message( allowed_guilds: &[String], allowed_users: &[String], ignore_bots: bool, + created_thread_ids: &Arc>>, ) -> Option { let author = d.get("author")?; let author_id = author["id"].as_str()?; @@ -583,6 +790,20 @@ async fn parse_discord_message( let channel_id = d["channel_id"].as_str()?; let message_id = d["id"].as_str().unwrap_or("0"); + + // Detect if this message is inside a bot-created thread. + // In Discord, a thread is its own channel — channel_id will be the thread's ID. + // If so, use the parent channel as platform_id and set thread_id so that: + // (a) auto-thread logic is skipped (message.thread_id.is_some()) + // (b) responses are sent back into the same thread + let (effective_channel_id, parsed_thread_id) = { + let threads = created_thread_ids.read().await; + if let Some(parent_channel_id) = threads.get(channel_id) { + (parent_channel_id.clone(), Some(channel_id.to_string())) + } else { + (channel_id.to_string(), None) + } + }; let username = author["username"].as_str().unwrap_or("Unknown"); let discriminator = author["discriminator"].as_str().unwrap_or("0000"); let display_name = if discriminator == "0" { @@ -641,7 +862,7 @@ async fn parse_discord_message( channel: ChannelType::Discord, platform_message_id: message_id.to_string(), sender: ChannelUser { - platform_id: channel_id.to_string(), + platform_id: effective_channel_id, display_name, openfang_user: None, }, @@ -649,15 +870,50 @@ async fn parse_discord_message( target_agent: None, timestamp, is_group, - thread_id: None, + thread_id: parsed_thread_id, metadata, }) } +/// Build a Discord thread name from the message content. +/// Strips @mention prefixes (`<@...>`), trims whitespace, and truncates to +/// Discord's 100-character thread name limit. Falls back to the sender's +/// display name if the message has no usable text (e.g. image-only). +fn thread_name_from_message(message: &ChannelMessage) -> String { + let raw = match &message.content { + ChannelContent::Text(t) => t.clone(), + ChannelContent::Image { caption, .. } => caption.clone().unwrap_or_default(), + _ => String::new(), + }; + + // Strip leading Discord mention tokens (<@id> / <@!id>) + let stripped = regex_lite::Regex::new(r"^(<@!?\d+>\s*)+") + .map(|re| re.replace(&raw, "").into_owned()) + .unwrap_or(raw); + + let trimmed = stripped.trim().to_string(); + + if trimmed.is_empty() { + return message.sender.display_name.clone(); + } + + // Truncate to Discord's 100-char limit + if trimmed.chars().count() <= 100 { + trimmed + } else { + trimmed.chars().take(97).collect::() + "…" + } +} + #[cfg(test)] mod tests { use super::*; + /// Convenience helper: empty thread-tracking map for tests that don't exercise threading. + fn empty_threads() -> Arc>> { + Arc::new(RwLock::new(HashMap::new())) + } + #[tokio::test] async fn test_parse_discord_message_basic() { let bot_id = Arc::new(RwLock::new(Some("bot123".to_string()))); @@ -674,7 +930,7 @@ mod tests { "timestamp": "2024-01-01T00:00:00+00:00" }); - let msg = parse_discord_message(&d, &bot_id, &[], &[], true) + let msg = parse_discord_message(&d, &bot_id, &[], &[], true, &empty_threads()) .await .unwrap(); assert_eq!(msg.channel, ChannelType::Discord); @@ -698,7 +954,7 @@ mod tests { "timestamp": "2024-01-01T00:00:00+00:00" }); - let msg = parse_discord_message(&d, &bot_id, &[], &[], true).await; + let msg = parse_discord_message(&d, &bot_id, &[], &[], true, &empty_threads()).await; assert!(msg.is_none()); } @@ -718,7 +974,7 @@ mod tests { "timestamp": "2024-01-01T00:00:00+00:00" }); - let msg = parse_discord_message(&d, &bot_id, &[], &[], true).await; + let msg = parse_discord_message(&d, &bot_id, &[], &[], true, &empty_threads()).await; assert!(msg.is_none()); } @@ -739,7 +995,7 @@ mod tests { }); // With ignore_bots=false, other bots' messages should be allowed - let msg = parse_discord_message(&d, &bot_id, &[], &[], false).await; + let msg = parse_discord_message(&d, &bot_id, &[], &[], false, &empty_threads()).await; assert!(msg.is_some()); let msg = msg.unwrap(); assert_eq!(msg.sender.display_name, "somebot"); @@ -763,7 +1019,7 @@ mod tests { }); // Even with ignore_bots=false, the bot's own messages must still be filtered - let msg = parse_discord_message(&d, &bot_id, &[], &[], false).await; + let msg = parse_discord_message(&d, &bot_id, &[], &[], false, &empty_threads()).await; assert!(msg.is_none()); } @@ -784,12 +1040,20 @@ mod tests { }); // Not in allowed guilds - let msg = - parse_discord_message(&d, &bot_id, &["111".into(), "222".into()], &[], true).await; + let msg = parse_discord_message( + &d, + &bot_id, + &["111".into(), "222".into()], + &[], + true, + &empty_threads(), + ) + .await; assert!(msg.is_none()); // In allowed guilds - let msg = parse_discord_message(&d, &bot_id, &["999".into()], &[], true).await; + let msg = + parse_discord_message(&d, &bot_id, &["999".into()], &[], true, &empty_threads()).await; assert!(msg.is_some()); } @@ -808,7 +1072,7 @@ mod tests { "timestamp": "2024-01-01T00:00:00+00:00" }); - let msg = parse_discord_message(&d, &bot_id, &[], &[], true) + let msg = parse_discord_message(&d, &bot_id, &[], &[], true, &empty_threads()) .await .unwrap(); match &msg.content { @@ -835,7 +1099,7 @@ mod tests { "timestamp": "2024-01-01T00:00:00+00:00" }); - let msg = parse_discord_message(&d, &bot_id, &[], &[], true).await; + let msg = parse_discord_message(&d, &bot_id, &[], &[], true, &empty_threads()).await; assert!(msg.is_none()); } @@ -854,7 +1118,7 @@ mod tests { "timestamp": "2024-01-01T00:00:00+00:00" }); - let msg = parse_discord_message(&d, &bot_id, &[], &[], true) + let msg = parse_discord_message(&d, &bot_id, &[], &[], true, &empty_threads()) .await .unwrap(); assert_eq!(msg.sender.display_name, "alice#1234"); @@ -878,7 +1142,7 @@ mod tests { }); // MESSAGE_UPDATE uses the same parse function as MESSAGE_CREATE - let msg = parse_discord_message(&d, &bot_id, &[], &[], true) + let msg = parse_discord_message(&d, &bot_id, &[], &[], true, &empty_threads()) .await .unwrap(); assert_eq!(msg.channel, ChannelType::Discord); @@ -909,16 +1173,25 @@ mod tests { &[], &["user111".into(), "user222".into()], true, + &empty_threads(), ) .await; assert!(msg.is_none()); // In allowed users - let msg = parse_discord_message(&d, &bot_id, &[], &["user999".into()], true).await; + let msg = parse_discord_message( + &d, + &bot_id, + &[], + &["user999".into()], + true, + &empty_threads(), + ) + .await; assert!(msg.is_some()); // Empty allowed_users = allow all - let msg = parse_discord_message(&d, &bot_id, &[], &[], true).await; + let msg = parse_discord_message(&d, &bot_id, &[], &[], true, &empty_threads()).await; assert!(msg.is_some()); } @@ -941,7 +1214,7 @@ mod tests { "timestamp": "2024-01-01T00:00:00+00:00" }); - let msg = parse_discord_message(&d, &bot_id, &[], &[], true) + let msg = parse_discord_message(&d, &bot_id, &[], &[], true, &empty_threads()) .await .unwrap(); assert!(msg.is_group); @@ -964,7 +1237,7 @@ mod tests { "timestamp": "2024-01-01T00:00:00+00:00" }); - let msg2 = parse_discord_message(&d2, &bot_id, &[], &[], true) + let msg2 = parse_discord_message(&d2, &bot_id, &[], &[], true, &empty_threads()) .await .unwrap(); assert!(msg2.is_group); @@ -986,7 +1259,7 @@ mod tests { "timestamp": "2024-01-01T00:00:00+00:00" }); - let msg = parse_discord_message(&d, &bot_id, &[], &[], true) + let msg = parse_discord_message(&d, &bot_id, &[], &[], true, &empty_threads()) .await .unwrap(); assert!(!msg.is_group); @@ -1025,6 +1298,7 @@ mod tests { vec![], true, 37376, + "true".to_string(), ); assert_eq!(adapter.name(), "discord"); assert_eq!(adapter.channel_type(), ChannelType::Discord); diff --git a/crates/openfang-channels/src/types.rs b/crates/openfang-channels/src/types.rs index 84247b5af2..626e982dca 100644 --- a/crates/openfang-channels/src/types.rs +++ b/crates/openfang-channels/src/types.rs @@ -271,6 +271,24 @@ pub trait ChannelAdapter: Send + Sync { self.send(user, content).await } + /// Determine whether to auto-create a thread for an incoming message. + /// Returns Some(thread_name) to create a thread, or None to reply directly. + /// Default implementation returns None (no auto-threading). + async fn should_auto_thread(&self, _message: &ChannelMessage) -> Option { + None + } + + /// Create a new thread (typically triggered after should_auto_thread returns Some). + /// Returns the new thread ID on success. + async fn create_thread( + &self, + _user: &ChannelUser, + _message_id: &str, + _thread_name: &str, + ) -> Result> { + Err("Thread creation not supported for this adapter".into()) + } + /// Whether this adapter should suppress sending internal agent errors back to the user. /// /// Returns `true` for public broadcast channels (e.g. Mastodon) where posting diff --git a/crates/openfang-types/src/config.rs b/crates/openfang-types/src/config.rs index 698e5f3bc0..dcc1282475 100644 --- a/crates/openfang-types/src/config.rs +++ b/crates/openfang-types/src/config.rs @@ -1360,6 +1360,10 @@ fn default_true() -> bool { true } +fn default_auto_thread() -> String { + "false".to_string() +} + fn default_thread_ttl() -> u64 { 24 } @@ -1845,6 +1849,10 @@ pub struct DiscordConfig { /// In these channels, the bot responds to all group messages without needing to be mentioned. #[serde(default, deserialize_with = "deserialize_string_or_int_vec")] pub free_response_channels: Vec, + /// Auto-thread behavior: "true" (always create thread), "false" (never), "smart" (only when @mentioned). + /// Default: "false" + #[serde(default = "default_auto_thread")] + pub auto_thread: String, /// Per-channel behavior overrides. #[serde(default)] pub overrides: ChannelOverrides, @@ -1861,6 +1869,7 @@ impl Default for DiscordConfig { ignore_bots: true, default_channel_id: None, free_response_channels: vec![], + auto_thread: "false".to_string(), overrides: ChannelOverrides::default(), } }