Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions crates/temper-server/src/state/dispatch/actions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -672,6 +672,7 @@ impl crate::state::ServerState {
entity_id,
action,
agent_ctx,
dispatch_idempotency_key: idempotency_key.as_deref(),
action_params: &action_params,
await_integration,
};
Expand Down
1 change: 1 addition & 0 deletions crates/temper-server/src/state/dispatch/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ impl crate::state::ServerState {
custom_effects: &custom_effects,
entity_state: &entity_state,
agent_ctx: &agent_ctx,
dispatch_idempotency_key: None,
action_params: &action_params,
mode: WasmDispatchMode::Background,
};
Expand Down
3 changes: 3 additions & 0 deletions crates/temper-server/src/state/dispatch/effects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ pub(crate) struct PostDispatchContext<'a> {
pub entity_id: &'a str,
pub action: &'a str,
pub agent_ctx: &'a AgentContext,
pub dispatch_idempotency_key: Option<&'a str>,
pub action_params: &'a serde_json::Value,
pub await_integration: bool,
}
Expand Down Expand Up @@ -500,6 +501,7 @@ impl crate::state::ServerState {
custom_effects: &response.custom_effects,
entity_state: &response.state,
agent_ctx: ctx.agent_ctx,
dispatch_idempotency_key: ctx.dispatch_idempotency_key,
action_params: ctx.action_params,
mode: super::WasmDispatchMode::Inline,
};
Expand Down Expand Up @@ -557,6 +559,7 @@ impl crate::state::ServerState {
custom_effects: &response.custom_effects,
entity_state: adapter_state,
agent_ctx: ctx.agent_ctx,
dispatch_idempotency_key: None,
action_params: ctx.action_params,
mode: super::WasmDispatchMode::Inline,
};
Expand Down
2 changes: 2 additions & 0 deletions crates/temper-server/src/state/dispatch/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ pub(crate) struct WasmDispatchRequest<'a> {
pub custom_effects: &'a [String],
pub entity_state: &'a EntityState,
pub agent_ctx: &'a AgentContext,
pub dispatch_idempotency_key: Option<&'a str>,
pub action_params: &'a serde_json::Value,
pub mode: WasmDispatchMode,
}
Expand Down Expand Up @@ -394,6 +395,7 @@ impl crate::state::ServerState {
custom_effects: &custom_effects,
entity_state: &entity_state,
agent_ctx: &agent_ctx,
dispatch_idempotency_key: None,
action_params: &action_params,
mode: WasmDispatchMode::Background,
};
Expand Down
1 change: 1 addition & 0 deletions crates/temper-server/src/state/dispatch/state_timeouts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1157,6 +1157,7 @@ queue_timeout_seconds = 30
entity_id: "t-1",
action: "__Created",
agent_ctx: &agent_ctx,
dispatch_idempotency_key: None,
action_params: &serde_json::json!({}),
await_integration: false,
};
Expand Down
52 changes: 51 additions & 1 deletion crates/temper-server/src/state/dispatch/wasm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,23 @@ struct WasmDispatchCtx<'a> {
entity_ref: WasmEntityRef<'a>,
action: &'a str,
agent_ctx: &'a AgentContext,
dispatch_idempotency_key: Option<&'a str>,
mode: WasmDispatchMode,
}

fn agent_ctx_for_composite_wasm_result(
agent_ctx: &AgentContext,
dispatch_idempotency_key: Option<&str>,
) -> AgentContext {
let mut composite_agent_ctx = agent_ctx.clone();
if composite_agent_ctx.idempotency_key.is_none()
&& let Some(idempotency_key) = dispatch_idempotency_key
{
composite_agent_ctx.idempotency_key = Some(idempotency_key.to_string());
}
composite_agent_ctx
}

const HTTP_CALL_AUTHZ_DENIED_PREFIX: &str = "authorization denied for http_call";
const MONTY_REPL_MODULE: &str = "monty_repl";
const WASM_DISPATCH_PHASE_MODULE_CACHE: &str = "dispatch.wasm.phase.module_cache";
Expand Down Expand Up @@ -355,6 +369,7 @@ impl crate::state::ServerState {
},
action: req.action,
agent_ctx: req.agent_ctx,
dispatch_idempotency_key: req.dispatch_idempotency_key,
mode: req.mode,
};
let mut last_response: Option<EntityResponse> = None;
Expand Down Expand Up @@ -1121,14 +1136,18 @@ impl crate::state::ServerState {
.await;

let callback_params = strip_private_observability_params(result.callback_params);
let composite_agent_ctx = agent_ctx_for_composite_wasm_result(
ctx.agent_ctx,
ctx.dispatch_idempotency_key,
);
let composite_result_consumed = self
.apply_composite_integration_result(
ctx.entity_ref.tenant,
ctx.entity_ref.entity_type,
ctx.entity_ref.entity_id,
ctx.action,
&callback_params,
ctx.agent_ctx,
&composite_agent_ctx,
)
.await
.map_err(|e| e.to_string())?;
Expand Down Expand Up @@ -2008,6 +2027,36 @@ fn progress_emitter_fn(
mod tests {
use super::*;

#[test]
fn composite_wasm_result_inherits_generated_dispatch_idempotency() {
let agent = AgentContext::for_service("version-publisher");

let composite_agent = agent_ctx_for_composite_wasm_result(
&agent,
Some("dispatch:default:App:app:PublishNewVersion:one"),
);

assert_eq!(
composite_agent.idempotency_key.as_deref(),
Some("dispatch:default:App:app:PublishNewVersion:one"),
"composite sub-writes need the parent dispatch idempotency so repeated app version updates get distinct sub-write keys"
);
}

#[test]
fn composite_wasm_result_preserves_caller_supplied_idempotency() {
let mut agent = AgentContext::for_service("version-publisher");
agent.idempotency_key = Some("caller-key".to_string());

let composite_agent = agent_ctx_for_composite_wasm_result(&agent, Some("generated-key"));

assert_eq!(
composite_agent.idempotency_key.as_deref(),
Some("caller-key"),
"caller idempotency remains authoritative for retries"
);
}

#[test]
fn strips_private_llm_observability_params_before_callback_dispatch() {
let params = json!({
Expand Down Expand Up @@ -2183,6 +2232,7 @@ mod tests {
},
action: "ContextReady",
agent_ctx: &agent_ctx,
dispatch_idempotency_key: None,
mode: WasmDispatchMode::Inline,
};
let span = build_llm_root_span(&ctx, &integration, &entity_state, "provider_caller");
Expand Down
Loading