Skip to content

Commit 60dfd75

Browse files
committed
Add performance counters gated by metrics feature
1 parent 02e47d9 commit 60dfd75

16 files changed

+284
-14
lines changed

Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ pem = "3.0"
2929
percent-encoding = "2.1.0"
3030
pin-project = "1.0.2"
3131
rand = "0.8.5"
32-
serde = "1"
32+
serde = { version = "1", features = ["derive", "rc"] }
3333
serde_json = "1"
3434
socket2 = "0.5.2"
3535
thiserror = "1.0.4"
@@ -95,6 +95,7 @@ default-rustls = [
9595
"derive",
9696
"rustls-tls",
9797
]
98+
metrics = []
9899
minimal = ["flate2/zlib"]
99100
native-tls-tls = ["native-tls", "tokio-native-tls"]
100101
rustls-tls = [

src/buffer_pool.rs

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
// option. All files in the project carrying such notice may not be copied,
77
// modified, or distributed except according to those terms.
88

9+
use crate::metrics::BufferPoolMetrics;
910
use crossbeam::queue::ArrayQueue;
1011
use std::{mem::replace, ops::Deref, sync::Arc};
1112

@@ -14,6 +15,7 @@ pub struct BufferPool {
1415
buffer_size_cap: usize,
1516
buffer_init_cap: usize,
1617
pool: ArrayQueue<Vec<u8>>,
18+
metrics: BufferPoolMetrics,
1719
}
1820

1921
impl BufferPool {
@@ -37,14 +39,21 @@ impl BufferPool {
3739
pool: ArrayQueue::new(pool_cap),
3840
buffer_size_cap,
3941
buffer_init_cap,
42+
metrics: Default::default(),
4043
}
4144
}
4245

4346
pub fn get(self: &Arc<Self>) -> PooledBuf {
44-
let buf = self
45-
.pool
46-
.pop()
47-
.unwrap_or_else(|| Vec::with_capacity(self.buffer_init_cap));
47+
let buf = match self.pool.pop() {
48+
Some(buf) => {
49+
self.metrics.reuses.incr();
50+
buf
51+
}
52+
None => {
53+
self.metrics.creations.incr();
54+
Vec::with_capacity(self.buffer_init_cap)
55+
}
56+
};
4857
debug_assert_eq!(buf.len(), 0);
4958
PooledBuf(buf, self.clone())
5059
}
@@ -64,7 +73,15 @@ impl BufferPool {
6473
buf.shrink_to(self.buffer_size_cap);
6574

6675
// ArrayQueue will make sure to drop the buffer if capacity is exceeded
67-
let _ = self.pool.push(buf);
76+
match self.pool.push(buf) {
77+
Ok(()) => self.metrics.returns.incr(),
78+
Err(_buf) => self.metrics.discards.incr(),
79+
};
80+
}
81+
82+
#[cfg(feature = "metrics")]
83+
pub(crate) fn snapshot_metrics(&self) -> BufferPoolMetrics {
84+
self.metrics.clone()
6885
}
6986
}
7087

src/conn/mod.rs

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ use crate::{
3939
consts::{CapabilityFlags, Command, StatusFlags},
4040
error::*,
4141
io::Stream,
42+
metrics::ConnMetrics,
4243
opts::Opts,
4344
queryable::{
4445
query_result::{QueryResult, ResultSetMeta},
@@ -59,6 +60,8 @@ const DEFAULT_WAIT_TIMEOUT: usize = 28800;
5960

6061
/// Helper that asynchronously disconnects the givent connection on the default tokio executor.
6162
fn disconnect(mut conn: Conn) {
63+
conn.metrics().disconnects.incr();
64+
6265
let disconnected = conn.inner.disconnected;
6366

6467
// Mark conn as disconnected.
@@ -120,6 +123,7 @@ struct ConnInner {
120123
/// One-time connection-level infile handler.
121124
infile_handler:
122125
Option<Pin<Box<dyn Future<Output = crate::Result<InfileData>> + Send + Sync + 'static>>>,
126+
conn_metrics: Arc<ConnMetrics>,
123127
}
124128

125129
impl fmt::Debug for ConnInner {
@@ -142,6 +146,7 @@ impl ConnInner {
142146
/// Constructs an empty connection.
143147
fn empty(opts: Opts) -> ConnInner {
144148
let ttl_deadline = opts.pool_opts().new_connection_ttl_deadline();
149+
let conn_metrics: Arc<ConnMetrics> = Default::default();
145150
ConnInner {
146151
capabilities: opts.get_capabilities(),
147152
status: StatusFlags::empty(),
@@ -156,7 +161,7 @@ impl ConnInner {
156161
tx_status: TxStatus::None,
157162
last_io: Instant::now(),
158163
wait_timeout: Duration::from_secs(0),
159-
stmt_cache: StmtCache::new(opts.stmt_cache_size()),
164+
stmt_cache: StmtCache::new(opts.stmt_cache_size(), conn_metrics.clone()),
160165
socket: opts.socket().map(Into::into),
161166
opts,
162167
ttl_deadline,
@@ -167,6 +172,7 @@ impl ConnInner {
167172
server_key: None,
168173
infile_handler: None,
169174
reset_upon_returning_to_a_pool: false,
175+
conn_metrics,
170176
}
171177
}
172178

@@ -178,6 +184,18 @@ impl ConnInner {
178184
.as_mut()
179185
.ok_or_else(|| DriverError::ConnectionClosed.into())
180186
}
187+
188+
fn set_pool(&mut self, pool: Option<Pool>) {
189+
let conn_metrics = if let Some(ref pool) = pool {
190+
Arc::clone(&pool.inner.metrics.conn)
191+
} else {
192+
Default::default()
193+
};
194+
self.conn_metrics = Arc::clone(&conn_metrics);
195+
self.stmt_cache.conn_metrics = conn_metrics;
196+
197+
self.pool = pool;
198+
}
181199
}
182200

183201
/// MySql server connection.
@@ -929,6 +947,8 @@ impl Conn {
929947
conn.run_init_commands().await?;
930948
conn.run_setup_commands().await?;
931949

950+
conn.metrics().connects.incr();
951+
932952
Ok(conn)
933953
}
934954
.boxed()
@@ -1170,6 +1190,10 @@ impl Conn {
11701190
self.inner.stmt_cache.clear();
11711191
self.inner.infile_handler = None;
11721192
self.run_setup_commands().await?;
1193+
// self.inner.set_pool(pool);
1194+
1195+
// TODO: clear some metrics?
1196+
11731197
Ok(())
11741198
}
11751199

@@ -1284,6 +1308,10 @@ impl Conn {
12841308

12851309
Ok(BinlogStream::new(self))
12861310
}
1311+
1312+
pub(crate) fn metrics(&self) -> &ConnMetrics {
1313+
&self.inner.conn_metrics
1314+
}
12871315
}
12881316

12891317
#[cfg(test)]

src/conn/pool/futures/get_conn.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -142,9 +142,10 @@ impl Future for GetConn {
142142

143143
return match result {
144144
Ok(mut c) => {
145-
c.inner.pool = Some(pool);
145+
c.inner.set_pool(Some(pool));
146146
c.inner.reset_upon_returning_to_a_pool =
147147
self.reset_upon_returning_to_a_pool;
148+
c.metrics().connects.incr();
148149
Poll::Ready(Ok(c))
149150
}
150151
Err(e) => {
@@ -160,7 +161,8 @@ impl Future for GetConn {
160161
self.inner = GetConnInner::Done;
161162

162163
let pool = self.pool_take();
163-
c.inner.pool = Some(pool);
164+
pool.inner.metrics.reuses.incr();
165+
c.inner.set_pool(Some(pool));
164166
c.inner.reset_upon_returning_to_a_pool =
165167
self.reset_upon_returning_to_a_pool;
166168
return Poll::Ready(Ok(c));

src/conn/pool/mod.rs

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ use std::{
2525
use crate::{
2626
conn::{pool::futures::*, Conn},
2727
error::*,
28+
metrics::PoolMetrics,
2829
opts::{Opts, PoolOpts},
2930
queryable::transaction::{Transaction, TxOpts},
3031
};
@@ -133,6 +134,10 @@ impl Waitlist {
133134
self.queue.remove(&tmp);
134135
}
135136

137+
fn len(&self) -> usize {
138+
self.queue.len()
139+
}
140+
136141
fn is_empty(&self) -> bool {
137142
self.queue.is_empty()
138143
}
@@ -189,6 +194,7 @@ pub struct Inner {
189194
close: atomic::AtomicBool,
190195
closed: atomic::AtomicBool,
191196
exchange: Mutex<Exchange>,
197+
pub(crate) metrics: PoolMetrics,
192198
}
193199

194200
/// Asynchronous pool of MySql connections.
@@ -202,7 +208,7 @@ pub struct Inner {
202208
#[derive(Debug, Clone)]
203209
pub struct Pool {
204210
opts: Opts,
205-
inner: Arc<Inner>,
211+
pub(super) inner: Arc<Inner>,
206212
drop: mpsc::UnboundedSender<Option<Conn>>,
207213
}
208214

@@ -231,6 +237,7 @@ impl Pool {
231237
exist: 0,
232238
recycler: Some((rx, pool_opts)),
233239
}),
240+
metrics: Default::default(),
234241
}),
235242
drop: tx,
236243
}
@@ -244,6 +251,7 @@ impl Pool {
244251

245252
/// Async function that resolves to `Conn`.
246253
pub fn get_conn(&self) -> GetConn {
254+
self.inner.metrics.gets.incr();
247255
let reset_connection = self.opts.pool_opts().reset_connection();
248256
GetConn::new(self, reset_connection)
249257
}
@@ -262,6 +270,11 @@ impl Pool {
262270
DisconnectPool::new(self)
263271
}
264272

273+
#[cfg(feature = "metrics")]
274+
pub fn snapshot_metrics(&self) -> PoolMetrics {
275+
self.inner.metrics.clone()
276+
}
277+
265278
/// A way to return connection taken from a pool.
266279
fn return_conn(&mut self, conn: Conn) {
267280
// NOTE: we're not in async context here, so we can't block or return NotReady
@@ -270,6 +283,8 @@ impl Pool {
270283
}
271284

272285
fn send_to_recycler(&self, conn: Conn) {
286+
self.inner.metrics.recycler.recycles.incr();
287+
273288
if let Err(conn) = self.drop.send(Some(conn)) {
274289
let conn = conn.0.unwrap();
275290

@@ -366,6 +381,19 @@ impl Pool {
366381
let mut exchange = self.inner.exchange.lock().unwrap();
367382
exchange.waiting.remove(queue_id);
368383
}
384+
385+
/// Returns the number of
386+
/// - open connections,
387+
/// - idling connections in the pool and
388+
/// - tasks waiting for a connection.
389+
pub fn queue_stats(&self) -> (usize, usize, usize) {
390+
let exchange = self.inner.exchange.lock().unwrap();
391+
(
392+
exchange.exist,
393+
exchange.available.len(),
394+
exchange.waiting.len(),
395+
)
396+
}
369397
}
370398

371399
impl Drop for Conn {

src/conn/pool/recycler.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,10 @@ impl Future for Recycler {
6767
let mut exchange = $self.inner.exchange.lock().unwrap();
6868
if $pool_is_closed || exchange.available.len() >= $self.pool_opts.active_bound() {
6969
drop(exchange);
70+
$self.inner.metrics.recycler.discards.incr();
7071
$self.discard.push($conn.close_conn().boxed());
7172
} else {
73+
$self.inner.metrics.recycler.recycled_returnals.incr();
7274
exchange.available.push_back($conn.into());
7375
if let Some(w) = exchange.waiting.pop() {
7476
w.wake();
@@ -80,11 +82,14 @@ impl Future for Recycler {
8082
macro_rules! conn_decision {
8183
($self:ident, $conn:ident) => {
8284
if $conn.inner.stream.is_none() || $conn.inner.disconnected {
85+
$self.inner.metrics.recycler.discards.incr();
8386
// drop unestablished connection
8487
$self.discard.push(futures_util::future::ok(()).boxed());
8588
} else if $conn.inner.tx_status != TxStatus::None || $conn.has_pending_result() {
89+
$self.inner.metrics.recycler.cleans.incr();
8690
$self.cleaning.push($conn.cleanup_for_pool().boxed());
8791
} else if $conn.expired() || close {
92+
$self.inner.metrics.recycler.discards.incr();
8893
$self.discard.push($conn.close_conn().boxed());
8994
} else if $conn.inner.reset_upon_returning_to_a_pool {
9095
$self.reset.push($conn.reset_for_pool().boxed());

src/conn/routines/change_user.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ pub struct ChangeUser;
1717

1818
impl Routine<()> for ChangeUser {
1919
fn call<'a>(&'a mut self, conn: &'a mut Conn) -> BoxFuture<'a, crate::Result<()>> {
20+
conn.metrics().routines.change_user.incr();
21+
2022
#[cfg(feature = "tracing")]
2123
let span = debug_span!(
2224
"mysql_async::change_user",

src/conn/routines/exec.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ impl<'a> ExecRoutine<'a> {
2525

2626
impl Routine<()> for ExecRoutine<'_> {
2727
fn call<'a>(&'a mut self, conn: &'a mut Conn) -> BoxFuture<'a, crate::Result<()>> {
28+
conn.metrics().routines.execs.incr();
29+
2830
#[cfg(feature = "tracing")]
2931
let span = info_span!(
3032
"mysql_async::exec",

src/conn/routines/next_set.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ where
2424
P: Protocol,
2525
{
2626
fn call<'a>(&'a mut self, conn: &'a mut Conn) -> BoxFuture<'a, crate::Result<()>> {
27+
conn.metrics().routines.next_sets.incr();
28+
2729
#[cfg(feature = "tracing")]
2830
let span = debug_span!(
2931
"mysql_async::next_set",

src/conn/routines/ping.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ pub struct PingRoutine;
1414

1515
impl Routine<()> for PingRoutine {
1616
fn call<'a>(&'a mut self, conn: &'a mut Conn) -> BoxFuture<'a, crate::Result<()>> {
17+
conn.metrics().routines.pings.incr();
18+
1719
#[cfg(feature = "tracing")]
1820
let span = debug_span!("mysql_async::ping", mysql_async.connection.id = conn.id());
1921

src/conn/routines/prepare.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ impl PrepareRoutine {
2626

2727
impl Routine<Arc<StmtInner>> for PrepareRoutine {
2828
fn call<'a>(&'a mut self, conn: &'a mut Conn) -> BoxFuture<'a, crate::Result<Arc<StmtInner>>> {
29+
conn.metrics().routines.prepares.incr();
30+
2931
#[cfg(feature = "tracing")]
3032
let span = info_span!(
3133
"mysql_async::prepare",

src/conn/routines/query.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ impl<'a, L: TracingLevel> QueryRoutine<'a, L> {
2929

3030
impl<L: TracingLevel> Routine<()> for QueryRoutine<'_, L> {
3131
fn call<'a>(&'a mut self, conn: &'a mut Conn) -> BoxFuture<'a, crate::Result<()>> {
32+
conn.metrics().routines.queries.incr();
33+
3234
#[cfg(feature = "tracing")]
3335
let span = create_span!(
3436
L::LEVEL,

src/conn/routines/reset.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ pub struct ResetRoutine;
1414

1515
impl Routine<()> for ResetRoutine {
1616
fn call<'a>(&'a mut self, conn: &'a mut Conn) -> BoxFuture<'a, crate::Result<()>> {
17+
conn.metrics().routines.resets.incr();
18+
1719
#[cfg(feature = "tracing")]
1820
let span = debug_span!("mysql_async::reset", mysql_async.connection.id = conn.id());
1921

0 commit comments

Comments
 (0)