From 2cef393f666c963a40bc9e6438dd57c7949ed705 Mon Sep 17 00:00:00 2001 From: Ignacio Duart Date: Sun, 23 Mar 2025 00:58:54 +0100 Subject: [PATCH 1/6] wip --- crates/core/src/contract.rs | 264 ++++++------ crates/core/src/contract/executor.rs | 187 +++++--- crates/core/src/contract/executor/runtime.rs | 424 +++++++++++-------- crates/core/src/contract/handler.rs | 27 +- 4 files changed, 551 insertions(+), 351 deletions(-) diff --git a/crates/core/src/contract.rs b/crates/core/src/contract.rs index c8c51c480..8d255e0ba 100644 --- a/crates/core/src/contract.rs +++ b/crates/core/src/contract.rs @@ -23,64 +23,95 @@ pub(crate) use handler::{ pub use executor::{Executor, ExecutorError, OperationMode}; use executor::ContractExecutor; -use tracing::Instrument; pub(crate) async fn contract_handling(mut contract_handler: CH) -> Result<(), ContractError> where CH: ContractHandler + Send + 'static, { + use tokio::task::JoinSet; + + let mut pending_tasks = JoinSet::new(); + loop { - let (id, event) = contract_handler.channel().recv_from_sender().await?; + // Check for completed tasks and send their responses via contract_handler.channel() + while let Some(result) = pending_tasks.try_join_next() { + match result { + Ok((id, event, executor)) => { + // Return the executor back to the pool + contract_handler.executor().return_executor(executor); + + // Send the result using the contract_handler's channel + if let Err(error) = contract_handler.channel().send_to_sender(id, event).await { + tracing::debug!(%error, "shutting down contract handler"); + } + } + Err(e) => { + tracing::error!("Task error: {:?}", e); + // Create a new executor to replace the one that failed + let new_executor = contract_handler.executor().create_new_executor().await; + contract_handler.executor().return_executor(new_executor); + tracing::info!("Created replacement executor after task failure"); + } + } + } + + // Wait for next event with a timeout to allow checking pending tasks + let recv_result = tokio::time::timeout( + std::time::Duration::from_millis(100), + contract_handler.channel().recv_from_sender(), + ) + .await; + + let (id, event) = match recv_result { + Ok(Ok(result)) => result, + Ok(Err(e)) => return Err(e), + Err(_) => continue, // Timeout, continue to check pending tasks + }; + tracing::debug!(%event, "Got contract handling event"); + match event { ContractHandlerEvent::GetQuery { key, return_contract_code, } => { - match contract_handler - .executor() - .fetch_contract(key, return_contract_code) - .instrument(tracing::info_span!("fetch_contract", %key, %return_contract_code)) - .await - { - Ok((state, contract)) => { - tracing::debug!(with_contract_code = %return_contract_code, has_contract = %contract.is_some(), "Fetched contract {key}"); - contract_handler - .channel() - .send_to_sender( - id, - ContractHandlerEvent::GetResponse { - key, - response: Ok(StoreResponse { state, contract }), - }, - ) - .await - .map_err(|error| { - tracing::debug!(%error, "shutting down contract handler"); - error - })?; - } - Err(err) => { - tracing::warn!("Error while executing get contract query: {err}"); - if err.is_fatal() { - todo!("Handle fatal error; reset executor"); + // Clone needed values for the task + let fetch_contract = contract_handler.executor().fetch_contract(key, return_contract_code).await; + let id_clone = id; + + pending_tasks.spawn(async move { + let span = tracing::info_span!("fetch_contract", %key, %return_contract_code); + let _guard = span.enter(); + + let (executor, result) = fetch_contract.await; + + let response_event = match result { + Ok((state, contract)) => { + tracing::debug!(with_contract_code = %return_contract_code, + has_contract = %contract.is_some(), + "Fetched contract {key}"); + + ContractHandlerEvent::GetResponse { + key, + response: Ok(StoreResponse { state, contract }), + } } - contract_handler - .channel() - .send_to_sender( - id, - ContractHandlerEvent::GetResponse { - key, - response: Err(err), - }, - ) - .await - .map_err(|error| { - tracing::debug!(%error, "shutting down contract handler"); - error - })?; - } - } + Err(err) => { + tracing::warn!("Error while executing get contract query: {err}"); + + if err.is_fatal() { + tracing::error!("Fatal error encountered in executor"); + } + + ContractHandlerEvent::GetResponse { + key, + response: Err(err), + } + } + }; + + (id_clone, response_event, executor) + }); } ContractHandlerEvent::PutQuery { key, @@ -88,43 +119,36 @@ where related_contracts, contract, } => { - let put_result = contract_handler - .executor() - .upsert_contract_state( - key, - Either::Left(state.clone()), - related_contracts, - contract, - ) - .instrument(tracing::info_span!("upsert_contract_state", %key)) - .await; - - let event_result = match put_result { - Ok(UpsertResult::NoChange) => ContractHandlerEvent::PutResponse { - new_value: Ok(state), - }, - Ok(UpsertResult::Updated(state)) => ContractHandlerEvent::PutResponse { - new_value: Ok(state), - }, - Err(err) => { - if err.is_fatal() { - todo!("Handle fatal error; reset executor"); - } - ContractHandlerEvent::PutResponse { - new_value: Err(err), + // Clone needed values for the task + let put_future = contract_handler.executor().upsert_contract_state(key, Either::Left(state.clone()), related_contracts, contract).await; + + pending_tasks.spawn(async move { + let span = tracing::info_span!("upsert_contract_state", %key); + let _guard = span.enter(); + + let (executor, result) = put_future.await; + + let event_result = match result { + Ok(UpsertResult::NoChange) => ContractHandlerEvent::PutResponse { + new_value: Ok(state), + }, + Ok(UpsertResult::Updated(state)) => ContractHandlerEvent::PutResponse { + new_value: Ok(state), + }, + Err(err) => { + if err.is_fatal() { + tracing::error!("Fatal error in executor during put"); + } + ContractHandlerEvent::PutResponse { + new_value: Err(err), + } } - } - }; + }; - contract_handler - .channel() - .send_to_sender(id, event_result) - .await - .map_err(|error| { - tracing::debug!(%error, "shutting down contract handler"); - error - })?; + (id, event_result, executor) + }); } + ContractHandlerEvent::UpdateQuery { key, data, @@ -137,58 +161,62 @@ where freenet_stdlib::prelude::UpdateData::Delta(delta) => Either::Right(delta), _ => unreachable!(), }; - let update_result = contract_handler - .executor() - .upsert_contract_state(key, update_value, related_contracts, None) - .instrument(tracing::info_span!("upsert_contract_state", %key)) - .await; - - let event_result = match update_result { - Ok(UpsertResult::NoChange) => ContractHandlerEvent::UpdateNoChange { key }, - Ok(UpsertResult::Updated(state)) => ContractHandlerEvent::UpdateResponse { - new_value: Ok(state), - }, - Err(err) => { - if err.is_fatal() { - todo!("Handle fatal error; reset executor"); - } - ContractHandlerEvent::UpdateResponse { - new_value: Err(err), + let update_future = contract_handler.executor().upsert_contract_state(key, update_value, related_contracts, None).await; + + pending_tasks.spawn(async move { + let span = tracing::info_span!("upsert_contract_state", %key); + let _guard = span.enter(); + + let (executor, result) = update_future.await; + + let event_result = match result { + Ok(UpsertResult::NoChange) => ContractHandlerEvent::UpdateNoChange { key }, + Ok(UpsertResult::Updated(state)) => ContractHandlerEvent::UpdateResponse { + new_value: Ok(state), + }, + Err(err) => { + if err.is_fatal() { + tracing::error!("Fatal error in executor during update"); + } + ContractHandlerEvent::UpdateResponse { + new_value: Err(err), + } } - } - }; + }; - contract_handler - .channel() - .send_to_sender(id, event_result) - .await - .map_err(|error| { - tracing::debug!(%error, "shutting down contract handler"); - error - })?; + (id, event_result, executor) + }); } + ContractHandlerEvent::RegisterSubscriberListener { key, client_id, summary, subscriber_listener, } => { - let _ = contract_handler - .executor() - .register_contract_notifier(key, client_id, subscriber_listener, summary) - .inspect_err(|err| { - tracing::warn!("Error while registering subscriber listener: {err}"); - }); - - // FIXME: if there is an error senc actually an error back - contract_handler + let result = contract_handler.executor().register_contract_notifier( + key, + client_id, + subscriber_listener, + summary, + ); + + if let Err(err) = &result { + tracing::warn!("Error while registering subscriber listener: {err}"); + } + + if let Err(error) = contract_handler .channel() .send_to_sender(id, ContractHandlerEvent::RegisterSubscriberListenerResponse) .await - .inspect_err(|error| { - tracing::debug!(%error, "shutting down contract handler"); - })?; + { + tracing::debug!(%error, "shutting down contract handler"); + return Err(ContractError::ChannelDropped(Box::new( + ContractHandlerEvent::RegisterSubscriberListenerResponse, + ))); + } } + _ => unreachable!(), } } diff --git a/crates/core/src/contract/executor.rs b/crates/core/src/contract/executor.rs index c1a1fe3af..e6079b195 100644 --- a/crates/core/src/contract/executor.rs +++ b/crates/core/src/contract/executor.rs @@ -202,7 +202,7 @@ impl Display for OperationMode { } pub struct ExecutorToEventLoopChannel { - op_manager: Arc, + pub(super) op_manager: Arc, end: End, } @@ -246,31 +246,12 @@ enum CallbackError { } impl ExecutorToEventLoopChannel { - async fn send_to_event_loop(&mut self, message: T) -> anyhow::Result - where - T: ComposeNetworkMessage, - Op: Operation + Send + 'static, - { - let op = message.initiate_op(&self.op_manager); - let tx = *op.id(); - self.end.waiting_for_op_tx.send(tx).await.inspect_err(|_| { - tracing::debug!("failed to send request to executor, channel closed"); - })?; - >::resume_op(op, &self.op_manager) - .await - .map_err(|e| { - tracing::debug!("failed to resume operation: {e}"); - e - })?; - Ok(tx) - } - - async fn receive_op_result(&mut self, transaction: Transaction) -> Result - where - Op: Operation + TryFrom, - { + async fn receive_op_result( + &mut self, + transaction: Transaction, + ) -> Result { if let Some(result) = self.end.completed.remove(&transaction) { - return result.try_into().map_err(CallbackError::Conversion); + return Ok(result); } let op_result = self .end @@ -282,7 +263,54 @@ impl ExecutorToEventLoopChannel { self.end.completed.insert(*op_result.id(), op_result); return Err(CallbackError::MissingResult); } - op_result.try_into().map_err(CallbackError::Conversion) + Ok(op_result) + } + + pub async fn handle_operation_result( + mut self, + mut to_process: mpsc::Receiver<( + Transaction, + tokio::sync::oneshot::Sender>, + )>, + ) { + let mut waiting = Vec::new(); + // This loop should never exit under normal operation + loop { + tokio::select! { + // Process any new transaction request + Some((tx, cb)) = to_process.recv() => { + // Try to get the result for this transaction + let op_res = self.receive_op_result(tx).await; + if let Err(CallbackError::MissingResult) = &op_res { + waiting.push((tx, cb)); + } else { + cb.send(op_res).unwrap_or_else(|_| { + tracing::debug!("Error sending callback result to executor"); + }); + } + } + // Process any received response that might match a waiting transaction + Some(op_result) = self.end.response_for_rx.recv(), if !waiting.is_empty() => { + let tx = *op_result.id(); + // Check if this response matches any waiting transaction + if let Some(position) = waiting.iter().position(|(wait_tx, _)| *wait_tx == tx) { + let (_, cb) = waiting.swap_remove(position); + cb.send(Ok(op_result)).unwrap_or_else(|_| { + tracing::debug!("Error sending callback result for waiting transaction"); + }); + } else { + // Store the result for future requests + self.end.completed.insert(tx, op_result); + } + } + else => { + tracing::debug!("All channels closed, shutting down operation result handler"); + break; + } + } + } + + tracing::warn!("Operation result handler shutting down unexpectedly"); } } @@ -447,22 +475,43 @@ impl ComposeNetworkMessage for UpdateContract { } } +pub(crate) type FetchContractR = + Result<(Option, Option), ExecutorError>; +pub(crate) type UpsertContractR = Result; + +/// A trait for contract execution, storage, and notification. +/// +/// This trait abstracts the capabilities required for contract lifecycle management: +/// - Fetching contracts from storage +/// - Updating contract state +/// - Managing notifications to interested clients +/// - Handling executor instances +/// +/// Implementations must be thread-safe (Send) and have a static lifetime. pub(crate) trait ContractExecutor: Send + 'static { + type InnerExecutor: Send + 'static; + + /// Fetches a contract from the store. fn fetch_contract( &mut self, key: ContractKey, return_contract_code: bool, - ) -> impl Future, Option), ExecutorError>> - + Send; + ) -> impl Future< + Output = impl Future + Send + 'static, + > + Send; + /// Updates the contract state in the store. fn upsert_contract_state( &mut self, key: ContractKey, update: Either>, related_contracts: RelatedContracts<'static>, code: Option, - ) -> impl Future> + Send; + ) -> impl Future< + Output = impl Future + Send + 'static, + > + Send; + /// Registers a contract notifier for a specific contract key. fn register_contract_notifier( &mut self, key: ContractKey, @@ -470,6 +519,13 @@ pub(crate) trait ContractExecutor: Send + 'static { notification_ch: tokio::sync::mpsc::UnboundedSender, summary: Option>, ) -> Result<(), Box>; + + /// Returns the current executor instance. + fn return_executor(&mut self, executor: Self::InnerExecutor); + + /// Creates a new executor instance when an error occurs with the current one. + /// This method should never fail - it must always return a working executor. + fn create_new_executor(&mut self) -> impl Future + Send; } /// A WASM executor which will run any contracts, delegates, etc. registered. @@ -488,19 +544,24 @@ pub struct Executor { /// Attested contract instances for a given delegate. delegate_attested_ids: HashMap>, - event_loop_channel: Option>, + op_sender: mpsc::Sender<( + Transaction, + tokio::sync::oneshot::Sender>, + )>, + op_manager: Arc, } impl Executor { pub async fn new( state_store: StateStore, - ctrl_handler: impl FnOnce() -> anyhow::Result<()>, mode: OperationMode, runtime: R, - event_loop_channel: Option>, + op_sender: mpsc::Sender<( + Transaction, + tokio::sync::oneshot::Sender>, + )>, + op_manager: Arc, ) -> anyhow::Result { - ctrl_handler()?; - Ok(Self { mode, runtime, @@ -508,7 +569,8 @@ impl Executor { update_notifications: HashMap::default(), subscriber_summaries: HashMap::default(), delegate_attested_ids: HashMap::default(), - event_loop_channel, + op_sender, + op_manager, }) } @@ -547,33 +609,42 @@ impl Executor { ::Result: TryFrom, M: ComposeNetworkMessage, { - let Some(ch) = &mut self.event_loop_channel else { - return Err(ExecutorError::other(anyhow::anyhow!( - "missing event loop channel" - ))); - }; - let transaction = ch - .send_to_event_loop(request) + let op = request.initiate_op(&self.op_manager); + let tx = *op.id(); + let (cb_s, cb) = tokio::sync::oneshot::channel(); + self.op_sender + .send((tx, cb_s)) .await - .map_err(ExecutorError::other)?; - // FIXME: must add a way to suspend a request while waiting for result and resume upon getting - // an answer back so we don't block the executor itself. - // otherwise it may be possible to end up in a deadlock waiting for a tree of contract - // dependencies to be resolved - let result = loop { - match ch.receive_op_result::(transaction).await { - Ok(result) => break result, - Err(CallbackError::MissingResult) => { - tokio::time::sleep(Duration::from_secs(10)).await; - continue; - } - Err(CallbackError::Conversion(err)) => { - tracing::error!("expect message of one type but got an other: {err}"); - return Err(ExecutorError::other(err)); - } + .inspect_err(|_| { + tracing::debug!("failed to send request to executor, channel closed"); + }) + .map_err(|e| { + tracing::debug!("failed to send request to executor: {e}"); + ExecutorError::other(anyhow::anyhow!("channel closed")) + })?; + >::resume_op(op, &self.op_manager) + .await + .map_err(|e| { + tracing::debug!("failed to resume operation: {e}"); + ExecutorError::other(e) + })?; + + let result = cb.await.map_err(|_| { + tracing::debug!("failed to receive callback from executor, channel closed"); + ExecutorError::other(anyhow::anyhow!("channel closed")) + })?; + + let result: Op = { + match result { + Ok(result) => result.try_into().map_err(|err| { + tracing::debug!("failed to convert callback result: {err}"); + ExecutorError::other(err) + })?, Err(CallbackError::Err(other)) => return Err(other), + _ => unreachable!(), } }; + let result = ::try_from(result).map_err(|err| { tracing::debug!("didn't get result back: {err}"); ExecutorError::other(err) diff --git a/crates/core/src/contract/executor/runtime.rs b/crates/core/src/contract/executor/runtime.rs index 83fe4b555..092573025 100644 --- a/crates/core/src/contract/executor/runtime.rs +++ b/crates/core/src/contract/executor/runtime.rs @@ -1,18 +1,86 @@ use super::*; use super::{ - ContractExecutor, ContractRequest, ContractResponse, ExecutorError, ExecutorHalve, - ExecutorToEventLoopChannel, RequestError, Response, StateStoreError, + ContractExecutor, ContractRequest, ContractResponse, ExecutorError, RequestError, Response, + StateStoreError, }; +use std::num::NonZeroUsize; +use std::sync::Arc; +use tokio::sync::Semaphore; + +pub(in crate::contract) struct RuntimePool { + // Keeping track of available executors + runtimes: Vec>>, + // Semaphore to control access to executors + available: Semaphore, + config: Arc, + op_sender: mpsc::Sender<( + Transaction, + tokio::sync::oneshot::Sender>, + )>, + op_manager: Arc, +} + +impl RuntimePool { + /// Create a new pool with the given number of runtime executors + pub async fn new( + config: Arc, + op_sender: mpsc::Sender<( + Transaction, + tokio::sync::oneshot::Sender>, + )>, + op_manager: Arc, + pool_size: NonZeroUsize, + ) -> anyhow::Result { + let mut runtimes = Vec::with_capacity(pool_size.into()); + + for _ in 0..pool_size.into() { + let executor = + Executor::from_config(config.clone(), op_sender.clone(), op_manager.clone()) + .await?; + runtimes.push(Some(executor)); + } + + Ok(Self { + runtimes, + available: Semaphore::new(pool_size.into()), + config, + op_sender, + op_manager, + }) + } + + // Pop an executor from the pool - blocks until one is available + async fn pop_executor(&mut self) -> Executor { + // Wait for an available permit + let _ = self.available.acquire().await.expect("Semaphore is closed"); + + // Find the first available executor + for slot in &mut self.runtimes { + if let Some(executor) = slot.take() { + return executor; + } + } + + // This should never happen because of the semaphore + unreachable!("No executors available despite semaphore permit") + } +} + +impl ContractExecutor for RuntimePool { + type InnerExecutor = Executor; -impl ContractExecutor for Executor { async fn fetch_contract( &mut self, key: ContractKey, return_contract_code: bool, - ) -> Result<(Option, Option), ExecutorError> { - match self.perform_contract_get(return_contract_code, key).await { - Ok((state, code)) => Ok((state, code)), - Err(err) => Err(err), + ) -> impl Future + Send + 'static { + let mut executor = self.pop_executor().await; + + async move { + let result = executor + .perform_contract_get(return_contract_code, key) + .await; + (executor, result) } } @@ -22,136 +90,140 @@ impl ContractExecutor for Executor { update: Either>, related_contracts: RelatedContracts<'static>, code: Option, - ) -> Result { - let params = if let Some(code) = &code { - code.params() - } else { - self.state_store - .get_params(&key) - .await - .map_err(ExecutorError::other)? - .ok_or_else(|| { - ExecutorError::request(StdContractError::Put { - key, - cause: "missing contract parameters".into(), - }) - })? - }; + ) -> impl Future + Send + 'static { + let mut executor = self.pop_executor().await; + + async move { + let result = match update { + Either::Left(state) => { + // For state updates with simplified implementation + let params = if let Some(code) = &code { + code.params() + } else { + match executor.state_store.get_params(&key).await { + Ok(Some(params)) => params, + Ok(None) => { + return ( + executor, + Err(ExecutorError::request(StdContractError::Put { + key, + cause: "missing contract parameters".into(), + })), + ) + } + Err(err) => return (executor, Err(ExecutorError::other(err))), + } + }; - let remove_if_fail = if self - .runtime - .contract_store - .fetch_contract(&key, ¶ms) - .is_none() - { - let code = code.ok_or_else(|| { - ExecutorError::request(StdContractError::MissingContract { key: key.into() }) - })?; - self.runtime - .contract_store - .store_contract(code.clone()) - .map_err(ExecutorError::other)?; - true - } else { - false - }; + let remove_if_fail = if executor + .runtime + .contract_store + .fetch_contract(&key, ¶ms) + .is_none() + { + if let Some(code) = code { + if let Err(err) = + executor.runtime.contract_store.store_contract(code.clone()) + { + return (executor, Err(ExecutorError::other(err))); + } + true + } else { + return ( + executor, + Err(ExecutorError::request(StdContractError::MissingContract { + key: key.into(), + })), + ); + } + } else { + false + }; - let mut updates = match update { - Either::Left(incoming_state) => { - let result = self - .runtime - .validate_state(&key, ¶ms, &incoming_state, &related_contracts) - .map_err(|err| { - if remove_if_fail { - let _ = self.runtime.contract_store.remove_contract(&key); + // Validate the state + match executor + .runtime + .validate_state(&key, ¶ms, &state, &related_contracts) + { + Ok(validate_result) => { + match validate_result { + ValidateResult::Valid => { + // Store and send notifications + if let Err(err) = executor + .state_store + .store(key, state.clone(), params.clone()) + .await + { + return (executor, Err(ExecutorError::other(err))); + } + + // Attempt to send notifications + if let Err(err) = executor + .send_update_notification(&key, ¶ms, &state) + .await + { + tracing::error!("Failed to send notifications: {}", err); + } + + (executor, Ok(UpsertResult::Updated(state))) + } + ValidateResult::Invalid => ( + executor, + Err(ExecutorError::request(StdContractError::invalid_put(key))), + ), + ValidateResult::RequestRelated(mut related) => { + if let Some(key) = related.pop() { + ( + executor, + Err(ExecutorError::request( + StdContractError::MissingRelated { key }, + )), + ) + } else { + (executor, Err(ExecutorError::internal_error())) + } + } + } } - ExecutorError::execution(err, None) - })?; - match result { - ValidateResult::Valid => { - self.state_store - .store(key, incoming_state.clone(), params.clone()) - .await - .map_err(ExecutorError::other)?; - } - ValidateResult::Invalid => { - return Err(ExecutorError::request(StdContractError::invalid_put(key))); - } - ValidateResult::RequestRelated(mut related) => { - if let Some(key) = related.pop() { - return Err(ExecutorError::request(StdContractError::MissingRelated { - key, - })); - } else { - return Err(ExecutorError::internal_error()); + Err(err) => { + if remove_if_fail { + let _ = executor.runtime.contract_store.remove_contract(&key); + } + (executor, Err(ExecutorError::execution(err, None))) } } } - - vec![UpdateData::State(incoming_state.clone().into())] - } - Either::Right(delta) => { - // todo: forward delta like we are doing with puts - tracing::warn!("Delta updates are not yet supported"); - vec![UpdateData::Delta(delta)] - } - }; - - let current_state = match self.state_store.get(&key).await { - Ok(s) => s, - Err(StateStoreError::MissingContract(_)) => { - tracing::warn!("Missing contract {key} for upsert"); - return Err(ExecutorError::request(StdContractError::MissingContract { - key: key.into(), - })); - } - Err(StateStoreError::Any(err)) => return Err(ExecutorError::other(err)), - }; - - for (id, state) in related_contracts - .states() - .filter_map(|(id, c)| c.as_ref().map(|c| (id, c))) - { - updates.push(UpdateData::RelatedState { - related_to: *id, - state: state.clone(), - }); - } - - let updated_state = match self - .attempt_state_update(¶ms, ¤t_state, &key, &updates) - .await? - { - Either::Left(s) => s, - Either::Right(mut r) => { - let Some(c) = r.pop() else { - // this branch should be unreachable since attempt_state_update should only - return Err(ExecutorError::internal_error()); - }; - return Err(ExecutorError::request(StdContractError::MissingRelated { - key: c.contract_instance_id, - })); - } - }; - match self - .runtime - .validate_state(&key, ¶ms, &updated_state, &related_contracts) - .map_err(|e| ExecutorError::execution(e, None))? - { - ValidateResult::Valid => { - if updated_state.as_ref() == current_state.as_ref() { - Ok(UpsertResult::NoChange) - } else { - Ok(UpsertResult::Updated(updated_state)) + Either::Right(delta) => { + // For delta updates, use the full implementation + match executor.state_store.get(&key).await { + Ok(current_state) => match executor.state_store.get_params(&key).await { + Ok(Some(parameters)) => { + let updates = vec![UpdateData::Delta(delta)]; + match executor + .get_updated_state(¶meters, current_state, key, updates) + .await + { + Ok(new_state) => { + (executor, Ok(UpsertResult::Updated(new_state))) + } + Err(err) => (executor, Err(err)), + } + } + Ok(None) => ( + executor, + Err(ExecutorError::request(StdContractError::Update { + key, + cause: "missing contract parameters".into(), + })), + ), + Err(err) => (executor, Err(ExecutorError::other(err))), + }, + Err(err) => (executor, Err(ExecutorError::other(err))), + } } - } - ValidateResult::Invalid => Err(ExecutorError::request( - freenet_stdlib::client_api::ContractError::Update { - key, - cause: "invalid outcome state".into(), - }, - )), - ValidateResult::RequestRelated(_) => todo!(), + }; + + result } } @@ -162,57 +234,75 @@ impl ContractExecutor for Executor { notification_ch: tokio::sync::mpsc::UnboundedSender, summary: Option>, ) -> Result<(), Box> { - let channels = self.update_notifications.entry(key).or_default(); - if let Ok(i) = channels.binary_search_by_key(&&cli_id, |(p, _)| p) { - let (_, existing_ch) = &channels[i]; - if !existing_ch.same_channel(¬ification_ch) { - return Err(RequestError::from(StdContractError::Subscribe { - key, - cause: format!("Peer {cli_id} already subscribed").into(), - }) - .into()); + // We need to register with all executors + let mut last_error = None; + + // Temporarily collect all executors + let mut executors = Vec::new(); + for slot in &mut self.runtimes { + if let Some(executor) = slot.take() { + executors.push(executor); } - } else { - channels.push((cli_id, notification_ch)); } - if self - .subscriber_summaries - .entry(key) - .or_default() - .insert(cli_id, summary.map(StateSummary::into_owned)) - .is_some() - { - tracing::warn!( - "contract {key} already was registered for peer {cli_id}; replaced summary" - ); + // Register with each executor + // FIXME: potentially missing registers + for executor in &mut executors { + if let Err(err) = executor.register_contract_notifier( + key, + cli_id, + notification_ch.clone(), + summary.clone(), + ) { + last_error = Some(err); + } + } + + // Return executors to pool + for executor in executors { + if let Some(empty_slot) = self.runtimes.iter_mut().find(|slot| slot.is_none()) { + *empty_slot = Some(executor); + self.available.add_permits(1); + } + } + + last_error.map_or(Ok(()), Err) + } + + fn return_executor(&mut self, executor: Self::InnerExecutor) { + // Find an empty slot and return the executor + if let Some(empty_slot) = self.runtimes.iter_mut().find(|slot| slot.is_none()) { + *empty_slot = Some(executor); + self.available.add_permits(1); + } else { + unreachable!("No empty slot found in the pool"); } - Ok(()) + } + + async fn create_new_executor(&mut self) -> Self::InnerExecutor { + Executor::from_config( + self.config.clone(), + self.op_sender.clone(), + self.op_manager.clone(), + ) + .await + .expect("Failed to create new executor") } } impl Executor { pub async fn from_config( config: Arc, - event_loop_channel: Option>, + op_sender: mpsc::Sender<( + Transaction, + tokio::sync::oneshot::Sender>, + )>, + op_manager: Arc, ) -> anyhow::Result { let (contract_store, delegate_store, secret_store, state_store) = Self::get_stores(&config).await?; let rt = Runtime::build(contract_store, delegate_store, secret_store, false).unwrap(); - Executor::new( - state_store, - move || { - let _ = - crate::util::set_cleanup_on_exit(config.paths().clone()).inspect_err(|error| { - tracing::error!("Failed to set cleanup on exit: {error}"); - }); - Ok(()) - }, - OperationMode::Local, - rt, - event_loop_channel, - ) - .await + Executor::new(state_store, OperationMode::Local, rt, op_sender, op_manager).await } pub fn register_contract_notifier( diff --git a/crates/core/src/contract/handler.rs b/crates/core/src/contract/handler.rs index be5b74947..14a5d1275 100644 --- a/crates/core/src/contract/handler.rs +++ b/crates/core/src/contract/handler.rs @@ -9,12 +9,10 @@ use freenet_stdlib::prelude::*; use serde::{Deserialize, Serialize}; use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender}; +use super::executor::runtime::RuntimePool; use super::executor::{ExecutorHalve, ExecutorToEventLoopChannel}; use super::ExecutorError; -use super::{ - executor::{ContractExecutor, Executor}, - ContractError, -}; +use super::{executor::ContractExecutor, ContractError}; use crate::client_events::HostResult; use crate::config::Config; use crate::message::Transaction; @@ -70,13 +68,14 @@ pub(crate) trait ContractHandler { } pub(crate) struct NetworkContractHandler { - executor: Executor, + executor: RuntimePool, channel: ContractHandlerChannel, + op_res_handle: tokio::task::JoinHandle<()>, } impl ContractHandler for NetworkContractHandler { type Builder = Arc; - type ContractExecutor = Executor; + type ContractExecutor = RuntimePool; async fn build( channel: ContractHandlerChannel, @@ -86,8 +85,17 @@ impl ContractHandler for NetworkContractHandler { where Self: Sized + 'static, { - let executor = Executor::from_config(config.clone(), Some(executor_request_sender)).await?; - Ok(Self { executor, channel }) + let num_executors = std::thread::available_parallelism()?; + let (to_process_tx, to_process) = mpsc::channel(num_executors.into()); + let op_manager = executor_request_sender.op_manager.clone(); + let op_res_handle = + tokio::spawn(executor_request_sender.handle_operation_result(to_process)); + let executor = RuntimePool::new(config, to_process_tx, op_manager, num_executors).await?; + Ok(Self { + executor, + channel, + op_res_handle, + }) } fn channel(&mut self) -> &mut ContractHandlerChannel { @@ -95,6 +103,9 @@ impl ContractHandler for NetworkContractHandler { } fn executor(&mut self) -> &mut Self::ContractExecutor { + if self.op_res_handle.is_finished() { + panic!("executor handle is finished"); + } &mut self.executor } } From 82fd749cfad913a1b113d5a839fe1005f0b41dd7 Mon Sep 17 00:00:00 2001 From: Ignacio Duart Date: Sun, 23 Mar 2025 02:52:01 +0100 Subject: [PATCH 2/6] wip --- crates/core/src/bin/freenet.rs | 2 +- crates/core/src/contract/executor.rs | 37 ++- .../src/contract/executor/mock_runtime.rs | 272 ++++++++++++------ crates/core/src/contract/executor/runtime.rs | 51 ++-- crates/core/src/contract/handler.rs | 61 +++- 5 files changed, 274 insertions(+), 149 deletions(-) diff --git a/crates/core/src/bin/freenet.rs b/crates/core/src/bin/freenet.rs index eee06f21b..f6e225721 100644 --- a/crates/core/src/bin/freenet.rs +++ b/crates/core/src/bin/freenet.rs @@ -19,7 +19,7 @@ async fn run_local(config: Config) -> anyhow::Result<()> { tracing::info!("Starting freenet node in local mode"); let socket = config.ws_api; - let executor = Executor::from_config(Arc::new(config), None) + let executor = Executor::local(Arc::new(config)) .await .map_err(anyhow::Error::msg)?; diff --git a/crates/core/src/contract/executor.rs b/crates/core/src/contract/executor.rs index e6079b195..c07bc9be4 100644 --- a/crates/core/src/contract/executor.rs +++ b/crates/core/src/contract/executor.rs @@ -236,7 +236,7 @@ pub(crate) fn executor_channel( } #[derive(thiserror::Error, Debug)] -enum CallbackError { +pub(crate) enum CallbackError { #[error(transparent)] Err(#[from] ExecutorError), #[error(transparent)] @@ -528,6 +528,11 @@ pub(crate) trait ContractExecutor: Send + 'static { fn create_new_executor(&mut self) -> impl Future + Send; } +pub(super) type OpResult = mpsc::Sender<( + Transaction, + tokio::sync::oneshot::Sender>, +)>; + /// A WASM executor which will run any contracts, delegates, etc. registered. /// /// This executor will monitor the store directories and databases to detect state changes. @@ -544,23 +549,17 @@ pub struct Executor { /// Attested contract instances for a given delegate. delegate_attested_ids: HashMap>, - op_sender: mpsc::Sender<( - Transaction, - tokio::sync::oneshot::Sender>, - )>, - op_manager: Arc, + op_sender: Option, + op_manager: Option>, } impl Executor { - pub async fn new( + pub(crate) async fn new( state_store: StateStore, mode: OperationMode, runtime: R, - op_sender: mpsc::Sender<( - Transaction, - tokio::sync::oneshot::Sender>, - )>, - op_manager: Arc, + op_sender: Option, + op_manager: Option>, ) -> anyhow::Result { Ok(Self { mode, @@ -609,10 +608,18 @@ impl Executor { ::Result: TryFrom, M: ComposeNetworkMessage, { - let op = request.initiate_op(&self.op_manager); + let op_manager = self + .op_manager + .as_ref() + .ok_or_else(|| ExecutorError::other(anyhow::anyhow!("no op manager")))?; + let op_sender = self + .op_sender + .as_ref() + .ok_or_else(|| ExecutorError::other(anyhow::anyhow!("no op sender")))?; + let op = request.initiate_op(op_manager); let tx = *op.id(); let (cb_s, cb) = tokio::sync::oneshot::channel(); - self.op_sender + op_sender .send((tx, cb_s)) .await .inspect_err(|_| { @@ -622,7 +629,7 @@ impl Executor { tracing::debug!("failed to send request to executor: {e}"); ExecutorError::other(anyhow::anyhow!("channel closed")) })?; - >::resume_op(op, &self.op_manager) + >::resume_op(op, op_manager) .await .map_err(|e| { tracing::debug!("failed to resume operation: {e}"); diff --git a/crates/core/src/contract/executor/mock_runtime.rs b/crates/core/src/contract/executor/mock_runtime.rs index 018c3a3f6..ee2cd07a4 100644 --- a/crates/core/src/contract/executor/mock_runtime.rs +++ b/crates/core/src/contract/executor/mock_runtime.rs @@ -1,40 +1,13 @@ -use super::*; -use tokio::sync::mpsc::UnboundedSender; +use super::{runtime::RuntimePool, *}; +use std::sync::Arc; +use tokio::sync::mpsc::{self, UnboundedSender}; +use tokio::sync::Semaphore; pub(crate) struct MockRuntime { pub contract_store: ContractStore, } impl Executor { - pub async fn new_mock( - identifier: &str, - event_loop_channel: ExecutorToEventLoopChannel, - ) -> anyhow::Result { - let data_dir = Self::test_data_dir(identifier); - - let contracts_data_dir = data_dir.join("contracts"); - std::fs::create_dir_all(&contracts_data_dir).expect("directory created"); - let contract_store = ContractStore::new(contracts_data_dir, u16::MAX as i64)?; - - // FIXME: if is sqlite it should be a dir, named /db - // let db_path = data_dir.join("db"); - // let state_store = StateStore::new(Storage::new(&db_path).await?, u16::MAX as u32).unwrap(); - tracing::debug!("creating state store at path: {data_dir:?}"); - std::fs::create_dir_all(&data_dir).expect("directory created"); - let state_store = StateStore::new(Storage::new(&data_dir).await?, u16::MAX as u32).unwrap(); - tracing::debug!("state store created"); - - let executor = Executor::new( - state_store, - || Ok(()), - OperationMode::Local, - MockRuntime { contract_store }, - Some(event_loop_channel), - ) - .await?; - Ok(executor) - } - pub async fn handle_request( &mut self, _id: ClientId, @@ -43,13 +16,11 @@ impl Executor { ) -> Response { unreachable!() } -} -impl ContractExecutor for Executor { - async fn fetch_contract( + async fn perform_contract_get( &mut self, - key: ContractKey, return_contract_code: bool, + key: ContractKey, ) -> Result<(Option, Option), ExecutorError> { let Some(parameters) = self .state_store @@ -61,6 +32,7 @@ impl ContractExecutor for Executor { "missing state and/or parameters for contract {key}" ))); }; + let contract = if return_contract_code { self.runtime .contract_store @@ -68,12 +40,143 @@ impl ContractExecutor for Executor { } else { None }; - let Ok(state) = self.state_store.get(&key).await else { - return Err(ExecutorError::other(anyhow::anyhow!( + + match self.state_store.get(&key).await { + Ok(state) => Ok((Some(state), contract)), + Err(_) => Err(ExecutorError::other(anyhow::anyhow!( "missing state for contract {key}" - ))); - }; - Ok((Some(state), contract)) + ))), + } + } +} + +impl RuntimePool { + pub async fn new_mock( + identifier: &str, + to_process_tx: mpsc::Sender<( + Transaction, + tokio::sync::oneshot::Sender>, + )>, + op_manager: Arc, + num_executors: std::num::NonZeroUsize, + ) -> anyhow::Result { + let data_dir = Executor::::test_data_dir(identifier); + + let contracts_data_dir = data_dir.join("contracts"); + std::fs::create_dir_all(&contracts_data_dir).expect("directory created"); + let contract_store = ContractStore::new(contracts_data_dir, u16::MAX as i64)?; + + tracing::debug!("creating state store at path: {data_dir:?}"); + std::fs::create_dir_all(&data_dir).expect("directory created"); + + let mut runtimes = Vec::with_capacity(1); + + let exec_dir = data_dir.join(format!("executor-{identifier}")); + std::fs::create_dir_all(&exec_dir).expect("directory created"); + + let state_store = StateStore::new(Storage::new(&exec_dir).await?, u16::MAX as u32).unwrap(); + + let executor = Executor::new( + state_store, + OperationMode::Local, + MockRuntime { contract_store }, + Some(to_process_tx.clone()), + Some(op_manager.clone()), + ) + .await?; + + runtimes.push(Some(executor)); + + Ok(RuntimePool { + runtimes, + available: Semaphore::new(num_executors.get()), + op_sender: to_process_tx, + op_manager, + config: identifier.to_string(), + }) + } + + // Pop an executor from the pool - blocks until one is available + async fn pop_executor(&mut self) -> Executor { + // Wait for an available permit + let _ = self.available.acquire().await.expect("Semaphore is closed"); + + // Find the first available executor + for slot in &mut self.runtimes { + if let Some(executor) = slot.take() { + return executor; + } + } + + // This should never happen because of the semaphore + unreachable!("No executors available despite semaphore permit") + } +} + +impl ContractExecutor for RuntimePool { + type InnerExecutor = Executor; + + fn return_executor(&mut self, executor: Self::InnerExecutor) { + // Find an empty slot and return the executor + if let Some(empty_slot) = self.runtimes.iter_mut().find(|slot| slot.is_none()) { + *empty_slot = Some(executor); + self.available.add_permits(1); + } else { + unreachable!("No empty slot found in the pool"); + } + } + + async fn create_new_executor(&mut self) -> Self::InnerExecutor { + // Create a new executor with a unique directory + let identifier = self.config.clone(); + let data_dir = Executor::::test_data_dir(&identifier); + let exec_dir = data_dir.join(format!( + "executor-new-{}", + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_millis() + )); + std::fs::create_dir_all(&exec_dir).expect("directory created"); + + let contracts_data_dir = data_dir.join("contracts"); + std::fs::create_dir_all(&contracts_data_dir).expect("directory created"); + let contract_store = ContractStore::new(contracts_data_dir, u16::MAX as i64) + .expect("Failed to create contract store"); + + let state_store = + StateStore::new(Storage::new(&exec_dir).await.unwrap(), u16::MAX as u32).unwrap(); + + Executor::new( + state_store, + OperationMode::Local, + MockRuntime { contract_store }, + Some(self.op_sender.clone()), + Some(self.op_manager.clone()), + ) + .await + .expect("Failed to create new executor") + } + + async fn fetch_contract( + &mut self, + key: ContractKey, + return_contract_code: bool, + ) -> impl Future< + Output = ( + Self::InnerExecutor, + Result<(Option, Option), ExecutorError>, + ), + > + Send + + 'static { + let mut executor = self.pop_executor().await; + + async move { + let result = executor + .perform_contract_get(return_contract_code, key) + .await; + (executor, result) + } } async fn upsert_contract_state( @@ -82,30 +185,41 @@ impl ContractExecutor for Executor { state: Either>, _related_contracts: RelatedContracts<'static>, code: Option, - ) -> Result { + ) -> impl Future)> + Send + 'static + { + let mut executor = self.pop_executor().await; // todo: instead allow to perform mutations per contract based on incoming value so we can track // state values over the network - match (state, code) { - (Either::Left(incoming_state), Some(contract)) => { - self.runtime - .contract_store - .store_contract(contract.clone()) - .map_err(ExecutorError::other)?; - self.state_store - .store(key, incoming_state.clone(), contract.params().into_owned()) - .await - .map_err(ExecutorError::other)?; - Ok(UpsertResult::Updated(incoming_state)) - } - (Either::Left(incoming_state), None) => { - // update case - self.state_store - .update(&key, incoming_state.clone()) - .await - .map_err(ExecutorError::other)?; - Ok(UpsertResult::Updated(incoming_state)) - } - (update, contract) => unreachable!("{update:?}, {contract:?}"), + async move { + let r = async { + match (state, code) { + (Either::Left(incoming_state), Some(contract)) => { + executor + .runtime + .contract_store + .store_contract(contract.clone()) + .map_err(ExecutorError::other)?; + executor + .state_store + .store(key, incoming_state.clone(), contract.params().into_owned()) + .await + .map_err(ExecutorError::other)?; + Ok(UpsertResult::Updated(incoming_state)) + } + (Either::Left(incoming_state), None) => { + // update case + executor + .state_store + .update(&key, incoming_state.clone()) + .await + .map_err(ExecutorError::other)?; + Ok(UpsertResult::Updated(incoming_state)) + } + (update, contract) => unreachable!("{update:?}, {contract:?}"), + } + }; + let r = r.await; + (executor, r) } } @@ -116,39 +230,7 @@ impl ContractExecutor for Executor { _notification_ch: UnboundedSender, _summary: Option>, ) -> Result<(), Box> { - Ok(()) - } -} - -#[cfg(test)] -mod test { - use super::*; - - #[tokio::test(flavor = "multi_thread")] - async fn local_node_handle() -> Result<(), Box> { - const MAX_SIZE: i64 = 10 * 1024 * 1024; - const MAX_MEM_CACHE: u32 = 10_000_000; - let tmp_dir = tempfile::tempdir()?; - let state_store_path = tmp_dir.path().join("state_store"); - std::fs::create_dir_all(&state_store_path)?; - let contract_store = ContractStore::new(tmp_dir.path().join("executor-test"), MAX_SIZE)?; - let state_store = - StateStore::new(Storage::new(&state_store_path).await?, MAX_MEM_CACHE).unwrap(); - let mut counter = 0; - Executor::new( - state_store, - || { - counter += 1; - Ok(()) - }, - OperationMode::Local, - MockRuntime { contract_store }, - None, - ) - .await - .expect("local node with handle"); - - assert_eq!(counter, 1); + // For mock, we just acknowledge the registration without actually implementing notification Ok(()) } } diff --git a/crates/core/src/contract/executor/runtime.rs b/crates/core/src/contract/executor/runtime.rs index 092573025..dc937118c 100644 --- a/crates/core/src/contract/executor/runtime.rs +++ b/crates/core/src/contract/executor/runtime.rs @@ -7,20 +7,20 @@ use std::num::NonZeroUsize; use std::sync::Arc; use tokio::sync::Semaphore; -pub(in crate::contract) struct RuntimePool { +pub(crate) struct RuntimePool, R = Runtime> { // Keeping track of available executors - runtimes: Vec>>, + pub runtimes: Vec>>, // Semaphore to control access to executors - available: Semaphore, - config: Arc, - op_sender: mpsc::Sender<( + pub available: Semaphore, + pub config: C, + pub op_sender: mpsc::Sender<( Transaction, tokio::sync::oneshot::Sender>, )>, - op_manager: Arc, + pub op_manager: Arc, } -impl RuntimePool { +impl RuntimePool, Runtime> { /// Create a new pool with the given number of runtime executors pub async fn new( config: Arc, @@ -34,9 +34,12 @@ impl RuntimePool { let mut runtimes = Vec::with_capacity(pool_size.into()); for _ in 0..pool_size.into() { - let executor = - Executor::from_config(config.clone(), op_sender.clone(), op_manager.clone()) - .await?; + let executor = Executor::from_config( + config.clone(), + Some(op_sender.clone()), + Some(op_manager.clone()), + ) + .await?; runtimes.push(Some(executor)); } @@ -66,7 +69,7 @@ impl RuntimePool { } } -impl ContractExecutor for RuntimePool { +impl ContractExecutor for RuntimePool, Runtime> { type InnerExecutor = Executor; async fn fetch_contract( @@ -94,7 +97,7 @@ impl ContractExecutor for RuntimePool { let mut executor = self.pop_executor().await; async move { - let result = match update { + match update { Either::Left(state) => { // For state updates with simplified implementation let params = if let Some(code) = &code { @@ -221,9 +224,7 @@ impl ContractExecutor for RuntimePool { Err(err) => (executor, Err(ExecutorError::other(err))), } } - }; - - result + } } } @@ -282,8 +283,8 @@ impl ContractExecutor for RuntimePool { async fn create_new_executor(&mut self) -> Self::InnerExecutor { Executor::from_config( self.config.clone(), - self.op_sender.clone(), - self.op_manager.clone(), + Some(self.op_sender.clone()), + Some(self.op_manager.clone()), ) .await .expect("Failed to create new executor") @@ -291,13 +292,17 @@ impl ContractExecutor for RuntimePool { } impl Executor { - pub async fn from_config( + pub async fn local(config: Arc) -> anyhow::Result { + let (contract_store, delegate_store, secret_store, state_store) = + Self::get_stores(&config).await?; + let rt = Runtime::build(contract_store, delegate_store, secret_store, false).unwrap(); + Executor::new(state_store, OperationMode::Local, rt, None, None).await + } + + async fn from_config( config: Arc, - op_sender: mpsc::Sender<( - Transaction, - tokio::sync::oneshot::Sender>, - )>, - op_manager: Arc, + op_sender: Option, + op_manager: Option>, ) -> anyhow::Result { let (contract_store, delegate_store, secret_store, state_store) = Self::get_stores(&config).await?; diff --git a/crates/core/src/contract/handler.rs b/crates/core/src/contract/handler.rs index 14a5d1275..3c04482cb 100644 --- a/crates/core/src/contract/handler.rs +++ b/crates/core/src/contract/handler.rs @@ -51,7 +51,7 @@ impl std::ops::Deref for ClientResponsesSender { } pub(crate) trait ContractHandler { - type Builder; + type Builder: Clone; type ContractExecutor: ContractExecutor; fn build( @@ -67,15 +67,15 @@ pub(crate) trait ContractHandler { fn executor(&mut self) -> &mut Self::ContractExecutor; } -pub(crate) struct NetworkContractHandler { - executor: RuntimePool, +pub(crate) struct NetworkContractHandler, R = Runtime> { + executor: RuntimePool, channel: ContractHandlerChannel, op_res_handle: tokio::task::JoinHandle<()>, } -impl ContractHandler for NetworkContractHandler { +impl ContractHandler for NetworkContractHandler, Runtime> { type Builder = Arc; - type ContractExecutor = RuntimePool; + type ContractExecutor = RuntimePool; async fn build( channel: ContractHandlerChannel, @@ -111,9 +111,9 @@ impl ContractHandler for NetworkContractHandler { } #[cfg(test)] -impl ContractHandler for NetworkContractHandler { +impl ContractHandler for NetworkContractHandler { type Builder = String; - type ContractExecutor = Executor; + type ContractExecutor = RuntimePool; async fn build( channel: ContractHandlerChannel, @@ -123,8 +123,19 @@ impl ContractHandler for NetworkContractHandler { where Self: Sized + 'static, { - let executor = Executor::new_mock(&identifier, executor_request_sender).await?; - Ok(Self { executor, channel }) + let num_executors = std::thread::available_parallelism()?; + let (to_process_tx, to_process) = mpsc::channel(num_executors.into()); + let op_manager = executor_request_sender.op_manager.clone(); + let op_res_handle = + tokio::spawn(executor_request_sender.handle_operation_result(to_process)); + let executor = + RuntimePool::new_mock(&identifier, to_process_tx, op_manager, num_executors).await?; + + Ok(Self { + executor, + channel, + op_res_handle, + }) } fn channel(&mut self) -> &mut ContractHandlerChannel { @@ -132,6 +143,9 @@ impl ContractHandler for NetworkContractHandler { } fn executor(&mut self) -> &mut Self::ContractExecutor { + if self.op_res_handle.is_finished() { + panic!("executor handle is finished"); + } &mut self.executor } } @@ -484,17 +498,22 @@ pub mod test { } pub(super) mod in_memory { + use tokio::sync::mpsc; + + use crate::contract::executor::runtime::RuntimePool; + use super::{ super::{ executor::{ExecutorHalve, ExecutorToEventLoopChannel}, - Executor, MockRuntime, + MockRuntime, }, ContractHandler, ContractHandlerChannel, ContractHandlerHalve, }; pub(crate) struct MemoryContractHandler { channel: ContractHandlerChannel, - runtime: Executor, + runtime: RuntimePool, + op_res_handle: tokio::task::JoinHandle<()>, } impl MemoryContractHandler { @@ -503,18 +522,27 @@ pub(super) mod in_memory { executor_request_sender: ExecutorToEventLoopChannel, identifier: &str, ) -> Self { + let num_executors = std::thread::available_parallelism().unwrap(); + let (to_process_tx, to_process) = mpsc::channel(num_executors.into()); + let op_manager = executor_request_sender.op_manager.clone(); + let op_res_handle = + tokio::spawn(executor_request_sender.handle_operation_result(to_process)); + let runtime = + RuntimePool::new_mock(identifier, to_process_tx, op_manager, num_executors) + .await + .unwrap(); + MemoryContractHandler { channel, - runtime: Executor::new_mock(identifier, executor_request_sender) - .await - .expect("should start mock executor"), + runtime, + op_res_handle, } } } impl ContractHandler for MemoryContractHandler { type Builder = String; - type ContractExecutor = Executor; + type ContractExecutor = RuntimePool; async fn build( channel: ContractHandlerChannel, @@ -532,6 +560,9 @@ pub(super) mod in_memory { } fn executor(&mut self) -> &mut Self::ContractExecutor { + if self.op_res_handle.is_finished() { + panic!("executor handle is finished"); + } &mut self.runtime } } From d8b39083e56f533bc46bf5f57bf19353712f31df Mon Sep 17 00:00:00 2001 From: Ignacio Duart Date: Sun, 23 Mar 2025 03:10:14 +0100 Subject: [PATCH 3/6] fixes --- crates/core/src/bin/freenet.rs | 3 +++ crates/core/src/config.rs | 2 +- crates/core/src/contract/executor.rs | 4 ++++ crates/core/src/contract/executor/runtime.rs | 3 ++- 4 files changed, 10 insertions(+), 2 deletions(-) diff --git a/crates/core/src/bin/freenet.rs b/crates/core/src/bin/freenet.rs index f6e225721..a11dc9e7c 100644 --- a/crates/core/src/bin/freenet.rs +++ b/crates/core/src/bin/freenet.rs @@ -19,6 +19,9 @@ async fn run_local(config: Config) -> anyhow::Result<()> { tracing::info!("Starting freenet node in local mode"); let socket = config.ws_api; + let _ = freenet::util::set_cleanup_on_exit(config.paths()).inspect_err(|error| { + tracing::error!("Failed to set cleanup on exit: {error}"); + }); let executor = Executor::local(Arc::new(config)) .await .map_err(anyhow::Error::msg)?; diff --git a/crates/core/src/config.rs b/crates/core/src/config.rs index a0fcfe275..b0e9036a9 100644 --- a/crates/core/src/config.rs +++ b/crates/core/src/config.rs @@ -425,7 +425,7 @@ impl Config { self.secrets.transport_keypair() } - pub(crate) fn paths(&self) -> Arc { + pub fn paths(&self) -> Arc { self.config_paths.clone() } } diff --git a/crates/core/src/contract/executor.rs b/crates/core/src/contract/executor.rs index c07bc9be4..853ca0d75 100644 --- a/crates/core/src/contract/executor.rs +++ b/crates/core/src/contract/executor.rs @@ -279,6 +279,10 @@ impl ExecutorToEventLoopChannel { tokio::select! { // Process any new transaction request Some((tx, cb)) = to_process.recv() => { + if self.end.waiting_for_op_tx.send(tx).await.is_err() { + tracing::debug!("failed to send request to executor, channel closed"); + break; + } // Try to get the result for this transaction let op_res = self.receive_op_result(tx).await; if let Err(CallbackError::MissingResult) = &op_res { diff --git a/crates/core/src/contract/executor/runtime.rs b/crates/core/src/contract/executor/runtime.rs index dc937118c..5f39dcf9c 100644 --- a/crates/core/src/contract/executor/runtime.rs +++ b/crates/core/src/contract/executor/runtime.rs @@ -296,6 +296,7 @@ impl Executor { let (contract_store, delegate_store, secret_store, state_store) = Self::get_stores(&config).await?; let rt = Runtime::build(contract_store, delegate_store, secret_store, false).unwrap(); + assert!(config.mode == OperationMode::Local); Executor::new(state_store, OperationMode::Local, rt, None, None).await } @@ -307,7 +308,7 @@ impl Executor { let (contract_store, delegate_store, secret_store, state_store) = Self::get_stores(&config).await?; let rt = Runtime::build(contract_store, delegate_store, secret_store, false).unwrap(); - Executor::new(state_store, OperationMode::Local, rt, op_sender, op_manager).await + Executor::new(state_store, config.mode, rt, op_sender, op_manager).await } pub fn register_contract_notifier( From 7e667ae66a7ac44f13e3406af87e12d2f733ca34 Mon Sep 17 00:00:00 2001 From: Ignacio Duart Date: Sun, 30 Mar 2025 11:30:25 +0200 Subject: [PATCH 4/6] wip --- crates/core/src/contract/executor/runtime.rs | 284 ++++++++++--------- 1 file changed, 155 insertions(+), 129 deletions(-) diff --git a/crates/core/src/contract/executor/runtime.rs b/crates/core/src/contract/executor/runtime.rs index 5f39dcf9c..ba7e23a9a 100644 --- a/crates/core/src/contract/executor/runtime.rs +++ b/crates/core/src/contract/executor/runtime.rs @@ -95,136 +95,11 @@ impl ContractExecutor for RuntimePool, Runtime> { code: Option, ) -> impl Future + Send + 'static { let mut executor = self.pop_executor().await; - async move { - match update { - Either::Left(state) => { - // For state updates with simplified implementation - let params = if let Some(code) = &code { - code.params() - } else { - match executor.state_store.get_params(&key).await { - Ok(Some(params)) => params, - Ok(None) => { - return ( - executor, - Err(ExecutorError::request(StdContractError::Put { - key, - cause: "missing contract parameters".into(), - })), - ) - } - Err(err) => return (executor, Err(ExecutorError::other(err))), - } - }; - - let remove_if_fail = if executor - .runtime - .contract_store - .fetch_contract(&key, ¶ms) - .is_none() - { - if let Some(code) = code { - if let Err(err) = - executor.runtime.contract_store.store_contract(code.clone()) - { - return (executor, Err(ExecutorError::other(err))); - } - true - } else { - return ( - executor, - Err(ExecutorError::request(StdContractError::MissingContract { - key: key.into(), - })), - ); - } - } else { - false - }; - - // Validate the state - match executor - .runtime - .validate_state(&key, ¶ms, &state, &related_contracts) - { - Ok(validate_result) => { - match validate_result { - ValidateResult::Valid => { - // Store and send notifications - if let Err(err) = executor - .state_store - .store(key, state.clone(), params.clone()) - .await - { - return (executor, Err(ExecutorError::other(err))); - } - - // Attempt to send notifications - if let Err(err) = executor - .send_update_notification(&key, ¶ms, &state) - .await - { - tracing::error!("Failed to send notifications: {}", err); - } - - (executor, Ok(UpsertResult::Updated(state))) - } - ValidateResult::Invalid => ( - executor, - Err(ExecutorError::request(StdContractError::invalid_put(key))), - ), - ValidateResult::RequestRelated(mut related) => { - if let Some(key) = related.pop() { - ( - executor, - Err(ExecutorError::request( - StdContractError::MissingRelated { key }, - )), - ) - } else { - (executor, Err(ExecutorError::internal_error())) - } - } - } - } - Err(err) => { - if remove_if_fail { - let _ = executor.runtime.contract_store.remove_contract(&key); - } - (executor, Err(ExecutorError::execution(err, None))) - } - } - } - Either::Right(delta) => { - // For delta updates, use the full implementation - match executor.state_store.get(&key).await { - Ok(current_state) => match executor.state_store.get_params(&key).await { - Ok(Some(parameters)) => { - let updates = vec![UpdateData::Delta(delta)]; - match executor - .get_updated_state(¶meters, current_state, key, updates) - .await - { - Ok(new_state) => { - (executor, Ok(UpsertResult::Updated(new_state))) - } - Err(err) => (executor, Err(err)), - } - } - Ok(None) => ( - executor, - Err(ExecutorError::request(StdContractError::Update { - key, - cause: "missing contract parameters".into(), - })), - ), - Err(err) => (executor, Err(ExecutorError::other(err))), - }, - Err(err) => (executor, Err(ExecutorError::other(err))), - } - } - } + let res = + upsert_contract_state_inner(&mut executor, key, update, related_contracts, code) + .await; + (executor, res) } } @@ -291,6 +166,157 @@ impl ContractExecutor for RuntimePool, Runtime> { } } +async fn upsert_contract_state_inner( + executor: &mut Executor, + key: ContractKey, + update: Either>, + related_contracts: RelatedContracts<'static>, + code: Option, +) -> UpsertContractR { + let params = if let Some(code) = &code { + code.params() + } else { + executor + .state_store + .get_params(&key) + .await + .transpose() + .ok_or_else(|| { + ExecutorError::request(StdContractError::Put { + key, + cause: "missing contract parameters".into(), + }) + })? + .map_err(ExecutorError::other)? + }; + + let remove_if_fail = if executor + .runtime + .contract_store + .fetch_contract(&key, ¶ms) + .is_none() + { + let Some(code) = code else { + return Err(ExecutorError::request(StdContractError::MissingContract { + key: key.into(), + })); + }; + executor + .runtime + .contract_store + .store_contract(code.clone()) + .map_err(ExecutorError::other)?; + true + } else { + false + }; + + let mut updates = match update { + Either::Left(incoming_state) => { + let result = match executor.runtime.validate_state( + &key, + ¶ms, + &incoming_state, + &related_contracts, + ) { + Ok(result) => result, + Err(err) => { + if remove_if_fail { + let _ = executor.runtime.contract_store.remove_contract(&key); + } + return Err(ExecutorError::execution(err, None)); + } + }; + match result { + ValidateResult::Valid => { + executor + .state_store + .store(key, incoming_state.clone(), params.clone()) + .await + .map_err(ExecutorError::other)?; + } + ValidateResult::Invalid => { + return Err(ExecutorError::request(StdContractError::invalid_put(key))); + } + ValidateResult::RequestRelated(mut related) => { + if let Some(key) = related.pop() { + // TODO: support recursive related contracts + return Err(ExecutorError::request(StdContractError::MissingRelated { + key, + })); + } else { + return Err(ExecutorError::internal_error()); + } + } + } + + vec![UpdateData::State(incoming_state.clone().into())] + } + Either::Right(delta) => { + // todo: forward delta like we are doing with puts + tracing::warn!("Delta updates are not yet supported"); + vec![UpdateData::Delta(delta)] + } + }; + + let current_state = match executor.state_store.get(&key).await { + Ok(s) => s, + Err(StateStoreError::MissingContract(_)) => { + tracing::warn!("Missing contract {key} for upsert"); + return Err(ExecutorError::request(StdContractError::MissingContract { + key: key.into(), + })); + } + Err(StateStoreError::Any(err)) => return Err(ExecutorError::other(err)), + }; + + for (id, state) in related_contracts + .states() + .filter_map(|(id, c)| c.as_ref().map(|c| (id, c))) + { + updates.push(UpdateData::RelatedState { + related_to: *id, + state: state.clone(), + }); + } + + let updated_state = match executor + .attempt_state_update(¶ms, ¤t_state, &key, &updates) + .await? + { + Either::Left(s) => s, + Either::Right(mut r) => { + let Some(c) = r.pop() else { + // this branch should be unreachable since attempt_state_update should only + return Err(ExecutorError::internal_error()); + }; + return Err(ExecutorError::request(StdContractError::MissingRelated { + key: c.contract_instance_id, + })); + } + }; + match executor + .runtime + .validate_state(&key, ¶ms, &updated_state, &related_contracts) + .map_err(|e| ExecutorError::execution(e, None))? + { + ValidateResult::Valid => { + if updated_state.as_ref() == current_state.as_ref() { + Ok(UpsertResult::NoChange) + } else { + Ok(UpsertResult::Updated(updated_state)) + } + } + ValidateResult::Invalid => Err(ExecutorError::request( + freenet_stdlib::client_api::ContractError::Update { + key, + cause: "invalid outcome state".into(), + }, + )), + ValidateResult::RequestRelated(_) => todo!(), + } +} + impl Executor { pub async fn local(config: Arc) -> anyhow::Result { let (contract_store, delegate_store, secret_store, state_store) = From e5db4f9eab132f8d6a1f037138ba99ff1236b2b6 Mon Sep 17 00:00:00 2001 From: Ignacio Duart Date: Sun, 30 Mar 2025 12:23:33 +0200 Subject: [PATCH 5/6] fix missing registrations --- crates/core/src/contract/executor.rs | 5 +- .../src/contract/executor/mock_runtime.rs | 26 ++++- crates/core/src/contract/executor/runtime.rs | 110 +++++++++++++----- 3 files changed, 111 insertions(+), 30 deletions(-) diff --git a/crates/core/src/contract/executor.rs b/crates/core/src/contract/executor.rs index 853ca0d75..b6331108f 100644 --- a/crates/core/src/contract/executor.rs +++ b/crates/core/src/contract/executor.rs @@ -16,6 +16,7 @@ use freenet_stdlib::client_api::{ RequestError, }; use freenet_stdlib::prelude::*; +use runtime::ExecutorWithId; use serde::{Deserialize, Serialize}; use tokio::sync::mpsc::{self}; @@ -493,7 +494,7 @@ pub(crate) type UpsertContractR = Result; /// /// Implementations must be thread-safe (Send) and have a static lifetime. pub(crate) trait ContractExecutor: Send + 'static { - type InnerExecutor: Send + 'static; + type InnerExecutor: ExecutorWithId; /// Fetches a contract from the store. fn fetch_contract( @@ -543,6 +544,7 @@ pub(super) type OpResult = mpsc::Sender<( /// Consumers of the executor are required to poll for new changes in order to be notified /// of changes or can alternatively use the notification channel. pub struct Executor { + pub id: usize, mode: OperationMode, runtime: R, pub state_store: StateStore, @@ -566,6 +568,7 @@ impl Executor { op_manager: Option>, ) -> anyhow::Result { Ok(Self { + id: 0, // Default ID, will be set by the pool mode, runtime, state_store, diff --git a/crates/core/src/contract/executor/mock_runtime.rs b/crates/core/src/contract/executor/mock_runtime.rs index ee2cd07a4..c7bddb2ec 100644 --- a/crates/core/src/contract/executor/mock_runtime.rs +++ b/crates/core/src/contract/executor/mock_runtime.rs @@ -1,4 +1,5 @@ use super::{runtime::RuntimePool, *}; +use std::collections::HashMap; use std::sync::Arc; use tokio::sync::mpsc::{self, UnboundedSender}; use tokio::sync::Semaphore; @@ -76,7 +77,7 @@ impl RuntimePool { let state_store = StateStore::new(Storage::new(&exec_dir).await?, u16::MAX as u32).unwrap(); - let executor = Executor::new( + let mut executor = Executor::new( state_store, OperationMode::Local, MockRuntime { contract_store }, @@ -85,6 +86,9 @@ impl RuntimePool { ) .await?; + // Assign ID to executor + executor.id = 0; + runtimes.push(Some(executor)); Ok(RuntimePool { @@ -93,6 +97,8 @@ impl RuntimePool { op_sender: to_process_tx, op_manager, config: identifier.to_string(), + pending_registrations: HashMap::new(), + next_executor_id: 1, // We start at 1 since the initial executor has ID 0 }) } @@ -104,6 +110,8 @@ impl RuntimePool { // Find the first available executor for slot in &mut self.runtimes { if let Some(executor) = slot.take() { + // Create an empty entry for pending registrations + self.pending_registrations.insert(executor.id(), Vec::new()); return executor; } } @@ -117,6 +125,9 @@ impl ContractExecutor for RuntimePool { type InnerExecutor = Executor; fn return_executor(&mut self, executor: Self::InnerExecutor) { + // Clear any pending registrations for this executor + self.pending_registrations.remove(&executor.id()); + // Find an empty slot and return the executor if let Some(empty_slot) = self.runtimes.iter_mut().find(|slot| slot.is_none()) { *empty_slot = Some(executor); @@ -147,7 +158,7 @@ impl ContractExecutor for RuntimePool { let state_store = StateStore::new(Storage::new(&exec_dir).await.unwrap(), u16::MAX as u32).unwrap(); - Executor::new( + let mut executor = Executor::new( state_store, OperationMode::Local, MockRuntime { contract_store }, @@ -155,7 +166,13 @@ impl ContractExecutor for RuntimePool { Some(self.op_manager.clone()), ) .await - .expect("Failed to create new executor") + .expect("Failed to create new executor"); + + // Assign a new ID to this executor + executor.id = self.next_executor_id; + self.next_executor_id += 1; + + executor } async fn fetch_contract( @@ -230,6 +247,9 @@ impl ContractExecutor for RuntimePool { _notification_ch: UnboundedSender, _summary: Option>, ) -> Result<(), Box> { + // Add to all pending registration entries + // In the mock implementation we don't need to actually store anything + // For mock, we just acknowledge the registration without actually implementing notification Ok(()) } diff --git a/crates/core/src/contract/executor/runtime.rs b/crates/core/src/contract/executor/runtime.rs index ba7e23a9a..0f34b6ce6 100644 --- a/crates/core/src/contract/executor/runtime.rs +++ b/crates/core/src/contract/executor/runtime.rs @@ -3,10 +3,29 @@ use super::{ ContractExecutor, ContractRequest, ContractResponse, ExecutorError, RequestError, Response, StateStoreError, }; +use std::collections::HashMap; use std::num::NonZeroUsize; use std::sync::Arc; use tokio::sync::Semaphore; +/// Trait for executors that can identify themselves +pub trait ExecutorWithId: Send + 'static { + fn id(&self) -> usize; +} + +impl ExecutorWithId for Executor { + fn id(&self) -> usize { + self.id + } +} + +pub(super) struct PendingRegistration { + key: ContractKey, + cli_id: ClientId, + notification_ch: tokio::sync::mpsc::UnboundedSender, + summary: Option>, +} + pub(crate) struct RuntimePool, R = Runtime> { // Keeping track of available executors pub runtimes: Vec>>, @@ -18,6 +37,10 @@ pub(crate) struct RuntimePool, R = Runtime> { tokio::sync::oneshot::Sender>, )>, pub op_manager: Arc, + // Track pending registrations by executor index + pub(super) pending_registrations: HashMap>, + // Next executor ID to assign + pub next_executor_id: usize, } impl RuntimePool, Runtime> { @@ -32,14 +55,20 @@ impl RuntimePool, Runtime> { pool_size: NonZeroUsize, ) -> anyhow::Result { let mut runtimes = Vec::with_capacity(pool_size.into()); + let mut next_id = 0; for _ in 0..pool_size.into() { - let executor = Executor::from_config( + let mut executor = Executor::from_config( config.clone(), Some(op_sender.clone()), Some(op_manager.clone()), ) .await?; + + // Assign an ID to the executor + executor.id = next_id; + next_id += 1; + runtimes.push(Some(executor)); } @@ -49,6 +78,8 @@ impl RuntimePool, Runtime> { config, op_sender, op_manager, + pending_registrations: HashMap::new(), + next_executor_id: next_id, }) } @@ -60,6 +91,14 @@ impl RuntimePool, Runtime> { // Find the first available executor for slot in &mut self.runtimes { if let Some(executor) = slot.take() { + let id = executor.id(); + // Ensure there's an entry in pending_registrations for this executor + // This will track any registrations that happen while it's out of the pool + let existing = self.pending_registrations.insert(id, Vec::new()); + assert!( + existing.is_none(), + "Executor ID already exists in pending registrations" + ); return executor; } } @@ -110,43 +149,51 @@ impl ContractExecutor for RuntimePool, Runtime> { notification_ch: tokio::sync::mpsc::UnboundedSender, summary: Option>, ) -> Result<(), Box> { - // We need to register with all executors let mut last_error = None; + let owned_summary = summary.map(StateSummary::into_owned); - // Temporarily collect all executors - let mut executors = Vec::new(); - for slot in &mut self.runtimes { - if let Some(executor) = slot.take() { - executors.push(executor); - } - } - - // Register with each executor - // FIXME: potentially missing registers - for executor in &mut executors { + // Register with all available executors + for executor in self.runtimes.iter_mut().flatten() { if let Err(err) = executor.register_contract_notifier( key, cli_id, notification_ch.clone(), - summary.clone(), + owned_summary.clone(), ) { last_error = Some(err); } } - // Return executors to pool - for executor in executors { - if let Some(empty_slot) = self.runtimes.iter_mut().find(|slot| slot.is_none()) { - *empty_slot = Some(executor); - self.available.add_permits(1); - } + // Add to all pending registration entries + // These represent executors that are currently out of the pool + for pending in self.pending_registrations.values_mut() { + pending.push(PendingRegistration { + key, + cli_id, + notification_ch: notification_ch.clone(), + summary: owned_summary.clone(), + }); } last_error.map_or(Ok(()), Err) } - fn return_executor(&mut self, executor: Self::InnerExecutor) { - // Find an empty slot and return the executor + fn return_executor(&mut self, mut executor: Self::InnerExecutor) { + let executor_id = executor.id(); + + // Apply any pending registrations for this executor + if let Some(pending) = self.pending_registrations.remove(&executor_id) { + for registration in pending { + let _ = executor.register_contract_notifier( + registration.key, + registration.cli_id, + registration.notification_ch, + registration.summary.clone(), + ); + } + } + + // Return the executor to the pool if let Some(empty_slot) = self.runtimes.iter_mut().find(|slot| slot.is_none()) { *empty_slot = Some(executor); self.available.add_permits(1); @@ -156,13 +203,19 @@ impl ContractExecutor for RuntimePool, Runtime> { } async fn create_new_executor(&mut self) -> Self::InnerExecutor { - Executor::from_config( + let mut executor = Executor::from_config( self.config.clone(), Some(self.op_sender.clone()), Some(self.op_manager.clone()), ) .await - .expect("Failed to create new executor") + .expect("Failed to create new executor"); + + // Assign a new ID to this executor + executor.id = self.next_executor_id; + self.next_executor_id += 1; + + executor } } @@ -323,7 +376,9 @@ impl Executor { Self::get_stores(&config).await?; let rt = Runtime::build(contract_store, delegate_store, secret_store, false).unwrap(); assert!(config.mode == OperationMode::Local); - Executor::new(state_store, OperationMode::Local, rt, None, None).await + let mut executor = Executor::new(state_store, OperationMode::Local, rt, None, None).await?; + executor.id = 0; // Default ID for local executors + Ok(executor) } async fn from_config( @@ -334,7 +389,10 @@ impl Executor { let (contract_store, delegate_store, secret_store, state_store) = Self::get_stores(&config).await?; let rt = Runtime::build(contract_store, delegate_store, secret_store, false).unwrap(); - Executor::new(state_store, config.mode, rt, op_sender, op_manager).await + let mut executor = + Executor::new(state_store, config.mode, rt, op_sender, op_manager).await?; + executor.id = 0; // ID will be assigned by the pool + Ok(executor) } pub fn register_contract_notifier( From 3afe902a77ea1aac7b1af249d1a75358d57ae5c6 Mon Sep 17 00:00:00 2001 From: Ignacio Duart Date: Sun, 30 Mar 2025 12:31:12 +0200 Subject: [PATCH 6/6] crfo fmt --- crates/core/src/contract.rs | 26 ++++++++++++++++++++------ 1 file changed, 20 insertions(+), 6 deletions(-) diff --git a/crates/core/src/contract.rs b/crates/core/src/contract.rs index 8d255e0ba..984317227 100644 --- a/crates/core/src/contract.rs +++ b/crates/core/src/contract.rs @@ -39,7 +39,7 @@ where Ok((id, event, executor)) => { // Return the executor back to the pool contract_handler.executor().return_executor(executor); - + // Send the result using the contract_handler's channel if let Err(error) = contract_handler.channel().send_to_sender(id, event).await { tracing::debug!(%error, "shutting down contract handler"); @@ -76,7 +76,10 @@ where return_contract_code, } => { // Clone needed values for the task - let fetch_contract = contract_handler.executor().fetch_contract(key, return_contract_code).await; + let fetch_contract = contract_handler + .executor() + .fetch_contract(key, return_contract_code) + .await; let id_clone = id; pending_tasks.spawn(async move { @@ -87,8 +90,8 @@ where let response_event = match result { Ok((state, contract)) => { - tracing::debug!(with_contract_code = %return_contract_code, - has_contract = %contract.is_some(), + tracing::debug!(with_contract_code = %return_contract_code, + has_contract = %contract.is_some(), "Fetched contract {key}"); ContractHandlerEvent::GetResponse { @@ -120,7 +123,15 @@ where contract, } => { // Clone needed values for the task - let put_future = contract_handler.executor().upsert_contract_state(key, Either::Left(state.clone()), related_contracts, contract).await; + let put_future = contract_handler + .executor() + .upsert_contract_state( + key, + Either::Left(state.clone()), + related_contracts, + contract, + ) + .await; pending_tasks.spawn(async move { let span = tracing::info_span!("upsert_contract_state", %key); @@ -161,7 +172,10 @@ where freenet_stdlib::prelude::UpdateData::Delta(delta) => Either::Right(delta), _ => unreachable!(), }; - let update_future = contract_handler.executor().upsert_contract_state(key, update_value, related_contracts, None).await; + let update_future = contract_handler + .executor() + .upsert_contract_state(key, update_value, related_contracts, None) + .await; pending_tasks.spawn(async move { let span = tracing::info_span!("upsert_contract_state", %key);