Skip to content

Commit

Permalink
server: add metric for conn send/recv bytes. (tikv#1829)
Browse files Browse the repository at this point in the history
  • Loading branch information
siddontang authored May 11, 2017
1 parent 74c269e commit d3ae3ca
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 5 deletions.
16 changes: 12 additions & 4 deletions src/server/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use super::resolve::StoreAddrResolver;
use super::snap::Task as SnapTask;
use util::worker::Scheduler;
use util::buf::PipeBuffer;
use super::metrics::*;


#[derive(PartialEq)]
Expand Down Expand Up @@ -163,7 +164,8 @@ impl Conn {
loop {
{
let recv_buffer = self.recv_buffer.as_mut().unwrap();
try!(recv_buffer.read_from(&mut self.sock));
let n = try!(recv_buffer.read_from(&mut self.sock));
CONN_RECV_BYTES_COUNTER.inc_by(n as f64).unwrap();
// if the snapshot is too small, the default buffer may be not filled.
if !recv_buffer.is_full() && recv_buffer.len() < self.expect_size {
break;
Expand Down Expand Up @@ -197,7 +199,8 @@ impl Conn {
if self.last_msg_id.is_none() {
recv_buffer.ensure(rpc::MSG_HEADER_LEN);
if recv_buffer.len() < rpc::MSG_HEADER_LEN {
try!(recv_buffer.read_from(&mut self.sock));
let n = try!(recv_buffer.read_from(&mut self.sock));
CONN_RECV_BYTES_COUNTER.inc_by(n as f64).unwrap();
}
if recv_buffer.len() < rpc::MSG_HEADER_LEN {
// we need to read more data for header
Expand All @@ -209,7 +212,8 @@ impl Conn {
self.expect_size = payload_len;
}
recv_buffer.ensure(self.expect_size);
try!(recv_buffer.read_from(&mut self.sock));
let n = try!(recv_buffer.read_from(&mut self.sock));
CONN_RECV_BYTES_COUNTER.inc_by(n as f64).unwrap();
if recv_buffer.len() < self.expect_size {
// we need to read more data for payload
return Ok(None);
Expand Down Expand Up @@ -246,7 +250,9 @@ impl Conn {
where T: RaftStoreRouter,
S: StoreAddrResolver
{
try!(self.send_buffer.write_to(&mut self.sock));
let n = try!(self.send_buffer.write_to(&mut self.sock));
CONN_SEND_BYTES_COUNTER.inc_by(n as f64).unwrap();

if !self.send_buffer.is_empty() {
// we don't write all data, so must try later.
// we have already registered writable, no need registering again.
Expand All @@ -271,7 +277,9 @@ impl Conn {
where T: RaftStoreRouter,
S: StoreAddrResolver
{
let n = self.send_buffer.len();
msg.encode_to(&mut self.send_buffer).unwrap();
CONN_BUFFERED_SEND_BYTES_COUNTER.inc_by((self.send_buffer.len() - n) as f64).unwrap();

if !self.interest.is_writable() {
// re-register writable if we have not,
Expand Down
20 changes: 19 additions & 1 deletion src/server/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use prometheus::{Gauge, CounterVec, Histogram};
use prometheus::{Gauge, Counter, CounterVec, Histogram};

lazy_static! {
pub static ref SEND_SNAP_HISTOGRAM: Histogram =
Expand Down Expand Up @@ -53,4 +53,22 @@ lazy_static! {
"Total number of reporting failure messages",
&["type", "store_id"]
).unwrap();

pub static ref CONN_SEND_BYTES_COUNTER: Counter =
register_counter!(
"tikv_server_conn_send_bytes_total",
"Total bytes of connection send data"
).unwrap();

pub static ref CONN_BUFFERED_SEND_BYTES_COUNTER: Counter =
register_counter!(
"tikv_server_conn_buffered_send_bytes_total",
"Total bytes of connection buffered send data"
).unwrap();

pub static ref CONN_RECV_BYTES_COUNTER: Counter =
register_counter!(
"tikv_server_conn_recv_bytes_total",
"Total bytes of connection receive data"
).unwrap();
}

0 comments on commit d3ae3ca

Please sign in to comment.