Skip to content

Commit

Permalink
refactor(xlineapi): add cache for conflict rule
Browse files Browse the repository at this point in the history
Signed-off-by: lxl66566 <[email protected]>
  • Loading branch information
lxl66566 committed Sep 20, 2024
1 parent 92de6f1 commit 2fe4785
Show file tree
Hide file tree
Showing 4 changed files with 111 additions and 27 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/xlineapi/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ async-trait = "0.1.81"
curp = { path = "../curp" }
curp-external-api = { path = "../curp-external-api" }
itertools = "0.13"
once_cell = "1.17.0"
prost = "0.13"
serde = { version = "1.0.204", features = ["derive"] }
thiserror = "1.0.61"
Expand Down
34 changes: 34 additions & 0 deletions crates/xlineapi/src/classifier.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,39 @@
use super::RequestWrapper;

/// because RequestWrapper do not repr u8, we need to convert it manually.
impl From<&RequestWrapper> for u8 {
fn from(value: &RequestWrapper) -> Self {
match *value {
RequestWrapper::PutRequest(_) => 0,
RequestWrapper::RangeRequest(_) => 1,
RequestWrapper::DeleteRangeRequest(_) => 2,
RequestWrapper::TxnRequest(_) => 3,
RequestWrapper::CompactionRequest(_) => 4,
RequestWrapper::AuthEnableRequest(_) => 5,
RequestWrapper::AuthDisableRequest(_) => 6,
RequestWrapper::AuthStatusRequest(_) => 7,
RequestWrapper::AuthRoleAddRequest(_) => 8,
RequestWrapper::AuthRoleDeleteRequest(_) => 9,
RequestWrapper::AuthRoleGetRequest(_) => 10,
RequestWrapper::AuthRoleGrantPermissionRequest(_) => 11,
RequestWrapper::AuthRoleListRequest(_) => 12,
RequestWrapper::AuthRoleRevokePermissionRequest(_) => 13,
RequestWrapper::AuthUserAddRequest(_) => 14,
RequestWrapper::AuthUserChangePasswordRequest(_) => 15,
RequestWrapper::AuthUserDeleteRequest(_) => 16,
RequestWrapper::AuthUserGetRequest(_) => 17,
RequestWrapper::AuthUserGrantRoleRequest(_) => 18,
RequestWrapper::AuthUserListRequest(_) => 19,
RequestWrapper::AuthUserRevokeRoleRequest(_) => 20,
RequestWrapper::AuthenticateRequest(_) => 21,
RequestWrapper::LeaseGrantRequest(_) => 22,
RequestWrapper::LeaseRevokeRequest(_) => 23,
RequestWrapper::LeaseLeasesRequest(_) => 24,
RequestWrapper::AlarmRequest(_) => 25,
}
}
}

/// Backend store of request
#[allow(missing_docs)]
#[derive(Debug, PartialEq, Eq)]
Expand Down
102 changes: 75 additions & 27 deletions crates/xlineapi/src/command.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use once_cell::sync::Lazy;
use std::{
collections::HashSet,
collections::{HashMap, HashSet},
ops::{Bound, RangeBounds},
sync::RwLock,
};

use curp::{client::ClientApi, cmd::Command as CurpCommand};
Expand All @@ -26,6 +28,10 @@ const UNBOUNDED: &[u8] = &[0_u8];
/// Range end to get one key
const ONE_KEY: &[u8] = &[];

/// A global cache to store conflict rules. The rules will not change, so it's safe to use a global cache.
static CONFLICT_RULES_CACHE: Lazy<RwLock<HashMap<(u8, u8), bool>>> =
Lazy::new(|| RwLock::new(HashMap::new()));

/// Key Range for Command
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Hash)]
pub struct KeyRange {
Expand Down Expand Up @@ -224,7 +230,7 @@ pub struct Command {
auth_info: Option<AuthInfo>,
}

/// Match all Classifiers seperated by `&`
/// Match all Classifiers separated by `&`
///
/// # Returns
///
Expand Down Expand Up @@ -297,7 +303,18 @@ impl ConflictCheck for Command {
fn is_conflict(&self, other: &Self) -> bool {
let t_req = &self.request;
let o_req = &other.request;
is_conflict!(
let mut cache_key: (u8, u8) = (t_req.into(), o_req.into());
if cache_key.0 > cache_key.1 {
cache_key = (cache_key.1, cache_key.0);
}
if let Some(res) = CONFLICT_RULES_CACHE
.read()
.ok()
.and_then(|c| c.get(&cache_key).cloned())
{
return res;
}
let first_step = is_conflict!(
// auth read request will not conflict with any request except the auth write request
(
true,
Expand All @@ -316,27 +333,33 @@ impl ConflictCheck for Command {
RequestBackend::Lease & RequestRw::Write
),
(true, RequestType::Compaction, RequestType::Compaction)
)(t_req, o_req)
.or_else(|| {
swap_map!(
RequestWrapper::TxnRequest,
RequestWrapper::CompactionRequest,
|x, y| x.is_conflict_with_rev(y.revision)
)(t_req, o_req)
})
// the fallback map
.or_else(|| {
let this_lease_ids = t_req.leases().into_iter().collect::<HashSet<_>>();
let other_lease_ids = o_req.leases().into_iter().collect::<HashSet<_>>();
let lease_conflict = !this_lease_ids.is_disjoint(&other_lease_ids);
let key_conflict = self
.keys()
.iter()
.cartesian_product(other.keys().iter())
.any(|(k1, k2)| k1.is_conflict(k2));
Some(lease_conflict || key_conflict)
})
.unwrap_or_default()
)(t_req, o_req);
if let Some(first_step_res) = first_step {
if let Ok(mut cache) = CONFLICT_RULES_CACHE.write() {
cache.insert((t_req.into(), o_req.into()), first_step_res);
}
}
first_step
.or_else(|| {
swap_map!(
RequestWrapper::TxnRequest,
RequestWrapper::CompactionRequest,
|x, y| x.is_conflict_with_rev(y.revision)
)(t_req, o_req)
})
// the fallback map
.or_else(|| {
let this_lease_ids = t_req.leases().into_iter().collect::<HashSet<_>>();
let other_lease_ids = o_req.leases().into_iter().collect::<HashSet<_>>();
let lease_conflict = !this_lease_ids.is_disjoint(&other_lease_ids);
let key_conflict = self
.keys()
.iter()
.cartesian_product(other.keys().iter())
.any(|(k1, k2)| k1.is_conflict(k2));
Some(lease_conflict || key_conflict)
})
.unwrap_or_default()
}
}

Expand Down Expand Up @@ -570,9 +593,9 @@ impl PbCodec for Command {
mod test {
use super::*;
use crate::{
AuthEnableRequest, AuthStatusRequest, CommandAttr, CompactionRequest, Compare,
DeleteRangeRequest, LeaseGrantRequest, LeaseLeasesRequest, LeaseRevokeRequest, PutRequest,
PutResponse, RangeRequest, Request, RequestOp, TxnRequest,
AlarmRequest, AuthEnableRequest, AuthStatusRequest, CommandAttr, CompactionRequest,
Compare, DeleteRangeRequest, LeaseGrantRequest, LeaseLeasesRequest, LeaseRevokeRequest,
PutRequest, PutResponse, RangeRequest, Request, RequestOp, TxnRequest,
};

#[test]
Expand Down Expand Up @@ -607,6 +630,31 @@ mod test {
assert!(!kr4.contains_key(b"e"));
}

#[test]
fn test_cache_should_work() {
let cmd1 = Command::new(RequestWrapper::AuthStatusRequest(AuthStatusRequest {
..Default::default()
}));
let cmd2 = Command::new(RequestWrapper::AlarmRequest(AlarmRequest {
..Default::default()
}));
let cache_key = ((&cmd1.request).into(), (&cmd2.request).into());
if CONFLICT_RULES_CACHE
.read()
.unwrap()
.get(&cache_key)
.is_some()
{
return;
}
let _ig = ConflictCheck::is_conflict(&cmd1, &cmd2);
assert!(CONFLICT_RULES_CACHE
.read()
.unwrap()
.get(&cache_key)
.is_some());
}

#[test]
fn test_command_conflict() {
let cmd1 = Command::new(RequestWrapper::DeleteRangeRequest(DeleteRangeRequest {
Expand Down

0 comments on commit 2fe4785

Please sign in to comment.