Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions crates/wal-protocol/src/invocation.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright (c) 2023 - 2026 Restate Software, Inc., Restate GmbH.
// All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use bytes::{Buf, BufMut, Bytes};

use restate_types::bilrost_storage_encode_decode;
use restate_types::identifiers::{InvocationId, PartitionProcessorRpcRequestId};

/// Pause an invocation rpc triggered from external sources (http ingress)
#[derive(Debug, Clone, bilrost::Message)]
pub struct PauseInvocationRpcRequest {
#[bilrost(tag(1))]
pub invocation_id: InvocationId,
#[bilrost(tag(2))]
pub request_id: PartitionProcessorRpcRequestId,
}

bilrost_storage_encode_decode!(PauseInvocationRpcRequest);

impl PauseInvocationRpcRequest {
pub fn bilrost_encode<B: BufMut>(&self, b: &mut B) -> Result<(), bilrost::EncodeError> {
bilrost::Message::encode(self, b)
}

pub fn encoded_len(&self) -> usize {
bilrost::Message::encoded_len(self)
}

pub fn bilrost_encode_to_bytes(&self) -> Bytes {
bilrost::Message::encode_to_bytes(self)
}

pub fn bilrost_decode<B: Buf>(buf: B) -> Result<Self, bilrost::DecodeError> {
bilrost::OwnedMessage::decode(buf)
}
}
1 change: 1 addition & 0 deletions crates/wal-protocol/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
// by the Apache License, Version 2.0.

pub mod control;
pub mod invocation;
pub mod timer;
pub mod v1;
pub mod v2;
Expand Down
6 changes: 6 additions & 0 deletions crates/wal-protocol/src/v1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,11 @@ pub enum Command {
AttachInvocation(AttachInvocationRequest),
/// Resume an invocation
ResumeInvocation(ResumeInvocationRequest),
/// Pause an invocation
/// payload is bilrost encoded [`invocation::PauseInvocationRpcRequest`]
///
/// Introduced in v1.7.0 to support pausing invocations with vqueues
PauseInvocationRpcRequest(#[debug(skip)] Bytes),
/// Restart as new invocation from prefix
RestartAsNewInvocation(RestartAsNewInvocationRequest),

Expand Down Expand Up @@ -252,6 +257,7 @@ impl HasRecordKeys for Envelope {
Command::ProxyThrough(_) => Keys::Single(self.partition_key()),
Command::AttachInvocation(_) => Keys::Single(self.partition_key()),
Command::ResumeInvocation(req) => Keys::Single(req.invocation_id.partition_key()),
Command::PauseInvocationRpcRequest(_) => Keys::Single(self.partition_key()),
Command::RestartAsNewInvocation(req) => Keys::Single(req.invocation_id.partition_key()),
// todo: Handle journal entries that request cross-partition invocations
Command::InvokerEffect(effect) => Keys::Single(effect.invocation_id.partition_key()),
Expand Down
4 changes: 4 additions & 0 deletions crates/wal-protocol/src/v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,10 @@ pub enum CommandKind {
/// payload is bilrost encoded [`vqueues::VQueuesResume`]
/// *Since v1.7.0
VQueuesResume = 24,

/// Pause an invocation
/// Introduced in v1.7.0 to support pausing invocations with vqueues
PauseInvocationRpcRequest = 25,
}

mod bilrost_encoding {
Expand Down
8 changes: 7 additions & 1 deletion crates/wal-protocol/src/v2/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ use restate_types::{
use super::sealed::Sealed;
use super::{Command, CommandKind};
pub use crate::control::UpsertRuleBookCommand;
pub use crate::invocation::PauseInvocationRpcRequest;
use crate::timer;
// Re-epxort vqueues commands
// Re-export vqueues commands
pub use crate::vqueues::{VQueuesPauseCommand, VQueuesResumeCommand};

pub use crate::control::{
Expand Down Expand Up @@ -467,3 +468,8 @@ command! {
@kind=CommandKind::VQueuesResume,
@command=VQueuesResumeCommand
}

command! {
@kind=CommandKind::PauseInvocationRpcRequest,
@command=PauseInvocationRpcRequest
}
6 changes: 6 additions & 0 deletions crates/wal-protocol/src/v2/compatibility.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,12 @@ impl TryFrom<v1::Envelope> for v2::Envelope<Raw> {
dedup,
payload,
),
v1::Command::PauseInvocationRpcRequest(payload) => Envelope::from_bytes_unchecked(
v2::CommandKind::PauseInvocationRpcRequest,
StorageCodecKind::Bilrost,
dedup,
payload,
),
};

Ok(envelope)
Expand Down
10 changes: 10 additions & 0 deletions crates/worker/src/partition/leadership/leader_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -697,6 +697,16 @@ impl LeaderState {
)));
}
}
Action::ForwardPauseInvocationResponse {
request_id,
response,
} => {
if let Some(response_tx) = self.awaiting_rpc_actions.remove(&request_id) {
response_tx.send(Ok(PartitionProcessorRpcResponse::PauseInvocation(
response.into(),
)));
}
}
Action::ForwardPurgeJournalResponse {
request_id,
response,
Expand Down
7 changes: 6 additions & 1 deletion crates/worker/src/partition/state_machine/actions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ use restate_types::identifiers::{EntryIndex, InvocationId, PartitionProcessorRpc
use restate_types::invocation::InvocationTarget;
use restate_types::invocation::client::{
CancelInvocationResponse, InvocationOutputResponse, KillInvocationResponse,
PurgeInvocationResponse, RestartAsNewInvocationResponse, ResumeInvocationResponse,
PauseInvocationResponse, PurgeInvocationResponse, RestartAsNewInvocationResponse,
ResumeInvocationResponse,
};
use restate_types::journal_v2::{CommandIndex, NotificationId};
use restate_types::message::MessageIndex;
Expand Down Expand Up @@ -105,6 +106,10 @@ pub enum Action {
request_id: PartitionProcessorRpcRequestId,
response: RestartAsNewInvocationResponse,
},
ForwardPauseInvocationResponse {
request_id: PartitionProcessorRpcRequestId,
response: PauseInvocationResponse,
},
/// Forward a batch of rule-book diff entries to the leader's
/// `UserLimiter` via the resource-manager mpsc. Emitted by
/// `Command::UpsertRuleBook` apply when the rule book version
Expand Down
2 changes: 2 additions & 0 deletions crates/worker/src/partition/state_machine/lifecycle/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ mod notify_get_invocation_output_response;
mod notify_invocation_response;
mod notify_signal;
mod notify_sleep_completion;
mod pause_invocation_rpc;
mod paused;
mod pinned_deployment;
mod purge;
Expand All @@ -34,6 +35,7 @@ pub(super) use notify_get_invocation_output_response::OnNotifyGetInvocationOutpu
pub(super) use notify_invocation_response::OnNotifyInvocationResponse;
pub(super) use notify_signal::OnNotifySignalCommand;
pub(super) use notify_sleep_completion::OnNotifySleepCompletionCommand;
pub(super) use pause_invocation_rpc::OnInvocationPauseRpcRequest;
pub(super) use paused::OnPausedCommand;
pub(super) use pinned_deployment::OnPinnedDeploymentCommand;
pub(super) use purge::OnPurgeCommand;
Expand Down
Loading
Loading