Skip to content

Commit

Permalink
Introduce ArcVec<T> to improve Copy performance of messages
Browse files Browse the repository at this point in the history
Summary:
The ArcVec<T> is just a thin wrapper around Arc<[T]> that then
can be cheaply cloned. But at the same time can be seralized
and most importantly deserialized when sent over the wire
  • Loading branch information
muhamadazmy committed Oct 2, 2024
1 parent 89d0aa5 commit 300d29b
Show file tree
Hide file tree
Showing 2 changed files with 130 additions and 1 deletion.
3 changes: 2 additions & 1 deletion crates/types/src/net/replicated_loglet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use crate::logs::metadata::SegmentIndex;
use crate::logs::{LogId, LogletOffset, Record, SequenceNumber, TailState};
use crate::net::define_rpc;
use crate::replicated_loglet::ReplicatedLogletId;
use crate::storage::ArcVec;

// ----- ReplicatedLoglet Sequencer API -----
define_rpc! {
Expand Down Expand Up @@ -69,7 +70,7 @@ impl CommonResponseHeader {
pub struct Append {
#[serde(flatten)]
pub header: CommonRequestHeader,
pub payloads: Vec<Record>,
pub payloads: ArcVec<Record>,
}

impl Append {
Expand Down
128 changes: 128 additions & 0 deletions crates/types/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,17 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use core::fmt;
use std::marker::PhantomData;
use std::mem;
use std::ops::Deref;
use std::sync::Arc;

use bytes::{Buf, BufMut, Bytes, BytesMut};
use downcast_rs::{impl_downcast, DowncastSync};
use serde::de::{DeserializeOwned, Error as DeserializationError};
use serde::ser::Error as SerializationError;
use serde::ser::SerializeSeq;
use serde::{Deserialize, Serialize};
use tracing::error;

Expand Down Expand Up @@ -395,6 +399,130 @@ pub fn decode_from_flexbuffers<T: DeserializeOwned, B: Buf>(
}
}

/// [`ArcVec`] mainly used by `message` types to improve
/// cloning of messages.
///
/// It can replace [`Vec<T>`] most of the time in all structures
/// that need to be serialized over the wire.
///
/// Internally it keeps the data inside an [`Arc<[T]>`]
#[derive(Debug)]
pub struct ArcVec<T> {
inner: Arc<[T]>,
}

impl<T> Deref for ArcVec<T> {
type Target = [T];
fn deref(&self) -> &Self::Target {
&self.inner
}
}

impl<T> serde::Serialize for ArcVec<T>
where
T: serde::Serialize,
{
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let mut seq = serializer.serialize_seq(Some(self.len()))?;
for elem in self.iter() {
seq.serialize_element(elem)?;
}

seq.end()
}
}

impl<'de, T> serde::Deserialize<'de> for ArcVec<T>
where
T: serde::Deserialize<'de>,
{
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
deserializer.deserialize_seq(ArcVecVisitor::default())
}
}

struct ArcVecVisitor<T> {
_phantom: PhantomData<T>,
}

impl<T> Default for ArcVecVisitor<T> {
fn default() -> Self {
Self {
_phantom: PhantomData,
}
}
}

impl<'de, T> serde::de::Visitor<'de> for ArcVecVisitor<T>
where
T: serde::Deserialize<'de>,
{
type Value = ArcVec<T>;

fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
write!(formatter, "expecting an array")
}

fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error>
where
A: serde::de::SeqAccess<'de>,
{
let mut vec: Vec<T> = Vec::with_capacity(seq.size_hint().unwrap_or_default());
loop {
let Some(value) = seq.next_element::<T>()? else {
break;
};

vec.push(value);
}

Ok(vec.into())
}
}

impl<T> Clone for ArcVec<T> {
fn clone(&self) -> Self {
Self {
inner: Arc::clone(&self.inner),
}
}
}

impl<T> From<ArcVec<T>> for Arc<[T]> {
fn from(value: ArcVec<T>) -> Self {
value.inner
}
}

impl<T> From<ArcVec<T>> for Vec<T>
where
T: Clone,
{
fn from(value: ArcVec<T>) -> Self {
Vec::from_iter(value.iter().cloned())
}
}

impl<T> From<Vec<T>> for ArcVec<T> {
fn from(value: Vec<T>) -> Self {
Self {
inner: Arc::from_iter(value),
}
}
}

impl<T> From<Arc<[T]>> for ArcVec<T> {
fn from(value: Arc<[T]>) -> Self {
Self { inner: value }
}
}

#[cfg(test)]
mod tests {
use bytes::Bytes;
Expand Down

0 comments on commit 300d29b

Please sign in to comment.