diff --git a/crates/invoker-api/src/effects.rs b/crates/invoker-api/src/effects.rs index a2e465c3b2..39267f0b6b 100644 --- a/crates/invoker-api/src/effects.rs +++ b/crates/invoker-api/src/effects.rs @@ -75,6 +75,12 @@ pub enum EffectKind { End, /// This is sent when the invoker exhausted all its attempts to make progress on the specific invocation. Failed(InvocationError), + /// This is sent when the invoker exhausted all its retry attempts and the on_max_attempts policy is Kill. + /// Unlike [`Self::Failed`], this carries the last error as a killed journal event so the UI can + /// distinguish a kill-after-max-retries from a plain failure. + KilledAfterMaxAttempts { + killed_event: RawEvent, + }, // New journal entry v2 which only carries the raw entry. // Introduced in v1.6.0 // Start writing in v1.7.0 diff --git a/crates/invoker-impl/src/lib.rs b/crates/invoker-impl/src/lib.rs index 1127b41775..5f62a3fc54 100644 --- a/crates/invoker-impl/src/lib.rs +++ b/crates/invoker-impl/src/lib.rs @@ -55,7 +55,7 @@ use restate_types::invocation::InvocationTarget; use restate_types::journal::EntryIndex; use restate_types::journal::enriched::EnrichedRawEntry; use restate_types::journal_events::raw::RawEvent; -use restate_types::journal_events::{Event, PausedEvent, TransientErrorEvent}; +use restate_types::journal_events::{Event, KilledEvent, PausedEvent, TransientErrorEvent}; use restate_types::journal_v2::raw::{RawCommand, RawNotification}; use restate_types::journal_v2::{CommandIndex, EntryMetadata, NotificationId}; use restate_types::live::{Live, LiveLoad}; @@ -1661,13 +1661,45 @@ where "Error when executing the invocation, not going to retry."); self.status_store.on_end(&partition, &invocation_id); + let journal_v2_related_command_type = + if let InvokerError::SdkV2(SdkInvocationErrorV2 { + related_command: Some(ref related_entry), + .. + }) = error + { + related_entry + .related_entry_type + .and_then(|e| e.try_as_command_ref().copied()) + } else { + None + }; + let invocation_error_report = error.into_invocation_error_report(); + let killed_event = KilledEvent { + last_failure: Some(TransientErrorEvent { + error_code: invocation_error_report.err.code(), + error_message: invocation_error_report.err.message().to_owned(), + error_stacktrace: invocation_error_report + .err + .stacktrace() + .map(|s| s.to_owned()), + restate_doc_error_code: invocation_error_report + .doc_error_code + .map(|c| c.code().to_owned()), + related_command_index: invocation_error_report.related_entry_index, + related_command_name: invocation_error_report.related_entry_name.clone(), + related_command_type: journal_v2_related_command_type, + }), + }; + let _ = self .invocation_state_machine_manager .resolve_partition_sender(partition) .expect("Partition should be registered") .send(Box::new(Effect { invocation_id, - kind: EffectKind::Failed(error.into_invocation_error()), + kind: EffectKind::KilledAfterMaxAttempts { + killed_event: RawEvent::from(Event::Killed(killed_event)), + }, })) .await; } @@ -2858,7 +2890,7 @@ mod tests { *effect, pat!(Effect { invocation_id: eq(invocation_id), - kind: pat!(EffectKind::Failed(_)) + kind: pat!(EffectKind::KilledAfterMaxAttempts { .. }) }) ); } diff --git a/crates/storage-query-datafusion/src/journal_events/schema.rs b/crates/storage-query-datafusion/src/journal_events/schema.rs index ae706da47f..195d5e7f59 100644 --- a/crates/storage-query-datafusion/src/journal_events/schema.rs +++ b/crates/storage-query-datafusion/src/journal_events/schema.rs @@ -27,7 +27,7 @@ define_table!(sys_journal_events ( /// When the entry was appended to the journal. appended_at: TimestampMillisecond, - /// The event type. + /// The event type. Possible values: `TransientError`, `Paused`, `Killed`. event_type: DataType::LargeUtf8, /// The event serialized as a JSON string. diff --git a/crates/types/protobuf/restate/journal_events.proto b/crates/types/protobuf/restate/journal_events.proto index e4d7f7ccd1..e3a53100b9 100644 --- a/crates/types/protobuf/restate/journal_events.proto +++ b/crates/types/protobuf/restate/journal_events.proto @@ -47,4 +47,8 @@ message TransientErrorEvent { message PausedEvent { TransientErrorEvent last_failure = 1; +} + +message KilledEvent { + TransientErrorEvent last_failure = 1; } \ No newline at end of file diff --git a/crates/types/src/journal_events/mod.rs b/crates/types/src/journal_events/mod.rs index aca541ec78..817a7473b6 100644 --- a/crates/types/src/journal_events/mod.rs +++ b/crates/types/src/journal_events/mod.rs @@ -34,6 +34,7 @@ pub enum EventType { Unknown = 0, TransientError = 1, Paused = 2, + Killed = 3, } #[derive( @@ -43,6 +44,7 @@ pub enum EventType { pub enum Event { TransientError(TransientErrorEvent), Paused(PausedEvent), + Killed(KilledEvent), /// This is used when it's not possible to parse in this Restate version the event. Unknown, } @@ -71,3 +73,11 @@ pub struct PausedEvent { #[serde(default, skip_serializing_if = "Option::is_none")] pub last_failure: Option, } + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct KilledEvent { + /// The last transient error before being killed, if any. + /// Empty when killed via admin API without prior retry failures. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub last_failure: Option, +} diff --git a/crates/types/src/journal_events/raw.rs b/crates/types/src/journal_events/raw.rs index c00ef42b45..4248e96aab 100644 --- a/crates/types/src/journal_events/raw.rs +++ b/crates/types/src/journal_events/raw.rs @@ -77,6 +77,7 @@ fn decode(ty: EventType, value: Bytes) -> Result { pb::TransientErrorEvent::decode(value)?.try_into()?, )), EventType::Paused => Ok(Event::Paused(pb::PausedEvent::decode(value)?.try_into()?)), + EventType::Killed => Ok(Event::Killed(pb::KilledEvent::decode(value)?.try_into()?)), EventType::Unknown => Ok(Event::Unknown), } } @@ -91,6 +92,10 @@ fn encode(event: Event) -> RawEvent { EventType::Paused, pb::PausedEvent::from(e).encode_to_vec().into(), ), + Event::Killed(e) => RawEvent::new( + EventType::Killed, + pb::KilledEvent::from(e).encode_to_vec().into(), + ), Event::Unknown => RawEvent::unknown(), } } @@ -233,4 +238,22 @@ mod pb { }) } } + + impl From for KilledEvent { + fn from(event::KilledEvent { last_failure }: event::KilledEvent) -> Self { + KilledEvent { + last_failure: last_failure.map(Into::into), + } + } + } + + impl TryFrom for event::KilledEvent { + type Error = anyhow::Error; + + fn try_from(KilledEvent { last_failure }: KilledEvent) -> Result { + Ok(event::KilledEvent { + last_failure: last_failure.map(|f| f.try_into()).transpose()?, + }) + } + } } diff --git a/crates/worker/src/partition/state_machine/mod.rs b/crates/worker/src/partition/state_machine/mod.rs index 605f0dd776..42e224e5b0 100644 --- a/crates/worker/src/partition/state_machine/mod.rs +++ b/crates/worker/src/partition/state_machine/mod.rs @@ -42,7 +42,7 @@ use restate_storage_api::invocation_status_table::{ WriteInvocationStatusTable, }; use restate_storage_api::invocation_status_table::{InvocationStatus, ScheduledInvocation}; -use restate_storage_api::journal_events::WriteJournalEventsTable; +use restate_storage_api::journal_events::{EventView, WriteJournalEventsTable}; use restate_storage_api::journal_table::ReadJournalTable; use restate_storage_api::journal_table::{JournalEntry, WriteJournalTable}; use restate_storage_api::journal_table_v2; @@ -95,6 +95,8 @@ use restate_types::journal::enriched::{ AwakeableEnrichmentResult, CallEnrichmentResult, EnrichedEntryHeader, }; use restate_types::journal::raw::{EntryHeader, RawEntryCodec, RawEntryCodecError}; +use restate_types::journal_events::raw::RawEvent; +use restate_types::journal_events::{Event, KilledEvent}; use restate_types::journal_v2::command::{OutputCommand, OutputResult}; use restate_types::journal_v2::raw::RawEntry; use restate_types::journal_v2::{ @@ -1980,6 +1982,12 @@ impl StateMachineApplyContext<'_, S> { + WriteVQueueTable + WriteJournalEventsTable, { + let after_journal_entry_index = metadata + .journal_metadata + .length + .checked_sub(1) + .unwrap_or_default(); + self.kill_child_invocations(&invocation_id, metadata.journal_metadata.length, &metadata) .await?; @@ -1989,6 +1997,18 @@ impl StateMachineApplyContext<'_, S> { Some(ResponseResult::Failure(KILLED_INVOCATION_ERROR)), ) .await?; + + // Write after end_invocation so journal cleanup (do_drop_journal) doesn't delete it + self.storage.put_journal_event( + invocation_id, + EventView { + append_time: self.record_created_at, + after_journal_entry_index, + event: RawEvent::from(Event::Killed(KilledEvent { last_failure: None })), + }, + self.record_lsn.as_u64(), + )?; + self.do_send_abort_invocation_to_invoker(invocation_id); Ok(()) } @@ -2016,6 +2036,12 @@ impl StateMachineApplyContext<'_, S> { + WriteVQueueTable + WriteJournalEventsTable, { + let after_journal_entry_index = metadata + .journal_metadata + .length + .checked_sub(1) + .unwrap_or_default(); + self.kill_child_invocations(&invocation_id, metadata.journal_metadata.length, &metadata) .await?; @@ -2025,6 +2051,18 @@ impl StateMachineApplyContext<'_, S> { Some(ResponseResult::Failure(KILLED_INVOCATION_ERROR)), ) .await?; + + // Write after end_invocation so journal cleanup (do_drop_journal) doesn't delete it + self.storage.put_journal_event( + invocation_id, + EventView { + append_time: self.record_created_at, + after_journal_entry_index, + event: RawEvent::from(Event::Killed(KilledEvent { last_failure: None })), + }, + self.record_lsn.as_u64(), + )?; + self.do_send_abort_invocation_to_invoker(invocation_id); Ok(()) } @@ -2556,6 +2594,32 @@ impl StateMachineApplyContext<'_, S> { ) .await?; } + InvokerEffectKind::KilledAfterMaxAttempts { killed_event } => { + let metadata = invocation_status + .into_invocation_metadata() + .expect("Must be present if status is invoked"); + let after_journal_entry_index = metadata + .journal_metadata + .length + .checked_sub(1) + .unwrap_or_default(); + self.end_invocation( + effect.invocation_id, + metadata, + Some(ResponseResult::Failure(KILLED_INVOCATION_ERROR)), + ) + .await?; + // Write after end_invocation so journal cleanup (do_drop_journal) doesn't delete it + self.storage.put_journal_event( + effect.invocation_id, + EventView { + append_time: self.record_created_at, + after_journal_entry_index, + event: killed_event, + }, + self.record_lsn.as_u64(), + )?; + } InvokerEffectKind::Yield(ref reason) => { let invocation_metadata = invocation_status .into_invocation_metadata() diff --git a/crates/worker/src/partition/state_machine/tests/kill_cancel.rs b/crates/worker/src/partition/state_machine/tests/kill_cancel.rs index 1d2220ca64..758999cef2 100644 --- a/crates/worker/src/partition/state_machine/tests/kill_cancel.rs +++ b/crates/worker/src/partition/state_machine/tests/kill_cancel.rs @@ -9,20 +9,24 @@ // by the Apache License, Version 2.0. use super::{fixtures, matchers, *}; - use assert2::assert; use assert2::let_assert; use googletest::any; +use googletest::elements_are; use prost::Message; +use restate_invoker_api::EffectKind as InvokerEffectKind; use restate_storage_api::journal_table; use restate_storage_api::journal_table::WriteJournalTable; use restate_storage_api::timer_table::{ ReadTimerTable, Timer, TimerKey, TimerKeyKind, WriteTimerTable, }; use restate_types::deployment::PinnedDeployment; +use restate_types::errors::KILLED_INVOCATION_ERROR; use restate_types::identifiers::EntryIndex; use restate_types::invocation::{IngressInvocationResponseSink, TerminationFlavor}; use restate_types::journal::enriched::EnrichedEntryHeader; +use restate_types::journal_events::raw::RawEvent; +use restate_types::journal_events::{Event, KilledEvent, TransientErrorEvent}; use restate_types::journal_v2::NotificationId; use restate_types::service_protocol; use rstest::rstest; @@ -771,3 +775,79 @@ fn create_termination_journal( )), ] } + +/// Admin API kill should write a KilledEvent with no last_failure. +#[restate_core::test] +async fn kill_invoked_writes_killed_journal_event() -> anyhow::Result<()> { + let mut test_env = TestEnv::create().await; + let invocation_id = fixtures::mock_start_invocation(&mut test_env).await; + fixtures::mock_pinned_deployment_v5(&mut test_env, invocation_id).await; + + let _ = test_env + .apply(Command::TerminateInvocation(InvocationTermination { + invocation_id, + flavor: TerminationFlavor::Kill, + response_sink: None, + })) + .await; + + assert_that!( + test_env.read_journal_events(invocation_id).await, + elements_are![eq(Event::Killed(KilledEvent { last_failure: None }))] + ); + + test_env.shutdown().await; + Ok(()) +} + +/// Invoker kill-after-max-attempts should write a KilledEvent with the last error +/// and complete the invocation with KILLED_INVOCATION_ERROR (not the raw service error). +#[restate_core::test] +async fn killed_after_max_attempts_writes_killed_event_with_last_failure() -> anyhow::Result<()> { + let mut test_env = TestEnv::create().await; + let invocation_id = fixtures::mock_start_invocation(&mut test_env).await; + fixtures::mock_pinned_deployment_v5(&mut test_env, invocation_id).await; + + let last_error = TransientErrorEvent { + error_code: 500u16.into(), + error_message: "service blew up".to_string(), + error_stacktrace: None, + restate_doc_error_code: None, + related_command_index: None, + related_command_name: None, + related_command_type: None, + }; + + let _ = test_env + .apply(Command::InvokerEffect(Box::new( + restate_invoker_api::Effect { + invocation_id, + kind: InvokerEffectKind::KilledAfterMaxAttempts { + killed_event: RawEvent::from(Event::Killed(KilledEvent { + last_failure: Some(last_error.clone()), + })), + }, + }, + ))) + .await; + + // Journal event must carry the last_failure + assert_that!( + test_env.read_journal_events(invocation_id).await, + elements_are![eq(Event::Killed(KilledEvent { + last_failure: Some(last_error) + }))] + ); + + // Invocation must no longer be active + assert_that!( + test_env + .storage + .get_invocation_status(&invocation_id) + .await?, + not(pat!(InvocationStatus::Invoked { .. })) + ); + + test_env.shutdown().await; + Ok(()) +} diff --git a/crates/worker/src/partition/state_machine/tests/mod.rs b/crates/worker/src/partition/state_machine/tests/mod.rs index 9e4632b8f2..a93a102efc 100644 --- a/crates/worker/src/partition/state_machine/tests/mod.rs +++ b/crates/worker/src/partition/state_machine/tests/mod.rs @@ -49,7 +49,7 @@ use restate_storage_api::service_status_table::{ use restate_storage_api::state_table::{ReadStateTable, WriteStateTable}; use restate_test_util::matchers::*; use restate_types::config::StorageOptions; -use restate_types::errors::{InvocationError, KILLED_INVOCATION_ERROR, codes}; +use restate_types::errors::{InvocationError, codes}; use restate_types::identifiers::{ AwakeableIdentifier, InvocationId, PartitionId, PartitionKey, PartitionProcessorRpcRequestId, ServiceId, diff --git a/release-notes/unreleased/4425-killed-journal-event.md b/release-notes/unreleased/4425-killed-journal-event.md new file mode 100644 index 0000000000..b7841cc09c --- /dev/null +++ b/release-notes/unreleased/4425-killed-journal-event.md @@ -0,0 +1,76 @@ +# Release Notes for Issue #4425: KilledEvent in sys_journal_events + +## Breaking Change + +### What Changed + +When an invocation is killed — either via the Admin API or after exhausting all retry +attempts with `onMaxAttempts: 'kill'` — a `Killed` journal event is now written to +`sys_journal_events`. + +Previously, kill-after-max-retries surfaced as a plain failure carrying the raw service +error, making it impossible to distinguish from a genuine service error. The Admin API +kill path also wrote no journal event at all. + +### Why This Matters + +Without a `Killed` event, the UI and operators had no reliable way to tell whether an +invocation failed naturally or was deliberately killed. Querying "show me all killed +invocations" was impossible. This change makes kills a first-class observable event. + +### New Behaviour + +- **`onMaxAttempts: 'kill'`**: When retries are exhausted, a `Killed` event is written + with the last transient error attached as `last_failure`. The final invocation result + is now always `KILLED_INVOCATION_ERROR` (code `ABORTED`, message `"killed"`) instead + of the raw service error. + +- **Admin API kill**: A `Killed` event with no `last_failure` is written when an active + (invoked, suspended, or paused) invocation is killed via the Admin API. + +### Querying + +```sql +-- Find all killed invocations and their last error +SELECT id, event_json +FROM sys_journal_events +WHERE event_type = 'Killed' +``` + +The `event_json` column contains a JSON object of the form: + +```json +{ "ty": "Killed", "last_failure": { "error_code": 500, "error_message": "..." } } +``` + +`last_failure` is absent when the invocation was killed via the Admin API without any +prior retry failures. + +### Impact on Users + +- **Breaking**: The final error for `onMaxAttempts: 'kill'` invocations changes from the + raw service error to `KILLED_INVOCATION_ERROR` (code `ABORTED`, message `"killed"`). +- **UI**: The UI can now reliably display "killed after N retries" by reading the `Killed` + event. +- Existing deployments: No configuration change needed. The new event type is written + automatically from the next invocation kill onwards. + +### Migration Guidance + +If you have code that inspects the invocation error payload to detect kills (e.g. matching +on a specific error message or code from your service), migrate it to query +`sys_journal_events` instead: + +```sql +-- Before: fragile, depends on the raw service error +SELECT id FROM invocation WHERE last_failure_message = 'my-service-error' + +-- After: reliable, event_type = 'Killed' is definitive +SELECT id, event_json +FROM sys_journal_events +WHERE event_type = 'Killed' +``` + +### Related Issues + +- Issue #4425: Write KilledEvent to sys_journal_events on invocation kill