Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: txn command key #472

Merged
merged 4 commits into from
Dec 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion curp/src/server/cmd_worker/conflict_checked_mpmc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,6 @@ impl<C: Command, CE: CommandExecutor<C>> Filter<C, CE> {
new_vid
}
}
// since a reset is needed, all other vertices doesn't matter anymore, so delete them all
CEEvent::Reset(snapshot, finish_tx) => {
// since a reset is needed, all other vertices doesn't matter anymore, so delete them all
self.cmd_vid.clear();
Expand Down
16 changes: 8 additions & 8 deletions xline-client/src/clients/auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@ use pbkdf2::{
};
use tonic::transport::Channel;
use xlineapi::{
command::Command, AuthDisableResponse, AuthEnableResponse, AuthRoleAddResponse,
AuthRoleDeleteResponse, AuthRoleGetResponse, AuthRoleGrantPermissionResponse,
AuthRoleListResponse, AuthRoleRevokePermissionResponse, AuthStatusResponse,
AuthUserAddResponse, AuthUserChangePasswordResponse, AuthUserDeleteResponse,
AuthUserGetResponse, AuthUserGrantRoleResponse, AuthUserListResponse,
AuthUserRevokeRoleResponse, AuthenticateResponse, RequestWithToken, RequestWrapper,
ResponseWrapper,
command::{command_from_request_wrapper, Command},
AuthDisableResponse, AuthEnableResponse, AuthRoleAddResponse, AuthRoleDeleteResponse,
AuthRoleGetResponse, AuthRoleGrantPermissionResponse, AuthRoleListResponse,
AuthRoleRevokePermissionResponse, AuthStatusResponse, AuthUserAddResponse,
AuthUserChangePasswordResponse, AuthUserDeleteResponse, AuthUserGetResponse,
AuthUserGrantRoleResponse, AuthUserListResponse, AuthUserRevokeRoleResponse,
AuthenticateResponse, RequestWithToken, RequestWrapper, ResponseWrapper,
};

use crate::{
Expand Down Expand Up @@ -713,7 +713,7 @@ impl AuthClient {
) -> Result<Res> {
let propose_id = self.curp_client.gen_propose_id().await?;
let request = RequestWithToken::new_with_token(request.into(), self.token.clone());
let cmd = Command::new(vec![], request, propose_id);
let cmd = command_from_request_wrapper(propose_id, request);

let res_wrapper = if use_fast_path {
let (cmd_res, _sync_error) = self.curp_client.propose(cmd, true).await?;
Expand Down
21 changes: 6 additions & 15 deletions xline-client/src/clients/kv.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use std::sync::Arc;

use curp::client::Client as CurpClient;
use xlineapi::command::{command_from_request_wrapper, Command};
use xlineapi::{
command::{Command, KeyRange},
CompactionResponse, DeleteRangeResponse, PutResponse, RangeResponse, RequestWithToken,
TxnResponse,
};
Expand Down Expand Up @@ -55,13 +55,12 @@ impl KvClient {
/// ```
#[inline]
pub async fn put(&self, request: PutRequest) -> Result<PutResponse> {
let key_ranges = vec![KeyRange::new_one_key(request.key())];
let propose_id = self.curp_client.gen_propose_id().await?;
let request = RequestWithToken::new_with_token(
xlineapi::PutRequest::from(request).into(),
self.token.clone(),
);
let cmd = Command::new(key_ranges, request, propose_id);
let cmd = command_from_request_wrapper(propose_id, request);
let (cmd_res, _sync_res) = self.curp_client.propose(cmd, true).await?;
Ok(cmd_res.into_inner().into())
}
Expand Down Expand Up @@ -101,13 +100,12 @@ impl KvClient {
/// ```
#[inline]
pub async fn range(&self, request: RangeRequest) -> Result<RangeResponse> {
let key_ranges = vec![KeyRange::new(request.key(), request.range_end())];
let propose_id = self.curp_client.gen_propose_id().await?;
let request = RequestWithToken::new_with_token(
xlineapi::RangeRequest::from(request).into(),
self.token.clone(),
);
let cmd = Command::new(key_ranges, request, propose_id);
let cmd = command_from_request_wrapper(propose_id, request);
let (cmd_res, _sync_res) = self.curp_client.propose(cmd, true).await?;
Ok(cmd_res.into_inner().into())
}
Expand Down Expand Up @@ -140,13 +138,12 @@ impl KvClient {
/// ```
#[inline]
pub async fn delete(&self, request: DeleteRangeRequest) -> Result<DeleteRangeResponse> {
let key_ranges = vec![KeyRange::new(request.key(), request.range_end())];
let propose_id = self.curp_client.gen_propose_id().await?;
let request = RequestWithToken::new_with_token(
xlineapi::DeleteRangeRequest::from(request).into(),
self.token.clone(),
);
let cmd = Command::new(key_ranges, request, propose_id);
let cmd = command_from_request_wrapper(propose_id, request);
let (cmd_res, _sync_res) = self.curp_client.propose(cmd, true).await?;
Ok(cmd_res.into_inner().into())
}
Expand Down Expand Up @@ -190,18 +187,12 @@ impl KvClient {
/// ```
#[inline]
pub async fn txn(&self, request: TxnRequest) -> Result<TxnResponse> {
let key_ranges = request
.inner
.compare
.iter()
.map(|cmp| KeyRange::new(cmp.key.as_slice(), cmp.range_end.as_slice()))
.collect();
let propose_id = self.curp_client.gen_propose_id().await?;
let request = RequestWithToken::new_with_token(
xlineapi::TxnRequest::from(request).into(),
self.token.clone(),
);
let cmd = Command::new(key_ranges, request, propose_id);
let cmd = command_from_request_wrapper(propose_id, request);
let (cmd_res, Some(sync_res)) = self.curp_client.propose(cmd, false).await? else {
unreachable!("sync_res is always Some when use_fast_path is false");
};
Expand Down Expand Up @@ -254,7 +245,7 @@ impl KvClient {
xlineapi::CompactionRequest::from(request).into(),
self.token.clone(),
);
let cmd = Command::new(vec![], request, propose_id);
let cmd = command_from_request_wrapper(propose_id, request);

let res_wrapper = if use_fast_path {
let (cmd_res, _sync_res) = self.curp_client.propose(cmd, true).await?;
Expand Down
9 changes: 5 additions & 4 deletions xline-client/src/clients/lease.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ use curp::client::Client as CurpClient;
use futures::channel::mpsc::channel;
use tonic::{transport::Channel, Streaming};
use xlineapi::{
command::Command, LeaseGrantResponse, LeaseKeepAliveResponse, LeaseLeasesResponse,
LeaseRevokeResponse, LeaseTimeToLiveResponse, RequestWithToken,
command::{command_from_request_wrapper, Command},
LeaseGrantResponse, LeaseKeepAliveResponse, LeaseLeasesResponse, LeaseRevokeResponse,
LeaseTimeToLiveResponse, RequestWithToken,
};

use crate::{
Expand Down Expand Up @@ -89,7 +90,7 @@ impl LeaseClient {
xlineapi::LeaseGrantRequest::from(request).into(),
self.token.clone(),
);
let cmd = Command::new(vec![], request, propose_id);
let cmd = command_from_request_wrapper(propose_id, request);
let (cmd_res, _sync_res) = self.curp_client.propose(cmd, true).await?;
Ok(cmd_res.into_inner().into())
}
Expand Down Expand Up @@ -265,7 +266,7 @@ impl LeaseClient {
xlineapi::LeaseLeasesRequest {}.into(),
self.token.clone(),
);
let cmd = Command::new(vec![], request, propose_id);
let cmd = command_from_request_wrapper(propose_id, request);
let (cmd_res, _sync_res) = self.curp_client.propose(cmd, true).await?;
Ok(cmd_res.into_inner().into())
}
Expand Down
26 changes: 3 additions & 23 deletions xline-client/src/clients/lock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ use std::{
};

use clippy_utilities::OverflowArithmetic;
use curp::{client::Client as CurpClient, cmd::ProposeId};
use curp::client::Client as CurpClient;
use futures::{Future, FutureExt};
use tonic::transport::Channel;
use xlineapi::{
command::{Command, CommandResponse, KeyRange, SyncResponse},
command::{command_from_request_wrapper, Command, CommandResponse, KeyRange, SyncResponse},
Compare, CompareResult, CompareTarget, DeleteRangeRequest, DeleteRangeResponse, EventType,
LockResponse, PutRequest, RangeRequest, RangeResponse, Request, RequestOp, RequestWithToken,
RequestWrapper, Response, ResponseHeader, SortOrder, SortTarget, TargetUnion, TxnRequest,
Expand Down Expand Up @@ -233,26 +233,6 @@ impl LockClient {
Ok(UnlockResponse { header })
}

/// Generate `Command` proposal from `Request`
fn command_from_request_wrapper(propose_id: ProposeId, wrapper: RequestWithToken) -> Command {
#[allow(clippy::wildcard_enum_match_arm)]
let keys = match wrapper.request {
RequestWrapper::DeleteRangeRequest(ref req) => {
vec![KeyRange::new_one_key(req.key.as_slice())]
}
RequestWrapper::RangeRequest(ref req) => {
vec![KeyRange::new(req.key.as_slice(), req.range_end.as_slice())]
}
RequestWrapper::TxnRequest(ref req) => req
.compare
.iter()
.map(|cmp| KeyRange::new(cmp.key.as_slice(), cmp.range_end.as_slice()))
.collect(),
_ => vec![],
};
Command::new(keys, wrapper, propose_id)
}

/// Propose request and get result with fast/slow path
async fn propose<T>(
&self,
Expand All @@ -266,7 +246,7 @@ impl LockClient {
RequestWithToken::new_with_token(request.into(), self.token.clone());
let propose_id = self.curp_client.gen_propose_id().await?;

let cmd = Self::command_from_request_wrapper(propose_id, request_with_token);
let cmd = command_from_request_wrapper(propose_id, request_with_token);
self.curp_client
.propose(cmd, use_fast_path)
.await
Expand Down
57 changes: 20 additions & 37 deletions xline/src/server/auth_server.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{marker::PhantomData, sync::Arc};
use std::sync::Arc;

use curp::client::Client;
use pbkdf2::{
Expand All @@ -8,39 +8,31 @@ use pbkdf2::{
use tonic::metadata::MetadataMap;
use tracing::debug;
use xlineapi::{
command::{Command, CommandResponse, SyncResponse},
command::{command_from_request_wrapper, Command, CommandResponse, SyncResponse},
request_validation::RequestValidator,
RequestWithToken,
};

use super::command::{client_err_to_status, command_from_request_wrapper};
use crate::{
rpc::{
Auth, AuthDisableRequest, AuthDisableResponse, AuthEnableRequest, AuthEnableResponse,
AuthRoleAddRequest, AuthRoleAddResponse, AuthRoleDeleteRequest, AuthRoleDeleteResponse,
AuthRoleGetRequest, AuthRoleGetResponse, AuthRoleGrantPermissionRequest,
AuthRoleGrantPermissionResponse, AuthRoleListRequest, AuthRoleListResponse,
AuthRoleRevokePermissionRequest, AuthRoleRevokePermissionResponse, AuthStatusRequest,
AuthStatusResponse, AuthUserAddRequest, AuthUserAddResponse, AuthUserChangePasswordRequest,
AuthUserChangePasswordResponse, AuthUserDeleteRequest, AuthUserDeleteResponse,
AuthUserGetRequest, AuthUserGetResponse, AuthUserGrantRoleRequest,
AuthUserGrantRoleResponse, AuthUserListRequest, AuthUserListResponse,
AuthUserRevokeRoleRequest, AuthUserRevokeRoleResponse, AuthenticateRequest,
AuthenticateResponse, RequestWrapper, ResponseWrapper,
},
storage::storage_api::StorageApi,
use super::command::client_err_to_status;
use crate::rpc::{
Auth, AuthDisableRequest, AuthDisableResponse, AuthEnableRequest, AuthEnableResponse,
AuthRoleAddRequest, AuthRoleAddResponse, AuthRoleDeleteRequest, AuthRoleDeleteResponse,
AuthRoleGetRequest, AuthRoleGetResponse, AuthRoleGrantPermissionRequest,
AuthRoleGrantPermissionResponse, AuthRoleListRequest, AuthRoleListResponse,
AuthRoleRevokePermissionRequest, AuthRoleRevokePermissionResponse, AuthStatusRequest,
AuthStatusResponse, AuthUserAddRequest, AuthUserAddResponse, AuthUserChangePasswordRequest,
AuthUserChangePasswordResponse, AuthUserDeleteRequest, AuthUserDeleteResponse,
AuthUserGetRequest, AuthUserGetResponse, AuthUserGrantRoleRequest, AuthUserGrantRoleResponse,
AuthUserListRequest, AuthUserListResponse, AuthUserRevokeRoleRequest,
AuthUserRevokeRoleResponse, AuthenticateRequest, AuthenticateResponse, RequestWrapper,
ResponseWrapper,
};

/// Auth Server
#[derive(Debug)]
pub(crate) struct AuthServer<S>
where
S: StorageApi,
{
pub(crate) struct AuthServer {
/// Consensus client
client: Arc<Client<Command>>,
/// Phantom
phantom: PhantomData<S>,
}

/// Get token from metadata
Expand All @@ -51,16 +43,10 @@ pub(crate) fn get_token(metadata: &MetadataMap) -> Option<String> {
.and_then(|v| v.to_str().map(String::from).ok())
}

impl<S> AuthServer<S>
where
S: StorageApi,
{
impl AuthServer {
/// New `AuthServer`
pub(crate) fn new(client: Arc<Client<Command>>) -> Self {
Self {
client,
phantom: PhantomData,
}
Self { client }
}

/// Propose request and get result with fast/slow path
Expand All @@ -79,7 +65,7 @@ where
.gen_propose_id()
.await
.map_err(client_err_to_status)?;
let cmd = command_from_request_wrapper::<S>(propose_id, wrapper, None);
let cmd = command_from_request_wrapper(propose_id, wrapper);

self.client
.propose(cmd, use_fast_path)
Expand Down Expand Up @@ -116,10 +102,7 @@ where
}

#[tonic::async_trait]
impl<S> Auth for AuthServer<S>
where
S: StorageApi,
{
impl Auth for AuthServer {
async fn auth_enable(
&self,
request: tonic::Request<AuthEnableRequest>,
Expand Down
53 changes: 7 additions & 46 deletions xline/src/server/command.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,18 @@
use std::sync::Arc;

use super::barriers::{IdBarrier, IndexBarrier};
use crate::{
revision_number::RevisionNumberGenerator,
rpc::RequestBackend,
storage::{db::WriteOp, storage_api::StorageApi, AuthStore, KvStore, LeaseStore},
};
use curp::{
cmd::{Command as CurpCommand, CommandExecutor as CurpCommandExecutor, ProposeId},
error::ClientError,
LogIndex,
};
use engine::Snapshot;
use xlineapi::command::{Command, KeyRange};

use super::barriers::{IdBarrier, IndexBarrier};
use crate::{
revision_number::RevisionNumberGenerator,
rpc::{RequestBackend, RequestWithToken, RequestWrapper},
storage::{db::WriteOp, storage_api::StorageApi, AuthStore, KvStore, LeaseStore},
};
use xlineapi::command::Command;

/// Meta table name
pub(crate) const META_TABLE: &str = "meta";
Expand Down Expand Up @@ -209,44 +208,6 @@ where
}
}

/// Generate `Command` proposal from `Request`
pub(super) fn command_from_request_wrapper<S>(
propose_id: ProposeId,
wrapper: RequestWithToken,
lease_storage: Option<&LeaseStore<S>>,
) -> Command
where
S: StorageApi,
{
#[allow(clippy::wildcard_enum_match_arm)]
let keys = match wrapper.request {
RequestWrapper::RangeRequest(ref req) => {
vec![KeyRange::new(req.key.as_slice(), req.range_end.as_slice())]
}
RequestWrapper::PutRequest(ref req) => vec![KeyRange::new_one_key(req.key.as_slice())],
RequestWrapper::DeleteRangeRequest(ref req) => {
vec![KeyRange::new(req.key.as_slice(), req.range_end.as_slice())]
}
RequestWrapper::TxnRequest(ref req) => req
.compare
.iter()
.map(|cmp| KeyRange::new(cmp.key.as_slice(), cmp.range_end.as_slice()))
.collect(),
RequestWrapper::LeaseRevokeRequest(ref req) => {
let Some(lease_storage) = lease_storage else {
panic!("lease_storage should be Some(_) when creating command of LeaseRevokeRequest")
};
lease_storage
.get_keys(req.id)
.into_iter()
.map(|k| KeyRange::new(k, ""))
.collect()
}
_ => vec![],
};
Command::new(keys, wrapper, propose_id)
}

/// Convert `ClientError` to `tonic::Status`
pub(super) fn client_err_to_status(err: ClientError<Command>) -> tonic::Status {
#[allow(clippy::wildcard_enum_match_arm)]
Expand Down
Loading
Loading