Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

51 changes: 51 additions & 0 deletions crates/core/src/metric_definitions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,59 @@

#![allow(unused)]

use std::fmt;
use std::hash::Hash;
use std::sync::LazyLock;

use dashmap::{DashMap, Entry};
use metrics::{Unit, describe_counter};

// ---------------------------------------------------------------------------
// Metric label interning utilities
//
// These caches convert runtime values into `&'static str` for use as metric
// labels, avoiding per-emission allocations. Values are leaked and never freed.
//
// **Only use for low-cardinality dimensions** (node IDs, partition IDs,
// service names, deployment IDs, HTTP status codes). High-cardinality keys
// (request IDs, trace IDs, user-supplied strings) will leak unbounded memory.
// ---------------------------------------------------------------------------

/// Intern cache that maps values to their `Display` representation as leaked
/// `&'static str`. Cache hit = hash lookup, no formatting. Cache miss = format
/// once, leak, store.
pub struct LazyIntern<K: Hash + Eq> {
cache: LazyLock<DashMap<K, &'static str>>,
}

impl<K: Hash + Eq + fmt::Display + Clone> Default for LazyIntern<K> {
fn default() -> Self {
Self::new()
}
}

impl<K: Hash + Eq + fmt::Display + Clone> LazyIntern<K> {
pub const fn new() -> Self {
Self {
cache: LazyLock::new(DashMap::new),
}
}

#[inline]
pub fn get(&self, key: &K) -> &'static str {
if let Some(entry) = self.cache.get(key) {
return entry.value();
}
match self.cache.entry(key.clone()) {
Entry::Occupied(entry) => entry.get(),
Entry::Vacant(entry) => {
let s: &'static str = entry.key().to_string().leak();
entry.insert(s).value()
}
}
}
}

// value of label `kind` in TC_SPAWN are defined in [`crate::TaskKind`].
pub const TC_SPAWN: &str = "restate.task_center.spawned.total";
pub const TC_FINISHED: &str = "restate.task_center.finished.total";
Expand Down
29 changes: 25 additions & 4 deletions crates/core/src/network/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ pub use throttle::ConnectThrottle;

use std::sync::Arc;

use metrics::counter;
use tokio::sync::mpsc;
use tokio::time::Instant;
use tracing::debug;
Expand All @@ -31,7 +30,7 @@ use crate::Metadata;
use crate::TaskId;
use crate::TaskKind;
use crate::network::PeerMetadataVersion;
use crate::network::metric_definitions::NETWORK_CONNECTION_CREATED;
use crate::network::metric_definitions::{ConnectionMetrics, NetworkMetrics};

use super::ConnectError;
use super::ConnectionClosed;
Expand Down Expand Up @@ -160,6 +159,7 @@ pub struct Connection {
pub(crate) sender: EgressSender,
pub(crate) swimlane: Swimlane,
pub(crate) created: Instant,
pub(crate) metrics: ConnectionMetrics,
}

impl Connection {
Expand All @@ -168,13 +168,15 @@ impl Connection {
protocol_version: ProtocolVersion,
swimlane: Swimlane,
sender: EgressSender,
metrics: ConnectionMetrics,
) -> Self {
Self {
peer,
protocol_version,
sender,
swimlane,
created: Instant::now(),
metrics,
}
}

Expand Down Expand Up @@ -216,6 +218,7 @@ impl Connection {
conn_tracker: impl ConnectionTracking + Send + Sync + 'static,
is_dedicated: bool,
) -> Result<(Self, TaskId), ConnectError> {
let start = Instant::now();
let result = Self::connect_inner(
destination.clone(),
swimlane,
Expand All @@ -227,6 +230,10 @@ impl Connection {
is_dedicated,
)
.await;
let elapsed = start.elapsed();
NetworkMetrics::new(swimlane)
.handshake_duration(result.is_ok())
.record(elapsed);

ConnectThrottle::note_connect_status(&destination, result.is_ok());
match result {
Expand Down Expand Up @@ -345,7 +352,13 @@ impl Connection {
.into());
}

let connection = Connection::new(peer_node_id, protocol_version, swimlane, tx);
let connection = Connection::new(
peer_node_id,
protocol_version,
swimlane,
tx,
NetworkMetrics::new(swimlane).connection(peer_node_id, "outgoing"),
);

// if peer cannot respect our hello intent of direction, we are okay with registering
let is_bidi = matches!(
Expand All @@ -361,7 +374,7 @@ impl Connection {

let task_id = reactor.start(task_kind, conn_tracker, is_dedicated, incoming)?;

counter!(NETWORK_CONNECTION_CREATED, "direction" => "outgoing", "swimlane" => swimlane.as_str_name()).increment(1);
connection.metrics.record_opened();
Ok((connection, task_id))
}

Expand Down Expand Up @@ -397,7 +410,11 @@ impl Connection {
/// returns None.
#[must_use]
pub async fn reserve(&self) -> Option<SendPermit<'_>> {
let start = Instant::now();
let permit = self.sender.reserve().await.ok()?;
self.metrics
.permit_acquisition_duration()
.record(start.elapsed());
Some(SendPermit {
permit,
protocol_version: self.protocol_version,
Expand All @@ -406,7 +423,11 @@ impl Connection {

#[must_use]
pub async fn reserve_owned(&self) -> Option<OwnedSendPermit> {
let start = Instant::now();
let permit = self.sender.clone().reserve_owned().await.ok()?;
self.metrics
.permit_acquisition_duration()
.record(start.elapsed());
Some(OwnedSendPermit {
permit,
protocol_version: self.protocol_version,
Expand Down
57 changes: 44 additions & 13 deletions crates/core/src/network/connection_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ use std::sync::Arc;
use ahash::HashMap;
use futures::future::{BoxFuture, Shared};
use futures::{FutureExt, Stream};
use metrics::counter;
use parking_lot::Mutex;
use tokio::time::Instant;
use tracing::{debug, info, trace};

use restate_types::config::Configuration;
Expand All @@ -35,7 +35,7 @@ use super::{
use crate::network::PeerMetadataVersion;
use crate::network::connection::ConnectThrottle;
use crate::network::handshake::{negotiate_protocol_version, wait_for_hello};
use crate::network::metric_definitions::{NETWORK_CONNECTION_CREATED, NETWORK_CONNECTION_DROPPED};
use crate::network::metric_definitions::NetworkMetrics;
use crate::{Metadata, TaskId, TaskKind, my_node_id};

#[derive(Copy, Clone, PartialOrd, PartialEq, Default)]
Expand Down Expand Up @@ -102,6 +102,7 @@ impl ConnectionManagerInner {
restate_types::net::CURRENT_PROTOCOL_VERSION,
swimlane,
tx,
NetworkMetrics::new(swimlane).connection(my_node_id(), "loopback"),
);

let reactor = ConnectionReactor::new(connection.clone(), shared, None, self.router.clone());
Expand Down Expand Up @@ -155,6 +156,9 @@ impl ConnectionManagerInner {
match self.connection_by_gen_id.entry((peer, swimlane)) {
hash_map::Entry::Occupied(c) if c.get() == connection => {
c.remove();
NetworkMetrics::new(connection.swimlane)
.pool_connections_active()
.decrement(1.0);
}
_ => {}
}
Expand Down Expand Up @@ -366,6 +370,7 @@ impl ConnectionManager {
selected_protocol_version,
hello.swimlane(),
sender,
NetworkMetrics::new(hello.swimlane()).connection(peer_node_id, "incoming"),
);

// Register the connection.
Expand Down Expand Up @@ -405,7 +410,7 @@ impl ConnectionManager {
"Incoming connection accepted from node {}", peer_node_id
);

counter!(NETWORK_CONNECTION_CREATED, "direction" => "incoming", "swimlane" => hello.swimlane().as_str_name()).increment(1);
connection.metrics.record_opened();

// Our output stream, i.e. responses.
Ok(egress)
Expand Down Expand Up @@ -462,6 +467,7 @@ impl ConnectionManager {
where
C: TransportConnect,
{
let start = Instant::now();
let my_node_id_opt = Metadata::with_current(|m| m.my_node_id_opt());
let node_id = node_id.into();
// find latest generation if this is not generational node id
Expand All @@ -481,17 +487,26 @@ impl ConnectionManager {
return Err(DiscoveryError::NodeIsGone(node_id.into()).into());
}

let metrics = NetworkMetrics::new(swimlane);

let router = {
// -- Lock held
let mut guard = self.inner.lock();

// find a connection by node_id
if let Some(connection) = guard.get_connection(node_id, swimlane) {
metrics
.connection_acquisition_duration("hit", true)
.record(start.elapsed());
return Ok(connection);
}

if my_node_id_opt.is_some_and(|my_node| my_node == node_id) {
return guard.create_loopback_connection(swimlane, self.clone());
let result = guard.create_loopback_connection(swimlane, self.clone());
metrics
.connection_acquisition_duration("hit", result.is_ok())
.record(start.elapsed());
return result;
}

// fail if the node is seen as gone before
Expand All @@ -501,13 +516,19 @@ impl ConnectionManager {

// We have no connection. We attempt to create a new connection or latch onto an
// existing attempt.
self.create_shared_connection(
Destination::Node(node_id),
swimlane,
router,
transport_connector,
)
.await
let result = self
.create_shared_connection(
Destination::Node(node_id),
swimlane,
router,
transport_connector,
&metrics,
)
.await;
metrics
.connection_acquisition_duration("miss", result.is_ok())
.record(start.elapsed());
result
}

async fn create_shared_connection<C>(
Expand All @@ -516,6 +537,7 @@ impl ConnectionManager {
swimlane: Swimlane,
router: Arc<MessageRouter>,
transport_connector: &C,
metrics: &NetworkMetrics,
) -> Result<Connection, ConnectError>
where
C: TransportConnect,
Expand Down Expand Up @@ -568,6 +590,7 @@ impl ConnectionManager {

// Put it in the map so other concurrent callers share the same future
in_flight.insert((dest.clone(), swimlane), fut.clone());
metrics.connections_pending().increment(1.0);
fut
}
};
Expand All @@ -577,7 +600,9 @@ impl ConnectionManager {

// 5) Remove the completed future so subsequent calls can attempt a fresh connect
let mut in_flight = self.in_flight_connects.lock();
in_flight.remove(&(dest, swimlane));
if in_flight.remove(&(dest, swimlane)).is_some() {
metrics.connections_pending().decrement(1.0);
}

Ok(maybe_connection?.0)
}
Expand All @@ -594,6 +619,9 @@ impl ConnectionTracking for ConnectionManager {
fn connection_created(&self, conn: &Connection, is_dedicated: bool) {
if !is_dedicated {
self.inner.lock().register(conn.clone());
NetworkMetrics::new(conn.swimlane)
.pool_connections_active()
.increment(1.0);
}
trace!(
swimlane = %conn.swimlane,
Expand All @@ -606,8 +634,11 @@ impl ConnectionTracking for ConnectionManager {
"Connection terminated, connection lived for {:?}",
conn.created.elapsed()
);
conn.metrics
.connection_lifetime()
.record(conn.created.elapsed());
self.inner.lock().deregister(conn);
counter!(NETWORK_CONNECTION_DROPPED).increment(1);
conn.metrics.record_closed();
}

fn notify_peer_shutdown(&self, node_id: GenerationalNodeId) {
Expand Down
Loading
Loading