Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 28 additions & 2 deletions linera-core/src/chain_worker/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,10 @@ pub(crate) type ChainWorkerRequestReceiver<Ctx> =
mod metrics {
use std::sync::LazyLock;

use linera_base::prometheus_util::{exponential_bucket_interval, register_histogram};
use prometheus::Histogram;
use linera_base::prometheus_util::{
exponential_bucket_interval, register_histogram, register_int_gauge,
};
use prometheus::{Histogram, IntGauge};

pub static CHAIN_WORKER_REQUEST_QUEUE_WAIT_TIME: LazyLock<Histogram> = LazyLock::new(|| {
register_histogram(
Expand All @@ -67,6 +69,22 @@ mod metrics {
exponential_bucket_interval(0.1_f64, 10_000.0),
)
});

/// Number of active chain worker actor tasks (outer loop of handle_requests).
pub static CHAIN_WORKER_ACTORS_ACTIVE: LazyLock<IntGauge> = LazyLock::new(|| {
register_int_gauge(
"chain_worker_actors_active",
"Number of active chain worker actor tasks",
)
});

/// Number of chain workers with chain state loaded in memory (inner loop of handle_requests).
pub static CHAIN_WORKER_STATES_LOADED: LazyLock<IntGauge> = LazyLock::new(|| {
register_int_gauge(
"chain_worker_states_loaded",
"Number of chain workers with chain state loaded in memory",
)
});
}

/// A request for the [`ChainWorkerActor`].
Expand Down Expand Up @@ -327,6 +345,8 @@ where
request_receiver: ChainWorkerRequestReceiver<StorageClient::Context>,
is_tracked: bool,
) {
#[cfg(with_metrics)]
metrics::CHAIN_WORKER_ACTORS_ACTIVE.inc();
let actor = ChainWorkerActor {
config,
storage,
Expand All @@ -343,6 +363,8 @@ where
{
tracing::error!("Chain actor error: {err}");
}
#[cfg(with_metrics)]
metrics::CHAIN_WORKER_ACTORS_ACTIVE.dec();
}

/// Returns the TTL timeout timestamp.
Expand Down Expand Up @@ -458,6 +480,8 @@ where
)
.instrument(span.clone())
.await?;
#[cfg(with_metrics)]
metrics::CHAIN_WORKER_STATES_LOADED.inc();

Box::pin(worker.handle_request(request))
.instrument(span)
Expand Down Expand Up @@ -488,6 +512,8 @@ where
trace!("Unloading chain state of {} ...", self.chain_id);
worker.clear_shared_chain_view().await;
drop(worker);
#[cfg(with_metrics)]
metrics::CHAIN_WORKER_STATES_LOADED.dec();
if let Some(task) = service_runtime_task {
task.await?;
}
Expand Down
27 changes: 25 additions & 2 deletions linera-core/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,9 @@ mod metrics {

use linera_base::prometheus_util::{
exponential_bucket_interval, register_histogram_vec, register_int_counter,
register_int_counter_vec,
register_int_counter_vec, register_int_gauge,
};
use prometheus::{HistogramVec, IntCounter, IntCounterVec};
use prometheus::{HistogramVec, IntCounter, IntCounterVec, IntGauge};

pub static NUM_ROUNDS_IN_CERTIFICATE: LazyLock<HistogramVec> = LazyLock::new(|| {
register_histogram_vec(
Expand All @@ -92,6 +92,9 @@ mod metrics {
pub static INCOMING_BUNDLE_COUNT: LazyLock<IntCounter> =
LazyLock::new(|| register_int_counter("incoming_bundle_count", "Incoming bundle count"));

pub static INCOMING_MESSAGE_COUNT: LazyLock<IntCounter> =
LazyLock::new(|| register_int_counter("incoming_message_count", "Incoming message count"));

pub static OPERATION_COUNT: LazyLock<IntCounter> =
LazyLock::new(|| register_int_counter("operation_count", "Operation count"));

Expand All @@ -113,6 +116,14 @@ mod metrics {
"Number of chain info queries processed",
)
});

/// Number of cached chain worker channel endpoints in the BTreeMap.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// Number of cached chain worker channel endpoints in the BTreeMap.
/// Number of cached chain worker channel endpoints in the `BTreeMap`.

(or: "in the map"?)

pub static CHAIN_WORKER_ENDPOINTS_CACHED: LazyLock<IntGauge> = LazyLock::new(|| {
register_int_gauge(
"chain_worker_endpoints_cached",
"Number of cached chain worker channel endpoints",
)
});
}

/// Instruct the networking layer to send cross-chain requests and/or push notifications.
Expand Down Expand Up @@ -934,6 +945,8 @@ where

// Put back the sender in the cache for next time.
chain_workers.insert(chain_id, sender);
#[cfg(with_metrics)]
metrics::CHAIN_WORKER_ENDPOINTS_CACHED.set(chain_workers.len() as i64);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this decrement properly as chain workers are removed from the cache? i.e. is this the only place where workers are being evicted? Or can they be also dropped elsewhere (and we'd get outdated values in metrics).


Ok(new_channel)
}
Expand Down Expand Up @@ -1011,6 +1024,12 @@ where
certificate.round.number(),
certificate.block().body.transactions.len() as u64,
certificate.block().body.incoming_bundles().count() as u64,
certificate
.block()
.body
.incoming_bundles()
.map(|b| b.messages().count())
.sum::<usize>() as u64,
certificate.block().body.operations().count() as u64,
certificate
.signatures()
Expand All @@ -1032,6 +1051,7 @@ where
round_number,
confirmed_transactions,
confirmed_incoming_bundles,
confirmed_incoming_messages,
confirmed_operations,
validators_with_signatures,
) = metrics_data;
Expand All @@ -1046,6 +1066,9 @@ where
if confirmed_incoming_bundles > 0 {
metrics::INCOMING_BUNDLE_COUNT.inc_by(confirmed_incoming_bundles);
}
if confirmed_incoming_messages > 0 {
metrics::INCOMING_MESSAGE_COUNT.inc_by(confirmed_incoming_messages);
}
if confirmed_operations > 0 {
metrics::OPERATION_COUNT.inc_by(confirmed_operations);
}
Expand Down
25 changes: 24 additions & 1 deletion linera-rpc/src/grpc/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,22 @@ mod metrics {
&[],
)
});

pub static NOTIFICATIONS_SKIPPED_RECEIVER_LAG: LazyLock<IntCounterVec> = LazyLock::new(|| {
register_int_counter_vec(
"notifications_skipped_receiver_lag",
"Number of notifications skipped because receiver lagged behind sender",
&[],
)
});

pub static NOTIFICATIONS_DROPPED_NO_RECEIVER: LazyLock<IntCounterVec> = LazyLock::new(|| {
register_int_counter_vec(
"notifications_dropped_no_receiver",
"Number of notifications dropped because no receiver was available",
&[],
)
});
}

#[derive(Clone)]
Expand Down Expand Up @@ -320,6 +336,10 @@ where
nickname,
skipped_count, "notification receiver lagged, messages were skipped"
);
#[cfg(with_metrics)]
metrics::NOTIFICATIONS_SKIPPED_RECEIVER_LAG
.with_label_values(&[])
.inc_by(skipped_count);
continue;
}
Err(RecvError::Closed) => {
Expand Down Expand Up @@ -395,7 +415,10 @@ where
trace!("Scheduling notification query");
if let Err(error) = notification_sender.send(notification) {
error!(%error, "dropping notification");
break;
#[cfg(with_metrics)]
metrics::NOTIFICATIONS_DROPPED_NO_RECEIVER
.with_label_values(&[])
.inc();
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion linera-storage/src/db_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ pub mod metrics {
"load_chain_latency",
"The latency to load a chain state",
&[],
exponential_bucket_latencies(10.0),
exponential_bucket_latencies(1000.0),
)
});

Expand Down
31 changes: 26 additions & 5 deletions linera-views-derive/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ fn generate_root_view_code(input: ItemStruct) -> TokenStream2 {
} = Constraints::get(&input);
let struct_name = &input.ident;

let increment_counter = if cfg!(feature = "metrics") {
let metrics_code = if cfg!(feature = "metrics") {
quote! {
#[cfg(not(target_arch = "wasm32"))]
linera_views::metrics::increment_counter(
Expand All @@ -259,6 +259,29 @@ fn generate_root_view_code(input: ItemStruct) -> TokenStream2 {
quote! {}
};

let write_batch_with_metrics = if cfg!(feature = "metrics") {
quote! {
if !batch.is_empty() {
#[cfg(not(target_arch = "wasm32"))]
let start = std::time::Instant::now();
self.context().store().write_batch(batch).await?;
#[cfg(not(target_arch = "wasm32"))]
{
let latency_ms = start.elapsed().as_secs_f64() * 1000.0;
linera_views::metrics::SAVE_VIEW_LATENCY
.with_label_values(&[stringify!(#struct_name)])
.observe(latency_ms);
}
}
}
} else {
quote! {
if !batch.is_empty() {
self.context().store().write_batch(batch).await?;
}
}
};

quote! {
impl #impl_generics linera_views::views::RootView for #struct_name #type_generics
where
Expand All @@ -267,12 +290,10 @@ fn generate_root_view_code(input: ItemStruct) -> TokenStream2 {
{
async fn save(&mut self) -> Result<(), linera_views::ViewError> {
use linera_views::{context::Context as _, batch::Batch, store::WritableKeyValueStore as _, views::View as _};
#increment_counter
#metrics_code
let mut batch = Batch::new();
self.pre_save(&mut batch)?;
if !batch.is_empty() {
self.context().store().write_batch(batch).await?;
}
#write_batch_with_metrics
self.post_save();
Ok(())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,16 @@ where
let mut batch = Batch::new();
self.pre_save(&mut batch)?;
if !batch.is_empty() {
#[cfg(not(target_arch = "wasm32"))]
let start = std::time::Instant::now();
self.context().store().write_batch(batch).await?;
#[cfg(not(target_arch = "wasm32"))]
{
let latency_ms = start.elapsed().as_secs_f64() * 1000.0;
linera_views::metrics::SAVE_VIEW_LATENCY
.with_label_values(&[stringify!(TestView)])
.observe(latency_ms);
}
}
self.post_save();
Ok(())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,16 @@ where
let mut batch = Batch::new();
self.pre_save(&mut batch)?;
if !batch.is_empty() {
#[cfg(not(target_arch = "wasm32"))]
let start = std::time::Instant::now();
self.context().store().write_batch(batch).await?;
#[cfg(not(target_arch = "wasm32"))]
{
let latency_ms = start.elapsed().as_secs_f64() * 1000.0;
linera_views::metrics::SAVE_VIEW_LATENCY
.with_label_values(&[stringify!(TestView)])
.observe(latency_ms);
}
}
self.post_save();
Ok(())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,16 @@ where
let mut batch = Batch::new();
self.pre_save(&mut batch)?;
if !batch.is_empty() {
#[cfg(not(target_arch = "wasm32"))]
let start = std::time::Instant::now();
self.context().store().write_batch(batch).await?;
#[cfg(not(target_arch = "wasm32"))]
{
let latency_ms = start.elapsed().as_secs_f64() * 1000.0;
linera_views::metrics::SAVE_VIEW_LATENCY
.with_label_values(&[stringify!(TestView)])
.observe(latency_ms);
}
}
self.post_save();
Ok(())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,16 @@ where
let mut batch = Batch::new();
self.pre_save(&mut batch)?;
if !batch.is_empty() {
#[cfg(not(target_arch = "wasm32"))]
let start = std::time::Instant::now();
self.context().store().write_batch(batch).await?;
#[cfg(not(target_arch = "wasm32"))]
{
let latency_ms = start.elapsed().as_secs_f64() * 1000.0;
linera_views::metrics::SAVE_VIEW_LATENCY
.with_label_values(&[stringify!(TestView)])
.observe(latency_ms);
}
}
self.post_save();
Ok(())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,16 @@ where
let mut batch = Batch::new();
self.pre_save(&mut batch)?;
if !batch.is_empty() {
#[cfg(not(target_arch = "wasm32"))]
let start = std::time::Instant::now();
self.context().store().write_batch(batch).await?;
#[cfg(not(target_arch = "wasm32"))]
{
let latency_ms = start.elapsed().as_secs_f64() * 1000.0;
linera_views::metrics::SAVE_VIEW_LATENCY
.with_label_values(&[stringify!(TestView)])
.observe(latency_ms);
}
}
self.post_save();
Ok(())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,16 @@ where
let mut batch = Batch::new();
self.pre_save(&mut batch)?;
if !batch.is_empty() {
#[cfg(not(target_arch = "wasm32"))]
let start = std::time::Instant::now();
self.context().store().write_batch(batch).await?;
#[cfg(not(target_arch = "wasm32"))]
{
let latency_ms = start.elapsed().as_secs_f64() * 1000.0;
linera_views::metrics::SAVE_VIEW_LATENCY
.with_label_values(&[stringify!(TestView)])
.observe(latency_ms);
}
}
self.post_save();
Ok(())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,16 @@ where
let mut batch = Batch::new();
self.pre_save(&mut batch)?;
if !batch.is_empty() {
#[cfg(not(target_arch = "wasm32"))]
let start = std::time::Instant::now();
self.context().store().write_batch(batch).await?;
#[cfg(not(target_arch = "wasm32"))]
{
let latency_ms = start.elapsed().as_secs_f64() * 1000.0;
linera_views::metrics::SAVE_VIEW_LATENCY
.with_label_values(&[stringify!(TestView)])
.observe(latency_ms);
}
}
self.post_save();
Ok(())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,16 @@ where
let mut batch = Batch::new();
self.pre_save(&mut batch)?;
if !batch.is_empty() {
#[cfg(not(target_arch = "wasm32"))]
let start = std::time::Instant::now();
self.context().store().write_batch(batch).await?;
#[cfg(not(target_arch = "wasm32"))]
{
let latency_ms = start.elapsed().as_secs_f64() * 1000.0;
linera_views::metrics::SAVE_VIEW_LATENCY
.with_label_values(&[stringify!(TestView)])
.observe(latency_ms);
}
}
self.post_save();
Ok(())
Expand Down
Loading
Loading