Skip to content

Commit

Permalink
Merge pull request #365 from elfenpiff/iox2-337-samples-rejected-afte…
Browse files Browse the repository at this point in the history
…r-publisher-disconnected

[#337] samples rejected after publisher disconnected
  • Loading branch information
elfenpiff authored Sep 3, 2024
2 parents b67e09a + f1dd86b commit 0ace03a
Show file tree
Hide file tree
Showing 14 changed files with 275 additions and 57 deletions.
1 change: 1 addition & 0 deletions config/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,4 @@ Adjusting `global` settings ensures a non-interfering setup.
* `defaults.publish-subscribe.publisher-max-loaned-samples` - [int]: Maximum samples a publisher can loan.
* `defaults.publish-subscribe.enable-safe-overflow` - [`true`|`false`]: Default overflow behavior.
* `defaults.publish-subscribe.unable-to-deliver-strategy` - [`Block`|`DiscardSample`]: Default strategy for non-overflowing setups when delivery fails.
* `defaults.publish-subscribe.subscriber-expired-connection-buffer` - [int]: Expired connection buffer size of the subscriber. Connections to publishers are expired when the publisher disconnected from the service and the connection contains unconsumed samples.
1 change: 1 addition & 0 deletions config/iceoryx2.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ subscriber-max-borrowed-samples = 2
publisher-max-loaned-samples = 2
enable-safe-overflow = true
unable-to-deliver-strategy = 'Block' # or 'DiscardSample'
subscriber-expired-connection-buffer = 128

[defaults.event]
max-listeners = 2
Expand Down
1 change: 1 addition & 0 deletions doc/release-notes/iceoryx2-unreleased.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
* Mem-leak in `iceoryx2-bb-posix::Directory::contents()` and skip empty file names [#287](https://github.com/eclipse-iceoryx/iceoryx2/issues/287)
* Log macros do no longer return values [#292](https://github.com/eclipse-iceoryx/iceoryx2/issues/292)
* Fix cross-compilation issue with `scandir.c` [#318](https://github.com/eclipse-iceoryx/iceoryx2/issues/318)
* Fix sample loss when publisher disconnected before subscriber called receive [#337](https://github.com/eclipse-iceoryx/iceoryx2/issues/337)
* Service-creation-timeout is considered also for the data segment and zero copy connection [#361](https://github.com/eclipse-iceoryx/iceoryx2/issues/361)

### Refactoring
Expand Down
104 changes: 88 additions & 16 deletions iceoryx2-bb/container/src/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,17 +149,29 @@ pub mod details {
unsafe { self.clear_impl() }
}

/// Acquire an element from the queue. If the queue is empty it returns [`None`].
/// Returns a reference to the element from the beginning of the queue without removing it.
/// If the queue is empty it returns [`None`].
pub fn peek(&self) -> Option<&T> {
unsafe { self.peek_impl() }
}

/// Returns a mutable reference to the element from the beginning of the queue without removing it.
/// If the queue is empty it returns [`None`].
pub fn peek_mut(&mut self) -> Option<&mut T> {
unsafe { self.peek_mut_impl() }
}

/// Removes the element from the beginning of the queue. If the queue is empty it returns [`None`].
pub fn pop(&mut self) -> Option<T> {
unsafe { self.pop_impl() }
}

/// Adds an element to the queue. If the queue is full it returns false, otherwise true.
/// Adds an element at the end of the queue. If the queue is full it returns false, otherwise true.
pub fn push(&mut self, value: T) -> bool {
unsafe { self.push_impl(value) }
}

/// Adds an element to the queue. If the queue is full it returns the oldest element,
/// Adds an element at the end of the queue. If the queue is full it returns the oldest element,
/// otherwise [`None`].
pub fn push_with_overflow(&mut self, value: T) -> Option<T> {
unsafe { self.push_with_overflow_impl(value) }
Expand Down Expand Up @@ -259,7 +271,29 @@ pub mod details {
self.clear_impl()
}

/// Acquire an element from the queue. If the queue is empty it returns [`None`].
/// Returns a reference to the element from the beginning of the queue without removing it.
/// If the queue is empty it returns [`None`].
///
/// # Safety
///
/// * [`Queue::init()`] must have been called once before
///
pub fn peek(&self) -> Option<&T> {
unsafe { self.peek_impl() }
}

/// Returns a mutable reference to the element from the beginning of the queue without removing it.
/// If the queue is empty it returns [`None`].
///
/// # Safety
///
/// * [`Queue::init()`] must have been called once before
///
pub fn peek_mut(&mut self) -> Option<&mut T> {
unsafe { self.peek_mut_impl() }
}

/// Removes the element from the beginning of the queue. If the queue is empty it returns [`None`].
///
/// # Safety
///
Expand All @@ -269,7 +303,7 @@ pub mod details {
self.pop_impl()
}

/// Adds an element to the queue. If the queue is full it returns false, otherwise true.
/// Adds an element at the end of the queue. If the queue is full it returns false, otherwise true.
///
/// # Safety
///
Expand All @@ -279,7 +313,7 @@ pub mod details {
self.push_impl(value)
}

/// Adds an element to the queue. If the queue is full it returns the oldest element,
/// Adds an element at the end of the queue. If the queue is full it returns the oldest element,
/// otherwise [`None`].
///
/// # Safety
Expand Down Expand Up @@ -331,12 +365,37 @@ pub mod details {
while self.pop_impl().is_some() {}
}

pub(crate) unsafe fn pop_impl(&mut self) -> Option<T> {
pub(crate) unsafe fn peek_mut_impl(&mut self) -> Option<&mut T> {
self.verify_init(&format!("Queue<{}>::pop()", std::any::type_name::<T>()));

if self.is_empty() {
return None;
}

let index = (self.start - self.len) % self.capacity;

Some((*self.data_ptr.as_mut_ptr().add(index)).assume_init_mut())
}

pub(crate) unsafe fn peek_impl(&self) -> Option<&T> {
self.verify_init(&format!("Queue<{}>::pop()", std::any::type_name::<T>()));

if self.is_empty() {
return None;
}

let index = (self.start - self.len) % self.capacity;

Some((*self.data_ptr.as_ptr().add(index)).assume_init_ref())
}

pub(crate) unsafe fn pop_impl(&mut self) -> Option<T> {
self.verify_init(&format!("Queue<{}>::pop()", std::any::type_name::<T>()));

if self.is_empty() {
return None;
}

let index = (self.start - self.len) % self.capacity;
self.len -= 1;
let value = std::mem::replace(
Expand All @@ -347,27 +406,28 @@ pub mod details {
}

pub(crate) unsafe fn push_impl(&mut self, value: T) -> bool {
self.verify_init(&format!("Queue<{}>::push()", std::any::type_name::<T>()));

if self.len == self.capacity {
return false;
}

self.verify_init(&format!("Queue<{}>::push()", std::any::type_name::<T>()));

self.unchecked_push(value);
true
}

pub(crate) unsafe fn push_with_overflow_impl(&mut self, value: T) -> Option<T> {
self.verify_init(&format!(
"Queue<{}>::push_with_overflow()",
std::any::type_name::<T>()
));

let overridden_value = if self.len() == self.capacity() {
self.pop_impl()
} else {
None
};

self.verify_init(&format!(
"Queue<{}>::push_with_overflow()",
std::any::type_name::<T>()
));
self.unchecked_push(value);
overridden_value
}
Expand Down Expand Up @@ -458,17 +518,29 @@ impl<T, const CAPACITY: usize> FixedSizeQueue<T, CAPACITY> {
unsafe { self.state.clear_impl() }
}

/// Acquire an element from the queue. If the queue is empty it returns [`None`].
/// Returns a reference to the element from the beginning of the queue without removing it.
/// If the queue is empty it returns [`None`].
pub fn peek(&self) -> Option<&T> {
unsafe { self.state.peek_impl() }
}

/// Returns a mutable reference to the element from the beginning of the queue without removing it.
/// If the queue is empty it returns [`None`].
pub fn peek_mut(&mut self) -> Option<&mut T> {
unsafe { self.state.peek_mut_impl() }
}

/// Removes the element from the beginning of the queue. If the queue is empty it returns [`None`].
pub fn pop(&mut self) -> Option<T> {
unsafe { self.state.pop_impl() }
}

/// Adds an element to the queue. If the queue is full it returns false, otherwise true.
/// Adds an element at the end of the queue. If the queue is full it returns false, otherwise true.
pub fn push(&mut self, value: T) -> bool {
unsafe { self.state.push_impl(value) }
}

/// Adds an element to the queue. If the queue is full it returns the oldest element,
/// Adds an element at the end of the queue. If the queue is full it returns the oldest element,
/// otherwise [`None`].
pub fn push_with_overflow(&mut self, value: T) -> Option<T> {
unsafe { self.state.push_with_overflow_impl(value) }
Expand Down
18 changes: 18 additions & 0 deletions iceoryx2-bb/container/tests/queue_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,4 +307,22 @@ mod queue {
assert_that!(unsafe {sut.assume_init_mut()}.pop(), eq Some(123));
assert_that!(unsafe {sut.assume_init_mut()}.pop(), eq Some(456));
}

#[test]
fn peek_works() {
let mut sut = Sut::new();

assert_that!(sut.peek(), is_none);
assert_that!(sut.peek_mut(), is_none);

sut.push(8781);

assert_that!(*sut.peek().unwrap(), eq 8781);
assert_that!(*sut.peek_mut().unwrap(), eq 8781);

*sut.peek_mut().unwrap() = 99182;

assert_that!(*sut.peek().unwrap(), eq 99182);
assert_that!(*sut.peek_mut().unwrap(), eq 99182);
}
}
2 changes: 1 addition & 1 deletion iceoryx2-cal/src/static_storage/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,7 @@ impl crate::static_storage::StaticStorageBuilder<Storage> for Builder {
"{} due to a failure while reading the files metadata.", msg);

if metadata.permission() != FINAL_PERMISSIONS {
if elapsed_time >= timeout {
if elapsed_time > timeout {
fail!(from origin,
with StaticStorageOpenError::InitializationNotYetFinalized,
"{} since the static storage is still being created (in locked state), try later.",
Expand Down
2 changes: 1 addition & 1 deletion iceoryx2-cal/src/static_storage/process_local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ impl StaticStorageBuilder<Storage> for Builder {

let entry = entry.unwrap();
if entry.content.is_locked {
if elapsed_time >= timeout {
if elapsed_time > timeout {
fail!(from self, with StaticStorageOpenError::InitializationNotYetFinalized,
"{} since the static storage is still being created (in locked state), try later.", msg);
}
Expand Down
2 changes: 1 addition & 1 deletion iceoryx2-ffi/ffi/src/api/subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ impl SubscriberUnion {
#[repr(C)]
#[repr(align(16))] // alignment of Option<SubscriberUnion>
pub struct iox2_subscriber_storage_t {
internal: [u8; 512], // magic number obtained with size_of::<Option<SubscriberUnion>>()
internal: [u8; 816], // magic number obtained with size_of::<Option<SubscriberUnion>>()
}

#[repr(C)]
Expand Down
7 changes: 7 additions & 0 deletions iceoryx2/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,12 @@ pub struct PublishSubscribe {
/// [`crate::port::publisher::Publisher`] when the [`crate::port::subscriber::Subscriber`]s
/// buffer is full.
pub unable_to_deliver_strategy: UnableToDeliverStrategy,
/// Defines the size of the internal [`Subscriber`](crate::port::subscriber::Subscriber)
/// buffer that contains expired connections. An
/// connection is expired when the [`Publisher`](crate::port::publisher::Publisher)
/// disconnected from a service and the connection
/// still contains unconsumed [`Sample`](crate::sample::Sample)s.
pub subscriber_expired_connection_buffer: usize,
}

/// Default settings for the event messaging pattern. These settings are used unless
Expand Down Expand Up @@ -312,6 +318,7 @@ impl Default for Config {
publisher_max_loaned_samples: 2,
enable_safe_overflow: true,
unable_to_deliver_strategy: UnableToDeliverStrategy::Block,
subscriber_expired_connection_buffer: 128,
},
event: Event {
max_listeners: 2,
Expand Down
8 changes: 4 additions & 4 deletions iceoryx2/src/port/details/publisher_connections.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ impl<Service: service::Service> Connection<Service> {
}
#[derive(Debug)]
pub(crate) struct PublisherConnections<Service: service::Service> {
connections: Vec<UnsafeCell<Option<Connection<Service>>>>,
connections: Vec<UnsafeCell<Option<Arc<Connection<Service>>>>>,
subscriber_id: UniqueSubscriberId,
pub(crate) service_state: Arc<ServiceState<Service>>,
pub(crate) static_config: StaticConfig,
Expand Down Expand Up @@ -106,13 +106,13 @@ impl<Service: service::Service> PublisherConnections<Service> {
self.subscriber_id
}

pub(crate) fn get(&self, index: usize) -> &Option<Connection<Service>> {
pub(crate) fn get(&self, index: usize) -> &Option<Arc<Connection<Service>>> {
unsafe { &*self.connections[index].get() }
}

// only used internally as convinience function
#[allow(clippy::mut_from_ref)]
pub(crate) fn get_mut(&self, index: usize) -> &mut Option<Connection<Service>> {
pub(crate) fn get_mut(&self, index: usize) -> &mut Option<Arc<Connection<Service>>> {
#[deny(clippy::mut_from_ref)]
unsafe {
&mut *self.connections[index].get()
Expand All @@ -124,7 +124,7 @@ impl<Service: service::Service> PublisherConnections<Service> {
index: usize,
details: &PublisherDetails,
) -> Result<(), ConnectionFailure> {
*self.get_mut(index) = Some(Connection::new(self, details)?);
*self.get_mut(index) = Some(Arc::new(Connection::new(self, details)?));

Ok(())
}
Expand Down
Loading

0 comments on commit 0ace03a

Please sign in to comment.