Skip to content

Commit

Permalink
Prevent duplicate message processing
Browse files Browse the repository at this point in the history
Handle cache to ensure that the same message is not processed twice if sent with different RaptorQ configurations
  • Loading branch information
herr-seppia committed Oct 2, 2024
1 parent 922088b commit 437f7de
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 68 deletions.
33 changes: 9 additions & 24 deletions src/transport/encoding/raptorq.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,10 @@ impl BroadcastPayload {
}
}
impl<'a> ChunkedPayload<'a> {
fn ray_id(&self) -> &[u8] {
&self.0.gossip_frame[0..RAY_ID_SIZE]
fn ray_id(&self) -> [u8; RAY_ID_SIZE] {
self.0.gossip_frame[0..RAY_ID_SIZE]
.try_into()
.expect("slice to be length 32")
}

fn transmission_info(
Expand All @@ -73,39 +75,22 @@ impl<'a> ChunkedPayload<'a> {
) -> Result<SafeObjectTransmissionInformation, TransmissionInformationError>
{
let slice = self.transmission_info_bytes();
let info = SafeObjectTransmissionInformation::try_from(slice)?;
let info = SafeObjectTransmissionInformation::try_from(&slice)?;
match info.inner.transfer_length() < max_udp_len {
true => Ok(info),
false => Err(TransmissionInformationError::TransferLengthExceeded),
}
}

fn transmission_info_bytes(&self) -> &[u8] {
&self.0.gossip_frame[RAY_ID_SIZE..(CHUNKED_HEADER_SIZE)]
fn transmission_info_bytes(&self) -> [u8; TRANSMISSION_INFO_SIZE] {
self.0.gossip_frame[RAY_ID_SIZE..(CHUNKED_HEADER_SIZE)]
.try_into()
.expect("slice to be length 12")
}

fn encoded_chunk(&self) -> &[u8] {
&self.0.gossip_frame[(CHUNKED_HEADER_SIZE)..]
}

fn header(&self) -> [u8; CHUNKED_HEADER_SIZE] {
let header = &self.0.gossip_frame[0..CHUNKED_HEADER_SIZE];

// Why do we need transmission info included into the header?
//
// Transmission info should be sent over a reliable channel, because
// it is critical to decode packets.
// Since it is sent over UDP alongside the encoded chunked bytes,
// corrupted transmission info can be received.
// If the corrupted info is part of the first received chunk, no
// message can ever be decoded.
//
// ** UPDATE:
// Since the correctness of an UDP packet is already guaranteed by OS
// checksum checks, Hashing has been removed in order to increase the
// decoding performance.
header.try_into().expect("slice to be length 44")
}
}

#[cfg(test)]
Expand Down
114 changes: 77 additions & 37 deletions src/transport/encoding/raptorq/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use raptorq::{Decoder as ExtDecoder, EncodingPacket};
use serde::{Deserialize, Serialize};
use tracing::{debug, trace, warn};

use super::{ChunkedPayload, CHUNKED_HEADER_SIZE};
use super::{ChunkedPayload, RAY_ID_SIZE, TRANSMISSION_INFO_SIZE};
use crate::encoding::message::Message;
use crate::encoding::payload::BroadcastPayload;
use crate::transport::encoding::Configurable;
Expand All @@ -25,7 +25,7 @@ const DEFAULT_CACHE_PRUNE_EVERY: Duration = Duration::from_secs(30);
const DEFAULT_MAX_UDP_LEN: u64 = 10 * 1_024 * 1_024;

pub struct RaptorQDecoder {
cache: BTreeMap<[u8; CHUNKED_HEADER_SIZE], CacheStatus>,
cache: BTreeMap<[u8; RAY_ID_SIZE], CacheStatus>,
last_pruned: Instant,
conf: RaptorQDecoderConf,
}
Expand Down Expand Up @@ -63,15 +63,27 @@ impl Configurable for RaptorQDecoder {
}
}

struct ReceivingInfo {
expire_on: Instant,
max_kad_height: u8,
}

struct DecoderInfo {
decoder: ExtDecoder,
max_kad_blocks: usize,
}

type DecoderLists = BTreeMap<[u8; TRANSMISSION_INFO_SIZE], DecoderInfo>;

enum CacheStatus {
Receiving(ExtDecoder, Instant, u8, usize),
Receiving(ReceivingInfo, DecoderLists),
Processed(Instant),
}

impl CacheStatus {
fn expired(&self) -> bool {
let expire_on = match self {
CacheStatus::Receiving(_, expire_on, _, _) => expire_on,
CacheStatus::Receiving(info, _) => &info.expire_on,
CacheStatus::Processed(expire_on) => expire_on,
};
expire_on < &Instant::now()
Expand All @@ -85,70 +97,98 @@ impl Decoder for RaptorQDecoder {
let chunked = ChunkedPayload::try_from(&payload)?;
let ray_id = chunked.ray_id();
let encode_info = chunked.transmission_info_bytes();
let chunked_header = chunked.header();

// Perform a `match` on the cache entry against the chunked header.
let status = match self.cache.entry(chunked_header) {
// Perform a `match` on the cache entry against the ray id.
let status = match self.cache.entry(ray_id) {
// Cache status exists: return it
std::collections::btree_map::Entry::Occupied(o) => o.into_mut(),

// Cache status not found: creates a new entry with
// CacheStatus::Receiving status and binds a new Decoder with
// the received transmission information
std::collections::btree_map::Entry::Vacant(v) => {
let info = chunked.transmission_info(self.conf.max_udp_len);
match info {
Ok(safe_info) => {
debug!(
event = "Start decoding payload",
ray = hex::encode(ray_id),
encode_info = hex::encode(encode_info)
);

v.insert(CacheStatus::Receiving(
ExtDecoder::new(safe_info.inner),
Instant::now() + self.conf.cache_ttl,
payload.height,
safe_info.max_blocks,
))
}
Err(e) => {
return Err(io::Error::new(
io::ErrorKind::Other,
format!("Invalid transmission info {e:?}",),
));
}
}
let receiving_info = ReceivingInfo {
expire_on: Instant::now() + self.conf.cache_ttl,
max_kad_height: payload.height,
};
v.insert(CacheStatus::Receiving(
receiving_info,
BTreeMap::new(),
))
}
};

let decoded = match status {
// Avoid to repropagate already processed messages
CacheStatus::Processed(_) => None,
CacheStatus::Receiving(decoder, _, max_height, max_blocks) => {
CacheStatus::Receiving(recv, list) => {
// check right decoder according to the encoding info
let decoder_info = match list.entry(encode_info) {
// Cache status exists: return it
std::collections::btree_map::Entry::Occupied(o) => {
o.into_mut()
}

// Cache status not found: creates a new entry with
// CacheStatus::Receiving status and binds a new Decoder
// with the received
// transmission information
std::collections::btree_map::Entry::Vacant(v) => {
let info = chunked
.transmission_info(self.conf.max_udp_len);

match info {
Ok(safe_info) => {
debug!(
event = "Start decoding payload",
ray = hex::encode(ray_id),
encode_info = hex::encode(encode_info)
);

v.insert(DecoderInfo {
decoder: ExtDecoder::new(
safe_info.inner,
),
max_kad_blocks: safe_info.max_blocks,
})
}
Err(e) => {
return Err(io::Error::new(
io::ErrorKind::Other,
format!(
"Invalid transmission info {e:?}",
),
));
}
}
}
};

// Depending on Beta replication, we can receive chunks of
// the same message from multiple peers.
// Those peers can send with different broadcast height.
// If those heights differs, we should check the highest one
// in order to preserve the propagation
if payload.height > *max_height {
*max_height = payload.height;
if payload.height > recv.max_kad_height {
recv.max_kad_height = payload.height;
}

let packet =
EncodingPacket::deserialize(chunked.encoded_chunk());
if packet.payload_id().source_block_number() as usize
>= *max_blocks
>= decoder_info.max_kad_blocks
{
return Ok(None);
};

decoder
decoder_info
.decoder
.decode(packet)
// If decoded successfully, create the new
// BroadcastMessage
.and_then(|decoded| {
let payload = BroadcastPayload {
height: *max_height,
height: recv.max_kad_height,
gossip_frame: decoded,
};
// Perform integrity check
Expand All @@ -171,7 +211,7 @@ impl Decoder for RaptorQDecoder {
// to propagate already processed messages
.map(|decoded| {
self.cache.insert(
chunked_header,
ray_id,
CacheStatus::Processed(
Instant::now() + self.conf.cache_ttl,
),
Expand Down
14 changes: 7 additions & 7 deletions src/transport/encoding/raptorq/safe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
* limitations under the License.
*/

use std::convert::{TryFrom, TryInto};
use std::convert::TryFrom;

use raptorq::ObjectTransmissionInformation;

Expand Down Expand Up @@ -60,7 +60,6 @@ pub(crate) struct SafeObjectTransmissionInformation {

#[derive(Debug, Clone)]
pub enum TransmissionInformationError {
InvalidSize,
SourceBlocksZero,
SymbolSizeZero,
SymbolSizeGreaterThanMTU,
Expand All @@ -70,12 +69,13 @@ pub enum TransmissionInformationError {
TooManySourceSymbols,
}

impl TryFrom<&[u8]> for SafeObjectTransmissionInformation {
impl TryFrom<&[u8; TRANSMISSION_INFO_SIZE]>
for SafeObjectTransmissionInformation
{
type Error = TransmissionInformationError;
fn try_from(value: &[u8]) -> Result<Self, Self::Error> {
let value: &[u8; TRANSMISSION_INFO_SIZE] = value
.try_into()
.map_err(|_| TransmissionInformationError::InvalidSize)?;
fn try_from(
value: &[u8; TRANSMISSION_INFO_SIZE],
) -> Result<Self, Self::Error> {
let config = ObjectTransmissionInformation::deserialize(value);

if config.source_blocks() == 0 {
Expand Down

0 comments on commit 437f7de

Please sign in to comment.