diff --git a/Cargo.lock b/Cargo.lock index 52da195c2..3b3178165 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -103,6 +103,28 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8" +[[package]] +name = "aws-lc-rs" +version = "1.15.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a88aab2464f1f25453baa7a07c84c5b7684e274054ba06817f382357f77a288" +dependencies = [ + "aws-lc-sys", + "zeroize", +] + +[[package]] +name = "aws-lc-sys" +version = "0.35.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b45afffdee1e7c9126814751f88dddc747f41d91da16c9551a0f1e8a11e788a1" +dependencies = [ + "cc", + "cmake", + "dunce", + "fs_extra", +] + [[package]] name = "axum" version = "0.7.9" @@ -206,11 +228,13 @@ dependencies = [ [[package]] name = "cc" -version = "1.2.45" +version = "1.2.52" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "35900b6c8d709fb1d854671ae27aeaa9eec2f8b01b364e1619a40da3e6fe2afe" +checksum = "cd4932aefd12402b36c60956a4fe0035421f544799057659ff86f923657aada3" dependencies = [ "find-msvc-tools", + "jobserver", + "libc", "shlex", ] @@ -239,6 +263,15 @@ dependencies = [ "windows-link 0.2.1", ] +[[package]] +name = "cmake" +version = "0.1.57" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75443c44cd6b379beb8c5b45d85d0773baf31cce901fe7bb252f4eff3008ef7d" +dependencies = [ + "cc", +] + [[package]] name = "colored" version = "2.2.0" @@ -466,6 +499,12 @@ dependencies = [ "syn", ] +[[package]] +name = "dunce" +version = "1.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "92773504d58c093f6de2459af4af33faa518c13451eb8f2b5698ed3d36e7c813" + [[package]] name = "either" version = "1.15.0" @@ -552,9 +591,9 @@ checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be" [[package]] name = "find-msvc-tools" -version = "0.1.4" +version = "0.1.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "52051878f80a721bb68ebfbc930e07b65ba72f2da88968ea5c06fd6ca3d3a127" +checksum = "f449e6c6c08c865631d4890cfacf252b3d396c9bcc83adb6623cdb02a8336c41" [[package]] name = "flate2" @@ -596,6 +635,12 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "fs_extra" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "42703706b716c37f96a77aea830392ad231f44c9e9a67872fa5548707e11b11c" + [[package]] name = "futures" version = "0.3.31" @@ -1149,6 +1194,16 @@ version = "1.0.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4a5f13b858c8d314ee3e8f639011f7ccefe71f97f96e50151fb991f267928e2c" +[[package]] +name = "jobserver" +version = "0.1.34" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9afb3de4395d6b3e67a780b6de64b51c978ecf11cb9a462c66be7d4ca9039d33" +dependencies = [ + "getrandom 0.3.4", + "libc", +] + [[package]] name = "js-sys" version = "0.3.82" @@ -1560,6 +1615,16 @@ version = "1.0.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "57c0d7b74b563b49d38dae00a0c37d4d6de9b432382b2892f0574ddcae73fd0a" +[[package]] +name = "pem" +version = "3.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d30c53c26bc5b31a98cd02d20f25a7c8567146caf63ed593a9d87b2775291be" +dependencies = [ + "base64 0.22.1", + "serde_core", +] + [[package]] name = "percent-encoding" version = "2.3.2" @@ -1683,10 +1748,16 @@ dependencies = [ "opentelemetry-otlp", "opentelemetry_sdk", "rand 0.9.2", + "rcgen", + "ring", + "rustls", + "rustls-pemfile", "serde", "serde_json", "thiserror 2.0.17", + "time", "tokio", + "tokio-rustls", "tokio-stream", "tokio-test", "tokio-util", @@ -2015,6 +2086,19 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "rcgen" +version = "0.13.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75e669e5202259b5314d1ea5397316ad400819437857b90861765f24c4cf80a2" +dependencies = [ + "pem", + "ring", + "rustls-pki-types", + "time", + "yasna", +] + [[package]] name = "redox_syscall" version = "0.5.18" @@ -2166,6 +2250,7 @@ version = "0.23.35" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "533f54bc6a7d4f647e46ad909549eda97bf5afc1585190ef692b4286b198bd8f" dependencies = [ + "aws-lc-rs", "log", "once_cell", "ring", @@ -2175,6 +2260,15 @@ dependencies = [ "zeroize", ] +[[package]] +name = "rustls-pemfile" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dce314e5fee3f39953d46bb63bb8a46d40c2f8fb7cc5a3b6cab2bde9721d6e50" +dependencies = [ + "rustls-pki-types", +] + [[package]] name = "rustls-pki-types" version = "1.13.0" @@ -2191,6 +2285,7 @@ version = "0.103.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2ffdfa2f5286e2247234e03f680868ac2815974dc39e00ea15adc445d0aafe52" dependencies = [ + "aws-lc-rs", "ring", "rustls-pki-types", "untrusted", @@ -3478,6 +3573,15 @@ version = "0.6.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9edde0db4769d2dc68579893f2306b26c6ecfbe0ef499b013d731b7b9247e0b9" +[[package]] +name = "yasna" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e17bb3549cc1321ae1296b9cdc2698e2b6cb1992adfa19a8c72e5b7a738f44cd" +dependencies = [ + "time", +] + [[package]] name = "yoke" version = "0.8.1" diff --git a/Cargo.toml b/Cargo.toml index fec96d9a8..9ef44916d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -74,6 +74,14 @@ reqwest = { version = "0.12", default-features = false, features = [ ] } reqwest-eventsource = "0.6" +# TLS +rustls = { version = "0.23", default-features = false, features = ["ring", "std"] } +tokio-rustls = { version = "0.26" } +rcgen = { version = "0.13" } +ring = { version = "0.17" } +rustls-pemfile = { version = "2" } +time = { version = "0.3" } + # CLI (removed clap - now using Python hyperparameter) strum_macros = "0.27" diff --git a/crates/pulsing-actor/Cargo.toml b/crates/pulsing-actor/Cargo.toml index 35707fdaf..03f51aab4 100644 --- a/crates/pulsing-actor/Cargo.toml +++ b/crates/pulsing-actor/Cargo.toml @@ -9,11 +9,13 @@ repository = "https://github.com/reiase/pulsing" keywords = ["actor", "distributed", "cluster", "gossip"] [features] -default = [] +default = ["tls"] # Enable integration tests that require a running cluster integration = [] # Enable OTLP exporter for sending traces to Jaeger/Tempo otlp = ["opentelemetry-otlp"] +# Enable TLS support with passphrase-derived certificates +tls = ["dep:rustls", "dep:tokio-rustls", "dep:rcgen", "dep:ring", "dep:rustls-pemfile", "dep:time"] [dependencies] # Async runtime @@ -55,6 +57,14 @@ hyper = { workspace = true } hyper-util = { workspace = true } http-body-util = { workspace = true } +# TLS (optional) +rustls = { workspace = true, optional = true } +tokio-rustls = { workspace = true, optional = true } +rcgen = { workspace = true, optional = true } +ring = { workspace = true, optional = true } +rustls-pemfile = { workspace = true, optional = true } +time = { workspace = true, optional = true } + [dev-dependencies] tokio-test = { workspace = true } tracing-subscriber = { workspace = true } diff --git a/crates/pulsing-actor/src/lib.rs b/crates/pulsing-actor/src/lib.rs index 6c60e8529..37e07a810 100644 --- a/crates/pulsing-actor/src/lib.rs +++ b/crates/pulsing-actor/src/lib.rs @@ -108,6 +108,7 @@ pub mod circuit_breaker; pub mod cluster; pub mod metrics; pub mod policies; +pub mod supervision; pub mod system; pub mod system_actor; pub mod tracing; @@ -130,6 +131,7 @@ pub mod watch; /// import from `pulsing_actor::actor::*`. pub mod prelude { pub use crate::actor::{Actor, ActorContext, ActorRef, Message}; + pub use crate::supervision::{BackoffStrategy, RestartPolicy, SupervisionSpec}; pub use crate::system::{ ActorSystem, LoadBalanceStrategy, ResolveOptions, SpawnOptions, SystemConfig, }; diff --git a/crates/pulsing-actor/src/supervision.rs b/crates/pulsing-actor/src/supervision.rs new file mode 100644 index 000000000..743910e4e --- /dev/null +++ b/crates/pulsing-actor/src/supervision.rs @@ -0,0 +1,137 @@ +//! Supervision strategies for actor fault tolerance +//! +//! This module defines how actors should handle failures (panics or errors) +//! and when they should be restarted. + +use rand::Rng; +use std::time::Duration; + +/// Restart policy for an actor +#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] +pub enum RestartPolicy { + /// Always restart the actor, regardless of the exit reason + Always, + /// Restart the actor only if it failed (non-normal exit) + OnFailure, + /// Never restart the actor (default) + #[default] + Never, +} + +impl RestartPolicy { + pub fn should_restart(&self, is_failure: bool) -> bool { + match self { + RestartPolicy::Always => true, + RestartPolicy::OnFailure => is_failure, + RestartPolicy::Never => false, + } + } +} + +/// Backoff strategy for restarts +#[derive(Debug, Clone, Copy)] +pub struct BackoffStrategy { + /// Minimum backoff duration + pub min: Duration, + /// Maximum backoff duration + pub max: Duration, + /// Jitter factor (0.0 to 1.0) + pub jitter: f64, + /// Exponential factor (e.g., 2.0 for doubling) + pub factor: f64, +} + +impl Default for BackoffStrategy { + fn default() -> Self { + Self { + min: Duration::from_millis(100), + max: Duration::from_secs(30), + jitter: 0.2, + factor: 2.0, + } + } +} + +impl BackoffStrategy { + /// Create a new exponential backoff strategy + pub fn exponential(min: Duration, max: Duration) -> Self { + Self { + min, + max, + ..Default::default() + } + } + + /// Calculate backoff duration for a given attempt number (0-based) + pub fn duration(&self, attempt: u32) -> Duration { + let mut duration = self.min.as_secs_f64() * self.factor.powi(attempt as i32); + + // Cap at max + let max_secs = self.max.as_secs_f64(); + if duration > max_secs { + duration = max_secs; + } + + // Add jitter + if self.jitter > 0.0 { + let jitter_amount = duration * self.jitter; + let random_factor = rand::rng().random_range(-1.0..1.0); + duration += jitter_amount * random_factor; + } + + // Ensure we don't go below min (though calculation above starts at min) + if duration < 0.0 { + duration = 0.0; + } + + Duration::from_secs_f64(duration) + } +} + +/// Supervision specification for an actor +#[derive(Debug, Clone, Default)] +pub struct SupervisionSpec { + /// Restart policy + pub policy: RestartPolicy, + /// Backoff strategy + pub backoff: BackoffStrategy, + /// Maximum number of restarts allowed + pub max_restarts: u32, + /// Time window for max_restarts (optional). + /// If set, max_restarts applies only within this sliding window. + /// If None, max_restarts applies to the lifetime of the actor. + pub restart_window: Option, +} + +impl SupervisionSpec { + /// Create a spec that never restarts (default) + pub fn never() -> Self { + Self::default() + } + + /// Create a spec that always restarts + pub fn always() -> Self { + Self { + policy: RestartPolicy::Always, + ..Default::default() + } + } + + /// Create a spec that restarts on failure + pub fn on_failure() -> Self { + Self { + policy: RestartPolicy::OnFailure, + ..Default::default() + } + } + + pub fn with_backoff(mut self, backoff: BackoffStrategy) -> Self { + self.backoff = backoff; + self + } + + pub fn with_max_restarts(mut self, max: u32) -> Self { + self.max_restarts = max; + self + } +} diff --git a/crates/pulsing-actor/src/system.rs b/crates/pulsing-actor/src/system.rs index 1f11cc178..80cb82e96 100644 --- a/crates/pulsing-actor/src/system.rs +++ b/crates/pulsing-actor/src/system.rs @@ -8,6 +8,7 @@ use crate::cluster::{ GossipCluster, GossipConfig, GossipMessage, MemberInfo, MemberStatus, NamedActorInfo, }; use crate::metrics::{metrics, SystemMetrics as PrometheusMetrics}; +use crate::supervision::SupervisionSpec; use crate::system_actor::{ BoxedActorFactory, SystemActor, SystemRef, SYSTEM_ACTOR_LOCAL_NAME, SYSTEM_ACTOR_PATH, }; @@ -107,6 +108,22 @@ impl SystemConfig { self.default_mailbox_capacity = capacity; self } + + /// Enable TLS with passphrase-derived certificates + /// + /// All nodes using the same passphrase will be able to communicate securely. + /// The passphrase is used to derive a shared CA certificate, enabling + /// automatic mutual TLS authentication. + #[cfg(feature = "tls")] + pub fn with_tls(mut self, passphrase: &str) -> anyhow::Result { + self.http2_config = self.http2_config.with_tls(passphrase)?; + Ok(self) + } + + /// Check if TLS is enabled + pub fn is_tls_enabled(&self) -> bool { + self.http2_config.is_tls_enabled() + } } /// Options for spawning an actor @@ -116,6 +133,8 @@ pub struct SpawnOptions { pub mailbox_capacity: Option, /// Whether this actor is public (can be resolved by name across cluster) pub public: bool, + /// Supervision specification (restart policy) + pub supervision: SupervisionSpec, } impl SpawnOptions { @@ -135,6 +154,12 @@ impl SpawnOptions { self.public = public; self } + + /// Set supervision specification + pub fn supervision(mut self, supervision: SupervisionSpec) -> Self { + self.supervision = supervision; + self + } } /// Load balance strategy for resolving named actors with multiple instances @@ -432,6 +457,28 @@ impl ActorSystem { ) -> anyhow::Result where A: Actor, + { + // Wrap actor in a factory that only works once + let mut actor_opt = Some(actor); + let factory = move || { + actor_opt + .take() + .ok_or_else(|| anyhow::anyhow!("Actor cannot be restarted (spawned as instance)")) + }; + + self.spawn_factory(name, factory, options).await + } + + /// Spawn an actor using a factory function (enables supervision restarts) + pub async fn spawn_factory( + &self, + name: impl AsRef, + factory: F, + options: SpawnOptions, + ) -> anyhow::Result + where + F: FnMut() -> anyhow::Result + Send + 'static, + A: Actor, { let name = name.as_ref(); @@ -450,7 +497,9 @@ impl ActorSystem { let (sender, receiver) = mailbox.split(); let stats = Arc::new(ActorStats::default()); - let metadata = actor.metadata(); + // We can't get metadata from factory without creating an instance, + // so we start with empty metadata. It could be updated later if we wanted. + let metadata = HashMap::new(); // Create context let ctx = ActorContext::new(actor_id); @@ -459,8 +508,12 @@ impl ActorSystem { let stats_clone = stats.clone(); let cancel = self.cancel_token.clone(); let actor_id_for_log = actor_id; + let supervision = options.supervision.clone(); + let join_handle = tokio::spawn(async move { - let reason = run_actor_loop(actor, receiver, ctx, cancel, stats_clone).await; + let reason = + run_supervision_loop(factory, receiver, ctx, cancel, stats_clone, supervision) + .await; tracing::debug!(actor_id = ?actor_id_for_log, reason = ?reason, "Actor stopped"); }); @@ -504,6 +557,30 @@ impl ActorSystem { ) -> anyhow::Result where A: Actor, + { + // Wrap actor in a factory that only works once + let mut actor_opt = Some(actor); + let factory = move || { + actor_opt + .take() + .ok_or_else(|| anyhow::anyhow!("Actor cannot be restarted (spawned as instance)")) + }; + + self.spawn_named_factory(path, local_name, factory, options) + .await + } + + /// Spawn a named actor using a factory function + pub async fn spawn_named_factory( + &self, + path: ActorPath, + local_name: impl AsRef, + factory: F, + options: SpawnOptions, + ) -> anyhow::Result + where + F: FnMut() -> anyhow::Result + Send + 'static, + A: Actor, { let local_name = local_name.as_ref(); @@ -533,7 +610,7 @@ impl ActorSystem { let (sender, receiver) = mailbox.split(); let stats = Arc::new(ActorStats::default()); - let metadata = actor.metadata(); + let metadata = HashMap::new(); // Create context let ctx = ActorContext::new(actor_id); @@ -542,9 +619,12 @@ impl ActorSystem { let stats_clone = stats.clone(); let cancel = self.cancel_token.clone(); let actor_id_for_log = actor_id; + let supervision = options.supervision.clone(); let join_handle = tokio::spawn(async move { - let reason = run_actor_loop(actor, receiver, ctx, cancel, stats_clone).await; + let reason = + run_supervision_loop(factory, receiver, ctx, cancel, stats_clone, supervision) + .await; tracing::debug!(actor_id = ?actor_id_for_log, reason = ?reason, "Actor stopped"); }); @@ -933,16 +1013,16 @@ impl ActorSystemRef for ActorSystem { } } -/// Actor message loop - returns the reason for stopping -async fn run_actor_loop( +/// Actor instance loop - runs a single instance of an actor +async fn run_actor_instance( mut actor: A, - mut receiver: mpsc::Receiver, - mut ctx: ActorContext, + receiver: &mut mpsc::Receiver, + ctx: &mut ActorContext, cancel: CancellationToken, stats: Arc, ) -> StopReason { // Call on_start - if let Err(e) = actor.on_start(&mut ctx).await { + if let Err(e) = actor.on_start(ctx).await { tracing::error!(actor_id = ?ctx.id(), error = %e, "Actor start error"); stats.inc_stop(); return StopReason::Failed(e.to_string()); @@ -956,11 +1036,15 @@ async fn run_actor_loop( stats.inc_message(); let (message, responder) = envelope.into_parts(); - match actor.receive(message, &mut ctx).await { - Ok(response) => responder.send(Ok(response)), + match actor.receive(message, ctx).await { + Ok(response) => { + responder.send(Ok(response)); + } Err(e) => { tracing::error!(actor_id = ?ctx.id(), error = %e, "Actor error"); responder.send(Err(anyhow::anyhow!("Handler error: {}", e))); + // Actor crashes on error - supervision will decide whether to restart + return StopReason::Failed(e.to_string()); } } } @@ -978,7 +1062,7 @@ async fn run_actor_loop( // Cleanup stats.inc_stop(); - if let Err(e) = actor.on_stop(&mut ctx).await { + if let Err(e) = actor.on_stop(ctx).await { tracing::warn!(actor_id = ?ctx.id(), error = %e, "Actor stop error"); // If on_stop fails, mark as failed if matches!(stop_reason, StopReason::Normal) { @@ -989,6 +1073,89 @@ async fn run_actor_loop( stop_reason } +/// Supervision loop - manages actor restarts +async fn run_supervision_loop( + mut factory: F, + mut receiver: mpsc::Receiver, + mut ctx: ActorContext, + cancel: CancellationToken, + stats: Arc, + spec: SupervisionSpec, +) -> StopReason +where + F: FnMut() -> anyhow::Result + Send + 'static, + A: Actor, +{ + let mut restarts = 0; + // Track restarts for windowing if needed (timestamp of restart) + let mut restart_timestamps: Vec = Vec::new(); + + loop { + // Create actor instance + let actor = match factory() { + Ok(a) => a, + Err(e) => { + tracing::error!(actor_id = ?ctx.id(), error = %e, "Failed to create actor instance"); + return StopReason::Failed(format!("Factory error: {}", e)); + } + }; + + // Run actor instance + let reason = run_actor_instance( + actor, + &mut receiver, + &mut ctx, + cancel.clone(), + stats.clone(), + ) + .await; + + // Check if we should restart + let is_failure = matches!(reason, StopReason::Failed(_)); + if !spec.policy.should_restart(is_failure) { + return reason; + } + + if matches!(reason, StopReason::SystemShutdown | StopReason::Killed) { + return reason; + } + + // Check max restarts + restarts += 1; + + // Prune old timestamps if window is set + if let Some(window) = spec.restart_window { + let now = std::time::Instant::now(); + restart_timestamps.push(now); + restart_timestamps.retain(|&t| now.duration_since(t) <= window); + + if restart_timestamps.len() as u32 > spec.max_restarts { + tracing::error!(actor_id = ?ctx.id(), "Max restarts ({}) exceeded within window {:?}", spec.max_restarts, window); + return reason; + } + } else { + // Absolute count + if restarts > spec.max_restarts { + tracing::error!(actor_id = ?ctx.id(), "Max restarts ({}) exceeded", spec.max_restarts); + return reason; + } + } + + tracing::info!( + actor_id = ?ctx.id(), + reason = ?reason, + restarts = restarts, + "Restarting actor..." + ); + + // Backoff + let backoff = spec.backoff.duration(restarts - 1); + if !backoff.is_zero() { + tokio::time::sleep(backoff).await; + } + } +} + /// Unified message handler for HTTP/2 transport struct SystemMessageHandler { node_id: NodeId, diff --git a/crates/pulsing-actor/src/transport/http2/client.rs b/crates/pulsing-actor/src/transport/http2/client.rs index 1d7b2e5f7..65efb221f 100644 --- a/crates/pulsing-actor/src/transport/http2/client.rs +++ b/crates/pulsing-actor/src/transport/http2/client.rs @@ -338,11 +338,81 @@ impl Http2Client { .map_err(|_| anyhow::anyhow!("Connection timeout"))? .map_err(|e| anyhow::anyhow!("Failed to connect: {}", e))?; - let io = TokioIo::new(tcp_stream); - - // Build HTTP/2 connection with streaming body type + // Build HTTP/2 connection with streaming body type - with or without TLS type StreamingBody = StreamBody, Infallible>>>; + + #[cfg(feature = "tls")] + if let Some(ref tls_config) = self.config.tls { + // TLS mode: wrap TCP stream with TLS + let server_name = addr.ip().to_string(); + let tls_stream = tls_config.connect(tcp_stream, &server_name).await?; + let io = TokioIo::new(tls_stream); + let (mut sender, conn): (http2::SendRequest, _) = + http2::handshake(TokioExecutor::new(), io) + .await + .map_err(|e| anyhow::anyhow!("HTTP/2 TLS handshake failed: {}", e))?; + + // Spawn connection driver for TLS + let cancel = self.cancel.clone(); + tokio::spawn(async move { + tokio::select! { + result = conn => { + if let Err(e) = result { + tracing::debug!(error = %e, "TLS streaming connection ended"); + } + } + _ = cancel.cancelled() => { + tracing::debug!("TLS streaming connection cancelled"); + } + } + }); + + // Complete the streaming request (TLS path) + let (tx, rx) = tokio::sync::mpsc::channel::, Infallible>>(32); + let default_msg_type = msg_type.to_string(); + tokio::spawn(async move { + let mut stream = std::pin::pin!(stream); + while let Some(result) = stream.next().await { + let frame = match result { + Ok(msg) => StreamFrame::from_message(&msg, &default_msg_type), + Err(e) => StreamFrame::error(e.to_string()), + }; + if tx.send(Ok(Frame::data(frame.to_binary()))).await.is_err() { + break; + } + } + let _ = tx + .send(Ok(Frame::data(StreamFrame::end().to_binary()))) + .await; + }); + + let body_stream = tokio_stream::wrappers::ReceiverStream::new(rx); + let body = StreamBody::new(body_stream); + + let uri = format!("http://{}{}", addr, path); + let request = Request::builder() + .method(Method::POST) + .uri(&uri) + .header(headers::MESSAGE_MODE, MessageMode::Ask.as_str()) + .header(headers::MESSAGE_TYPE, msg_type) + .header(headers::REQUEST_TYPE, RequestType::Stream.as_str()) + .header(TRACEPARENT_HEADER, trace_ctx.to_traceparent()) + .header("content-type", "application/octet-stream") + .body(body) + .map_err(|e| anyhow::anyhow!("Failed to build request: {}", e))?; + + let send_future = sender.send_request(request); + let response = tokio::time::timeout(self.config.stream_timeout, send_future) + .await + .map_err(|_| anyhow::anyhow!("Streaming request timeout"))? + .map_err(|e| anyhow::anyhow!("Streaming request failed: {}", e))?; + + return Ok(response); + } + + // Plain h2c mode (no TLS or TLS feature disabled) + let io = TokioIo::new(tcp_stream); let (mut sender, conn): (http2::SendRequest, _) = http2::handshake(TokioExecutor::new(), io) .await diff --git a/crates/pulsing-actor/src/transport/http2/config.rs b/crates/pulsing-actor/src/transport/http2/config.rs index 61792fc91..cc4097ab6 100644 --- a/crates/pulsing-actor/src/transport/http2/config.rs +++ b/crates/pulsing-actor/src/transport/http2/config.rs @@ -4,9 +4,13 @@ //! - Server settings (concurrent streams, window sizes, etc.) //! - Client settings (timeouts, connection pooling) //! - Retry policies +//! - TLS settings (when `tls` feature is enabled) use std::time::Duration; +#[cfg(feature = "tls")] +use super::tls::TlsConfig; + /// HTTP/2 transport configuration #[derive(Debug, Clone)] pub struct Http2Config { @@ -65,6 +69,12 @@ pub struct Http2Config { /// Whether to use jitter in retry delays (default: true) pub retry_use_jitter: bool, + + // ========== TLS Configuration (requires `tls` feature) ========== + /// TLS configuration for encrypted transport (default: None) + /// When set, all connections will use TLS with mutual authentication + #[cfg(feature = "tls")] + pub tls: Option, } impl Default for Http2Config { @@ -94,6 +104,10 @@ impl Default for Http2Config { retry_initial_delay: Duration::from_millis(100), retry_max_delay: Duration::from_secs(10), retry_use_jitter: true, + + // TLS defaults + #[cfg(feature = "tls")] + tls: None, } } } @@ -238,6 +252,36 @@ impl Http2Config { self } + /// Enable TLS with passphrase-derived certificates + /// + /// All nodes using the same passphrase will be able to communicate securely. + /// The passphrase is used to derive a shared CA certificate, enabling + /// automatic mutual TLS authentication. + #[cfg(feature = "tls")] + pub fn with_tls(mut self, passphrase: &str) -> anyhow::Result { + self.tls = Some(TlsConfig::from_passphrase(passphrase)?); + Ok(self) + } + + /// Set TLS configuration directly + #[cfg(feature = "tls")] + pub fn tls_config(mut self, tls: TlsConfig) -> Self { + self.tls = Some(tls); + self + } + + /// Check if TLS is enabled + #[cfg(feature = "tls")] + pub fn is_tls_enabled(&self) -> bool { + self.tls.is_some() + } + + /// Check if TLS is enabled (always false when `tls` feature is not enabled) + #[cfg(not(feature = "tls"))] + pub fn is_tls_enabled(&self) -> bool { + false + } + /// Convert to retry config pub fn to_retry_config(&self) -> super::retry::RetryConfig { super::retry::RetryConfig { diff --git a/crates/pulsing-actor/src/transport/http2/mod.rs b/crates/pulsing-actor/src/transport/http2/mod.rs index 75cff69bb..4a1982cfe 100644 --- a/crates/pulsing-actor/src/transport/http2/mod.rs +++ b/crates/pulsing-actor/src/transport/http2/mod.rs @@ -64,6 +64,9 @@ mod retry; mod server; mod stream; +#[cfg(feature = "tls")] +mod tls; + pub use client::{Http2Client, Http2ClientBuilder}; pub use config::Http2Config; pub use pool::{ConnectionPool, PoolConfig, PoolStats}; @@ -71,6 +74,9 @@ pub use retry::{RetryConfig, RetryExecutor, RetryableError}; pub use server::{Http2Server, Http2ServerHandler}; pub use stream::{BinaryFrameParser, StreamFrame, StreamHandle, FLAG_END, FLAG_ERROR}; +#[cfg(feature = "tls")] +pub use tls::TlsConfig; + use crate::actor::{ActorId, ActorPath, Message, RemoteTransport}; use crate::circuit_breaker::{CircuitBreaker, CircuitBreakerConfig}; use std::net::SocketAddr; diff --git a/crates/pulsing-actor/src/transport/http2/pool.rs b/crates/pulsing-actor/src/transport/http2/pool.rs index 095de26eb..be53cab58 100644 --- a/crates/pulsing-actor/src/transport/http2/pool.rs +++ b/crates/pulsing-actor/src/transport/http2/pool.rs @@ -5,6 +5,7 @@ //! - Connection expiration/eviction //! - Connection reuse optimization //! - Pool statistics +//! - TLS support (when `tls` feature is enabled) use super::config::Http2Config; use bytes::Bytes; @@ -395,9 +396,31 @@ impl ConnectionPool { // Set TCP options stream.set_nodelay(true)?; - let io = TokioIo::new(stream); + // Create HTTP/2 connection - with or without TLS + #[cfg(feature = "tls")] + if let Some(ref tls_config) = self.http2_config.tls { + // TLS mode: wrap TCP stream with TLS + let server_name = addr.ip().to_string(); + let tls_stream = tls_config.connect(stream, &server_name).await?; + let io = TokioIo::new(tls_stream); + let (sender, conn) = http2::handshake(TokioExecutor::new(), io) + .await + .map_err(|e| anyhow::anyhow!("HTTP/2 TLS handshake failed with {}: {}", addr, e))?; + + // Spawn connection driver for TLS connection + tokio::spawn(async move { + if let Err(e) = conn.await { + tracing::debug!(error = %e, "HTTP/2 TLS connection closed"); + } + }); + + let mut pooled = PooledConnection::new(sender); + pooled.mark_used(); + return Ok(pooled); + } - // Create HTTP/2 connection with prior knowledge (h2c) + // Plain h2c mode (no TLS or TLS feature disabled) + let io = TokioIo::new(stream); let (sender, conn) = http2::handshake(TokioExecutor::new(), io) .await .map_err(|e| anyhow::anyhow!("HTTP/2 handshake failed with {}: {}", addr, e))?; diff --git a/crates/pulsing-actor/src/transport/http2/server.rs b/crates/pulsing-actor/src/transport/http2/server.rs index 4b66c7897..6213eb926 100644 --- a/crates/pulsing-actor/src/transport/http2/server.rs +++ b/crates/pulsing-actor/src/transport/http2/server.rs @@ -1,6 +1,7 @@ //! HTTP/2 Server implementation //! //! Supports h2c (HTTP/2 over cleartext) with optional HTTP/1.1 fallback. +//! When `tls` feature is enabled, supports TLS with passphrase-derived certificates. use super::config::Http2Config; use super::stream::{BinaryFrameParser, StreamFrame}; @@ -18,6 +19,7 @@ use hyper_util::rt::{TokioExecutor, TokioIo}; use std::convert::Infallible; use std::net::SocketAddr; use std::sync::Arc; +use tokio::io::{AsyncRead, AsyncWrite}; use tokio::net::TcpListener; use tokio::sync::mpsc; use tokio_util::sync::CancellationToken; @@ -173,6 +175,18 @@ impl Http2Server { config: Http2Config, cancel: CancellationToken, ) -> anyhow::Result<()> { + // Check if TLS is enabled + #[cfg(feature = "tls")] + if let Some(ref tls_config) = config.tls { + // TLS mode: accept TLS handshake first + let tls_stream = tls_config.accept(stream).await?; + let io = TokioIo::new(tls_stream); + + // TLS connections always use HTTP/2 (no HTTP/1.1 fallback) + return Self::serve_h2_generic(io, peer_addr, handler, config, cancel).await; + } + + // Plain TCP mode (h2c) let io = TokioIo::new(stream); // Try to detect HTTP/2 preface @@ -238,6 +252,20 @@ impl Http2Server { config: Http2Config, cancel: CancellationToken, ) -> anyhow::Result<()> { + Self::serve_h2_generic(io, peer_addr, handler, config, cancel).await + } + + /// Generic HTTP/2 server - works with any IO type (TCP or TLS) + async fn serve_h2_generic( + io: TokioIo, + peer_addr: SocketAddr, + handler: Arc, + config: Http2Config, + cancel: CancellationToken, + ) -> anyhow::Result<()> + where + I: AsyncRead + AsyncWrite + Unpin + Send + 'static, + { let service = service_fn(move |req| { let handler = handler.clone(); async move { Self::handle_request(req, handler, peer_addr).await } diff --git a/crates/pulsing-actor/src/transport/http2/tls.rs b/crates/pulsing-actor/src/transport/http2/tls.rs new file mode 100644 index 000000000..ae75c9c30 --- /dev/null +++ b/crates/pulsing-actor/src/transport/http2/tls.rs @@ -0,0 +1,414 @@ +//! TLS support for HTTP/2 transport with passphrase-derived certificates +//! +//! This module provides TLS encryption using certificates derived from a shared passphrase. +//! All nodes using the same passphrase will generate identical CA certificates, enabling +//! automatic mutual TLS authentication. +//! +//! ## Security Model +//! +//! 1. A passphrase is used to deterministically derive a CA certificate +//! 2. Each node generates its own certificate signed by the shared CA +//! 3. Nodes only trust certificates signed by the same CA (same passphrase) +//! +//! ## Usage +//! +//! ```rust,ignore +//! use pulsing_actor::transport::http2::tls::TlsConfig; +//! +//! let tls = TlsConfig::from_passphrase("my-cluster-secret")?; +//! ``` + +use rcgen::{ + BasicConstraints, Certificate, CertificateParams, DnType, ExtendedKeyUsagePurpose, IsCa, + KeyPair, KeyUsagePurpose, SerialNumber, PKCS_ED25519, +}; +use ring::digest::{digest, SHA256}; +use ring::hkdf::{self, HKDF_SHA256}; +use ring::signature::{Ed25519KeyPair, KeyPair as RingKeyPair}; +use rustls::crypto::ring::default_provider; +use rustls::pki_types::{CertificateDer, PrivateKeyDer, PrivatePkcs8KeyDer, ServerName}; +use rustls::server::WebPkiClientVerifier; +use rustls::{ClientConfig, RootCertStore, ServerConfig}; +use std::sync::OnceLock; + +/// Global flag to ensure crypto provider is installed only once +static CRYPTO_PROVIDER_INSTALLED: OnceLock<()> = OnceLock::new(); + +/// Install the ring crypto provider for rustls +fn ensure_crypto_provider() { + CRYPTO_PROVIDER_INSTALLED.get_or_init(|| { + let _ = default_provider().install_default(); + }); +} +use std::sync::Arc; +use tokio_rustls::{TlsAcceptor, TlsConnector}; + +/// Salt used for HKDF key derivation +const HKDF_SALT: &[u8] = b"pulsing-ca-v1"; + +/// CA certificate common name +const CA_COMMON_NAME: &str = "Pulsing Cluster CA"; + +/// Node certificate common name prefix +const NODE_CN_PREFIX: &str = "Pulsing Node"; + +/// Certificate validity period (10 years in seconds) +const CERT_VALIDITY_SECS: i64 = 10 * 365 * 24 * 60 * 60; + +/// TLS configuration for HTTP/2 transport +#[derive(Clone)] +pub struct TlsConfig { + /// TLS acceptor for server-side connections + pub acceptor: TlsAcceptor, + /// TLS connector for client-side connections + pub connector: TlsConnector, + /// The passphrase hash for debugging + passphrase_hash: String, +} + +impl TlsConfig { + /// Create TLS configuration from a passphrase + /// + /// The passphrase is used to deterministically derive a CA certificate. + /// All nodes using the same passphrase will generate identical CA certificates, + /// enabling automatic mutual TLS authentication. + pub fn from_passphrase(passphrase: &str) -> anyhow::Result { + // Ensure the ring crypto provider is installed + ensure_crypto_provider(); + + // Derive CA certificate and key from passphrase + let (ca_cert, ca_key_pair) = derive_ca_from_passphrase(passphrase)?; + + // Generate node certificate signed by CA + let (node_cert, node_key_pair) = generate_node_cert(&ca_cert, &ca_key_pair)?; + + // Convert to DER format + let ca_cert_der = CertificateDer::from(ca_cert.der().to_vec()); + let node_cert_der = CertificateDer::from(node_cert.der().to_vec()); + let node_key_der = + PrivateKeyDer::Pkcs8(PrivatePkcs8KeyDer::from(node_key_pair.serialize_der())); + + // Build root cert store with our CA + let mut root_store = RootCertStore::empty(); + root_store.add(ca_cert_der.clone())?; + + // Build server config with client certificate verification + let client_verifier = WebPkiClientVerifier::builder(Arc::new(root_store.clone())) + .build() + .map_err(|e| anyhow::anyhow!("Failed to build client verifier: {}", e))?; + + let server_config = ServerConfig::builder() + .with_client_cert_verifier(client_verifier) + .with_single_cert(vec![node_cert_der.clone()], node_key_der.clone_key()) + .map_err(|e| anyhow::anyhow!("Failed to build server config: {}", e))?; + + // Build client config + let client_config = ClientConfig::builder() + .with_root_certificates(root_store) + .with_client_auth_cert(vec![node_cert_der], node_key_der) + .map_err(|e| anyhow::anyhow!("Failed to build client config: {}", e))?; + + // Calculate passphrase hash for debugging + let hash = digest(&SHA256, passphrase.as_bytes()); + let passphrase_hash = hex_encode(&hash.as_ref()[..8]); + + Ok(Self { + acceptor: TlsAcceptor::from(Arc::new(server_config)), + connector: TlsConnector::from(Arc::new(client_config)), + passphrase_hash, + }) + } + + /// Get the passphrase hash (for debugging/logging purposes only) + pub fn passphrase_hash(&self) -> &str { + &self.passphrase_hash + } + + /// Connect to a remote server with TLS + /// + /// Note: server_name is ignored for mTLS connections within the cluster. + /// We use a fixed server name that matches the node certificate's CN pattern. + pub async fn connect( + &self, + stream: tokio::net::TcpStream, + _server_name: &str, + ) -> anyhow::Result> { + // Use a fixed server name for internal cluster communication + // The actual authentication is done via mutual TLS (client cert verification) + let server_name = ServerName::try_from("pulsing.internal".to_string()) + .map_err(|e| anyhow::anyhow!("Invalid server name: {}", e))?; + + self.connector + .connect(server_name, stream) + .await + .map_err(|e| anyhow::anyhow!("TLS connect failed: {}", e)) + } + + /// Accept a TLS connection + pub async fn accept( + &self, + stream: tokio::net::TcpStream, + ) -> anyhow::Result> { + self.acceptor + .accept(stream) + .await + .map_err(|e| anyhow::anyhow!("TLS accept failed: {}", e)) + } +} + +impl std::fmt::Debug for TlsConfig { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("TlsConfig") + .field("passphrase_hash", &self.passphrase_hash) + .finish() + } +} + +/// Derive CA certificate and key pair from passphrase +/// +/// This function is deterministic - the same passphrase will always produce +/// the same CA certificate and key. +fn derive_ca_from_passphrase(passphrase: &str) -> anyhow::Result<(Certificate, KeyPair)> { + // Derive seed using HKDF + let seed = derive_seed(passphrase, b"ca-key")?; + + // Generate deterministic Ed25519 key pair from seed + let key_pair = generate_deterministic_key_pair(&seed)?; + + // Create CA certificate with fixed parameters + let mut params = CertificateParams::new(vec![CA_COMMON_NAME.to_string()]) + .map_err(|e| anyhow::anyhow!("Failed to create cert params: {}", e))?; + + params + .distinguished_name + .push(DnType::CommonName, CA_COMMON_NAME); + params + .distinguished_name + .push(DnType::OrganizationName, "Pulsing"); + params.is_ca = IsCa::Ca(BasicConstraints::Unconstrained); + params.key_usages = vec![ + KeyUsagePurpose::KeyCertSign, + KeyUsagePurpose::CrlSign, + KeyUsagePurpose::DigitalSignature, + ]; + + // Fixed validity period (use a fixed start time for determinism) + // We use Unix epoch + 1 year as the start time + let not_before = time::OffsetDateTime::UNIX_EPOCH + time::Duration::days(365); + let not_after = not_before + time::Duration::seconds(CERT_VALIDITY_SECS); + params.not_before = not_before; + params.not_after = not_after; + + // Derive serial number from seed for determinism + let serial_seed = derive_seed(passphrase, b"ca-serial")?; + params.serial_number = Some(SerialNumber::from_slice(&serial_seed[..20])); + + // Generate certificate + let cert = params + .self_signed(&key_pair) + .map_err(|e| anyhow::anyhow!("Failed to generate CA cert: {}", e))?; + + Ok((cert, key_pair)) +} + +/// Internal server name used for TLS verification within the cluster +const INTERNAL_SERVER_NAME: &str = "pulsing.internal"; + +/// Generate a node certificate signed by the CA +fn generate_node_cert( + ca_cert: &Certificate, + ca_key: &KeyPair, +) -> anyhow::Result<(Certificate, KeyPair)> { + // Generate a random key pair for the node (not deterministic) + let node_key = KeyPair::generate_for(&PKCS_ED25519) + .map_err(|e| anyhow::anyhow!("Failed to generate node key: {}", e))?; + + // Create node certificate + let node_id = uuid::Uuid::new_v4().to_string(); + let cn = format!("{} {}", NODE_CN_PREFIX, &node_id[..8]); + + // Include internal server name as SAN for TLS verification + let mut params = CertificateParams::new(vec![cn.clone(), INTERNAL_SERVER_NAME.to_string()]) + .map_err(|e| anyhow::anyhow!("Failed to create node cert params: {}", e))?; + + params.distinguished_name.push(DnType::CommonName, &cn); + params + .distinguished_name + .push(DnType::OrganizationName, "Pulsing"); + params.is_ca = IsCa::NoCa; + params.key_usages = vec![ + KeyUsagePurpose::DigitalSignature, + KeyUsagePurpose::KeyEncipherment, + ]; + params.extended_key_usages = vec![ + ExtendedKeyUsagePurpose::ServerAuth, + ExtendedKeyUsagePurpose::ClientAuth, + ]; + + // Use current time for validity + let not_before = time::OffsetDateTime::now_utc(); + let not_after = not_before + time::Duration::seconds(CERT_VALIDITY_SECS); + params.not_before = not_before; + params.not_after = not_after; + + // Generate certificate signed by CA + let cert = params + .signed_by(&node_key, ca_cert, ca_key) + .map_err(|e| anyhow::anyhow!("Failed to sign node cert: {}", e))?; + + Ok((cert, node_key)) +} + +/// Helper struct for HKDF output length +struct HkdfLen(usize); + +impl hkdf::KeyType for HkdfLen { + fn len(&self) -> usize { + self.0 + } +} + +/// Derive a 32-byte seed using HKDF +fn derive_seed(passphrase: &str, info: &[u8]) -> anyhow::Result<[u8; 32]> { + let salt = hkdf::Salt::new(HKDF_SHA256, HKDF_SALT); + let prk = salt.extract(passphrase.as_bytes()); + + let mut seed = [0u8; 32]; + prk.expand(&[info], HkdfLen(32)) + .map_err(|_| anyhow::anyhow!("HKDF expand failed"))? + .fill(&mut seed) + .map_err(|_| anyhow::anyhow!("HKDF fill failed"))?; + + Ok(seed) +} + +/// Generate a deterministic Ed25519 key pair from a seed +/// +/// Ed25519 natively supports deterministic key generation from a 32-byte seed. +fn generate_deterministic_key_pair(seed: &[u8; 32]) -> anyhow::Result { + // Create Ed25519 key pair from seed using ring + let ed25519_key = Ed25519KeyPair::from_seed_unchecked(seed) + .map_err(|e| anyhow::anyhow!("Failed to create Ed25519 key from seed: {}", e))?; + + // Get the PKCS#8 v2 DER encoding + let pkcs8_der = create_ed25519_pkcs8_der(seed, ed25519_key.public_key().as_ref())?; + + // Convert to PrivateKeyDer and import into rcgen + let private_key_der = PrivateKeyDer::Pkcs8(PrivatePkcs8KeyDer::from(pkcs8_der)); + + let key_pair = KeyPair::from_der_and_sign_algo(&private_key_der, &PKCS_ED25519) + .map_err(|e| anyhow::anyhow!("Failed to create key pair from DER: {}", e))?; + + Ok(key_pair) +} + +/// Create PKCS#8 v1 DER encoding for an Ed25519 private key +/// +/// RFC 8410 defines the format for Ed25519 keys: +/// - privateKey contains CurvePrivateKey which is an OCTET STRING +/// - The 32-byte seed needs to be wrapped in an OCTET STRING +fn create_ed25519_pkcs8_der(seed: &[u8; 32], _public_key: &[u8]) -> anyhow::Result> { + // OID for Ed25519: 1.3.101.112 + let ed25519_oid: &[u8] = &[0x06, 0x03, 0x2b, 0x65, 0x70]; + + // Build algorithm identifier: SEQUENCE { OID ed25519 } + let algo_seq = wrap_in_sequence(ed25519_oid); + + // CurvePrivateKey ::= OCTET STRING (the 32-byte seed) + // This needs to be wrapped in OCTET STRING for the privateKey field + let inner_private_key = wrap_in_octet_string(seed); + let private_key_octet = wrap_in_octet_string(&inner_private_key); + + // Build PKCS#8 structure (version 0) + let mut pkcs8_content = Vec::new(); + // Version INTEGER 0 + pkcs8_content.extend_from_slice(&[0x02, 0x01, 0x00]); + // Algorithm identifier + pkcs8_content.extend_from_slice(&algo_seq); + // Private key (double-wrapped OCTET STRING) + pkcs8_content.extend_from_slice(&private_key_octet); + + Ok(wrap_in_sequence(&pkcs8_content)) +} + +/// Wrap data in an ASN.1 SEQUENCE +fn wrap_in_sequence(data: &[u8]) -> Vec { + let mut result = Vec::new(); + result.push(0x30); // SEQUENCE tag + write_length(&mut result, data.len()); + result.extend_from_slice(data); + result +} + +/// Wrap data in an ASN.1 OCTET STRING +fn wrap_in_octet_string(data: &[u8]) -> Vec { + let mut result = Vec::new(); + result.push(0x04); // OCTET STRING tag + write_length(&mut result, data.len()); + result.extend_from_slice(data); + result +} + +/// Write ASN.1 DER length encoding +fn write_length(output: &mut Vec, len: usize) { + if len < 128 { + output.push(len as u8); + } else if len < 256 { + output.push(0x81); + output.push(len as u8); + } else { + output.push(0x82); + output.push((len >> 8) as u8); + output.push(len as u8); + } +} + +/// Convert bytes to hex string +fn hex_encode(bytes: &[u8]) -> String { + bytes.iter().map(|b| format!("{:02x}", b)).collect() +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_derive_seed_deterministic() { + let seed1 = derive_seed("test-password", b"test-info").unwrap(); + let seed2 = derive_seed("test-password", b"test-info").unwrap(); + assert_eq!(seed1, seed2); + + let seed3 = derive_seed("different-password", b"test-info").unwrap(); + assert_ne!(seed1, seed3); + + let seed4 = derive_seed("test-password", b"different-info").unwrap(); + assert_ne!(seed1, seed4); + } + + #[test] + fn test_ca_derivation_deterministic() { + let (cert1, _) = derive_ca_from_passphrase("test-cluster-password").unwrap(); + let (cert2, _) = derive_ca_from_passphrase("test-cluster-password").unwrap(); + + // Same passphrase should produce same CA certificate + assert_eq!(cert1.der(), cert2.der()); + + let (cert3, _) = derive_ca_from_passphrase("different-password").unwrap(); + // Different passphrase should produce different CA certificate + assert_ne!(cert1.der(), cert3.der()); + } + + #[test] + fn test_tls_config_creation() { + let config = TlsConfig::from_passphrase("test-password"); + assert!(config.is_ok(), "TLS config creation failed: {:?}", config); + } + + #[test] + fn test_passphrase_hash() { + let config = TlsConfig::from_passphrase("test-password").unwrap(); + let hash = config.passphrase_hash(); + // Hash should be consistent + assert_eq!(hash.len(), 16); // 8 bytes = 16 hex chars + } +} diff --git a/crates/pulsing-actor/tests/actor_tests.rs b/crates/pulsing-actor/tests/actor_tests.rs index 17dc783b8..045de3bd1 100644 --- a/crates/pulsing-actor/tests/actor_tests.rs +++ b/crates/pulsing-actor/tests/actor_tests.rs @@ -254,9 +254,12 @@ mod error_tests { let result: Result = actor_ref.ask(ErrorMessage).await; assert!(result.is_err()); - // Actor should still be alive after error - let response: Pong = actor_ref.ask(Ping { value: 1 }).await.unwrap(); - assert_eq!(response.result, 1); + // With the supervision model, errors cause the actor to crash + // (unless supervision is configured to restart it) + // So subsequent messages will fail with "mailbox closed" + tokio::time::sleep(std::time::Duration::from_millis(10)).await; + let result2: Result = actor_ref.ask(Ping { value: 1 }).await; + assert!(result2.is_err(), "Actor should be dead after error"); let _ = system.shutdown().await; } diff --git a/crates/pulsing-actor/tests/http2_transport_tests.rs b/crates/pulsing-actor/tests/http2_transport_tests.rs index 5990dd59b..6ede3dda2 100644 --- a/crates/pulsing-actor/tests/http2_transport_tests.rs +++ b/crates/pulsing-actor/tests/http2_transport_tests.rs @@ -1399,3 +1399,124 @@ mod tracing_tests { .is_empty()); } } + +// ============================================================================ +// TLS Tests (requires `tls` feature) +// ============================================================================ + +#[cfg(feature = "tls")] +mod tls_tests { + use super::*; + use pulsing_actor::transport::http2::TlsConfig; + + /// Test TLS configuration creation from passphrase + #[test] + fn test_tls_config_from_passphrase() { + let config = TlsConfig::from_passphrase("test-cluster-password"); + assert!(config.is_ok(), "TLS config creation failed: {:?}", config); + } + + /// Test that same passphrase produces deterministic CA + #[test] + fn test_tls_deterministic_ca() { + let config1 = TlsConfig::from_passphrase("deterministic-test-password").unwrap(); + let config2 = TlsConfig::from_passphrase("deterministic-test-password").unwrap(); + + // Both should have the same passphrase hash + assert_eq!(config1.passphrase_hash(), config2.passphrase_hash()); + } + + /// Test that different passphrase produces different CA + #[test] + fn test_tls_different_passphrase() { + let config1 = TlsConfig::from_passphrase("password-one").unwrap(); + let config2 = TlsConfig::from_passphrase("password-two").unwrap(); + + // Different passwords should have different hashes + assert_ne!(config1.passphrase_hash(), config2.passphrase_hash()); + } + + /// Test TLS-enabled HTTP/2 server and client communication + #[tokio::test] + async fn test_tls_server_client_communication() { + let tls_config = TlsConfig::from_passphrase("test-cluster-tls").unwrap(); + + let handler = Arc::new(TestHandler::new()); + let cancel = CancellationToken::new(); + + // Create HTTP/2 config with TLS + let http2_config = Http2Config::default().tls_config(tls_config.clone()); + + // Start TLS server + let server = Http2Server::new( + "127.0.0.1:0".parse().unwrap(), + handler.clone(), + http2_config.clone(), + cancel.clone(), + ) + .await + .unwrap(); + + let addr = server.local_addr(); + + // Create TLS client with same passphrase + let client = Http2Client::new(http2_config); + + // Send request over TLS + let response = client + .ask(addr, "/actors/test", "test-msg", b"hello tls".to_vec()) + .await; + + // Should succeed + assert!(response.is_ok(), "TLS request failed: {:?}", response); + + let body = response.unwrap(); + let response_str = String::from_utf8_lossy(&body); + assert!( + response_str.contains("hello tls"), + "Response should contain original message" + ); + + cancel.cancel(); + } + + /// Test that different passphrase fails TLS handshake + #[tokio::test] + async fn test_tls_different_passphrase_fails() { + let server_tls = TlsConfig::from_passphrase("server-password").unwrap(); + let client_tls = TlsConfig::from_passphrase("wrong-password").unwrap(); + + let handler = Arc::new(TestHandler::new()); + let cancel = CancellationToken::new(); + + // Start TLS server + let server_config = Http2Config::default().tls_config(server_tls); + let server = Http2Server::new( + "127.0.0.1:0".parse().unwrap(), + handler, + server_config, + cancel.clone(), + ) + .await + .unwrap(); + + let addr = server.local_addr(); + + // Create client with WRONG passphrase + let client_config = Http2Config::default().tls_config(client_tls); + let client = Http2Client::new(client_config); + + // Request should fail due to TLS handshake failure + let response = client + .ask(addr, "/actors/test", "test", b"test".to_vec()) + .await; + + // Should fail + assert!( + response.is_err(), + "Request with different passphrase should fail" + ); + + cancel.cancel(); + } +} diff --git a/crates/pulsing-actor/tests/integration_tests.rs b/crates/pulsing-actor/tests/integration_tests.rs index 8aed8a9ec..ce4b4c017 100644 --- a/crates/pulsing-actor/tests/integration_tests.rs +++ b/crates/pulsing-actor/tests/integration_tests.rs @@ -355,9 +355,11 @@ mod error_tests { assert!(result.is_err()); assert_eq!(crash_count.load(Ordering::SeqCst), 1); - // Actor should still respond to normal messages - let response: Pong = actor_ref.ask(Ping { value: 42 }).await.unwrap(); - assert_eq!(response.result, 42); + // With supervision model, errors cause actor to crash (unless supervision is configured) + // So subsequent messages should fail + tokio::time::sleep(std::time::Duration::from_millis(10)).await; + let result2: Result = actor_ref.ask(Ping { value: 42 }).await; + assert!(result2.is_err(), "Actor should be dead after error"); system.shutdown().await.unwrap(); } @@ -377,16 +379,17 @@ mod error_tests { .await .unwrap(); - // Multiple crash messages - for _ in 0..5 { - let _: Result = actor_ref.ask(CrashMessage).await; - } + // First crash message crashes the actor + let _: Result = actor_ref.ask(CrashMessage).await; + assert_eq!(crash_count.load(Ordering::SeqCst), 1); - assert_eq!(crash_count.load(Ordering::SeqCst), 5); + // Actor is now dead - subsequent messages fail with mailbox closed + tokio::time::sleep(std::time::Duration::from_millis(10)).await; + let result: Result = actor_ref.ask(CrashMessage).await; + assert!(result.is_err(), "Actor should be dead after first error"); - // Should still work - let response: Pong = actor_ref.ask(Ping { value: 100 }).await.unwrap(); - assert_eq!(response.result, 100); + // Counter doesn't increment because actor is dead + assert_eq!(crash_count.load(Ordering::SeqCst), 1); system.shutdown().await.unwrap(); } diff --git a/crates/pulsing-actor/tests/supervision_tests.rs b/crates/pulsing-actor/tests/supervision_tests.rs new file mode 100644 index 000000000..154f6919f --- /dev/null +++ b/crates/pulsing-actor/tests/supervision_tests.rs @@ -0,0 +1,138 @@ +use pulsing_actor::prelude::*; +use pulsing_actor::supervision::{BackoffStrategy, SupervisionSpec}; +use std::sync::atomic::{AtomicU32, Ordering}; +use std::sync::Arc; +use std::time::Duration; + +struct FailingActor { + counter: Arc, + fail_at: u32, +} + +#[async_trait] +impl Actor for FailingActor { + async fn receive(&mut self, msg: Message, _ctx: &mut ActorContext) -> anyhow::Result { + let count = self.counter.fetch_add(1, Ordering::SeqCst) + 1; + + if count == self.fail_at { + return Err(anyhow::anyhow!("Boom!")); + } + + // Echo + Ok(msg) + } +} + +#[tokio::test] +async fn test_restart_on_failure() { + let system = ActorSystem::new(SystemConfig::standalone()).await.unwrap(); + let counter = Arc::new(AtomicU32::new(0)); + + let counter_clone = counter.clone(); + let factory = move || { + Ok(FailingActor { + counter: counter_clone.clone(), + fail_at: 2, // Fail on 2nd message + }) + }; + + let spec = SupervisionSpec::on_failure() + .with_max_restarts(3) + .with_backoff(BackoffStrategy::exponential( + Duration::from_millis(10), + Duration::from_millis(100), + )); + + let options = SpawnOptions::new().supervision(spec); + + let actor_ref = system + .spawn_factory("failing", factory, options) + .await + .unwrap(); + + // 1st message - success + let resp = actor_ref.send(Message::single("ping", b"1")).await; + assert!(resp.is_ok()); + + // 2nd message - failure (should crash and restart) + let resp = actor_ref.send(Message::single("ping", b"2")).await; + assert!(resp.is_err()); // The ask fails because the actor crashed handling it + + // Wait a bit for restart + tokio::time::sleep(Duration::from_millis(50)).await; + + // 3rd message - success (new instance) + // Note: counter is shared, so it will continue from 2 -> 3 + let resp = actor_ref.send(Message::single("ping", b"3")).await; + assert!(resp.is_ok()); + + let msg = resp.unwrap(); + if let Message::Single { data, .. } = msg { + assert_eq!(data, b"3"); + } else { + panic!("expected single message"); + } + + assert_eq!(counter.load(Ordering::SeqCst), 3); +} + +#[tokio::test] +async fn test_max_restarts_exceeded() { + let system = ActorSystem::new(SystemConfig::standalone()).await.unwrap(); + let counter = Arc::new(AtomicU32::new(0)); + + let counter_clone = counter.clone(); + // Fail immediately + let factory = move || { + counter_clone.fetch_add(1, Ordering::SeqCst); + Ok(FailingActor { + counter: Arc::new(AtomicU32::new(0)), // Unused + fail_at: 1, // Fail immediately + }) + }; + + let spec = SupervisionSpec::on_failure() + .with_max_restarts(2) // Allow 2 restarts + .with_backoff(BackoffStrategy { + min: Duration::from_millis(1), + max: Duration::from_millis(1), + jitter: 0.0, + factor: 1.0, + }); + + let options = SpawnOptions::new().supervision(spec); + + let actor_ref = system + .spawn_factory("crashing", factory, options) + .await + .unwrap(); + + // 1st crash + let _ = actor_ref.send(Message::single("ping", b"1")).await; + tokio::time::sleep(Duration::from_millis(10)).await; + + // 2nd crash + let _ = actor_ref.send(Message::single("ping", b"2")).await; + tokio::time::sleep(Duration::from_millis(10)).await; + + // 3rd crash + let _ = actor_ref.send(Message::single("ping", b"3")).await; + tokio::time::sleep(Duration::from_millis(10)).await; + + // Should be dead now (Initial start + 2 restarts = 3 failures. Next attempt stops.) + // Wait for supervision loop to exit + tokio::time::sleep(Duration::from_millis(50)).await; + + // Send message to dead actor + let resp = actor_ref.send(Message::single("ping", b"4")).await; + assert!(resp.is_err()); // Mailbox closed + + // Check factory calls: Initial + 2 restarts = 3 calls + // Actually, if it crashes 3 times: + // 1. Start (count=1), Receive -> Crash + // 2. Restart 1 (count=2), Receive -> Crash + // 3. Restart 2 (count=3), Receive -> Crash + // 4. Max restarts exceeded -> Stop + // So factory called 3 times. + assert_eq!(counter.load(Ordering::SeqCst), 3); +} diff --git a/crates/pulsing-py/Cargo.toml b/crates/pulsing-py/Cargo.toml index b47ea9b27..b1792e6ff 100644 --- a/crates/pulsing-py/Cargo.toml +++ b/crates/pulsing-py/Cargo.toml @@ -6,6 +6,10 @@ authors.workspace = true license.workspace = true description = "Python bindings for Pulsing Actor System" +[features] +default = ["tls"] +tls = ["pulsing-actor/tls"] + [lib] name = "_core" crate-type = ["cdylib", "rlib"] diff --git a/crates/pulsing-py/src/actor.rs b/crates/pulsing-py/src/actor.rs index b5a51e368..80d5118c8 100644 --- a/crates/pulsing-py/src/actor.rs +++ b/crates/pulsing-py/src/actor.rs @@ -3,6 +3,7 @@ use futures::StreamExt; use pulsing_actor::actor::{ActorId, ActorPath, NodeId}; use pulsing_actor::prelude::*; +use pulsing_actor::supervision::{BackoffStrategy, RestartPolicy, SupervisionSpec}; use pyo3::exceptions::{PyException, PyRuntimeError, PyStopAsyncIteration, PyValueError}; use pyo3::prelude::*; use pyo3::types::PyBytes; @@ -657,8 +658,32 @@ impl PySystemConfig { }) } + /// Enable TLS with passphrase-derived certificates + /// + /// All nodes using the same passphrase will be able to communicate securely. + /// The passphrase is used to derive a shared CA certificate, enabling + /// automatic mutual TLS authentication. + /// + /// Example: + /// config = SystemConfig.with_addr("0.0.0.0:8000").with_passphrase("my-cluster-secret") + #[cfg(feature = "tls")] + fn with_passphrase(&self, passphrase: String) -> PyResult { + let new_inner = self.inner.clone().with_tls(&passphrase).map_err(to_pyerr)?; + Ok(Self { inner: new_inner }) + } + + /// Check if TLS is enabled + fn is_tls_enabled(&self) -> bool { + self.inner.is_tls_enabled() + } + fn __repr__(&self) -> String { - format!("SystemConfig(addr={})", self.inner.addr) + let tls_status = if self.inner.is_tls_enabled() { + ", tls=enabled" + } else { + "" + }; + format!("SystemConfig(addr={}{})", self.inner.addr, tls_status) } } @@ -913,28 +938,98 @@ impl PyActorSystem { self.inner.addr().to_string() } - #[pyo3(signature = (name, handler, public=false))] + #[pyo3(signature = ( + name, + handler, + public=false, + restart_policy="never", + max_restarts=3, + min_backoff=0.1, + max_backoff=30.0 + ))] + #[allow(clippy::too_many_arguments)] fn spawn<'py>( &self, py: Python<'py>, name: String, handler: PyObject, public: bool, + restart_policy: &str, + max_restarts: u32, + min_backoff: f64, + max_backoff: f64, ) -> PyResult> { let system = self.inner.clone(); let event_loop = self.event_loop.clone(); - pyo3_async_runtimes::tokio::future_into_py(py, async move { - let actor = PythonActorWrapper::new(handler, event_loop); + // Parse supervision config + let policy = match restart_policy.to_lowercase().as_str() { + "always" => RestartPolicy::Always, + "on-failure" | "on_failure" => RestartPolicy::OnFailure, + _ => RestartPolicy::Never, + }; - let actor_ref = if public { - let path = ActorPath::new(format!("actors/{}", name)).map_err(to_pyerr)?; - system - .spawn_named(path, &name, actor) - .await - .map_err(to_pyerr)? + let supervision = if matches!(policy, RestartPolicy::Never) { + SupervisionSpec::never() + } else { + SupervisionSpec { + policy, + max_restarts, + backoff: BackoffStrategy::exponential( + std::time::Duration::from_secs_f64(min_backoff), + std::time::Duration::from_secs_f64(max_backoff), + ), + ..Default::default() + } + }; + + pyo3_async_runtimes::tokio::future_into_py(py, async move { + let options = pulsing_actor::system::SpawnOptions::new() + .public(public) + .supervision(supervision); + + let actor_ref = if matches!(policy, RestartPolicy::Never) { + // handler is the instance + let actor = PythonActorWrapper::new(handler, event_loop); + if public { + let path = ActorPath::new(format!("actors/{}", name)).map_err(to_pyerr)?; + system + .spawn_named_with_options(path, &name, actor, options) + .await + .map_err(to_pyerr)? + } else { + system + .spawn_with_options(&name, actor, options) + .await + .map_err(to_pyerr)? + } } else { - system.spawn(&name, actor).await.map_err(to_pyerr)? + // handler is a factory + let factory = move || { + let handler = handler.clone(); + let event_loop = event_loop.clone(); + + Python::with_gil(|py| -> anyhow::Result { + // Call factory to get instance + let instance = handler + .call0(py) + .map_err(|e| anyhow::anyhow!("Python factory error: {:?}", e))?; + Ok(PythonActorWrapper::new(instance, event_loop)) + }) + }; + + if public { + let path = ActorPath::new(format!("actors/{}", name)).map_err(to_pyerr)?; + system + .spawn_named_factory(path, &name, factory, options) + .await + .map_err(to_pyerr)? + } else { + system + .spawn_factory(&name, factory, options) + .await + .map_err(to_pyerr)? + } }; Ok(PyActorRef { inner: actor_ref }) diff --git a/docs/mkdocs.yml b/docs/mkdocs.yml index 6c799a809..40d5b7f53 100644 --- a/docs/mkdocs.yml +++ b/docs/mkdocs.yml @@ -89,6 +89,7 @@ plugins: Semantics: 语义与保证 Actor System: Actor 系统 Remote Actors: 远程 Actor + Security: 安全 Agent: Agent 框架 Overview: 概述 AutoGen: AutoGen @@ -123,6 +124,7 @@ nav: - Architecture: guide/architecture.md - Actor System: guide/actor_system.md - Remote Actors: guide/remote_actors.md + - Security: guide/security.md - Distributed Queue: guide/queue.md - Semantics: guide/semantics.md - Agent: diff --git a/docs/src/guide/security.md b/docs/src/guide/security.md new file mode 100644 index 000000000..cdc9544b5 --- /dev/null +++ b/docs/src/guide/security.md @@ -0,0 +1,214 @@ +# Security Guide + +Guide to securing your Pulsing cluster with TLS encryption. + +## Overview + +Pulsing supports **passphrase-based mTLS (Mutual TLS)** for secure cluster communication. This innovative design provides: + +- **Zero-configuration PKI**: No need to generate or distribute certificates manually +- **Passphrase-based access**: Nodes with the same passphrase can join the cluster +- **Cluster isolation**: Different passphrases create completely isolated clusters +- **Mutual authentication**: Both server and client verify each other's certificates + +## Enabling TLS + +### Development Mode (No TLS) + +By default, Pulsing uses cleartext HTTP/2 (h2c) for easy debugging: + +```python +from pulsing.actor import SystemConfig, create_actor_system + +# No passphrase - uses cleartext HTTP/2 +config = SystemConfig.with_addr("0.0.0.0:8000") +system = await create_actor_system(config) +``` + +### Production Mode (mTLS) + +To enable TLS encryption, simply set a passphrase: + +```python +# Set passphrase - automatically enables mTLS +config = SystemConfig.with_addr("0.0.0.0:8000").with_passphrase("my-cluster-secret") +system = await create_actor_system(config) +``` + +## Multi-Node Cluster with TLS + +All nodes in a cluster must use the **same passphrase** to communicate: + +```python +# Node 1: Seed node with TLS +config1 = SystemConfig.with_addr("0.0.0.0:8000").with_passphrase("shared-secret") +system1 = await create_actor_system(config1) + +# Node 2: Join cluster with same passphrase +config2 = ( + SystemConfig.with_addr("0.0.0.0:8001") + .with_seeds(["192.168.1.1:8000"]) + .with_passphrase("shared-secret") # Must match! +) +system2 = await create_actor_system(config2) +``` + +!!! warning "Passphrase Mismatch" + Nodes with different passphrases cannot communicate. The TLS handshake will fail. + +## Cluster Isolation + +Different passphrases create completely isolated clusters: + +```python +# Cluster A +cluster_a = SystemConfig.with_addr("0.0.0.0:8000").with_passphrase("secret-a") + +# Cluster B (different passphrase) +cluster_b = SystemConfig.with_addr("0.0.0.0:9000").with_passphrase("secret-b") + +# cluster_a and cluster_b cannot communicate +``` + +## How It Works + +Pulsing uses a **deterministic CA derivation** approach: + +``` +┌─────────────────────────────────────────────────────────────┐ +│ Passphrase (口令) │ +│ "my-secret" │ +└──────────────────────────┬──────────────────────────────────┘ + │ HKDF-SHA256 + ▼ +┌─────────────────────────────────────────────────────────────┐ +│ Deterministic CA Certificate │ +│ (Same passphrase → Same CA cert/key) │ +│ Algorithm: Ed25519 | Validity: 10 years │ +└──────────────────────────┬──────────────────────────────────┘ + │ Signs + ▼ +┌─────────────────────────────────────────────────────────────┐ +│ Node Certificate (per node) │ +│ (Each node generates its own, signed by CA) │ +│ CN: "Pulsing Node " | SAN: pulsing.internal │ +└─────────────────────────────────────────────────────────────┘ +``` + +### Key Features + +| Feature | Description | +|---------|-------------| +| **Mutual Authentication** | Both server and client present certificates | +| **Passphrase = Access** | Only nodes knowing the passphrase can join | +| **Zero-config PKI** | No manual certificate generation/distribution | +| **Deterministic CA** | Same passphrase → Same CA (all nodes trust it) | +| **Isolated Clusters** | Different passphrases = completely separate | + +## Security Best Practices + +### 1. Use Strong Passphrases + +!!! tip "Passphrase Strength" + Use high-entropy random strings for production: + ```python + # Good: High entropy + passphrase = "aX9#mK2$nL5@pQ8&" + + # Bad: Weak/predictable + passphrase = "password123" + ``` + +### 2. Environment Variables + +Store passphrases in environment variables, not code: + +```python +import os + +passphrase = os.environ.get("PULSING_PASSPHRASE") +if passphrase: + config = SystemConfig.with_addr("0.0.0.0:8000").with_passphrase(passphrase) +else: + # Development mode - no TLS + config = SystemConfig.with_addr("0.0.0.0:8000") +``` + +### 3. Rotate Passphrases + +To rotate passphrases: + +1. Deploy new nodes with the new passphrase +2. Gradually migrate actors to new nodes +3. Decommission old nodes + +!!! note "Rolling Updates" + Nodes with different passphrases cannot communicate. Plan for a brief transition period. + +### 4. Network Segmentation + +Even with TLS, use network-level security: + +- Deploy in private VPCs/subnets +- Use firewalls to restrict access +- Consider VPN for cross-datacenter communication + +## Comparison with Other Frameworks + +| Aspect | Pulsing | Ray | Traditional mTLS | +|--------|---------|-----|------------------| +| **Configuration** | 1 line of code | Multiple config files | PKI infrastructure required | +| **Certificate Management** | None needed | Need to distribute certs | Need CA + cert rotation | +| **New Node Join** | Know passphrase | Pre-configure certificates | Issue new certificates | +| **Cluster Isolation** | Different passphrase | Different cert system | Different CA | +| **Crypto Algorithm** | Ed25519 + mTLS | TLS 1.2/1.3 | Depends on config | + +## Limitations + +!!! warning "Current Limitations" + - **No authorization**: Any actor can call any actor (authentication only, not authorization) + - **Pickle serialization**: Message payloads still use Pickle (plan to replace with msgpack) + - **No cert rotation**: Changing passphrase requires cluster restart + +## Example: Secure Distributed Counter + +```python +import os +from pulsing.actor import SystemConfig, create_actor_system, as_actor + +# Get passphrase from environment +PASSPHRASE = os.environ.get("PULSING_SECRET", None) + +@as_actor +class SecureCounter: + def __init__(self, init_value: int = 0): + self.value = init_value + + def get(self) -> int: + return self.value + + def increment(self, n: int = 1) -> int: + self.value += n + return self.value + +async def main(): + # Create config with optional TLS + config = SystemConfig.with_addr("0.0.0.0:8000") + if PASSPHRASE: + config = config.with_passphrase(PASSPHRASE) + + system = await create_actor_system(config) + + # Spawn secure counter + counter = await SecureCounter.local(system, init_value=0) + await system.spawn(counter, "secure-counter", public=True) + + print("Secure counter running...") + print(f"TLS enabled: {PASSPHRASE is not None}") +``` + +## Next Steps + +- Learn about [Remote Actors](remote_actors.md) for cluster communication +- Check [HTTP2 Transport](../design/http2-transport.md) for transport details +- Read [Semantics](semantics.md) for message delivery guarantees diff --git a/docs/src/guide/security.zh.md b/docs/src/guide/security.zh.md new file mode 100644 index 000000000..46320a401 --- /dev/null +++ b/docs/src/guide/security.zh.md @@ -0,0 +1,214 @@ +# 安全指南 + +使用 TLS 加密保护 Pulsing 集群的指南。 + +## 概述 + +Pulsing 支持**基于口令的 mTLS(双向 TLS)**实现安全的集群通信。这种创新设计提供: + +- **零配置 PKI**:无需手动生成或分发证书 +- **口令即准入**:拥有相同口令的节点可以加入集群 +- **集群隔离**:不同口令创建完全隔离的集群 +- **双向认证**:服务端和客户端互相验证证书 + +## 启用 TLS + +### 开发模式(无 TLS) + +默认情况下,Pulsing 使用明文 HTTP/2 (h2c) 便于调试: + +```python +from pulsing.actor import SystemConfig, create_actor_system + +# 不设置口令 - 使用明文 HTTP/2 +config = SystemConfig.with_addr("0.0.0.0:8000") +system = await create_actor_system(config) +``` + +### 生产模式(mTLS) + +要启用 TLS 加密,只需设置口令: + +```python +# 设置口令 - 自动启用 mTLS +config = SystemConfig.with_addr("0.0.0.0:8000").with_passphrase("my-cluster-secret") +system = await create_actor_system(config) +``` + +## 多节点 TLS 集群 + +集群中的所有节点必须使用**相同口令**才能通信: + +```python +# Node 1: 带 TLS 的种子节点 +config1 = SystemConfig.with_addr("0.0.0.0:8000").with_passphrase("shared-secret") +system1 = await create_actor_system(config1) + +# Node 2: 使用相同口令加入集群 +config2 = ( + SystemConfig.with_addr("0.0.0.0:8001") + .with_seeds(["192.168.1.1:8000"]) + .with_passphrase("shared-secret") # 必须匹配! +) +system2 = await create_actor_system(config2) +``` + +!!! warning "口令不匹配" + 使用不同口令的节点无法通信。TLS 握手将失败。 + +## 集群隔离 + +不同口令创建完全隔离的集群: + +```python +# 集群 A +cluster_a = SystemConfig.with_addr("0.0.0.0:8000").with_passphrase("secret-a") + +# 集群 B(不同口令) +cluster_b = SystemConfig.with_addr("0.0.0.0:9000").with_passphrase("secret-b") + +# cluster_a 和 cluster_b 无法通信 +``` + +## 工作原理 + +Pulsing 使用**确定性 CA 派生**方法: + +``` +┌─────────────────────────────────────────────────────────────┐ +│ 口令 (Passphrase) │ +│ "my-secret" │ +└──────────────────────────┬──────────────────────────────────┘ + │ HKDF-SHA256 + ▼ +┌─────────────────────────────────────────────────────────────┐ +│ 确定性 CA 证书 │ +│ (相同口令 → 相同 CA 证书和私钥) │ +│ 算法: Ed25519 | 有效期: 10年 │ +└──────────────────────────┬──────────────────────────────────┘ + │ 签名 + ▼ +┌─────────────────────────────────────────────────────────────┐ +│ 节点证书 (每个节点独立) │ +│ (每个节点独立生成,由 CA 签名) │ +│ CN: "Pulsing Node " | SAN: pulsing.internal │ +└─────────────────────────────────────────────────────────────┘ +``` + +### 核心特性 + +| 特性 | 说明 | +|------|------| +| **双向认证** | 服务端和客户端都需要提供证书 | +| **口令即准入** | 只有知道口令的节点才能加入 | +| **零配置 PKI** | 无需手动生成/分发证书 | +| **确定性 CA** | 相同口令 → 相同 CA(所有节点信任) | +| **隔离集群** | 不同口令的集群完全隔离 | + +## 安全最佳实践 + +### 1. 使用强口令 + +!!! tip "口令强度" + 生产环境使用高熵随机字符串: + ```python + # 好:高熵值 + passphrase = "aX9#mK2$nL5@pQ8&" + + # 差:弱/可预测 + passphrase = "password123" + ``` + +### 2. 使用环境变量 + +将口令存储在环境变量中,而不是代码中: + +```python +import os + +passphrase = os.environ.get("PULSING_PASSPHRASE") +if passphrase: + config = SystemConfig.with_addr("0.0.0.0:8000").with_passphrase(passphrase) +else: + # 开发模式 - 无 TLS + config = SystemConfig.with_addr("0.0.0.0:8000") +``` + +### 3. 口令轮换 + +轮换口令的步骤: + +1. 使用新口令部署新节点 +2. 逐步将 Actor 迁移到新节点 +3. 下线旧节点 + +!!! note "滚动更新" + 使用不同口令的节点无法通信。请规划短暂的过渡期。 + +### 4. 网络隔离 + +即使启用 TLS,也应使用网络级安全措施: + +- 部署在私有 VPC/子网中 +- 使用防火墙限制访问 +- 跨数据中心通信考虑使用 VPN + +## 与其他框架对比 + +| 方面 | Pulsing | Ray | 传统 mTLS | +|------|---------|-----|-----------| +| **配置复杂度** | 1 行代码 | 多个配置文件 | 需要 PKI 基础设施 | +| **证书管理** | 无需管理 | 需要分发证书 | 需要 CA + 证书轮换 | +| **新节点加入** | 知道口令即可 | 需要预配置证书 | 需要签发新证书 | +| **集群隔离** | 不同口令 | 不同证书体系 | 不同 CA | +| **加密算法** | Ed25519 + mTLS | TLS 1.2/1.3 | 取决于配置 | + +## 当前限制 + +!!! warning "当前限制" + - **无授权机制**:任何 Actor 可以调用任何 Actor(仅认证,无授权) + - **Pickle 序列化**:消息载荷仍使用 Pickle(计划替换为 msgpack) + - **无证书轮换**:更换口令需要重启集群 + +## 示例:安全分布式计数器 + +```python +import os +from pulsing.actor import SystemConfig, create_actor_system, as_actor + +# 从环境变量获取口令 +PASSPHRASE = os.environ.get("PULSING_SECRET", None) + +@as_actor +class SecureCounter: + def __init__(self, init_value: int = 0): + self.value = init_value + + def get(self) -> int: + return self.value + + def increment(self, n: int = 1) -> int: + self.value += n + return self.value + +async def main(): + # 创建配置,可选启用 TLS + config = SystemConfig.with_addr("0.0.0.0:8000") + if PASSPHRASE: + config = config.with_passphrase(PASSPHRASE) + + system = await create_actor_system(config) + + # 生成安全计数器 + counter = await SecureCounter.local(system, init_value=0) + await system.spawn(counter, "secure-counter", public=True) + + print("安全计数器运行中...") + print(f"TLS 已启用: {PASSPHRASE is not None}") +``` + +## 下一步 + +- 了解[远程 Actor](remote_actors.zh.md) 的集群通信 +- 查看 [HTTP2 传输](../design/http2-transport.md) 了解传输细节 +- 阅读[语义与保证](semantics.zh.md) 了解消息传递保证 diff --git a/python/pulsing/actor/remote.py b/python/pulsing/actor/remote.py index 90d66a44e..ff1b4d5fb 100644 --- a/python/pulsing/actor/remote.py +++ b/python/pulsing/actor/remote.py @@ -207,6 +207,12 @@ async def _create_actor(self, data: dict) -> Message: kwargs = data.get("kwargs", {}) public = data.get("public", True) + # Supervision config + restart_policy = data.get("restart_policy", "never") + max_restarts = data.get("max_restarts", 3) + min_backoff = data.get("min_backoff", 0.1) + max_backoff = data.get("max_backoff", 30.0) + cls = _actor_class_registry.get(class_name) if cls is None: return Message.from_json( @@ -214,9 +220,26 @@ async def _create_actor(self, data: dict) -> Message: ) try: - instance = cls(*args, **kwargs) - actor = _WrappedActor(instance) - actor_ref = await self.system.spawn(actor_name, actor, public=public) + if restart_policy != "never": + # For supervision, we must provide a factory + def factory(): + instance = cls(*args, **kwargs) + return _WrappedActor(instance) + + actor_ref = await self.system.spawn( + actor_name, + factory, + public=public, + restart_policy=restart_policy, + max_restarts=max_restarts, + min_backoff=min_backoff, + max_backoff=max_backoff, + ) + else: + # Standard spawn + instance = cls(*args, **kwargs) + actor = _WrappedActor(instance) + actor_ref = await self.system.spawn(actor_name, actor, public=public) method_names = [ n @@ -240,9 +263,21 @@ async def _create_actor(self, data: dict) -> Message: class ActorClass: """Actor class wrapper""" - def __init__(self, cls: type): + def __init__( + self, + cls: type, + restart_policy: str = "never", + max_restarts: int = 3, + min_backoff: float = 0.1, + max_backoff: float = 30.0, + ): self._cls = cls self._class_name = f"{cls.__module__}.{cls.__name__}" + self._restart_policy = restart_policy + self._max_restarts = max_restarts + self._min_backoff = min_backoff + self._max_backoff = max_backoff + self._methods = [ n for n, _ in inspect.getmembers(cls, predicate=inspect.isfunction) @@ -263,10 +298,27 @@ async def local( Note: Use create_actor_system() to create ActorSystem, which automatically registers PythonActorService. """ - instance = self._cls(*args, **kwargs) - actor = _WrappedActor(instance) actor_name = name or f"{self._cls.__name__}_{uuid.uuid4().hex[:8]}" - actor_ref = await system.spawn(actor_name, actor, public=True) + + if self._restart_policy != "never": + + def factory(): + instance = self._cls(*args, **kwargs) + return _WrappedActor(instance) + + actor_ref = await system.spawn( + actor_name, + factory, + public=True, + restart_policy=self._restart_policy, + max_restarts=self._max_restarts, + min_backoff=self._min_backoff, + max_backoff=self._max_backoff, + ) + else: + instance = self._cls(*args, **kwargs) + actor = _WrappedActor(instance) + actor_ref = await system.spawn(actor_name, actor, public=True) return ActorProxy(actor_ref, self._methods) @@ -315,6 +367,11 @@ async def remote( "args": list(args), "kwargs": kwargs, "public": True, + # Supervision config + "restart_policy": self._restart_policy, + "max_restarts": self._max_restarts, + "min_backoff": self._min_backoff, + "max_backoff": self._max_backoff, }, ) ) @@ -336,31 +393,43 @@ def __call__(self, *args, **kwargs): return self._cls(*args, **kwargs) -def as_actor(cls: type[T]) -> ActorClass: +def as_actor( + cls: type[T] | None = None, + *, + restart_policy: str = "never", + max_restarts: int = 3, + min_backoff: float = 0.1, + max_backoff: float = 30.0, +) -> ActorClass: """@as_actor decorator Converts a regular class into a distributed deployable Actor. + Supports supervision configuration: + - restart_policy: "never" (default), "always", "on-failure" + - max_restarts: maximum number of restarts (default: 3) + - min_backoff: minimum backoff in seconds (default: 0.1) + - max_backoff: maximum backoff in seconds (default: 30.0) + Example: - @as_actor + @as_actor(restart_policy="on-failure", max_restarts=5) class Counter: - def __init__(self, init_value=0): - self.value = init_value - - def increment(self, n=1): - self.value += n - return self.value + ... + """ - # Local creation - counter = await Counter.local(system, init_value=10) + def wrapper(cls): + return ActorClass( + cls, + restart_policy=restart_policy, + max_restarts=max_restarts, + min_backoff=min_backoff, + max_backoff=max_backoff, + ) - # Remote creation - counter = await Counter.remote(system, init_value=10) + if cls is None: + return wrapper - # Call - result = await counter.increment(5) - """ - return ActorClass(cls) + return wrapper(cls) # ============================================================================