Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion linera-client/src/benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ impl<Env: Environment> Benchmark<Env> {
let barrier = Arc::new(Barrier::new(num_chains + 1));

let chain_listener_future = chain_listener
.run(true) // Enabling background sync for benchmarks
.run()
.await
.map_err(|_| BenchmarkError::ChainListenerStartupError)?;
let chain_listener_handle = tokio::spawn(chain_listener_future.in_current_span());
Expand Down
77 changes: 46 additions & 31 deletions linera-client/src/chain_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,8 @@ struct ListeningClient<C: ClientContext> {
timeout: Timestamp,
/// The mode of listening to this chain.
listening_mode: ListeningMode,
/// The cancellation token for the background sync process, if started.
maybe_sync_cancellation_token: Option<CancellationToken>,
}

impl<C: ClientContext> ListeningClient<C> {
Expand All @@ -173,6 +175,7 @@ impl<C: ClientContext> ListeningClient<C> {
join_handle: NonBlockingFuture<()>,
notification_stream: NotificationStream,
listening_mode: ListeningMode,
maybe_sync_cancellation_token: Option<CancellationToken>,
) -> Self {
Self {
client,
Expand All @@ -182,12 +185,16 @@ impl<C: ClientContext> ListeningClient<C> {
notification_stream: Arc::new(Mutex::new(notification_stream)),
timeout: Timestamp::from(u64::MAX),
listening_mode,
maybe_sync_cancellation_token,
}
}

async fn stop(self) {
// TODO(#4965): this is unnecessary: the join handle now also acts as an abort handle
drop(self.abort_handle);
if let Some(cancellation_token) = self.maybe_sync_cancellation_token {
cancellation_token.cancel();
}
self.join_handle.await;
}
}
Expand All @@ -213,6 +220,8 @@ pub struct ChainListener<C: ClientContext> {
cancellation_token: CancellationToken,
/// The channel through which the listener can receive commands.
command_receiver: UnboundedReceiver<ListenerCommand>,
/// Whether to fully sync chains in the background.
enable_background_sync: bool,
}

impl<C: ClientContext + 'static> ChainListener<C> {
Expand All @@ -223,6 +232,7 @@ impl<C: ClientContext + 'static> ChainListener<C> {
storage: <C::Environment as Environment>::Storage,
cancellation_token: CancellationToken,
command_receiver: UnboundedReceiver<ListenerCommand>,
enable_background_sync: bool,
) -> Self {
Self {
storage,
Expand All @@ -232,15 +242,13 @@ impl<C: ClientContext + 'static> ChainListener<C> {
event_subscribers: Default::default(),
cancellation_token,
command_receiver,
enable_background_sync,
}
}

/// Runs the chain listener.
#[instrument(skip(self))]
pub async fn run(
mut self,
enable_background_sync: bool,
) -> Result<impl Future<Output = Result<(), Error>>, Error> {
pub async fn run(mut self) -> Result<impl Future<Output = Result<(), Error>>, Error> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was this change necessary?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, because background sync for a chain can now be started at a point different than when the whole Chain Listener is started - so we need to remember the setting in the struct itself, to know whether we want to start background sync for chains or not.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sounds wrong – i.e. we decide to spawn the background sync at a different point in time than when we decide whether we (will) want to run it. Shouldn't those two be done at the same time?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But background sync is per chain, and we may want to start listening to a chain at any point while the listener (as in, the component) is running. So we initialize the listener at startup, but then we might want to start to listen to some chain much later, and if background sync is enabled, we will start the background sync for that chain then.

let chain_ids = {
let guard = self.context.lock().await;
let admin_chain_id = guard.admin_chain();
Expand Down Expand Up @@ -278,33 +286,6 @@ impl<C: ClientContext + 'static> ChainListener<C> {
chain_ids
};

// Start background tasks to sync received certificates for each chain,
// if enabled.
if enable_background_sync {
let context = Arc::clone(&self.context);
let cancellation_token = self.cancellation_token.clone();
for (chain_id, mode) in chain_ids.iter() {
if mode != &ListeningMode::FullChain {
continue;
}
let context = Arc::clone(&context);
let cancellation_token = cancellation_token.clone();
let chain_id = *chain_id;
linera_base::task::spawn(async move {
if let Err(e) = Self::background_sync_received_certificates(
context,
chain_id,
cancellation_token,
)
.await
{
warn!("Background sync failed for chain {chain_id}: {e}");
}
})
.forget();
}
}

Ok(async {
self.listen_recursively(chain_ids).await?;
loop {
Expand Down Expand Up @@ -495,6 +476,10 @@ impl<C: ClientContext + 'static> ChainListener<C> {
.get(&chain_id)
.map(|existing_client| existing_client.listening_mode.clone()),
);
// Start background tasks to sync received certificates, if enabled.
let maybe_sync_cancellation_token = self
.start_background_sync(chain_id, listening_mode.clone())
.await;
let client = self
.context
.lock()
Expand All @@ -510,13 +495,43 @@ impl<C: ClientContext + 'static> ChainListener<C> {
join_handle,
notification_stream,
listening_mode,
maybe_sync_cancellation_token,
);
self.listening.insert(chain_id, listening_client);
let publishing_chains = self.update_event_subscriptions(chain_id).await?;
self.maybe_process_inbox(chain_id).await?;
Ok(publishing_chains)
}

async fn start_background_sync(
&mut self,
chain_id: ChainId,
mode: ListeningMode,
) -> Option<CancellationToken> {
if !self.enable_background_sync {
return None;
}
if mode != ListeningMode::FullChain {
return None;
}
Comment on lines +511 to +516
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could add some logging for the less obvious cases:

  1. enable_background_sync == true but mode != ListeningMode::FullChain
  2. enable_background_sync == false but mode == ListeningMode::FullChain

🤔

Actually, how could we end up with FullChain listening mode but disabled background sync?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we enable background sync in all production scenarios, anyway. This setting only exists to make it possible to disable starting background sync tasks in unit tests, AFAIK.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add some logging regardless?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if it's that useful - because both cases will be pretty common. The first one will pop up in production when listening to sparse event chains, or with followed chains. The second one will be everywhere in unit tests, where we disable background sync for convenience. Neither case is unusual in any way (but I can of course add some debug or maybe even trace logs).

let context = Arc::clone(&self.context);
let cancellation_token = CancellationToken::new();
let cancellation_token_clone = cancellation_token.clone();
linera_base::task::spawn(async move {
if let Err(e) = Self::background_sync_received_certificates(
context,
chain_id,
cancellation_token_clone,
)
.await
{
warn!("Background sync failed for chain {chain_id}: {e}");
}
})
.forget();
Some(cancellation_token)
}

/// Updates the event subscribers map, and returns all publishing chains we need to listen to.
async fn update_event_subscriptions(
&mut self,
Expand Down
9 changes: 6 additions & 3 deletions linera-client/src/unit_tests/chain_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,9 @@ async fn test_chain_listener() -> anyhow::Result<()> {
storage,
child_token,
tokio::sync::mpsc::unbounded_channel().1,
false, // Unit test doesn't need background sync
)
.run(false) // Unit test doesn't need background sync
.run()
.await
.unwrap();

Expand Down Expand Up @@ -273,8 +274,9 @@ async fn test_chain_listener_follow_only() -> anyhow::Result<()> {
storage.clone(),
child_token,
tokio::sync::mpsc::unbounded_channel().1,
false, // Unit test doesn't need background sync
)
.run(false) // Unit test doesn't need background sync
.run()
.await
.unwrap();

Expand Down Expand Up @@ -400,8 +402,9 @@ async fn test_chain_listener_admin_chain() -> anyhow::Result<()> {
storage.clone(),
child_token,
tokio::sync::mpsc::unbounded_channel().1,
false, // Unit test doesn't need background sync
)
.run(false) // Unit test doesn't need background sync
.run()
.await
.unwrap();

Expand Down
3 changes: 2 additions & 1 deletion linera-faucet/server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -963,8 +963,9 @@ where
self.storage,
cancellation_token.clone(),
tokio::sync::mpsc::unbounded_channel().1,
false, // Faucet doesn't receive messages, so no need for background sync
)
.run(false) // Faucet doesn't receive messages, so no need for background sync
.run()
.await?;
let batch_processor_task = batch_processor.run(cancellation_token.clone());
let tcp_listener =
Expand Down
1 change: 1 addition & 0 deletions linera-service/src/cli/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -810,6 +810,7 @@ impl Runnable for Job {
storage.clone(),
shutdown_notifier.clone(),
mpsc::unbounded_channel().1,
true, // Enabling background sync for benchmarks
);
linera_client::benchmark::Benchmark::run_benchmark(
bps,
Expand Down
3 changes: 2 additions & 1 deletion linera-service/src/node_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -983,8 +983,9 @@ where
storage,
cancellation_token.clone(),
command_receiver,
true,
)
.run(true)
.run()
.await?;
let mut chain_listener = Box::pin(chain_listener).fuse();
let tcp_listener =
Expand Down
3 changes: 2 additions & 1 deletion web/@linera/client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,9 @@ impl Client {
storage,
tokio_util::sync::CancellationToken::new(),
tokio::sync::mpsc::unbounded_channel().1,
true, // Enable background sync
)
.run(true) // Enable background sync
.run()
.boxed_local()
.await?
.boxed_local();
Expand Down
Loading