Skip to content

Commit

Permalink
Fix latency reporting for RPC metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
slowli committed Sep 27, 2023
1 parent 41a3f88 commit 97ced60
Showing 1 changed file with 13 additions and 15 deletions.
28 changes: 13 additions & 15 deletions node/actors/network/src/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@
use self::metrics::{CallLatencyType, CallType, RPC_METRICS};
use crate::{frame, mux};
use anyhow::Context as _;
use concurrency::{ctx, io, limiter, metrics::GaugeGuard, scope};
use concurrency::{
ctx, io, limiter,
metrics::{GaugeGuard, LatencyHistogramExt as _},
scope,
};
use std::{collections::BTreeMap, sync::Arc};

pub(crate) mod consensus;
Expand Down Expand Up @@ -98,9 +102,7 @@ impl<'a, R: Rpc> ReservedCall<'a, R> {

let now = ctx.now();
let metric_labels = CallLatencyType::ClientSendRecv.to_labels::<R>(req, &res);
if let Ok(latency) = (now - send_time).try_into() {
RPC_METRICS.latency[&metric_labels].observe(latency);
}
RPC_METRICS.latency[&metric_labels].observe_latency(now - send_time);
let (res, msg_size) = res?;
RPC_METRICS.message_size[&CallType::RespRecv.to_labels::<R>(req)].observe(msg_size);
Ok(res)
Expand Down Expand Up @@ -136,9 +138,7 @@ impl<R: Rpc> Client<R> {
.reserve(ctx)
.await
.context("StreamQueue::open()")?;
if let Ok(latency) = (ctx.now() - reserve_time).try_into() {
RPC_METRICS.call_reserve_latency[&R::METHOD].observe(latency);
}
RPC_METRICS.call_reserve_latency[&R::METHOD].observe_latency(ctx.now() - reserve_time);
Ok(ReservedCall {
stream,
permit,
Expand Down Expand Up @@ -202,16 +202,14 @@ impl<R: Rpc, H: Handler<R>> ServerTrait for Server<R, H> {

let process_time = ctx.now();
let res = self.handler.handle(ctx, req).await.context(R::METHOD);
if let Ok(latency) = (ctx.now() - process_time).try_into() {
server_process_labels.set_result(&res);
RPC_METRICS.latency[&server_process_labels].observe(latency);
}
server_process_labels.set_result(&res);
RPC_METRICS.latency[&server_process_labels]
.observe_latency(ctx.now() - process_time);

let res = frame::mux_send_proto(ctx, &mut stream.write, &res?).await;
if let Ok(latency) = (ctx.now() - recv_time).try_into() {
recv_send_labels.set_result(&res);
RPC_METRICS.latency[&recv_send_labels].observe(latency);
}
recv_send_labels.set_result(&res);
RPC_METRICS.latency[&recv_send_labels]
.observe_latency(ctx.now() - recv_time);
let msg_size = res?;
RPC_METRICS.message_size[&resp_size_labels].observe(msg_size);
anyhow::Ok(())
Expand Down

0 comments on commit 97ced60

Please sign in to comment.