From 8f09f60b9e9981286afc0df66e2cc4db6186dd00 Mon Sep 17 00:00:00 2001 From: Muhamad Awad Date: Wed, 2 Oct 2024 11:29:55 +0200 Subject: [PATCH] Introduce ArcVec to improve Copy performance of messages Summary: The ArcVec is just a thin wrapper around Arc<[T]> that then can be cheaply cloned. But at the same time can be seralized and most importantly deserialized when sent over the wire --- .../replicated_loglet/rpc_routers.rs | 3 +- crates/types/src/net/replicated_loglet.rs | 4 +- crates/types/src/storage.rs | 127 ++++++++++++++++++ 3 files changed, 131 insertions(+), 3 deletions(-) diff --git a/crates/bifrost/src/providers/replicated_loglet/rpc_routers.rs b/crates/bifrost/src/providers/replicated_loglet/rpc_routers.rs index 25f725223..674718672 100644 --- a/crates/bifrost/src/providers/replicated_loglet/rpc_routers.rs +++ b/crates/bifrost/src/providers/replicated_loglet/rpc_routers.rs @@ -126,7 +126,7 @@ impl SequencersRpc { loglet_id: loglet.params().loglet_id, segment_index: loglet.segment_index(), }, - payloads: Vec::from_iter(payloads.iter().cloned()), + payloads: payloads.into(), }; let commit_token = loop { @@ -137,7 +137,6 @@ impl SequencersRpc { .await .unwrap(); - // todo(azmy): avoid copying all records on retry match connection .send(loglet.params().sequencer, permits, msg.clone()) .await diff --git a/crates/types/src/net/replicated_loglet.rs b/crates/types/src/net/replicated_loglet.rs index 018a1cb37..fa538d39a 100644 --- a/crates/types/src/net/replicated_loglet.rs +++ b/crates/types/src/net/replicated_loglet.rs @@ -20,6 +20,7 @@ use crate::logs::metadata::SegmentIndex; use crate::logs::{LogId, LogletOffset, Record, SequenceNumber, TailState}; use crate::net::define_rpc; use crate::replicated_loglet::ReplicatedLogletId; +use crate::storage::ArcVec; // ----- ReplicatedLoglet Sequencer API ----- define_rpc! { @@ -69,12 +70,13 @@ impl CommonResponseHeader { pub struct Append { #[serde(flatten)] pub header: CommonRequestHeader, - pub payloads: Vec, + pub payloads: ArcVec, } impl Append { pub fn estimated_encode_size(&self) -> usize { self.payloads + .as_slice() .iter() .map(|p| p.estimated_encode_size()) .sum() diff --git a/crates/types/src/storage.rs b/crates/types/src/storage.rs index f97e5408b..4bc5c7cf8 100644 --- a/crates/types/src/storage.rs +++ b/crates/types/src/storage.rs @@ -8,6 +8,8 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use core::fmt; +use std::marker::PhantomData; use std::mem; use std::sync::Arc; @@ -15,6 +17,7 @@ use bytes::{Buf, BufMut, Bytes, BytesMut}; use downcast_rs::{impl_downcast, DowncastSync}; use serde::de::{DeserializeOwned, Error as DeserializationError}; use serde::ser::Error as SerializationError; +use serde::ser::SerializeSeq; use serde::{Deserialize, Serialize}; use tracing::error; @@ -395,6 +398,130 @@ pub fn decode_from_flexbuffers( } } +/// [`ArcVec`] mainly used by `message` types to improve +/// cloning of messages. +/// +/// It can replace [`Vec`] most of the time in all structures +/// that need to be serialized over the wire. +/// +/// Internally it keeps the data inside an [`Arc<[T]>`] +#[derive(Debug)] +pub struct ArcVec { + inner: Arc<[T]>, +} + +impl serde::Serialize for ArcVec +where + T: serde::Serialize, +{ + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + let slice = self.as_slice(); + let mut seq = serializer.serialize_seq(Some(slice.len()))?; + for elem in slice.iter() { + seq.serialize_element(elem)?; + } + + seq.end() + } +} + +impl<'de, T> serde::Deserialize<'de> for ArcVec +where + T: serde::Deserialize<'de>, +{ + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + deserializer.deserialize_seq(ArcVecVisitor::default()) + } +} + +struct ArcVecVisitor { + _phantom: PhantomData, +} + +impl Default for ArcVecVisitor { + fn default() -> Self { + Self { + _phantom: PhantomData, + } + } +} + +impl<'de, T> serde::de::Visitor<'de> for ArcVecVisitor +where + T: serde::Deserialize<'de>, +{ + type Value = ArcVec; + + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { + write!(formatter, "expecting an array") + } + + fn visit_seq(self, mut seq: A) -> Result + where + A: serde::de::SeqAccess<'de>, + { + let mut vec: Vec = Vec::with_capacity(seq.size_hint().unwrap_or_default()); + loop { + let Some(value) = seq.next_element::()? else { + break; + }; + + vec.push(value); + } + + Ok(vec.into()) + } +} + +impl Clone for ArcVec { + fn clone(&self) -> Self { + Self { + inner: Arc::clone(&self.inner), + } + } +} + +impl ArcVec { + pub fn as_slice(&self) -> &[T] { + &self.inner + } +} + +impl From> for Arc<[T]> { + fn from(value: ArcVec) -> Self { + value.inner + } +} + +impl From> for Vec +where + T: Clone, +{ + fn from(value: ArcVec) -> Self { + Vec::from_iter(value.as_slice().iter().cloned()) + } +} + +impl From> for ArcVec { + fn from(value: Vec) -> Self { + Self { + inner: Arc::from_iter(value), + } + } +} + +impl From> for ArcVec { + fn from(value: Arc<[T]>) -> Self { + Self { inner: value } + } +} + #[cfg(test)] mod tests { use bytes::Bytes;