Skip to content

Commit

Permalink
CAKE-63: Cleanup rkyv dependency management and cleanup dataviews (#70)
Browse files Browse the repository at this point in the history
  • Loading branch information
ChillFish8 authored Dec 31, 2023
1 parent e627142 commit dcfd58d
Show file tree
Hide file tree
Showing 19 changed files with 95 additions and 63 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ anyhow = "1"
[features]
test-utils = ["datacake-eventual-consistency/test-utils"]
rkyv = ["datacake-crdt/rkyv-support"]
rkyv-validation = ["rkyv", "datacake-crdt/rkyv-validation"]
simulation = ["datacake-rpc/simulation"]
default = [
"datacake-crdt",
Expand Down
5 changes: 3 additions & 2 deletions datacake-crdt/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@ readme = "README.md"
[dependencies]
thiserror = "1.0.33"

rkyv = { version = "0.7.42", features = ["validation"], optional = true }
rkyv = { version = "0.7.42", features = ["strict", "archive_le"], optional = true }

[features]
# Enables (de)serialization support for all data types.
rkyv-support = ["rkyv"]
rkyv-support = ["rkyv"]
rkyv-validation = ["rkyv-support", "rkyv/validation"]
12 changes: 9 additions & 3 deletions datacake-crdt/src/orswot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@ pub struct BadState;
#[derive(Debug, Clone)]
#[repr(C)]
#[cfg_attr(feature = "rkyv", derive(Serialize, Deserialize, Archive))]
#[cfg_attr(feature = "rkyv", archive(compare(PartialEq), check_bytes))]
#[cfg_attr(
all(feature = "rkyv", feature = "rkyv-validation"),
archive(compare(PartialEq), check_bytes)
)]
pub struct NodeVersions<const N: usize> {
nodes_max_stamps: [BTreeMap<u8, HLCTimestamp>; N],
safe_last_stamps: BTreeMap<u8, HLCTimestamp>,
Expand Down Expand Up @@ -137,7 +140,10 @@ impl<const N: usize> NodeVersions<N> {
#[derive(Debug, Default, Clone)]
#[repr(C)]
#[cfg_attr(feature = "rkyv", derive(Serialize, Deserialize, Archive))]
#[cfg_attr(feature = "rkyv", archive(check_bytes))]
#[cfg_attr(
all(feature = "rkyv", feature = "rkyv-validation"),
archive(check_bytes)
)]
/// A CRDT which supports purging of deleted entry tombstones.
///
/// This implementation is largely based on the Riak DB implementations
Expand Down Expand Up @@ -217,7 +223,7 @@ pub struct OrSWotSet<const N: usize = 1> {
}

impl<const N: usize> OrSWotSet<N> {
#[cfg(feature = "rkyv")]
#[cfg(all(feature = "rkyv", feature = "rkyv-validation"))]
/// Deserializes a [OrSWotSet] from a array of bytes.
pub fn from_bytes(data: &[u8]) -> Result<Self, BadState> {
let deserialized = rkyv::from_bytes::<Self>(data).map_err(|_| BadState)?;
Expand Down
15 changes: 13 additions & 2 deletions datacake-crdt/src/timestamp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ pub const DATACAKE_EPOCH: Duration = Duration::from_secs(1672534861);
#[derive(Debug, Hash, Copy, Clone, Eq, PartialEq, PartialOrd, Ord)]
#[repr(C)]
#[cfg_attr(feature = "rkyv", derive(Serialize, Deserialize, Archive))]
#[cfg_attr(feature = "rkyv", archive(compare(PartialEq), check_bytes))]
#[cfg_attr(
all(feature = "rkyv", feature = "rkyv-validation"),
archive(compare(PartialEq), check_bytes)
)]
#[cfg_attr(feature = "rkyv", archive_attr(repr(C), derive(Debug)))]
/// A HLC (Hybrid Logical Clock) timestamp implementation.
///
Expand Down Expand Up @@ -229,6 +232,14 @@ impl HLCTimestamp {
}
}

#[cfg(feature = "rkyv-support")]
impl ArchivedHLCTimestamp {
/// Casts the archived HLCTimestamp to the actual HLCTimestamp.
pub fn cast(&self) -> HLCTimestamp {
HLCTimestamp(self.0.value())
}
}

impl Display for HLCTimestamp {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
Expand Down Expand Up @@ -453,7 +464,7 @@ mod tests {
}
}

#[cfg(all(test, feature = "rkyv-support"))]
#[cfg(all(test, feature = "rkyv-support", feature = "rkyv-validation"))]
mod rkyv_tests {
use super::*;

Expand Down
50 changes: 23 additions & 27 deletions datacake-eventual-consistency/src/rpc/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use std::net::SocketAddr;
use datacake_crdt::{HLCTimestamp, Key, OrSWotSet};
use datacake_node::{Clock, NodeId};
use datacake_rpc::{Channel, RpcClient, Status};
use rkyv::AlignedVec;

use crate::core::{Document, DocumentMetadata};
use crate::rpc::services::consistency_impl::{
Expand Down Expand Up @@ -67,8 +66,7 @@ where
timestamp,
})
.await?
.to_owned()
.map_err(Status::internal)?;
.cast();
self.clock.register_ts(ts).await;
Ok(())
}
Expand All @@ -91,8 +89,7 @@ where
timestamp,
})
.await?
.to_owned()
.map_err(Status::internal)?;
.cast();
self.clock.register_ts(ts).await;
Ok(())
}
Expand All @@ -113,8 +110,7 @@ where
timestamp,
})
.await?
.to_owned()
.map_err(Status::internal)?;
.cast();
self.clock.register_ts(ts).await;
Ok(())
}
Expand All @@ -134,19 +130,13 @@ where
timestamp,
})
.await?
.to_owned()
.map_err(Status::internal)?;
.cast();
self.clock.register_ts(ts).await;
Ok(())
}

pub async fn apply_batch(&mut self, batch: &BatchPayload) -> Result<(), Status> {
let ts = self
.inner
.send(batch)
.await?
.to_owned()
.map_err(Status::internal)?;
let ts = self.inner.send(batch).await?.cast();
self.clock.register_ts(ts).await;
Ok(())
}
Expand Down Expand Up @@ -186,7 +176,7 @@ where
.inner
.send(&PollKeyspace(timestamp))
.await?
.to_owned()
.deserialize_view()
.map_err(Status::internal)?;

self.clock.register_ts(inner.timestamp).await;
Expand All @@ -211,17 +201,23 @@ where
timestamp,
keyspace: keyspace.into(),
})
.await?
.to_owned()
.map_err(Status::internal)?;

self.clock.register_ts(inner.timestamp).await;

let mut aligned = AlignedVec::with_capacity(inner.set.len());
aligned.extend_from_slice(&inner.set);
.await?;

let state = rkyv::from_bytes(&aligned).map_err(|_| Status::invalid())?;
Ok((inner.last_updated, state))
self.clock.register_ts(inner.timestamp.cast()).await;

// SAFETY:
// Although this may seem very unsafe, we can rely on the parent type (`KeyspaceOrSwotSet`)
// to satisfy our guarantees when performing this operation.
// - Internally datacake-rpc has already validated and checked the checksum of the overall
// payload of the message when it originally deserialized `KeyspaceOrSwotSet` this ensures
// the actual layout and original data is intact.
// - The alignment issues are solved by the the fact the DataView maintains a 16 byte aligned
// buffer which the parent type maintains in its view form.
let state = unsafe {
rkyv::from_bytes_unchecked(&inner.set).map_err(|_| Status::invalid())?
};

Ok((inner.last_updated.cast(), state))
}

/// Fetches a set of documents with the provided IDs belonging to the given keyspace.
Expand All @@ -240,7 +236,7 @@ where
})
.await?;

let payload = inner.to_owned().unwrap();
let payload = inner.deserialize_view().unwrap();

self.clock.register_ts(payload.timestamp).await;
Ok(payload.documents)
Expand Down
25 changes: 20 additions & 5 deletions datacake-eventual-consistency/src/rpc/services/consistency_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,10 @@ where
&self,
msg: Request<PutPayload>,
) -> Result<HLCTimestamp, Status> {
let payload = msg.into_inner().to_owned().map_err(Status::internal)?;
let payload = msg
.into_inner()
.deserialize_view()
.map_err(Status::internal)?;

let doc = payload.document;
let ctx = self.get_put_ctx(payload.ctx)?;
Expand Down Expand Up @@ -110,7 +113,10 @@ where
&self,
msg: Request<MultiPutPayload>,
) -> Result<Self::Reply, Status> {
let payload = msg.into_inner().to_owned().map_err(Status::internal)?;
let payload = msg
.into_inner()
.deserialize_view()
.map_err(Status::internal)?;

let ctx = self.get_put_ctx(payload.ctx)?;
self.group.clock().register_ts(payload.timestamp).await;
Expand Down Expand Up @@ -139,7 +145,10 @@ where
&self,
msg: Request<RemovePayload>,
) -> Result<Self::Reply, Status> {
let payload = msg.into_inner().to_owned().map_err(Status::internal)?;
let payload = msg
.into_inner()
.deserialize_view()
.map_err(Status::internal)?;

self.group.clock().register_ts(payload.timestamp).await;

Expand All @@ -166,7 +175,10 @@ where
&self,
msg: Request<MultiRemovePayload>,
) -> Result<Self::Reply, Status> {
let payload = msg.into_inner().to_owned().map_err(Status::internal)?;
let payload = msg
.into_inner()
.deserialize_view()
.map_err(Status::internal)?;

self.group.clock().register_ts(payload.timestamp).await;

Expand All @@ -193,7 +205,10 @@ where
&self,
msg: Request<BatchPayload>,
) -> Result<Self::Reply, Status> {
let msg = msg.into_inner().to_owned().map_err(Status::internal)?;
let msg = msg
.into_inner()
.deserialize_view()
.map_err(Status::internal)?;

self.group.clock().register_ts(msg.timestamp).await;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ where
&self,
msg: Request<PollKeyspace>,
) -> Result<Self::Reply, Status> {
let msg = msg.to_owned().map_err(Status::internal)?;
let msg = msg.deserialize_view().map_err(Status::internal)?;
self.group.clock().register_ts(msg.0).await;

let payload = self.group.get_keyspace_info().await;
Expand All @@ -59,7 +59,7 @@ where
type Reply = KeyspaceOrSwotSet;

async fn on_message(&self, msg: Request<GetState>) -> Result<Self::Reply, Status> {
let msg = msg.to_owned().map_err(Status::internal)?;
let msg = msg.deserialize_view().map_err(Status::internal)?;
self.group.clock().register_ts(msg.timestamp).await;

let keyspace = self.group.get_or_create_keyspace(&msg.keyspace).await;
Expand Down Expand Up @@ -87,7 +87,7 @@ where
type Reply = FetchedDocs;

async fn on_message(&self, msg: Request<FetchDocs>) -> Result<Self::Reply, Status> {
let msg = msg.to_owned().map_err(Status::internal)?;
let msg = msg.deserialize_view().map_err(Status::internal)?;
let clock = self.group.clock();
clock.register_ts(msg.timestamp).await;

Expand Down Expand Up @@ -143,7 +143,7 @@ pub struct GetState {
pub struct KeyspaceOrSwotSet {
pub timestamp: HLCTimestamp,
pub last_updated: HLCTimestamp,
#[with(rkyv::with::CopyOptimize)]
#[with(rkyv::with::Raw)]
pub set: Vec<u8>,
}

Expand All @@ -152,7 +152,7 @@ pub struct KeyspaceOrSwotSet {
#[archive(check_bytes)]
pub struct FetchDocs {
pub keyspace: String,
#[with(rkyv::with::CopyOptimize)]
#[with(rkyv::with::Raw)]
pub doc_ids: Vec<Key>,
pub timestamp: HLCTimestamp,
}
Expand Down
2 changes: 1 addition & 1 deletion datacake-node/src/rpc/services/chitchat_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ impl Handler<ChitchatRpcMessage> for ChitchatService {
&self,
request: Request<ChitchatRpcMessage>,
) -> Result<Self::Reply, Status> {
let msg = request.to_owned().map_err(Status::internal)?;
let msg = request.deserialize_view().map_err(Status::internal)?;

let from = msg.source;
self.clock.register_ts(msg.timestamp).await;
Expand Down
6 changes: 4 additions & 2 deletions datacake-rpc/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ pub type MessageReply<Svc, Msg> =
/// type Reply = MyMessage;
///
/// async fn on_message(&self, msg: Request<MyMessage>) -> Result<Self::Reply, Status> {
/// Ok(msg.to_owned().unwrap())
/// Ok(msg.deserialize_view().unwrap())
/// }
/// }
///
Expand Down Expand Up @@ -302,6 +302,8 @@ where
.await
.map_err(|e| Status::internal(e.message()))?;
let status = DataView::<Status>::using(buffer).map_err(|_| Status::invalid())?;
Err(status.to_owned().unwrap_or_else(|_| Status::invalid()))
Err(status
.deserialize_view()
.unwrap_or_else(|_| Status::invalid()))
}
}
6 changes: 3 additions & 3 deletions datacake-rpc/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ pub type HandlerKey = u64;
/// type Reply = MyMessage;
///
/// async fn on_message(&self, msg: Request<MyMessage>) -> Result<Self::Reply, Status> {
/// Ok(msg.to_owned().unwrap())
/// Ok(msg.deserialize_view().unwrap())
/// }
/// }
///
Expand All @@ -78,7 +78,7 @@ pub type HandlerKey = u64;
/// type Reply = MyOtherMessage;
///
/// async fn on_message(&self, msg: Request<MyOtherMessage>) -> Result<Self::Reply, Status> {
/// Ok(msg.to_owned().unwrap())
/// Ok(msg.deserialize_view().unwrap())
/// }
/// }
/// ```
Expand Down Expand Up @@ -201,7 +201,7 @@ pub trait RpcService: Sized {
/// // request buffer, you can use the `to_owned` method which will attempt to
/// // deserialize the inner message/view.
/// async fn on_message(&self, msg: Request<MyMessage>) -> Result<Self::Reply, Status> {
/// Ok(msg.to_owned().unwrap())
/// Ok(msg.deserialize_view().unwrap())
/// }
/// }
/// ```
Expand Down
2 changes: 1 addition & 1 deletion datacake-rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@
//! // Our `Request` gives us a zero-copy view to our message, this doesn't actually
//! // allocate the message type.
//! async fn on_message(&self, msg: Request<MyMessage>) -> Result<Self::Reply, Status> {
//! Ok(msg.to_owned().unwrap().name)
//! Ok(msg.deserialize_view().unwrap().name)
//! }
//! }
//!
Expand Down
2 changes: 1 addition & 1 deletion datacake-rpc/src/net/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ mod tests {
"Archived value and original value should match"
);

let copy: Status = view.to_owned().expect("Deserialize OK");
let copy: Status = view.deserialize_view().expect("Deserialize OK");
assert_eq!(
copy, status,
"Deserialized value and original value should match"
Expand Down
4 changes: 2 additions & 2 deletions datacake-rpc/src/rkyv_tooling/view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ where
{
#[inline]
/// Deserializes the view into it's owned value T.
pub fn to_owned(&self) -> Result<T, InvalidView> {
pub fn deserialize_view(&self) -> Result<T, InvalidView> {
self.view
.deserialize(&mut SharedDeserializeMap::default())
.map_err(|_| InvalidView)
Expand Down Expand Up @@ -199,7 +199,7 @@ mod tests {
let view: DataView<Demo> = DataView::using(bytes).unwrap();
assert!(view == demo, "Original and view must match.");

let value = view.to_owned().unwrap();
let value = view.deserialize_view().unwrap();
assert_eq!(value, demo, "Deserialized and original value should match.")
}
}
Loading

0 comments on commit dcfd58d

Please sign in to comment.