Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion linera-client/src/benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ impl<Env: Environment> Benchmark<Env> {
let barrier = Arc::new(Barrier::new(num_chains + 1));

let chain_listener_future = chain_listener
.run(true) // Enabling background sync for benchmarks
.run()
.await
.map_err(|_| BenchmarkError::ChainListenerStartupError)?;
let chain_listener_handle = tokio::spawn(chain_listener_future.in_current_span());
Expand Down
60 changes: 35 additions & 25 deletions linera-client/src/chain_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,8 @@ pub struct ChainListener<C: ClientContext> {
cancellation_token: CancellationToken,
/// The channel through which the listener can receive commands.
command_receiver: UnboundedReceiver<ListenerCommand>,
/// Whether to fully sync chains in the background.
enable_background_sync: bool,
}

impl<C: ClientContext + 'static> ChainListener<C> {
Expand All @@ -223,6 +225,7 @@ impl<C: ClientContext + 'static> ChainListener<C> {
storage: <C::Environment as Environment>::Storage,
cancellation_token: CancellationToken,
command_receiver: UnboundedReceiver<ListenerCommand>,
enable_background_sync: bool,
) -> Self {
Self {
storage,
Expand All @@ -232,15 +235,13 @@ impl<C: ClientContext + 'static> ChainListener<C> {
event_subscribers: Default::default(),
cancellation_token,
command_receiver,
enable_background_sync,
}
}

/// Runs the chain listener.
#[instrument(skip(self))]
pub async fn run(
mut self,
enable_background_sync: bool,
) -> Result<impl Future<Output = Result<(), Error>>, Error> {
pub async fn run(mut self) -> Result<impl Future<Output = Result<(), Error>>, Error> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was this change necessary?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, because background sync for a chain can now be started at a point different than when the whole Chain Listener is started - so we need to remember the setting in the struct itself, to know whether we want to start background sync for chains or not.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sounds wrong – i.e. we decide to spawn the background sync at a different point in time than when we decide whether we (will) want to run it. Shouldn't those two be done at the same time?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But background sync is per chain, and we may want to start listening to a chain at any point while the listener (as in, the component) is running. So we initialize the listener at startup, but then we might want to start to listen to some chain much later, and if background sync is enabled, we will start the background sync for that chain then.

let chain_ids = {
let guard = self.context.lock().await;
let admin_chain_id = guard.admin_chain();
Expand Down Expand Up @@ -280,27 +281,7 @@ impl<C: ClientContext + 'static> ChainListener<C> {

// Start background tasks to sync received certificates for each chain,
// if enabled.
if enable_background_sync {
let context = Arc::clone(&self.context);
let cancellation_token = self.cancellation_token.clone();
for chain_id in chain_ids.keys() {
let context = Arc::clone(&context);
let cancellation_token = cancellation_token.clone();
let chain_id = *chain_id;
linera_base::task::spawn(async move {
if let Err(e) = Self::background_sync_received_certificates(
context,
chain_id,
cancellation_token,
)
.await
{
warn!("Background sync failed for chain {chain_id}: {e}");
}
})
.forget();
}
}
self.start_background_sync(&chain_ids).await;

Ok(async {
self.listen_recursively(chain_ids).await?;
Expand All @@ -318,6 +299,34 @@ impl<C: ClientContext + 'static> ChainListener<C> {
})
}

async fn start_background_sync(&mut self, chain_ids: &BTreeMap<ChainId, ListeningMode>) {
if !self.enable_background_sync {
return;
}
let context = Arc::clone(&self.context);
let cancellation_token = self.cancellation_token.clone();
for (chain_id, mode) in chain_ids {
if mode != &ListeningMode::FullChain {
continue;
}
let context = Arc::clone(&context);
let cancellation_token = cancellation_token.clone();
let chain_id = *chain_id;
linera_base::task::spawn(async move {
if let Err(e) = Self::background_sync_received_certificates(
context,
chain_id,
cancellation_token,
)
.await
{
warn!("Background sync failed for chain {chain_id}: {e}");
}
})
.forget();
}
}

/// Processes a notification, updating local chains and validators as needed.
async fn process_notification(&mut self, notification: Notification) -> Result<(), Error> {
Self::sleep(self.config.delay_before_ms).await;
Expand Down Expand Up @@ -568,6 +577,7 @@ impl<C: ClientContext + 'static> ChainListener<C> {
match command {
ListenerCommand::Listen(new_chains) => {
debug!(?new_chains, "received command to listen to new chains");
self.start_background_sync(&new_chains).await;
self.listen_recursively(new_chains).await?;
}
ListenerCommand::StopListening(chains) => {
Expand Down
9 changes: 6 additions & 3 deletions linera-client/src/unit_tests/chain_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,9 @@ async fn test_chain_listener() -> anyhow::Result<()> {
storage,
child_token,
tokio::sync::mpsc::unbounded_channel().1,
false, // Unit test doesn't need background sync
)
.run(false) // Unit test doesn't need background sync
.run()
.await
.unwrap();

Expand Down Expand Up @@ -273,8 +274,9 @@ async fn test_chain_listener_follow_only() -> anyhow::Result<()> {
storage.clone(),
child_token,
tokio::sync::mpsc::unbounded_channel().1,
false, // Unit test doesn't need background sync
)
.run(false) // Unit test doesn't need background sync
.run()
.await
.unwrap();

Expand Down Expand Up @@ -400,8 +402,9 @@ async fn test_chain_listener_admin_chain() -> anyhow::Result<()> {
storage.clone(),
child_token,
tokio::sync::mpsc::unbounded_channel().1,
false, // Unit test doesn't need background sync
)
.run(false) // Unit test doesn't need background sync
.run()
.await
.unwrap();

Expand Down
3 changes: 2 additions & 1 deletion linera-faucet/server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -963,8 +963,9 @@ where
self.storage,
cancellation_token.clone(),
tokio::sync::mpsc::unbounded_channel().1,
false, // Faucet doesn't receive messages, so no need for background sync
)
.run(false) // Faucet doesn't receive messages, so no need for background sync
.run()
.await?;
let batch_processor_task = batch_processor.run(cancellation_token.clone());
let tcp_listener =
Expand Down
1 change: 1 addition & 0 deletions linera-service/src/cli/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -810,6 +810,7 @@ impl Runnable for Job {
storage.clone(),
shutdown_notifier.clone(),
mpsc::unbounded_channel().1,
true, // Enabling background sync for benchmarks
);
linera_client::benchmark::Benchmark::run_benchmark(
bps,
Expand Down
3 changes: 2 additions & 1 deletion linera-service/src/node_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -983,8 +983,9 @@ where
storage,
cancellation_token.clone(),
command_receiver,
true,
)
.run(true)
.run()
.await?;
let mut chain_listener = Box::pin(chain_listener).fuse();
let tcp_listener =
Expand Down
Loading