diff --git a/Cargo.lock b/Cargo.lock index 18833bcf1c..ca8cb138f3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1067,7 +1067,7 @@ dependencies = [ [[package]] name = "bifrost-benchpress" -version = "1.6.3-dev" +version = "1.7.0-dev" dependencies = [ "anyhow", "bytes", @@ -1674,7 +1674,7 @@ checksum = "3f88a43d011fc4a6876cb7344703e297c71dda42494fee094d5f7c76bf13f746" [[package]] name = "codederror" -version = "1.6.3-dev" +version = "1.7.0-dev" dependencies = [ "codederror-derive", "restate-workspace-hack", @@ -4915,7 +4915,7 @@ checksum = "5e5032e24019045c762d3c0f28f5b6b8bbf38563a65908389bf7978758920897" [[package]] name = "logserver-bench" -version = "1.6.3-dev" +version = "1.7.0-dev" dependencies = [ "anyhow", "axum", @@ -5182,7 +5182,7 @@ checksum = "f2b8f3a258db515d5e91a904ce4ae3f73e091149b90cadbdb93d210bee07f63b" [[package]] name = "mock-service-endpoint" -version = "1.6.3-dev" +version = "1.7.0-dev" dependencies = [ "assert2", "async-stream", @@ -6035,7 +6035,7 @@ checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391" [[package]] name = "pp-bench" -version = "1.6.3-dev" +version = "1.7.0-dev" dependencies = [ "anyhow", "axum", @@ -6949,7 +6949,7 @@ checksum = "1e061d1b48cb8d38042de4ae0a7a6401009d6143dc80d2e2d6f31f0bdd6470c7" [[package]] name = "restate-admin" -version = "1.6.3-dev" +version = "1.7.0-dev" dependencies = [ "ahash", "anyhow", @@ -7005,7 +7005,7 @@ dependencies = [ [[package]] name = "restate-admin-rest-model" -version = "1.6.3-dev" +version = "1.7.0-dev" dependencies = [ "bytes", "derive_more", @@ -7024,7 +7024,7 @@ dependencies = [ [[package]] name = "restate-base64-util" -version = "1.6.3-dev" +version = "1.7.0-dev" dependencies = [ "base64 0.22.1", "restate-workspace-hack", @@ -7032,7 +7032,7 @@ dependencies = [ [[package]] name = "restate-benchmarks" -version = "1.6.3-dev" +version = "1.7.0-dev" dependencies = [ "anyhow", "criterion", @@ -7059,7 +7059,7 @@ dependencies = [ [[package]] name = "restate-bifrost" -version = "1.6.3-dev" +version = "1.7.0-dev" dependencies = [ "adaptive-timeout", "ahash", @@ -7115,7 +7115,7 @@ dependencies = [ [[package]] name = "restate-cli" -version = "1.6.3-dev" +version = "1.7.0-dev" dependencies = [ "anyhow", "arc-swap", @@ -7184,7 +7184,7 @@ dependencies = [ [[package]] name = "restate-cli-util" -version = "1.6.3-dev" +version = "1.7.0-dev" dependencies = [ "anyhow", "arc-swap", @@ -7213,7 +7213,7 @@ dependencies = [ [[package]] name = "restate-clock" -version = "1.6.3-dev" +version = "1.7.0-dev" dependencies = [ "bilrost", "criterion", @@ -7232,7 +7232,7 @@ dependencies = [ [[package]] name = "restate-cloud-tunnel-client" -version = "1.6.3-dev" +version = "1.7.0-dev" dependencies = [ "anyhow", "bs58", @@ -7252,7 +7252,7 @@ dependencies = [ [[package]] name = "restate-core" -version = "1.6.3-dev" +version = "1.7.0-dev" dependencies = [ "ahash", "anyhow", @@ -7320,7 +7320,7 @@ dependencies = [ [[package]] name = "restate-doctor" -version = "1.6.3-dev" +version = "1.7.0-dev" dependencies = [ "anyhow", "bilrost", @@ -7356,7 +7356,7 @@ dependencies = [ [[package]] name = "restate-encoding" -version = "1.6.3-dev" +version = "1.7.0-dev" dependencies = [ "bilrost", "bytes", @@ -7377,7 +7377,7 @@ dependencies = [ [[package]] name = "restate-errors" -version = "1.6.3-dev" +version = "1.7.0-dev" dependencies = [ "codederror", "paste", @@ -7389,7 +7389,7 @@ dependencies = [ [[package]] name = "restate-fs-util" -version = "1.6.3-dev" +version = "1.7.0-dev" dependencies = [ "restate-workspace-hack", "tokio", @@ -7399,7 +7399,7 @@ dependencies = [ [[package]] name = "restate-futures-util" -version = "1.6.3-dev" +version = "1.7.0-dev" dependencies = [ "anyhow", "async-channel", @@ -7419,7 +7419,7 @@ dependencies = [ [[package]] name = "restate-hyper-uds" -version = "1.6.3-dev" +version = "1.7.0-dev" dependencies = [ "http 1.4.0", "hyper-util", @@ -7431,7 +7431,7 @@ dependencies = [ [[package]] name = "restate-ingestion-client" -version = "1.6.3-dev" +version = "1.7.0-dev" dependencies = [ "bytes", "dashmap", @@ -7451,7 +7451,7 @@ dependencies = [ [[package]] name = "restate-ingress-http" -version = "1.6.3-dev" +version = "1.7.0-dev" dependencies = [ "anyhow", "bytes", @@ -7496,7 +7496,7 @@ dependencies = [ [[package]] name = "restate-ingress-kafka" -version = "1.6.3-dev" +version = "1.7.0-dev" dependencies = [ "anyhow", "base64 0.22.1", @@ -7527,7 +7527,7 @@ dependencies = [ [[package]] name = "restate-invoker-impl" -version = "1.6.3-dev" +version = "1.7.0-dev" dependencies = [ "anyhow", "bytes", @@ -7576,7 +7576,7 @@ dependencies = [ [[package]] name = "restate-limiter" -version = "1.6.3-dev" +version = "1.7.0-dev" dependencies = [ "bilrost", "restate-limiter", @@ -7589,7 +7589,7 @@ dependencies = [ [[package]] name = "restate-lite" -version = "1.6.3-dev" +version = "1.7.0-dev" dependencies = [ "anyhow", "http 1.4.0", @@ -7618,7 +7618,7 @@ dependencies = [ [[package]] name = "restate-local-cluster-runner" -version = "1.6.3-dev" +version = "1.7.0-dev" dependencies = [ "anyhow", "arc-swap", @@ -7655,7 +7655,7 @@ dependencies = [ [[package]] name = "restate-log-server" -version = "1.6.3-dev" +version = "1.7.0-dev" dependencies = [ "ahash", "anyhow", @@ -7697,7 +7697,7 @@ dependencies = [ [[package]] name = "restate-log-server-grpc" -version = "1.6.3-dev" +version = "1.7.0-dev" dependencies = [ "prost", "restate-types", @@ -7709,7 +7709,7 @@ dependencies = [ [[package]] name = "restate-memory" -version = "1.6.3-dev" +version = "1.7.0-dev" dependencies = [ "bytes", "futures", @@ -7725,7 +7725,7 @@ dependencies = [ [[package]] name = "restate-metadata-providers" -version = "1.6.3-dev" +version = "1.7.0-dev" dependencies = [ "anyhow", "async-trait", @@ -7761,7 +7761,7 @@ dependencies = [ [[package]] name = "restate-metadata-server" -version = "1.6.3-dev" +version = "1.7.0-dev" dependencies = [ "anyhow", "arc-swap", @@ -7806,7 +7806,7 @@ dependencies = [ [[package]] name = "restate-metadata-server-grpc" -version = "1.6.3-dev" +version = "1.7.0-dev" dependencies = [ "bytes", "bytestring", @@ -7827,7 +7827,7 @@ dependencies = [ [[package]] name = "restate-metadata-store" -version = "1.6.3-dev" +version = "1.7.0-dev" dependencies = [ "async-trait", "bytes", @@ -7849,7 +7849,7 @@ dependencies = [ [[package]] name = "restate-node" -version = "1.6.3-dev" +version = "1.7.0-dev" dependencies = [ "ahash", "anyhow", @@ -7911,7 +7911,7 @@ dependencies = [ [[package]] name = "restate-object-store-util" -version = "1.6.3-dev" +version = "1.7.0-dev" dependencies = [ "anyhow", "aws-config", @@ -7929,7 +7929,7 @@ dependencies = [ [[package]] name = "restate-partition-store" -version = "1.6.3-dev" +version = "1.7.0-dev" dependencies = [ "ahash", "anyhow", @@ -7984,7 +7984,7 @@ dependencies = [ [[package]] name = "restate-platform" -version = "1.6.3-dev" +version = "1.7.0-dev" dependencies = [ "bilrost", "bytes", @@ -8000,7 +8000,7 @@ dependencies = [ [[package]] name = "restate-queue" -version = "1.6.3-dev" +version = "1.7.0-dev" dependencies = [ "bincode", "criterion", @@ -8015,7 +8015,7 @@ dependencies = [ [[package]] name = "restate-rocksdb" -version = "1.6.3-dev" +version = "1.7.0-dev" dependencies = [ "anyhow", "bytes", @@ -8045,7 +8045,7 @@ dependencies = [ [[package]] name = "restate-serde-util" -version = "1.6.3-dev" +version = "1.7.0-dev" dependencies = [ "bytes", "bytesize", @@ -8062,7 +8062,7 @@ dependencies = [ [[package]] name = "restate-server" -version = "1.6.3-dev" +version = "1.7.0-dev" dependencies = [ "anyhow", "bytestring", @@ -8112,7 +8112,7 @@ dependencies = [ [[package]] name = "restate-service-client" -version = "1.6.3-dev" +version = "1.7.0-dev" dependencies = [ "arc-swap", "aws-config", @@ -8163,7 +8163,7 @@ dependencies = [ [[package]] name = "restate-service-protocol" -version = "1.6.3-dev" +version = "1.7.0-dev" dependencies = [ "bytes", "bytes-utils", @@ -8184,7 +8184,7 @@ dependencies = [ [[package]] name = "restate-service-protocol-v4" -version = "1.6.3-dev" +version = "1.7.0-dev" dependencies = [ "assert2", "bytes", @@ -8218,7 +8218,7 @@ dependencies = [ [[package]] name = "restate-sharding" -version = "1.6.3-dev" +version = "1.7.0-dev" dependencies = [ "bilrost", "derive_more", @@ -8232,7 +8232,7 @@ dependencies = [ [[package]] name = "restate-storage-api" -version = "1.6.3-dev" +version = "1.7.0-dev" dependencies = [ "ahash", "anyhow", @@ -8260,7 +8260,7 @@ dependencies = [ [[package]] name = "restate-storage-query-datafusion" -version = "1.6.3-dev" +version = "1.7.0-dev" dependencies = [ "ahash", "anyhow", @@ -8301,7 +8301,7 @@ dependencies = [ [[package]] name = "restate-test-util" -version = "1.6.3-dev" +version = "1.7.0-dev" dependencies = [ "assert2", "bytes", @@ -8316,7 +8316,7 @@ dependencies = [ [[package]] name = "restate-time-util" -version = "1.6.3-dev" +version = "1.7.0-dev" dependencies = [ "jiff", "restate-workspace-hack", @@ -8329,7 +8329,7 @@ dependencies = [ [[package]] name = "restate-timer" -version = "1.6.3-dev" +version = "1.7.0-dev" dependencies = [ "ahash", "futures-util", @@ -8346,7 +8346,7 @@ dependencies = [ [[package]] name = "restate-timer-queue" -version = "1.6.3-dev" +version = "1.7.0-dev" dependencies = [ "futures", "restate-workspace-hack", @@ -8355,7 +8355,7 @@ dependencies = [ [[package]] name = "restate-tracing-instrumentation" -version = "1.6.3-dev" +version = "1.7.0-dev" dependencies = [ "console-subscriber", "criterion", @@ -8385,7 +8385,7 @@ dependencies = [ [[package]] name = "restate-types" -version = "1.6.3-dev" +version = "1.7.0-dev" dependencies = [ "adaptive-timeout", "ahash", @@ -8484,7 +8484,7 @@ dependencies = [ [[package]] name = "restate-util-string" -version = "1.6.3-dev" +version = "1.7.0-dev" dependencies = [ "ahash", "bilrost", @@ -8498,7 +8498,7 @@ dependencies = [ [[package]] name = "restate-utoipa" -version = "1.6.3-dev" +version = "1.7.0-dev" dependencies = [ "indexmap 2.14.0", "restate-utoipa", @@ -8509,7 +8509,7 @@ dependencies = [ [[package]] name = "restate-vqueues" -version = "1.6.3-dev" +version = "1.7.0-dev" dependencies = [ "arrayvec", "bilrost", @@ -8543,7 +8543,7 @@ dependencies = [ [[package]] name = "restate-wal-protocol" -version = "1.6.3-dev" +version = "1.7.0-dev" dependencies = [ "bilrost", "bytes", @@ -8570,7 +8570,7 @@ dependencies = [ [[package]] name = "restate-worker" -version = "1.6.3-dev" +version = "1.7.0-dev" dependencies = [ "ahash", "anyhow", @@ -8636,7 +8636,7 @@ dependencies = [ [[package]] name = "restate-worker-api" -version = "1.6.3-dev" +version = "1.7.0-dev" dependencies = [ "bytes", "codederror", @@ -8812,7 +8812,7 @@ dependencies = [ [[package]] name = "restatectl" -version = "1.6.3-dev" +version = "1.7.0-dev" dependencies = [ "anyhow", "arrow", @@ -9340,7 +9340,7 @@ dependencies = [ [[package]] name = "service-protocol-wireshark-dissector" -version = "1.6.3-dev" +version = "1.7.0-dev" dependencies = [ "bytes", "mlua", @@ -11543,7 +11543,7 @@ checksum = "66fee0b777b0f5ac1c69bb06d361268faafa61cd4682ae064a171c16c433e9e4" [[package]] name = "xtask" -version = "1.6.3-dev" +version = "1.7.0-dev" dependencies = [ "anyhow", "reqwest", diff --git a/Cargo.toml b/Cargo.toml index acc7973be2..ef7576db37 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,7 +32,7 @@ default-members = [ resolver = "2" [workspace.package] -version = "1.6.3-dev" +version = "1.7.0-dev" authors = ["restate.dev"] edition = "2024" rust-version = "1.95.0" diff --git a/crates/core/src/task_center.rs b/crates/core/src/task_center.rs index ffbb7d9e08..99b0dca5ff 100644 --- a/crates/core/src/task_center.rs +++ b/crates/core/src/task_center.rs @@ -23,6 +23,7 @@ pub use monitoring::*; pub use runtime::*; pub use task::*; pub use task_kind::*; +use tokio::runtime::LocalOptions; use std::collections::HashMap; use std::future::Future; @@ -37,7 +38,6 @@ use futures::future::BoxFuture; use metrics::counter; use parking_lot::Mutex; use tokio::sync::oneshot; -use tokio::task::LocalSet; use tokio::task_local; use tokio::time::Instant; use tokio_util::sync::CancellationToken; @@ -47,6 +47,7 @@ use tracing::{debug, error, info, trace, warn}; use crate::metric_definitions::{STATUS_COMPLETED, STATUS_FAILED, TC_FINISHED, TC_SPAWN}; use crate::{Metadata, ShutdownError, ShutdownSourceErr}; use restate_memory::MemoryController; +use restate_platform::prelude::*; use restate_types::SharedString; use restate_types::cluster_state::ClusterState; use restate_types::config::Configuration; @@ -75,8 +76,12 @@ struct GlobalOverrides { #[derive(Debug, thiserror::Error)] pub enum RuntimeError { - #[error("Runtime with name {0} already exists")] + #[error("runtime with name {0} already exists")] AlreadyExists(String), + #[error("[io] cannot start thread:{0}")] + Io(std::io::Error), + #[error("[os] cannot start thread:{0}")] + Os(GenericError), #[error(transparent)] Shutdown(#[from] ShutdownError), } @@ -716,8 +721,17 @@ impl TaskCenterInner { if self.shutdown_requested.load(Ordering::Relaxed) { return Err(ShutdownError.into()); } + let cancel = CancellationToken::new(); + let tc = self.clone(); let runtime_name: SharedString = runtime_name.into(); + // todo: configure the runtime according to a new runtime kind perhaps? + let thread_builder = std::thread::Builder::new().name(format!("rt:{runtime_name}")); + // capture the runtime handle if it started successfully. Uses std mpsc rather than + // tokio's oneshot so we can block on recv() from inside a tokio runtime without panicking. + let (start_tx, start_rx) = std::sync::mpsc::sync_channel::(1); + + let (result_tx, result_rx) = oneshot::channel(); // hold a lock while creating the runtime to avoid concurrent runtimes with the same name let mut runtimes_guard = self.managed_runtimes.lock(); @@ -729,60 +743,77 @@ impl TaskCenterInner { return Err(RuntimeError::AlreadyExists(runtime_name.into_owned())); } - // todo: configure the runtime according to a new runtime kind perhaps? - let thread_builder = std::thread::Builder::new().name(format!("rt:{runtime_name}")); - let mut builder = tokio::runtime::Builder::new_current_thread(); - - #[cfg(any(test, feature = "test-util"))] - builder.start_paused(self.pause_time); - - let rt = builder - .enable_all() - .build() - .expect("runtime builder succeeds"); - let tc = self.clone(); - - let rt_handle = Arc::new(rt); - - runtimes_guard.insert( - runtime_name.clone(), - OwnedRuntimeHandle::new(cancel.clone(), rt_handle.clone()), - ); - - // release the lock. - drop(runtimes_guard); - - let id = TaskId::default(); - let context = TaskContext { - id, - name: runtime_name.clone(), - kind: root_task_kind, - cancellation_token: cancel.clone(), - partition_id, - }; - - let (result_tx, result_rx) = oneshot::channel(); - + let start = Arc::new(OnceLock::new()); // start the work on the runtime - let _ = thread_builder + let thread_handle: std::thread::JoinHandle<()> = thread_builder .spawn({ + let start = start.clone(); + #[cfg(any(test, feature = "test-util"))] + let pause_time = self.pause_time; + let cancel = cancel.clone(); let runtime_name = runtime_name.clone(); move || { - let local_set = LocalSet::new(); - let result = rt_handle.block_on(local_set.run_until(unmanaged_wrapper( - tc.clone(), - context, - root_future(), - ))); - - drop(rt_handle); + // todo: setup the thread (core affinity, thread-locals, etc.) + let mut builder = tokio::runtime::Builder::new_current_thread(); + + #[cfg(any(test, feature = "test-util"))] + builder.start_paused(pause_time); + + let id = TaskId::default(); + let context = TaskContext { + id, + name: runtime_name.clone(), + kind: root_task_kind, + cancellation_token: cancel.clone(), + partition_id, + }; + + // try to insert the runtime into the list of managed runtimes + let rt = builder + .enable_all() + .build_local(LocalOptions::default()) + .expect("runtime builder succeeds"); + if start_tx.send(rt.handle().clone()).is_err() { + warn!( + "Will not start runtime {runtime_name} since task center is no longer interested in it" + ); + return; + } + + // Wait until the runtime is registered in the map before doing any real work. + start.wait(); + + let result = rt.block_on(unmanaged_wrapper(tc.clone(), context, root_future())); + tc.drop_runtime(runtime_name); // need to use an oneshot here since we cannot await a thread::JoinHandle :-( let _ = result_tx.send(result); } - }) - .unwrap(); + }).map_err(RuntimeError::Io)?; + + let rt_handle = match start_rx.recv() { + Ok(rt_handle) => rt_handle, + Err(_) => { + cancel.cancel(); + if thread_handle.is_finished() + && let Err(e) = thread_handle.join() + { + std::panic::resume_unwind(e); + } else { + error!("Runtime {runtime_name} was not started successfully"); + } + return Err(RuntimeError::Shutdown(ShutdownError)); + } + }; + + runtimes_guard.insert( + runtime_name.clone(), + OwnedRuntimeHandle::new(cancel.clone(), rt_handle, thread_handle), + ); + drop(runtimes_guard); + // let the runtime go ahead. + let _ = start.set(true); Ok(RuntimeTaskHandle::new(runtime_name, cancel, result_rx)) } @@ -791,14 +822,9 @@ impl TaskCenterInner { /// the runtime handle. fn drop_runtime(self: &Arc, name: SharedString) { let mut runtimes_guard = self.managed_runtimes.lock(); - if let Some(runtime) = runtimes_guard.remove(&name) { + if runtimes_guard.remove(&name).is_some() { // We must be the only owner of runtime at this point. - debug!("Runtime {} completed", name); - let owner = Arc::into_inner(runtime.into_inner()); - if let Some(runtime) = owner { - runtime.shutdown_timeout(Duration::from_secs(2)); - trace!("Runtime {} shutdown completed", name); - } + trace!("Runtime {name} shutdown completed"); } } @@ -1217,13 +1243,9 @@ impl TaskCenterInner { fn shutdown_managed_runtimes(self: &Arc) { let mut runtimes = self.managed_runtimes.lock(); for (_, runtime) in runtimes.drain() { - if let Some(runtime) = Arc::into_inner(runtime.into_inner()) { - // This really isn't doing much, but it's left here for completion. - // The reason is: If the runtime is still running, then it'll hold the Arc until it - // finishes gracefully, yielding None here. If the runtime completed, it'll - // self-shutdown prior to reaching this point. - runtime.shutdown_background(); - } + // This won't do anything if the shutdown was already initiated via + // `initiate_managed_runtimes_shutdown` + runtime.cancel(); } } } diff --git a/crates/core/src/task_center/runtime.rs b/crates/core/src/task_center/runtime.rs index d9038be401..5fc08add7b 100644 --- a/crates/core/src/task_center/runtime.rs +++ b/crates/core/src/task_center/runtime.rs @@ -9,7 +9,6 @@ // by the Apache License, Version 2.0. use std::pin::Pin; -use std::sync::Arc; use std::task::{Context, Poll, ready}; use futures::FutureExt; @@ -63,23 +62,26 @@ impl std::future::Future for RuntimeTaskHandle { pub(super) struct OwnedRuntimeHandle { cancellation_token: CancellationToken, - inner: Arc, + inner: tokio::runtime::Handle, + _thread_handle: std::thread::JoinHandle<()>, } impl OwnedRuntimeHandle { pub fn new( cancellation_token: CancellationToken, - runtime: Arc, + runtime: tokio::runtime::Handle, + thread_handle: std::thread::JoinHandle<()>, ) -> Self { Self { cancellation_token, inner: runtime, + _thread_handle: thread_handle, } } // The runtime name pub fn runtime_handle(&self) -> &tokio::runtime::Handle { - self.inner.handle() + &self.inner } /// Trigger graceful shutdown of the runtime root task. Shutdown is not guaranteed, it depends @@ -87,8 +89,4 @@ impl OwnedRuntimeHandle { pub fn cancel(&self) { self.cancellation_token.cancel() } - - pub fn into_inner(self) -> Arc { - self.inner - } }