Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move rkyv RPC support to use CRC32 checksums rather than checkbytes and use stack allocated scratch. #62

Merged
merged 4 commits into from
Sep 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion datacake-rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ async-trait = "0.1.60"
thiserror = "1"
parking_lot = "0.12.1"
tracing = "0.1.37"
crc32fast = "1.3.2"

hyper = { version = "0.14.23", features = ["full"] }
rkyv = { version = "0.7.42", features = ["strict", "validation"] }
rkyv = { version = "0.7.42", features = ["strict"] }
tokio = { version = "1", default-features = false, features = ["rt"] }

# Used for simulation
Expand All @@ -31,6 +32,7 @@ async-stream = { version = "0.3.3", optional = true }
[dev-dependencies]
tokio = { version = "1", features = ["full"] }
test-helper = { path = "../test-helper" }
rkyv = { version = "0.7.42", features = ["strict", "validation"] }

[features]
test-utils = []
Expand Down
16 changes: 8 additions & 8 deletions datacake-rpc/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
use std::borrow::Cow;
use std::marker::PhantomData;
use std::time::Duration;

use crate::handler::{Handler, RpcService, TryAsBody, TryIntoBody};
use crate::net::{Channel, Status};
use crate::request::{MessageMetadata, RequestContents};
use crate::Body;
use crate::{Body, DataView};

/// A type alias for the returned data view of the RPC message reply.
pub type MessageReply<Svc, Msg> =
Expand Down Expand Up @@ -140,8 +139,8 @@ where
<Svc as Handler<Msg>>::Reply: RequestContents + TryIntoBody,
{
let metadata = MessageMetadata {
service_name: Cow::Borrowed(<Svc as RpcService>::service_name()),
path: Cow::Borrowed(<Svc as Handler<Msg>>::path()),
service_name: <Svc as RpcService>::service_name(),
path: <Svc as Handler<Msg>>::path(),
};

let body = msg.try_as_body()?;
Expand All @@ -164,8 +163,8 @@ where
<Svc as Handler<Msg>>::Reply: RequestContents + TryIntoBody,
{
let metadata = MessageMetadata {
service_name: Cow::Borrowed(<Svc as RpcService>::service_name()),
path: Cow::Borrowed(<Svc as Handler<Msg>>::path()),
service_name: <Svc as RpcService>::service_name(),
path: <Svc as Handler<Msg>>::path(),
};

let body = msg.try_into_body()?;
Expand Down Expand Up @@ -197,8 +196,9 @@ where
match result {
Ok(body) => <<Svc as Handler<Msg>>::Reply>::from_body(body).await,
Err(buffer) => {
let status = rkyv::from_bytes(&buffer).map_err(|_| Status::invalid())?;
Err(status)
let status =
DataView::<Status>::using(buffer).map_err(|_| Status::invalid())?;
Err(status.to_owned().unwrap_or_else(|_| Status::invalid()))
},
}
}
Expand Down
30 changes: 11 additions & 19 deletions datacake-rpc/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@
use std::sync::Arc;

use async_trait::async_trait;
use rkyv::ser::serializers::AllocSerializer;
use rkyv::{AlignedVec, Archive, Serialize};
use rkyv::{Archive, Serialize};

use crate::net::Status;
use crate::request::{Request, RequestContents};
use crate::{Body, SCRATCH_SPACE};
use crate::rkyv_tooling::DatacakeSerializer;
use crate::Body;

/// A specific handler key.
///
Expand Down Expand Up @@ -116,7 +116,7 @@
{
let phantom = PhantomHandler {
handler: self.service.clone(),
_msg: PhantomData::<Msg>::default(),

Check warning on line 119 in datacake-rpc/src/handler.rs

View workflow job for this annotation

GitHub Actions / clippy

use of `default` to create a unit struct

warning: use of `default` to create a unit struct --> datacake-rpc/src/handler.rs:119:37 | 119 | _msg: PhantomData::<Msg>::default(), | ^^^^^^^^^^^ help: remove this call to `default` | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#default_constructed_unit_structs = note: `#[warn(clippy::default_constructed_unit_structs)]` on by default

Check warning on line 119 in datacake-rpc/src/handler.rs

View workflow job for this annotation

GitHub Actions / clippy

use of `default` to create a unit struct

warning: use of `default` to create a unit struct --> datacake-rpc/src/handler.rs:119:37 | 119 | _msg: PhantomData::<Msg>::default(), | ^^^^^^^^^^^ help: remove this call to `default` | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#default_constructed_unit_structs = note: `#[warn(clippy::default_constructed_unit_structs)]` on by default
};

let uri = crate::to_uri_path(Svc::service_name(), <Svc as Handler<Msg>>::path());
Expand Down Expand Up @@ -236,7 +236,7 @@
&self,
remote_addr: SocketAddr,
data: Body,
) -> Result<Body, AlignedVec>;
) -> Result<Body, Status>;
}

struct PhantomHandler<H, Msg>
Expand All @@ -258,26 +258,15 @@
&self,
remote_addr: SocketAddr,
data: Body,
) -> Result<Body, AlignedVec> {
let view = match Msg::from_body(data).await {
Ok(view) => view,
Err(status) => {
let error = rkyv::to_bytes::<_, SCRATCH_SPACE>(&status)
.unwrap_or_else(|_| AlignedVec::new());
return Err(error);
},
};
) -> Result<Body, Status> {
let view = Msg::from_body(data).await?;

let msg = Request::new(remote_addr, view);

self.handler
.on_message(msg)
.await
.and_then(|reply| reply.try_into_body())
.map_err(|status| {
rkyv::to_bytes::<_, SCRATCH_SPACE>(&status)
.unwrap_or_else(|_| AlignedVec::new())
})
}
}

Expand Down Expand Up @@ -306,10 +295,11 @@

impl<T> TryAsBody for T
where
T: Archive + Serialize<AllocSerializer<SCRATCH_SPACE>>,
T: Archive + Serialize<DatacakeSerializer>,
{
#[inline]
fn try_as_body(&self) -> Result<Body, Status> {
rkyv::to_bytes::<_, SCRATCH_SPACE>(self)
crate::rkyv_tooling::to_view_bytes(self)
.map(|v| Body::from(v.to_vec()))
.map_err(|e| Status::internal(e.to_string()))
}
Expand All @@ -319,12 +309,14 @@
where
T: TryAsBody,
{
#[inline]
fn try_into_body(self) -> Result<Body, Status> {
<Self as TryAsBody>::try_as_body(&self)
}
}

impl TryIntoBody for Body {
#[inline]
fn try_into_body(self) -> Result<Body, Status> {
Ok(self)
}
Expand Down
26 changes: 16 additions & 10 deletions datacake-rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,24 +100,30 @@ mod client;
mod handler;
mod net;
mod request;
mod rkyv_tooling;
mod server;
mod utils;
mod view;

pub(crate) const SCRATCH_SPACE: usize = 4096;

use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};

/// A re-export of the async-trait macro.
pub use async_trait::async_trait;
pub use body::Body;
pub use client::{MessageReply, RpcClient};
pub use handler::{Handler, RpcService, ServiceRegistry, TryAsBody, TryIntoBody};
pub use net::{ArchivedErrorCode, ArchivedStatus, Channel, Error, ErrorCode, Status};
pub use request::{Request, RequestContents};
pub use server::Server;
pub use view::{DataView, InvalidView};

pub use self::body::Body;
pub use self::client::{MessageReply, RpcClient};
pub use self::handler::{Handler, RpcService, ServiceRegistry, TryAsBody, TryIntoBody};
pub use self::net::{
ArchivedErrorCode,
ArchivedStatus,
Channel,
Error,
ErrorCode,
Status,
};
pub use self::request::{Request, RequestContents};
pub use self::rkyv_tooling::{DataView, InvalidView};
pub use self::server::Server;

pub(crate) fn hash<H: Hash + ?Sized>(v: &H) -> u64 {
let mut hasher = DefaultHasher::new();
Expand Down
6 changes: 1 addition & 5 deletions datacake-rpc/src/net/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,7 @@ impl Channel {
metadata: MessageMetadata,
msg: Body,
) -> Result<Result<Body, AlignedVec>, Error> {
let uri = format!(
"http://{}{}",
self.remote_addr,
crate::to_uri_path(&metadata.service_name, &metadata.path),
);
let uri = format!("http://{}{}", self.remote_addr, metadata.to_uri_path(),);
let request = Request::builder()
.method(Method::POST)
.uri(uri)
Expand Down
63 changes: 33 additions & 30 deletions datacake-rpc/src/net/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use tokio::task::JoinHandle;

use crate::body::Body;
use crate::server::ServerState;
use crate::{Status, SCRATCH_SPACE};
use crate::Status;

/// Starts the RPC server.
///
Expand Down Expand Up @@ -88,37 +88,40 @@ async fn handle_message(
state: ServerState,
remote_addr: SocketAddr,
) -> anyhow::Result<Response<hyper::Body>> {
let (req, body) = req.into_parts();
let uri = req.uri.path();
match state.get_handler(uri) {
None => {
let status = Status::unavailable(format!("Unknown service {uri}"));
let buffer =
rkyv::to_bytes::<_, SCRATCH_SPACE>(&status).unwrap_or_else(|e| {
warn!(error = ?e, "Failed to serialize error message.");
AlignedVec::new()
});

let mut response = Response::new(buffer.to_vec().into());
(*response.status_mut()) = StatusCode::BAD_REQUEST;
let reply = try_handle_request(req, state, remote_addr).await;

match reply {
Ok(body) => {
let mut response = Response::new(body.into_inner());
(*response.status_mut()) = StatusCode::OK;
Ok(response)
},
Some(handler) => {
let reply = handler.try_handle(remote_addr, Body::new(body)).await;

match reply {
Ok(body) => {
let mut response = Response::new(body.into_inner());
(*response.status_mut()) = StatusCode::OK;
Ok(response)
},
Err(buffer) => {
let mut response = Response::new(buffer.to_vec().into());
(*response.status_mut()) = StatusCode::BAD_REQUEST;
Ok(response)
},
}
},
Err(status) => Ok(create_bad_request(&status)),
}
}

async fn try_handle_request(
req: Request<hyper::Body>,
state: ServerState,
remote_addr: SocketAddr,
) -> Result<Body, Status> {
let (req, body) = req.into_parts();
let uri = req.uri.path();

let handler = state
.get_handler(uri)
.ok_or_else(|| Status::unavailable(format!("Unknown service {uri}")))?;

handler.try_handle(remote_addr, Body::new(body)).await
}

fn create_bad_request(status: &Status) -> Response<hyper::Body> {
// This should be infallible.
let buffer =
crate::rkyv_tooling::to_view_bytes(status).unwrap_or_else(|_| AlignedVec::new());

let mut response = Response::new(buffer.to_vec().into());
(*response.status_mut()) = StatusCode::BAD_REQUEST;

response
}
15 changes: 8 additions & 7 deletions datacake-rpc/src/net/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use rkyv::{Archive, Deserialize, Serialize};

#[repr(C)]
#[derive(Serialize, Deserialize, Archive, PartialEq, Eq)]
#[archive(compare(PartialEq), check_bytes)]
#[archive(compare(PartialEq))]
#[archive_attr(derive(PartialEq, Eq, Debug))]
/// Status information around the cause of a message request failing.
///
Expand Down Expand Up @@ -81,7 +81,7 @@ impl Error for Status {}

#[repr(C)]
#[derive(Serialize, Deserialize, Archive, PartialEq, Eq, Debug)]
#[archive(compare(PartialEq), check_bytes)]
#[archive(compare(PartialEq))]
#[archive_attr(derive(Debug, PartialEq, Eq))]
/// A generic error code describing the high level reason why the request failed.
pub enum ErrorCode {
Expand All @@ -105,14 +105,15 @@ mod tests {

fn test_status_variant(status: Status) {
println!("Testing: {:?}", &status);
let bytes = rkyv::to_bytes::<_, 1024>(&status).expect("Serialize OK");
let archived =
rkyv::check_archived_root::<'_, Status>(&bytes).expect("Archive OK");
let bytes = crate::rkyv_tooling::to_view_bytes(&status).expect("Serialize OK");
let view =
crate::rkyv_tooling::DataView::<Status>::using(bytes).expect("Archive OK");
assert_eq!(
archived, &status,
view, status,
"Archived value and original value should match"
);
let copy: Status = rkyv::from_bytes(&bytes).expect("Deserialize OK");

let copy: Status = view.to_owned().expect("Deserialize OK");
assert_eq!(
copy, status,
"Deserialized value and original value should match"
Expand Down
Loading
Loading