Skip to content

Commit

Permalink
Introduce ArcVec<T> to improve Copy performance of messages
Browse files Browse the repository at this point in the history
Summary:
The ArcVec<T> is just a thin wrapper around Arc<[T]> that then
can be cheaply cloned. But at the same time can be seralized
and most importantly deserialized when sent over the wire
  • Loading branch information
muhamadazmy committed Oct 2, 2024
1 parent 32c0d35 commit 8f09f60
Show file tree
Hide file tree
Showing 3 changed files with 131 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ impl SequencersRpc {
loglet_id: loglet.params().loglet_id,
segment_index: loglet.segment_index(),
},
payloads: Vec::from_iter(payloads.iter().cloned()),
payloads: payloads.into(),
};

let commit_token = loop {
Expand All @@ -137,7 +137,6 @@ impl SequencersRpc {
.await
.unwrap();

// todo(azmy): avoid copying all records on retry
match connection
.send(loglet.params().sequencer, permits, msg.clone())
.await
Expand Down
4 changes: 3 additions & 1 deletion crates/types/src/net/replicated_loglet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use crate::logs::metadata::SegmentIndex;
use crate::logs::{LogId, LogletOffset, Record, SequenceNumber, TailState};
use crate::net::define_rpc;
use crate::replicated_loglet::ReplicatedLogletId;
use crate::storage::ArcVec;

// ----- ReplicatedLoglet Sequencer API -----
define_rpc! {
Expand Down Expand Up @@ -69,12 +70,13 @@ impl CommonResponseHeader {
pub struct Append {
#[serde(flatten)]
pub header: CommonRequestHeader,
pub payloads: Vec<Record>,
pub payloads: ArcVec<Record>,
}

impl Append {
pub fn estimated_encode_size(&self) -> usize {
self.payloads
.as_slice()
.iter()
.map(|p| p.estimated_encode_size())
.sum()
Expand Down
127 changes: 127 additions & 0 deletions crates/types/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,16 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use core::fmt;
use std::marker::PhantomData;
use std::mem;
use std::sync::Arc;

use bytes::{Buf, BufMut, Bytes, BytesMut};
use downcast_rs::{impl_downcast, DowncastSync};
use serde::de::{DeserializeOwned, Error as DeserializationError};
use serde::ser::Error as SerializationError;
use serde::ser::SerializeSeq;
use serde::{Deserialize, Serialize};
use tracing::error;

Expand Down Expand Up @@ -395,6 +398,130 @@ pub fn decode_from_flexbuffers<T: DeserializeOwned, B: Buf>(
}
}

/// [`ArcVec`] mainly used by `message` types to improve
/// cloning of messages.
///
/// It can replace [`Vec<T>`] most of the time in all structures
/// that need to be serialized over the wire.
///
/// Internally it keeps the data inside an [`Arc<[T]>`]
#[derive(Debug)]
pub struct ArcVec<T> {
inner: Arc<[T]>,
}

impl<T> serde::Serialize for ArcVec<T>
where
T: serde::Serialize,
{
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let slice = self.as_slice();
let mut seq = serializer.serialize_seq(Some(slice.len()))?;
for elem in slice.iter() {
seq.serialize_element(elem)?;
}

seq.end()
}
}

impl<'de, T> serde::Deserialize<'de> for ArcVec<T>
where
T: serde::Deserialize<'de>,
{
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
deserializer.deserialize_seq(ArcVecVisitor::default())
}
}

struct ArcVecVisitor<T> {
_phantom: PhantomData<T>,
}

impl<T> Default for ArcVecVisitor<T> {
fn default() -> Self {
Self {
_phantom: PhantomData,
}
}
}

impl<'de, T> serde::de::Visitor<'de> for ArcVecVisitor<T>
where
T: serde::Deserialize<'de>,
{
type Value = ArcVec<T>;

fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
write!(formatter, "expecting an array")
}

fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error>
where
A: serde::de::SeqAccess<'de>,
{
let mut vec: Vec<T> = Vec::with_capacity(seq.size_hint().unwrap_or_default());
loop {
let Some(value) = seq.next_element::<T>()? else {
break;
};

vec.push(value);
}

Ok(vec.into())
}
}

impl<T> Clone for ArcVec<T> {
fn clone(&self) -> Self {
Self {
inner: Arc::clone(&self.inner),
}
}
}

impl<T> ArcVec<T> {
pub fn as_slice(&self) -> &[T] {
&self.inner
}
}

impl<T> From<ArcVec<T>> for Arc<[T]> {
fn from(value: ArcVec<T>) -> Self {
value.inner
}
}

impl<T> From<ArcVec<T>> for Vec<T>
where
T: Clone,
{
fn from(value: ArcVec<T>) -> Self {
Vec::from_iter(value.as_slice().iter().cloned())
}
}

impl<T> From<Vec<T>> for ArcVec<T> {
fn from(value: Vec<T>) -> Self {
Self {
inner: Arc::from_iter(value),
}
}
}

impl<T> From<Arc<[T]>> for ArcVec<T> {
fn from(value: Arc<[T]>) -> Self {
Self { inner: value }
}
}

#[cfg(test)]
mod tests {
use bytes::Bytes;
Expand Down

0 comments on commit 8f09f60

Please sign in to comment.