Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
311 changes: 243 additions & 68 deletions crates/zeroclaw-channels/src/orchestrator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -657,56 +657,60 @@ fn channel_delivery_instructions(channel_name: &str) -> Option<&'static str> {
}
}

fn build_channel_system_prompt(
base_prompt: &str,
channel_name: &str,
reply_target: &str,
sender: &str,
) -> String {
let mut prompt = base_prompt.to_string();

// Refresh the stale datetime in the cached system prompt
{
let now = chrono::Local::now();
let fresh = format!(
"## Current Date & Time\n\n{} ({})\n",
now.format("%Y-%m-%d %H:%M:%S"),
now.format("%Z"),
);
if let Some(start) = prompt.find("## Current Date & Time\n\n") {
// Find the end of this section (next "## " heading or end of string)
let rest = &prompt[start + 24..]; // skip past "## Current Date & Time\n\n"
let section_end = rest
.find("\n## ")
.map(|i| start + 24 + i)
.unwrap_or(prompt.len());
prompt.replace_range(start..section_end, fresh.trim_end());
}
}
/// Build the channel-specific system prompt.
///
/// The result is intentionally **stable across turns of the same channel**: it
/// embeds only the base prompt and the channel-delivery instructions. Per-turn
/// pieces (current date/time, reply_target, sender) are kept out of the system
/// prompt entirely so that provider-side prompt caching (e.g. llama.cpp prefix
/// cache, Anthropic/OpenAI cached prefixes) can hit on the system prefix. See
/// `build_channel_turn_context_preamble` for the per-turn dynamic block that
/// now rides on the user message instead.
///
/// Issue #6360: the previous implementation refreshed `## Current Date & Time`
/// to *second* precision and appended a per-turn channel-context paragraph
/// containing `reply_target` and `sender`, which caused the entire system
/// prompt to mutate on every turn and forced a full prompt re-tokenization.
fn build_channel_system_prompt(base_prompt: &str, channel_name: &str) -> String {
let prompt = base_prompt.to_string();

if let Some(instructions) = channel_delivery_instructions(channel_name) {
if prompt.is_empty() {
prompt = instructions.to_string();
instructions.to_string()
} else {
prompt = format!("{prompt}\n\n{instructions}");
format!("{prompt}\n\n{instructions}")
}
} else {
prompt
}
}

if !reply_target.is_empty() {
let context = format!(
"\n\nChannel context: You are currently responding on channel={channel_name}, \
reply_target={reply_target}, sender={sender}. \
The sender field is the platform-specific user ID of the person who sent \
this message. Use it to distinguish between different users. \
When scheduling delayed messages or reminders \
via cron_add for this conversation, use delivery={{\"mode\":\"announce\",\
\"channel\":\"{channel_name}\",\"to\":\"{reply_target}\"}} so the message \
reaches the user."
);
prompt.push_str(&context);
/// Build the per-turn channel-context preamble that used to be appended to
/// the system prompt. Returns an empty string when there is no useful context
/// to attach so the caller can skip emitting the marker entirely.
///
/// This text is meant to ride on the **user message** of the current turn,
/// not the system prompt — it changes every turn (datetime, sender, reply
/// target) and must not invalidate the cached system prefix (#6360).
fn build_channel_turn_context_preamble(
channel_name: &str,
reply_target: &str,
sender: &str,
) -> String {
if reply_target.is_empty() {
return String::new();
}

prompt
let now = chrono::Local::now();
format!(
"[turn-context] time={} ({}) channel={channel_name} reply_target={reply_target} sender={sender}. \
The sender field is the platform-specific user ID of the person who sent \
this message. Use it to distinguish between different users. \
When scheduling delayed messages or reminders via cron_add for this \
conversation, use delivery={{\"mode\":\"announce\",\"channel\":\"{channel_name}\",\
\"to\":\"{reply_target}\"}} so the message reaches the user.",
now.format("%Y-%m-%d %H:%M:%S"),
now.format("%Z"),
)
}

fn normalize_cached_channel_turns(turns: Vec<ChatMessage>) -> Vec<ChatMessage> {
Expand Down Expand Up @@ -3021,18 +3025,29 @@ async fn process_channel_message(
} else {
refreshed_new_session_system_prompt(ctx.as_ref())
};
let mut system_prompt = build_channel_system_prompt(
&base_system_prompt,
&msg.channel,
&msg.reply_target,
&msg.sender,
);
let mut system_prompt = build_channel_system_prompt(&base_system_prompt, &msg.channel);
if !memory_context.is_empty() {
let _ = write!(system_prompt, "\n\n{memory_context}");
}
let mut history = vec![ChatMessage::system(system_prompt)];
history.extend(prior_turns);

// Prepend the per-turn channel context (datetime, reply_target, sender) to
// the *last* user turn — never to the system prompt — so that the system
// prefix stays byte-stable across turns and provider-side prompt caches
// can hit on it (#6360). The cached history copy stored at line ~2903 is
// untouched: only the outgoing LLM `history` carries the preamble.
let turn_preamble =
build_channel_turn_context_preamble(&msg.channel, &msg.reply_target, &msg.sender);
if let Some(last) = history
.iter_mut()
.rev()
.find(|m| m.role == "user")
.filter(|m| !turn_preamble.is_empty() && !m.content.starts_with("[turn-context]"))

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Trust-boundary regression: a user message starting with [turn-context] makes this filter skip the runtime-built preamble, so the outgoing turn carries only user-controlled text instead of the authoritative reply_target/sender/cron_add hint. Cached history never contains [turn-context], so drop the raw-content guard entirely — the current outgoing turn can always receive the runtime preamble.

{
last.content = format!("{turn_preamble}\n\n{}", last.content);
}

// ── Proactive context compression ────────────────────────────
// Use the existing ContextCompressor to summarize older history
// before the LLM call, preventing context-window-exceeded errors
Expand Down Expand Up @@ -11379,8 +11394,32 @@ BTC is currently around $65,000 based on latest tool output."#
assert_eq!(calls[0][0].0, "system");
assert!(calls[0][0].1.contains(MEMORY_CONTEXT_OPEN));
assert!(calls[0][0].1.contains("Age is 45"));
// Per-turn channel-context (datetime, reply_target, sender) rides on the
// user message — NOT the system prompt — so the system prefix stays
// byte-stable and provider prompt caches hit on it (#6360).
assert!(
!calls[0][0].1.contains("reply_target="),
"system prompt must not carry per-turn reply_target",
);
assert!(
!calls[0][0].1.contains("## Current Date & Time"),
"system prompt must not carry per-second timestamp",
);
assert_eq!(calls[0][1].0, "user");
assert_eq!(calls[0][1].1, "hello");
assert!(
calls[0][1].1.starts_with("[turn-context]"),
"user turn must be prefixed with the turn-context preamble, got: {}",
calls[0][1].1
);
assert!(
calls[0][1].1.contains("reply_target=chat-ctx"),
"user turn preamble carries the reply target"
);
assert!(
calls[0][1].1.ends_with("\n\nhello"),
"original user content must be preserved verbatim after the preamble, got: {}",
calls[0][1].1
);

let histories = runtime_ctx
.conversation_histories
Expand All @@ -11390,8 +11429,10 @@ BTC is currently around $65,000 based on latest tool output."#
.peek("test-channel_chat-ctx_alice")
.expect("history should be stored for sender");
assert_eq!(turns[0].role, "user");
// The cached copy stays clean — only the outgoing wire copy is enriched.
assert_eq!(turns[0].content, "hello");
assert!(!turns[0].content.contains(MEMORY_CONTEXT_OPEN));
assert!(!turns[0].content.contains("[turn-context]"));
}

#[tokio::test]
Expand Down Expand Up @@ -11512,6 +11553,119 @@ BTC is currently around $65,000 based on latest tool output."#
assert!(!calls[0].iter().skip(1).any(|(role, _)| role == "system"));
}

/// #6360 end-to-end pin: two consecutive process_channel_message calls for
/// the same sender/room must produce byte-identical system prompts so that
/// the upstream LLM's prompt cache can reuse the prefix.
#[tokio::test]
async fn process_channel_message_telegram_system_prompt_is_byte_stable_across_turns() {
let channel_impl = Arc::new(TelegramRecordingChannel::default());
let channel: Arc<dyn Channel> = channel_impl.clone();
let mut channels_by_name = HashMap::new();
channels_by_name.insert(channel.name().to_string(), channel);
let provider_impl = Arc::new(HistoryCaptureProvider::default());

let runtime_ctx = Arc::new(ChannelRuntimeContext {
channels_by_name: Arc::new(channels_by_name),
provider: provider_impl.clone(),
default_provider: Arc::new("test-provider".to_string()),
memory: Arc::new(NoopMemory),
tools_registry: Arc::new(vec![]),
observer: Arc::new(NoopObserver),
system_prompt: Arc::new("test-system-prompt".to_string()),
model: Arc::new("test-model".to_string()),
temperature: 0.0,
auto_save_memory: false,
max_tool_iterations: 5,
min_relevance_score: 0.0,
conversation_histories: Arc::new(Mutex::new(lru::LruCache::new(
std::num::NonZeroUsize::new(MAX_CONVERSATION_SENDERS).unwrap(),
))),
pending_new_sessions: Arc::new(Mutex::new(HashSet::new())),
provider_cache: Arc::new(Mutex::new(HashMap::new())),
route_overrides: Arc::new(Mutex::new(HashMap::new())),
api_key: None,
api_url: None,
reliability: Arc::new(zeroclaw_config::schema::ReliabilityConfig::default()),
provider_runtime_options: zeroclaw_providers::ProviderRuntimeOptions::default(),
workspace_dir: Arc::new(std::env::temp_dir()),
prompt_config: Arc::new(zeroclaw_config::schema::Config::default()),
message_timeout_secs: CHANNEL_MESSAGE_TIMEOUT_SECS,
interrupt_on_new_message: InterruptOnNewMessageConfig {
telegram: false,
slack: false,
discord: false,
mattermost: false,
matrix: false,
},
multimodal: zeroclaw_config::schema::MultimodalConfig::default(),
media_pipeline: zeroclaw_config::schema::MediaPipelineConfig::default(),
transcription_config: zeroclaw_config::schema::TranscriptionConfig::default(),
hooks: None,
non_cli_excluded_tools: Arc::new(Vec::new()),
autonomy_level: AutonomyLevel::default(),
tool_call_dedup_exempt: Arc::new(Vec::new()),
model_routes: Arc::new(Vec::new()),
query_classification: zeroclaw_config::schema::QueryClassificationConfig::default(),
ack_reactions: true,
show_tool_calls: true,
session_store: None,
approval_manager: Arc::new(ApprovalManager::for_non_interactive(
&zeroclaw_config::schema::AutonomyConfig::default(),
)),
activated_tools: None,
cost_tracking: None,
pacing: zeroclaw_config::schema::PacingConfig::default(),
max_tool_result_chars: 0,
context_token_budget: 0,
debouncer: Arc::new(zeroclaw_infra::debounce::MessageDebouncer::new(
Duration::ZERO,
)),
receipt_generator: None,
show_receipts_in_response: false,
});

let mk_msg = |id: &str, content: &str| zeroclaw_api::channel::ChannelMessage {
id: id.to_string(),
sender: "alice".to_string(),
reply_target: "chat-cache".to_string(),
content: content.to_string(),
channel: "telegram".to_string(),
timestamp: 1,
thread_ts: None,
interruption_scope_id: None,
attachments: vec![],
};

process_channel_message(
runtime_ctx.clone(),
mk_msg("m1", "first"),
CancellationToken::new(),
)
.await;
// Sleep across a second boundary — under the pre-fix behavior this would
// mutate `## Current Date & Time` in the system prompt by at least one
// second and invalidate the prefix cache.
tokio::time::sleep(Duration::from_millis(1100)).await;
process_channel_message(
runtime_ctx.clone(),
mk_msg("m2", "second"),
CancellationToken::new(),
)
.await;

let calls = provider_impl
.calls
.lock()
.unwrap_or_else(|e| e.into_inner());
assert_eq!(calls.len(), 2, "expected exactly two provider invocations");
assert_eq!(calls[0][0].0, "system");
assert_eq!(calls[1][0].0, "system");
assert_eq!(
calls[0][0].1, calls[1][0].1,
"system prompt must be byte-identical across turns for prompt caching to hit (#6360)"
);
}

#[test]
fn channel_delivery_instructions_for_discord_mandates_absolute_paths() {
let block = channel_delivery_instructions("discord")
Expand Down Expand Up @@ -13423,31 +13577,52 @@ This is an example JSON object for profile settings."#;
}

#[test]
fn build_channel_system_prompt_includes_sender_id() {
let prompt = build_channel_system_prompt(
"You are a helpful assistant.",
"mattermost",
"channel123:root456",
"user_abc123",
);
assert!(prompt.contains("sender=user_abc123"));
assert!(prompt.contains("channel=mattermost"));
assert!(prompt.contains("reply_target=channel123:root456"));
fn build_channel_system_prompt_is_byte_stable_across_calls() {
// #6360: the system prompt must not embed any per-turn state. Two
// back-to-back calls with the same inputs must yield identical bytes,
// even though wall-clock time has advanced between them.
let a = build_channel_system_prompt("Base.", "telegram");
std::thread::sleep(std::time::Duration::from_millis(5));
let b = build_channel_system_prompt("Base.", "telegram");
assert_eq!(a, b, "system prompt must be byte-stable across turns");
}

#[test]
fn build_channel_system_prompt_omits_context_when_reply_target_empty() {
let prompt = build_channel_system_prompt("Base prompt.", "mattermost", "", "user_abc123");
assert!(!prompt.contains("sender="));
fn build_channel_system_prompt_excludes_per_turn_context() {
// The Channel context: paragraph and any `sender=`/`reply_target=` token
// belong on the user turn, never on the system prompt — otherwise
// provider prefix caches re-tokenize every reply (#6360).
let prompt = build_channel_system_prompt("You are helpful.", "telegram");
assert!(!prompt.contains("Channel context:"));
assert!(!prompt.contains("sender="));
assert!(!prompt.contains("reply_target="));
}

#[test]
fn build_channel_system_prompt_excludes_per_second_datetime() {
// The previous implementation injected a `## Current Date & Time` section
// with second precision into the cached system prompt. That alone was
// enough to invalidate every cache hit. Pin that it's no longer added.
let prompt = build_channel_system_prompt("Base.", "telegram");
assert!(!prompt.contains("## Current Date & Time"));
}

#[test]
fn build_channel_turn_context_preamble_carries_dynamic_fields() {
let p = build_channel_turn_context_preamble("telegram", "chat:42", "user_xyz");
assert!(p.contains("channel=telegram"));
assert!(p.contains("reply_target=chat:42"));
assert!(p.contains("sender=user_xyz"));
// The preamble starts with the [turn-context] marker so the duplicate-guard
// at the call site can detect already-prefixed user turns.
assert!(p.starts_with("[turn-context]"));
}

#[test]
fn build_channel_system_prompt_sender_distinguishes_users() {
let prompt_a = build_channel_system_prompt("Base.", "mattermost", "ch:thread", "user_aaa");
let prompt_b = build_channel_system_prompt("Base.", "mattermost", "ch:thread", "user_bbb");
assert!(prompt_a.contains("sender=user_aaa"));
assert!(prompt_b.contains("sender=user_bbb"));
assert_ne!(prompt_a, prompt_b);
fn build_channel_turn_context_preamble_empty_when_reply_target_empty() {
// No reply target → CLI-style invocation → no preamble at all, mirroring
// CLI behavior which has always worked with prompt caching.
let p = build_channel_turn_context_preamble("telegram", "", "user_xyz");
assert!(p.is_empty());
}
}
Loading