Skip to content

Commit

Permalink
Feat: Instrument await using await-tree
Browse files Browse the repository at this point in the history
Signed-off-by: Harsh Raj <[email protected]>
  • Loading branch information
Harsh1s committed Mar 4, 2024
1 parent f12ba4c commit 6988d9b
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 16 deletions.
1 change: 1 addition & 0 deletions crates/curp/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ version = "0.1.0"
[dependencies]
async-stream = "0.3.4"
async-trait = "0.1.53"
await-tree = "0.1.2"
bincode = "1.3.3"
bytes = "1.4.0"
clippy-utilities = "0.2.0"
Expand Down
31 changes: 26 additions & 5 deletions crates/curp/src/rpc/connect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use std::{

use async_stream::stream;
use async_trait::async_trait;
use await_tree::InstrumentAwait;
use bytes::BytesMut;
use clippy_utilities::NumericCast;
use engine::SnapshotApi;
Expand Down Expand Up @@ -396,7 +397,11 @@ impl ConnectApi for Connect<ProtocolClient<Channel>> {
if let Some(token) = token {
_ = req.metadata_mut().insert("token", token.parse()?);
}
client.propose(req).await.map_err(Into::into)
client
.propose(req)
.instrument_await("client propose")
.await
.map_err(Into::into)
}

/// Send `ShutdownRequest`
Expand All @@ -410,7 +415,11 @@ impl ConnectApi for Connect<ProtocolClient<Channel>> {
let mut req = tonic::Request::new(request);
req.set_timeout(timeout);
req.metadata_mut().inject_current();
client.shutdown(req).await.map_err(Into::into)
client
.shutdown(req)
.instrument_await("client shutdown")
.await
.map_err(Into::into)
}

/// Send `ProposeRequest`
Expand All @@ -424,7 +433,11 @@ impl ConnectApi for Connect<ProtocolClient<Channel>> {
let mut req = tonic::Request::new(request);
req.set_timeout(timeout);
req.metadata_mut().inject_current();
client.propose_conf_change(req).await.map_err(Into::into)
client
.propose_conf_change(req)
.instrument_await("client propose conf change")
.await
.map_err(Into::into)
}

/// Send `PublishRequest`
Expand All @@ -438,7 +451,11 @@ impl ConnectApi for Connect<ProtocolClient<Channel>> {
let mut req = tonic::Request::new(request);
req.set_timeout(timeout);
req.metadata_mut().inject_current();
client.publish(req).await.map_err(Into::into)
client
.publish(req)
.instrument_await("client publish")
.await
.map_err(Into::into)
}

/// Send `WaitSyncedRequest`
Expand All @@ -452,7 +469,11 @@ impl ConnectApi for Connect<ProtocolClient<Channel>> {
let mut req = tonic::Request::new(request);
req.set_timeout(timeout);
req.metadata_mut().inject_current();
client.wait_synced(req).await.map_err(Into::into)
client
.wait_synced(req)
.instrument_await("client propose wait synced request")
.await
.map_err(Into::into)
}

/// Send `FetchClusterRequest`
Expand Down
46 changes: 37 additions & 9 deletions crates/curp/src/server/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::{fmt::Debug, sync::Arc};

use await_tree::InstrumentAwait;
use engine::SnapshotAllocator;
use tokio::sync::broadcast;
#[cfg(not(madsim))]
Expand Down Expand Up @@ -81,7 +82,10 @@ impl<C: Command, RC: RoleChange> crate::rpc::Protocol for Rpc<C, RC> {
) -> Result<tonic::Response<ProposeResponse>, tonic::Status> {
request.metadata().extract_span();
Ok(tonic::Response::new(
self.inner.propose(request.into_inner()).await?,
self.inner
.propose(request.into_inner())
.instrument_await("curp_propose")
.await?,
))
}

Expand All @@ -92,7 +96,10 @@ impl<C: Command, RC: RoleChange> crate::rpc::Protocol for Rpc<C, RC> {
) -> Result<tonic::Response<ShutdownResponse>, tonic::Status> {
request.metadata().extract_span();
Ok(tonic::Response::new(
self.inner.shutdown(request.into_inner()).await?,
self.inner
.shutdown(request.into_inner())
.instrument_await("curp_shutdown")
.await?,
))
}

Expand All @@ -103,7 +110,10 @@ impl<C: Command, RC: RoleChange> crate::rpc::Protocol for Rpc<C, RC> {
) -> Result<tonic::Response<ProposeConfChangeResponse>, tonic::Status> {
request.metadata().extract_span();
Ok(tonic::Response::new(
self.inner.propose_conf_change(request.into_inner()).await?,
self.inner
.propose_conf_change(request.into_inner())
.instrument_await("curp_propose_conf_change")
.await?,
))
}

Expand All @@ -125,7 +135,10 @@ impl<C: Command, RC: RoleChange> crate::rpc::Protocol for Rpc<C, RC> {
) -> Result<tonic::Response<WaitSyncedResponse>, tonic::Status> {
request.metadata().extract_span();
Ok(tonic::Response::new(
self.inner.wait_synced(request.into_inner()).await?,
self.inner
.wait_synced(request.into_inner())
.instrument_await("curp_wait_synced")
.await?,
))
}

Expand Down Expand Up @@ -155,7 +168,10 @@ impl<C: Command, RC: RoleChange> crate::rpc::Protocol for Rpc<C, RC> {
request: tonic::Request<MoveLeaderRequest>,
) -> Result<tonic::Response<MoveLeaderResponse>, tonic::Status> {
Ok(tonic::Response::new(
self.inner.move_leader(request.into_inner()).await?,
self.inner
.move_leader(request.into_inner())
.instrument_await("curp_move_leader")
.await?,
))
}

Expand All @@ -167,7 +183,10 @@ impl<C: Command, RC: RoleChange> crate::rpc::Protocol for Rpc<C, RC> {
) -> Result<tonic::Response<LeaseKeepAliveMsg>, tonic::Status> {
let req_stream = request.into_inner();
Ok(tonic::Response::new(
self.inner.lease_keep_alive(req_stream).await?,
self.inner
.lease_keep_alive(req_stream)
.instrument_await("lease_keep_alive")
.await?,
))
}
}
Expand All @@ -190,7 +209,10 @@ impl<C: Command, RC: RoleChange> crate::rpc::InnerProtocol for Rpc<C, RC> {
request: tonic::Request<VoteRequest>,
) -> Result<tonic::Response<VoteResponse>, tonic::Status> {
Ok(tonic::Response::new(
self.inner.vote(request.into_inner()).await?,
self.inner
.vote(request.into_inner())
.instrument_await("curp_vote")
.await?,
))
}

Expand All @@ -211,7 +233,10 @@ impl<C: Command, RC: RoleChange> crate::rpc::InnerProtocol for Rpc<C, RC> {
) -> Result<tonic::Response<InstallSnapshotResponse>, tonic::Status> {
let req_stream = request.into_inner();
Ok(tonic::Response::new(
self.inner.install_snapshot(req_stream).await?,
self.inner
.install_snapshot(req_stream)
.instrument_await("curp_install_snapshot")
.await?,
))
}

Expand All @@ -221,7 +246,10 @@ impl<C: Command, RC: RoleChange> crate::rpc::InnerProtocol for Rpc<C, RC> {
request: tonic::Request<TryBecomeLeaderNowRequest>,
) -> Result<tonic::Response<TryBecomeLeaderNowResponse>, tonic::Status> {
Ok(tonic::Response::new(
self.inner.try_become_leader_now(request.get_ref()).await?,
self.inner
.try_become_leader_now(request.get_ref())
.instrument_await("curp_try_become_leader_now")
.await?,
))
}
}
Expand Down
1 change: 1 addition & 0 deletions crates/xline/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ categories = ["KV"]
anyhow = "1.0.57"
async-stream = "0.3.5"
async-trait = "0.1.53"
await-tree = "0.1.2"
axum = "0.6.20"
bytes = "1.4.0"
clap = { version = "4", features = ["derive"] }
Expand Down
12 changes: 10 additions & 2 deletions crates/xline/src/server/kv_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::{
time::Duration,
};

use await_tree::InstrumentAwait;
use curp::rpc::ReadState;
use dashmap::DashMap;
use event_listener::Event;
Expand Down Expand Up @@ -223,7 +224,9 @@ where
let request = RequestWrapper::from(request.into_inner());
let cmd = Command::new_with_auth_info(request.keys(), request, auth_info);
if !is_serializable {
self.wait_read_state(&cmd).await?;
self.wait_read_state(&cmd)
.instrument_await("xline wait read state for range request")
.await?;
// Double check whether the range request is compacted or not since the compaction request
// may be executed during the process of `wait_read_state` which results in the result of
// previous `check_range_request` outdated.
Expand Down Expand Up @@ -259,6 +262,7 @@ where
let is_fast_path = true;
let (cmd_res, sync_res) = self
.propose(request.into_inner(), auth_info, is_fast_path)
.instrument_await("xline propose put request")
.await?;
let mut res = Self::parse_response_op(cmd_res.into_inner().into());
if let Some(sync_res) = sync_res {
Expand Down Expand Up @@ -291,6 +295,7 @@ where
let is_fast_path = true;
let (cmd_res, sync_res) = self
.propose(request.into_inner(), auth_info, is_fast_path)
.instrument_await("xline propose delete range request")
.await?;
let mut res = Self::parse_response_op(cmd_res.into_inner().into());
if let Some(sync_res) = sync_res {
Expand Down Expand Up @@ -331,7 +336,9 @@ where
let request = RequestWrapper::from(request.into_inner());
let cmd = Command::new_with_auth_info(request.keys(), request, auth_info);
if !is_serializable {
self.wait_read_state(&cmd).await?;
self.wait_read_state(&cmd)
.instrument_await("xline wait read state for txn request")
.await?;
}
self.do_serializable(&cmd)?
} else {
Expand Down Expand Up @@ -386,6 +393,7 @@ where
let (cmd_res, _sync_res) = self.client.propose(&cmd, None, !physical).await??;
let resp = cmd_res.into_inner();
if timeout(self.compact_timeout, compact_physical_fut)
.instrument_await("xline wait compact physical event")
.await
.is_err()
{
Expand Down

0 comments on commit 6988d9b

Please sign in to comment.