Skip to content

Commit

Permalink
feat: move token authentication to the network layer
Browse files Browse the repository at this point in the history
Signed-off-by: themanforfree <[email protected]>
  • Loading branch information
themanforfree committed Feb 22, 2024
1 parent f74fbda commit be490b9
Show file tree
Hide file tree
Showing 32 changed files with 791 additions and 752 deletions.
2 changes: 2 additions & 0 deletions crates/curp/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ pub trait ClientApi {
async fn propose(
&self,
cmd: &Self::Cmd,
token: Option<&String>, // TODO: Allow external custom interceptors, do not pass token in parameters
use_fast_path: bool,
) -> Result<ProposeResponse<Self::Cmd>, Self::Error>;

Expand Down Expand Up @@ -126,6 +127,7 @@ trait RepeatableClientApi: ClientApi {
&self,
propose_id: ProposeId,
cmd: &Self::Cmd,
token: Option<&String>,
use_fast_path: bool,
) -> Result<ProposeResponse<Self::Cmd>, Self::Error>;

Expand Down
3 changes: 2 additions & 1 deletion crates/curp/src/client/retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,13 +198,14 @@ where
async fn propose(
&self,
cmd: &Self::Cmd,
token: Option<&String>,
use_fast_path: bool,
) -> Result<ProposeResponse<Self::Cmd>, tonic::Status> {
let propose_id = self
.retry::<_, _>(RepeatableClientApi::gen_propose_id)
.await?;
self.retry::<_, _>(|client| {
RepeatableClientApi::propose(client, propose_id, cmd, use_fast_path)
RepeatableClientApi::propose(client, propose_id, cmd, token, use_fast_path)
})
.await
}
Expand Down
211 changes: 115 additions & 96 deletions crates/curp/src/client/tests.rs

Large diffs are not rendered by default.

16 changes: 12 additions & 4 deletions crates/curp/src/client/unary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ impl<C: Command> Unary<C> {
&self,
propose_id: ProposeId,
cmd: &C,
token: Option<&String>,
) -> Result<Result<C::ER, C::Error>, CurpError> {
let req = ProposeRequest::new(propose_id, cmd, self.state.cluster_version().await);
let timeout = self.config.propose_timeout;
Expand All @@ -91,7 +92,8 @@ impl<C: Command> Unary<C> {
.state
.for_each_server(|conn| {
let req_c = req.clone();
async move { (conn.id(), conn.propose(req_c, timeout).await) }
let token_c = token.cloned();
async move { (conn.id(), conn.propose(req_c, token_c, timeout).await) }
})
.await;
let super_quorum = super_quorum(responses.len());
Expand Down Expand Up @@ -209,9 +211,14 @@ impl<C: Command> ClientApi for Unary<C> {

/// Send propose to the whole cluster, `use_fast_path` set to `false` to fallback into ordered
/// requests (event the requests are commutative).
async fn propose(&self, cmd: &C, use_fast_path: bool) -> Result<ProposeResponse<C>, CurpError> {
async fn propose(
&self,
cmd: &C,
token: Option<&String>,
use_fast_path: bool,
) -> Result<ProposeResponse<C>, CurpError> {
let propose_id = self.gen_propose_id().await?;
RepeatableClientApi::propose(self, propose_id, cmd, use_fast_path).await
RepeatableClientApi::propose(self, propose_id, cmd, token, use_fast_path).await
}

/// Send propose configuration changes to the cluster
Expand Down Expand Up @@ -393,10 +400,11 @@ impl<C: Command> RepeatableClientApi for Unary<C> {
&self,
propose_id: ProposeId,
cmd: &Self::Cmd,
token: Option<&String>,
use_fast_path: bool,
) -> Result<ProposeResponse<Self::Cmd>, Self::Error> {
tokio::pin! {
let fast_round = self.fast_round(propose_id, cmd);
let fast_round = self.fast_round(propose_id, cmd, token);
let slow_round = self.slow_round(propose_id);
}

Expand Down
9 changes: 9 additions & 0 deletions crates/curp/src/rpc/connect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ pub(crate) trait ConnectApi: Send + Sync + 'static {
async fn propose(
&self,
request: ProposeRequest,
token: Option<String>,
timeout: Duration,
) -> Result<tonic::Response<ProposeResponse>, CurpError>;

Expand Down Expand Up @@ -385,12 +386,16 @@ impl ConnectApi for Connect<ProtocolClient<Channel>> {
async fn propose(
&self,
request: ProposeRequest,
token: Option<String>,
timeout: Duration,
) -> Result<tonic::Response<ProposeResponse>, CurpError> {
let mut client = self.rpc_connect.clone();
let mut req = tonic::Request::new(request);
req.set_timeout(timeout);
req.metadata_mut().inject_current();
if let Some(token) = token {
_ = req.metadata_mut().insert("token", token.parse()?);
}
client.propose(req).await.map_err(Into::into)
}

Expand Down Expand Up @@ -667,11 +672,15 @@ where
async fn propose(
&self,
request: ProposeRequest,
token: Option<String>,
_timeout: Duration,
) -> Result<tonic::Response<ProposeResponse>, CurpError> {
let mut req = tonic::Request::new(request);
req.metadata_mut().inject_bypassed();
req.metadata_mut().inject_current();
if let Some(token) = token {
_ = req.metadata_mut().insert("token", token.parse()?);
}
self.server.propose(req).await.map_err(Into::into)
}

Expand Down
1 change: 1 addition & 0 deletions crates/curp/src/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ impl Snapshot {
}

/// Get the inner snapshot ref
#[cfg(feature = "client-metrics")]
pub(crate) fn inner(&self) -> &EngineSnapshot {
&self.inner
}
Expand Down
7 changes: 6 additions & 1 deletion crates/curp/tests/it/read_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,12 @@ async fn read_state() {
let put_cmd = TestCommand::new_put(vec![0], 0).set_exe_dur(Duration::from_millis(100));
tokio::spawn(async move {
assert_eq!(
put_client.propose(&put_cmd, true).await.unwrap().unwrap().0,
put_client
.propose(&put_cmd, None, true)
.await
.unwrap()
.unwrap()
.0,
TestCommandResult::default(),
);
});
Expand Down
33 changes: 19 additions & 14 deletions crates/curp/tests/it/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,12 @@ async fn basic_propose() {
init_logger();

let group = CurpGroup::new(3).await;

let client = group.new_client().await;

assert_eq!(
client
.propose(&TestCommand::new_put(vec![0], 0), true)
.propose(&TestCommand::new_put(vec![0], 0), None, true)
.await
.unwrap()
.unwrap()
Expand All @@ -40,7 +41,7 @@ async fn basic_propose() {
);
assert_eq!(
client
.propose(&TestCommand::new_get(vec![0]), true)
.propose(&TestCommand::new_get(vec![0]), None, true)
.await
.unwrap()
.unwrap()
Expand All @@ -58,7 +59,7 @@ async fn synced_propose() {
let client = group.new_client().await;
let cmd = TestCommand::new_get(vec![0]);

let (er, index) = client.propose(&cmd, false).await.unwrap().unwrap();
let (er, index) = client.propose(&cmd, None, false).await.unwrap().unwrap();
assert_eq!(er, TestCommandResult::new(vec![], vec![]));
assert_eq!(index.unwrap(), 1.into()); // log[0] is a fake one

Expand All @@ -84,7 +85,7 @@ async fn exe_exact_n_times() {
let client = group.new_client().await;
let cmd = TestCommand::new_get(vec![0]);

let er = client.propose(&cmd, true).await.unwrap().unwrap().0;
let er = client.propose(&cmd, None, true).await.unwrap().unwrap().0;
assert_eq!(er, TestCommandResult::new(vec![], vec![]));

for exe_rx in group.exe_rxs() {
Expand Down Expand Up @@ -216,7 +217,7 @@ async fn concurrent_cmd_order() {

assert_eq!(
client
.propose(&TestCommand::new_get(vec![1]), true)
.propose(&TestCommand::new_get(vec![1]), None, true)
.await
.unwrap()
.unwrap()
Expand All @@ -240,7 +241,11 @@ async fn concurrent_cmd_order_should_have_correct_revision() {
for i in sample_range.clone() {
let rand_dur = Duration::from_millis(thread_rng().gen_range(0..500).numeric_cast());
let _er = client
.propose(&TestCommand::new_put(vec![i], i).set_as_dur(rand_dur), true)
.propose(
&TestCommand::new_put(vec![i], i).set_as_dur(rand_dur),
None,
true,
)
.await
.unwrap()
.unwrap();
Expand All @@ -249,7 +254,7 @@ async fn concurrent_cmd_order_should_have_correct_revision() {
for i in sample_range {
assert_eq!(
client
.propose(&TestCommand::new_get(vec![i]), true)
.propose(&TestCommand::new_get(vec![i]), None, true)
.await
.unwrap()
.unwrap()
Expand All @@ -272,7 +277,7 @@ async fn shutdown_rpc_should_shutdown_the_cluster() {
let mut collection = vec![];
for i in 0..10 {
let cmd = TestCommand::new_put(vec![i], i);
let res = req_client.propose(&cmd, true).await;
let res = req_client.propose(&cmd, None, true).await;
if res.is_ok() && res.unwrap().is_ok() {
collection.push(i);
}
Expand All @@ -284,7 +289,7 @@ async fn shutdown_rpc_should_shutdown_the_cluster() {
client.propose_shutdown().await.unwrap();

let res = client
.propose(&TestCommand::new_put(vec![888], 1), false)
.propose(&TestCommand::new_put(vec![888], 1), None, false)
.await;
assert!(matches!(
CurpError::from(res.unwrap_err()),
Expand All @@ -299,7 +304,7 @@ async fn shutdown_rpc_should_shutdown_the_cluster() {
let client = group.new_client().await;
for i in collection {
let res = client
.propose(&TestCommand::new_get(vec![i]), true)
.propose(&TestCommand::new_get(vec![i]), None, true)
.await
.unwrap();
assert_eq!(res.unwrap().0.values, vec![i]);
Expand Down Expand Up @@ -346,7 +351,7 @@ async fn propose_remove_follower_should_success() {
.is_finished());
// check if the old client can propose to the new cluster
client
.propose(&TestCommand::new_get(vec![1]), true)
.propose(&TestCommand::new_get(vec![1]), None, true)
.await
.unwrap()
.unwrap();
Expand Down Expand Up @@ -375,7 +380,7 @@ async fn propose_remove_leader_should_success() {
assert_ne!(new_leader_id, leader_id);
// check if the old client can propose to the new cluster
client
.propose(&TestCommand::new_get(vec![1]), true)
.propose(&TestCommand::new_get(vec![1]), None, true)
.await
.unwrap()
.unwrap();
Expand Down Expand Up @@ -448,7 +453,7 @@ async fn check_new_node(is_learner: bool) {
let mut group = CurpGroup::new(3).await;
let client = group.new_client().await;
let req = TestCommand::new_put(vec![123], 123);
let _res = client.propose(&req, true).await.unwrap().unwrap();
let _res = client.propose(&req, None, true).await.unwrap().unwrap();

let listener = TcpListener::bind("0.0.0.0:0").await.unwrap();
let addr = listener.local_addr().unwrap().to_string();
Expand Down Expand Up @@ -505,7 +510,7 @@ async fn check_new_node(is_learner: bool) {

// 5. check if the old client can propose to the new cluster
client
.propose(&TestCommand::new_get(vec![1]), true)
.propose(&TestCommand::new_get(vec![1]), None, true)
.await
.unwrap()
.unwrap();
Expand Down
2 changes: 1 addition & 1 deletion crates/simulation/src/curp_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,7 @@ impl<C: Command> SimClient<C> {
) -> Result<Result<(C::ER, Option<C::ASR>), C::Error>, tonic::Status> {
let inner = self.inner.clone();
self.handle
.spawn(async move { inner.propose(&cmd, use_fast_path).await })
.spawn(async move { inner.propose(&cmd, None, use_fast_path).await })
.await
.unwrap()
}
Expand Down
24 changes: 13 additions & 11 deletions crates/xline-client/src/clients/auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,12 @@ use pbkdf2::{
};
use tonic::transport::Channel;
use xlineapi::{
command::command_from_request_wrapper, AuthDisableResponse, AuthEnableResponse,
AuthRoleAddResponse, AuthRoleDeleteResponse, AuthRoleGetResponse,
AuthRoleGrantPermissionResponse, AuthRoleListResponse, AuthRoleRevokePermissionResponse,
AuthStatusResponse, AuthUserAddResponse, AuthUserChangePasswordResponse,
AuthUserDeleteResponse, AuthUserGetResponse, AuthUserGrantRoleResponse, AuthUserListResponse,
AuthUserRevokeRoleResponse, AuthenticateResponse, RequestWithToken, RequestWrapper,
ResponseWrapper,
command::Command, AuthDisableResponse, AuthEnableResponse, AuthRoleAddResponse,
AuthRoleDeleteResponse, AuthRoleGetResponse, AuthRoleGrantPermissionResponse,
AuthRoleListResponse, AuthRoleRevokePermissionResponse, AuthStatusResponse,
AuthUserAddResponse, AuthUserChangePasswordResponse, AuthUserDeleteResponse,
AuthUserGetResponse, AuthUserGrantRoleResponse, AuthUserListResponse,
AuthUserRevokeRoleResponse, AuthenticateResponse, RequestWrapper, ResponseWrapper,
};

use crate::{
Expand Down Expand Up @@ -720,14 +719,17 @@ impl AuthClient {
request: Req,
use_fast_path: bool,
) -> Result<Res> {
let request = RequestWithToken::new_with_token(request.into(), self.token.clone());
let cmd = command_from_request_wrapper(request);
let request = request.into();
let cmd = Command::new(request.keys(), request);

let res_wrapper = if use_fast_path {
let (cmd_res, _sync_error) = self.curp_client.propose(&cmd, true).await??;
let (cmd_res, _sync_error) = self
.curp_client
.propose(&cmd, self.token.as_ref(), true)
.await??;
cmd_res.into_inner()
} else {
let (cmd_res, Some(sync_res)) = self.curp_client.propose(&cmd, false).await?? else {
let (cmd_res, Some(sync_res)) = self.curp_client.propose(&cmd,self.token.as_ref(),false).await?? else {
unreachable!("sync_res is always Some when use_fast_path is false");
};
let mut res_wrapper = cmd_res.into_inner();
Expand Down
Loading

0 comments on commit be490b9

Please sign in to comment.