diff --git a/crates/temper-server/src/state/dispatch/actions.rs b/crates/temper-server/src/state/dispatch/actions.rs index 4b0952ce..3d99c319 100644 --- a/crates/temper-server/src/state/dispatch/actions.rs +++ b/crates/temper-server/src/state/dispatch/actions.rs @@ -672,6 +672,7 @@ impl crate::state::ServerState { entity_id, action, agent_ctx, + dispatch_idempotency_key: idempotency_key.as_deref(), action_params: &action_params, await_integration, }; diff --git a/crates/temper-server/src/state/dispatch/adapter.rs b/crates/temper-server/src/state/dispatch/adapter.rs index 02df3d44..b8177b16 100644 --- a/crates/temper-server/src/state/dispatch/adapter.rs +++ b/crates/temper-server/src/state/dispatch/adapter.rs @@ -73,6 +73,7 @@ impl crate::state::ServerState { custom_effects: &custom_effects, entity_state: &entity_state, agent_ctx: &agent_ctx, + dispatch_idempotency_key: None, action_params: &action_params, mode: WasmDispatchMode::Background, }; diff --git a/crates/temper-server/src/state/dispatch/effects.rs b/crates/temper-server/src/state/dispatch/effects.rs index 2b1d4a74..f202ed4c 100644 --- a/crates/temper-server/src/state/dispatch/effects.rs +++ b/crates/temper-server/src/state/dispatch/effects.rs @@ -24,6 +24,7 @@ pub(crate) struct PostDispatchContext<'a> { pub entity_id: &'a str, pub action: &'a str, pub agent_ctx: &'a AgentContext, + pub dispatch_idempotency_key: Option<&'a str>, pub action_params: &'a serde_json::Value, pub await_integration: bool, } @@ -500,6 +501,7 @@ impl crate::state::ServerState { custom_effects: &response.custom_effects, entity_state: &response.state, agent_ctx: ctx.agent_ctx, + dispatch_idempotency_key: ctx.dispatch_idempotency_key, action_params: ctx.action_params, mode: super::WasmDispatchMode::Inline, }; @@ -557,6 +559,7 @@ impl crate::state::ServerState { custom_effects: &response.custom_effects, entity_state: adapter_state, agent_ctx: ctx.agent_ctx, + dispatch_idempotency_key: None, action_params: ctx.action_params, mode: super::WasmDispatchMode::Inline, }; diff --git a/crates/temper-server/src/state/dispatch/mod.rs b/crates/temper-server/src/state/dispatch/mod.rs index 4a498aee..6143210d 100644 --- a/crates/temper-server/src/state/dispatch/mod.rs +++ b/crates/temper-server/src/state/dispatch/mod.rs @@ -45,6 +45,7 @@ pub(crate) struct WasmDispatchRequest<'a> { pub custom_effects: &'a [String], pub entity_state: &'a EntityState, pub agent_ctx: &'a AgentContext, + pub dispatch_idempotency_key: Option<&'a str>, pub action_params: &'a serde_json::Value, pub mode: WasmDispatchMode, } @@ -394,6 +395,7 @@ impl crate::state::ServerState { custom_effects: &custom_effects, entity_state: &entity_state, agent_ctx: &agent_ctx, + dispatch_idempotency_key: None, action_params: &action_params, mode: WasmDispatchMode::Background, }; diff --git a/crates/temper-server/src/state/dispatch/state_timeouts.rs b/crates/temper-server/src/state/dispatch/state_timeouts.rs index b79fc2a9..a6d77ef8 100644 --- a/crates/temper-server/src/state/dispatch/state_timeouts.rs +++ b/crates/temper-server/src/state/dispatch/state_timeouts.rs @@ -1157,6 +1157,7 @@ queue_timeout_seconds = 30 entity_id: "t-1", action: "__Created", agent_ctx: &agent_ctx, + dispatch_idempotency_key: None, action_params: &serde_json::json!({}), await_integration: false, }; diff --git a/crates/temper-server/src/state/dispatch/wasm.rs b/crates/temper-server/src/state/dispatch/wasm.rs index 53c70bf5..a47e6be2 100644 --- a/crates/temper-server/src/state/dispatch/wasm.rs +++ b/crates/temper-server/src/state/dispatch/wasm.rs @@ -38,9 +38,23 @@ struct WasmDispatchCtx<'a> { entity_ref: WasmEntityRef<'a>, action: &'a str, agent_ctx: &'a AgentContext, + dispatch_idempotency_key: Option<&'a str>, mode: WasmDispatchMode, } +fn agent_ctx_for_composite_wasm_result( + agent_ctx: &AgentContext, + dispatch_idempotency_key: Option<&str>, +) -> AgentContext { + let mut composite_agent_ctx = agent_ctx.clone(); + if composite_agent_ctx.idempotency_key.is_none() + && let Some(idempotency_key) = dispatch_idempotency_key + { + composite_agent_ctx.idempotency_key = Some(idempotency_key.to_string()); + } + composite_agent_ctx +} + const HTTP_CALL_AUTHZ_DENIED_PREFIX: &str = "authorization denied for http_call"; const MONTY_REPL_MODULE: &str = "monty_repl"; const WASM_DISPATCH_PHASE_MODULE_CACHE: &str = "dispatch.wasm.phase.module_cache"; @@ -355,6 +369,7 @@ impl crate::state::ServerState { }, action: req.action, agent_ctx: req.agent_ctx, + dispatch_idempotency_key: req.dispatch_idempotency_key, mode: req.mode, }; let mut last_response: Option = None; @@ -1121,6 +1136,10 @@ impl crate::state::ServerState { .await; let callback_params = strip_private_observability_params(result.callback_params); + let composite_agent_ctx = agent_ctx_for_composite_wasm_result( + ctx.agent_ctx, + ctx.dispatch_idempotency_key, + ); let composite_result_consumed = self .apply_composite_integration_result( ctx.entity_ref.tenant, @@ -1128,7 +1147,7 @@ impl crate::state::ServerState { ctx.entity_ref.entity_id, ctx.action, &callback_params, - ctx.agent_ctx, + &composite_agent_ctx, ) .await .map_err(|e| e.to_string())?; @@ -2008,6 +2027,36 @@ fn progress_emitter_fn( mod tests { use super::*; + #[test] + fn composite_wasm_result_inherits_generated_dispatch_idempotency() { + let agent = AgentContext::for_service("version-publisher"); + + let composite_agent = agent_ctx_for_composite_wasm_result( + &agent, + Some("dispatch:default:App:app:PublishNewVersion:one"), + ); + + assert_eq!( + composite_agent.idempotency_key.as_deref(), + Some("dispatch:default:App:app:PublishNewVersion:one"), + "composite sub-writes need the parent dispatch idempotency so repeated app version updates get distinct sub-write keys" + ); + } + + #[test] + fn composite_wasm_result_preserves_caller_supplied_idempotency() { + let mut agent = AgentContext::for_service("version-publisher"); + agent.idempotency_key = Some("caller-key".to_string()); + + let composite_agent = agent_ctx_for_composite_wasm_result(&agent, Some("generated-key")); + + assert_eq!( + composite_agent.idempotency_key.as_deref(), + Some("caller-key"), + "caller idempotency remains authoritative for retries" + ); + } + #[test] fn strips_private_llm_observability_params_before_callback_dispatch() { let params = json!({ @@ -2183,6 +2232,7 @@ mod tests { }, action: "ContextReady", agent_ctx: &agent_ctx, + dispatch_idempotency_key: None, mode: WasmDispatchMode::Inline, }; let span = build_llm_root_span(&ctx, &integration, &entity_state, "provider_caller");