Skip to content

Multi rt pool exec #1494

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 6 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion crates/core/src/bin/freenet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ async fn run_local(config: Config) -> anyhow::Result<()> {
tracing::info!("Starting freenet node in local mode");
let socket = config.ws_api;

let executor = Executor::from_config(Arc::new(config), None)
let _ = freenet::util::set_cleanup_on_exit(config.paths()).inspect_err(|error| {
tracing::error!("Failed to set cleanup on exit: {error}");
});
let executor = Executor::local(Arc::new(config))
.await
.map_err(anyhow::Error::msg)?;

Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ impl Config {
self.secrets.transport_keypair()
}

pub(crate) fn paths(&self) -> Arc<ConfigPaths> {
pub fn paths(&self) -> Arc<ConfigPaths> {
self.config_paths.clone()
}
}
Expand Down
246 changes: 144 additions & 102 deletions crates/core/src/contract.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,108 +23,143 @@ pub(crate) use handler::{
pub use executor::{Executor, ExecutorError, OperationMode};

use executor::ContractExecutor;
use tracing::Instrument;

pub(crate) async fn contract_handling<CH>(mut contract_handler: CH) -> Result<(), ContractError>
where
CH: ContractHandler + Send + 'static,
{
use tokio::task::JoinSet;

let mut pending_tasks = JoinSet::new();

loop {
let (id, event) = contract_handler.channel().recv_from_sender().await?;
// Check for completed tasks and send their responses via contract_handler.channel()
while let Some(result) = pending_tasks.try_join_next() {
match result {
Ok((id, event, executor)) => {
// Return the executor back to the pool
contract_handler.executor().return_executor(executor);

// Send the result using the contract_handler's channel
if let Err(error) = contract_handler.channel().send_to_sender(id, event).await {
tracing::debug!(%error, "shutting down contract handler");
}
}
Err(e) => {
tracing::error!("Task error: {:?}", e);
// Create a new executor to replace the one that failed
let new_executor = contract_handler.executor().create_new_executor().await;
contract_handler.executor().return_executor(new_executor);
tracing::info!("Created replacement executor after task failure");
}
}
}

// Wait for next event with a timeout to allow checking pending tasks
let recv_result = tokio::time::timeout(
std::time::Duration::from_millis(100),
contract_handler.channel().recv_from_sender(),
)
.await;

let (id, event) = match recv_result {
Ok(Ok(result)) => result,
Ok(Err(e)) => return Err(e),
Err(_) => continue, // Timeout, continue to check pending tasks
};

tracing::debug!(%event, "Got contract handling event");

match event {
ContractHandlerEvent::GetQuery {
key,
return_contract_code,
} => {
match contract_handler
// Clone needed values for the task
let fetch_contract = contract_handler
.executor()
.fetch_contract(key, return_contract_code)
.instrument(tracing::info_span!("fetch_contract", %key, %return_contract_code))
.await
{
Ok((state, contract)) => {
tracing::debug!(with_contract_code = %return_contract_code, has_contract = %contract.is_some(), "Fetched contract {key}");
contract_handler
.channel()
.send_to_sender(
id,
ContractHandlerEvent::GetResponse {
key,
response: Ok(StoreResponse { state, contract }),
},
)
.await
.map_err(|error| {
tracing::debug!(%error, "shutting down contract handler");
error
})?;
}
Err(err) => {
tracing::warn!("Error while executing get contract query: {err}");
if err.is_fatal() {
todo!("Handle fatal error; reset executor");
.await;
let id_clone = id;

pending_tasks.spawn(async move {
let span = tracing::info_span!("fetch_contract", %key, %return_contract_code);
let _guard = span.enter();

let (executor, result) = fetch_contract.await;

let response_event = match result {
Ok((state, contract)) => {
tracing::debug!(with_contract_code = %return_contract_code,
has_contract = %contract.is_some(),
"Fetched contract {key}");

ContractHandlerEvent::GetResponse {
key,
response: Ok(StoreResponse { state, contract }),
}
}
contract_handler
.channel()
.send_to_sender(
id,
ContractHandlerEvent::GetResponse {
key,
response: Err(err),
},
)
.await
.map_err(|error| {
tracing::debug!(%error, "shutting down contract handler");
error
})?;
}
}
Err(err) => {
tracing::warn!("Error while executing get contract query: {err}");

if err.is_fatal() {
tracing::error!("Fatal error encountered in executor");
}

ContractHandlerEvent::GetResponse {
key,
response: Err(err),
}
}
};

(id_clone, response_event, executor)
});
}
ContractHandlerEvent::PutQuery {
key,
state,
related_contracts,
contract,
} => {
let put_result = contract_handler
// Clone needed values for the task
let put_future = contract_handler
.executor()
.upsert_contract_state(
key,
Either::Left(state.clone()),
related_contracts,
contract,
)
.instrument(tracing::info_span!("upsert_contract_state", %key))
.await;

let event_result = match put_result {
Ok(UpsertResult::NoChange) => ContractHandlerEvent::PutResponse {
new_value: Ok(state),
},
Ok(UpsertResult::Updated(state)) => ContractHandlerEvent::PutResponse {
new_value: Ok(state),
},
Err(err) => {
if err.is_fatal() {
todo!("Handle fatal error; reset executor");
}
ContractHandlerEvent::PutResponse {
new_value: Err(err),
pending_tasks.spawn(async move {
let span = tracing::info_span!("upsert_contract_state", %key);
let _guard = span.enter();

let (executor, result) = put_future.await;

let event_result = match result {
Ok(UpsertResult::NoChange) => ContractHandlerEvent::PutResponse {
new_value: Ok(state),
},
Ok(UpsertResult::Updated(state)) => ContractHandlerEvent::PutResponse {
new_value: Ok(state),
},
Err(err) => {
if err.is_fatal() {
tracing::error!("Fatal error in executor during put");
}
ContractHandlerEvent::PutResponse {
new_value: Err(err),
}
}
}
};
};

contract_handler
.channel()
.send_to_sender(id, event_result)
.await
.map_err(|error| {
tracing::debug!(%error, "shutting down contract handler");
error
})?;
(id, event_result, executor)
});
}

ContractHandlerEvent::UpdateQuery {
key,
data,
Expand All @@ -137,58 +172,65 @@ where
freenet_stdlib::prelude::UpdateData::Delta(delta) => Either::Right(delta),
_ => unreachable!(),
};
let update_result = contract_handler
let update_future = contract_handler
.executor()
.upsert_contract_state(key, update_value, related_contracts, None)
.instrument(tracing::info_span!("upsert_contract_state", %key))
.await;

let event_result = match update_result {
Ok(UpsertResult::NoChange) => ContractHandlerEvent::UpdateNoChange { key },
Ok(UpsertResult::Updated(state)) => ContractHandlerEvent::UpdateResponse {
new_value: Ok(state),
},
Err(err) => {
if err.is_fatal() {
todo!("Handle fatal error; reset executor");
}
ContractHandlerEvent::UpdateResponse {
new_value: Err(err),
pending_tasks.spawn(async move {
let span = tracing::info_span!("upsert_contract_state", %key);
let _guard = span.enter();

let (executor, result) = update_future.await;

let event_result = match result {
Ok(UpsertResult::NoChange) => ContractHandlerEvent::UpdateNoChange { key },
Ok(UpsertResult::Updated(state)) => ContractHandlerEvent::UpdateResponse {
new_value: Ok(state),
},
Err(err) => {
if err.is_fatal() {
tracing::error!("Fatal error in executor during update");
}
ContractHandlerEvent::UpdateResponse {
new_value: Err(err),
}
}
}
};
};

contract_handler
.channel()
.send_to_sender(id, event_result)
.await
.map_err(|error| {
tracing::debug!(%error, "shutting down contract handler");
error
})?;
(id, event_result, executor)
});
}

ContractHandlerEvent::RegisterSubscriberListener {
key,
client_id,
summary,
subscriber_listener,
} => {
let _ = contract_handler
.executor()
.register_contract_notifier(key, client_id, subscriber_listener, summary)
.inspect_err(|err| {
tracing::warn!("Error while registering subscriber listener: {err}");
});
let result = contract_handler.executor().register_contract_notifier(
key,
client_id,
subscriber_listener,
summary,
);

// FIXME: if there is an error senc actually an error back
contract_handler
if let Err(err) = &result {
tracing::warn!("Error while registering subscriber listener: {err}");
}

if let Err(error) = contract_handler
.channel()
.send_to_sender(id, ContractHandlerEvent::RegisterSubscriberListenerResponse)
.await
.inspect_err(|error| {
tracing::debug!(%error, "shutting down contract handler");
})?;
{
tracing::debug!(%error, "shutting down contract handler");
return Err(ContractError::ChannelDropped(Box::new(
ContractHandlerEvent::RegisterSubscriberListenerResponse,
)));
}
}

_ => unreachable!(),
}
}
Expand Down
Loading
Loading