Skip to content

Commit

Permalink
fix: Use atomic variables to maintain thread-safe data access without…
Browse files Browse the repository at this point in the history
… locks, port unique_lock creation into constructor and only perform lock/unlock operations in callbacks.

* Both blocking read and write callbacks continue to rely on condition variables for data availability.
* The write_overwrite callback now relies only on the atomic operations without locks, and does not push the read index if overwriting.
* The read_timeout callback will simply not perform the callback instead of returning a nullptr.
* Implemented true nonblocking read_nonblock and write_nonblock callbacks, which rely only on the atomic operations and return immediately either performing or not performing the callback.
  • Loading branch information
Imaniac230 committed Mar 6, 2024
1 parent 874c6bf commit 1c03d79
Showing 1 changed file with 144 additions and 59 deletions.
203 changes: 144 additions & 59 deletions ouster-ros/src/thread_safe_ring_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#pragma once

#include <atomic>
#include <condition_variable>
#include <mutex>
#include <vector>
Expand All @@ -22,13 +23,15 @@ class ThreadSafeRingBuffer {
item_size(item_size_),
max_items_count(items_count_),
active_items_count(0),
write_idx(0),
read_idx(0) {}
write_idx(SIZE_MAX),
read_idx(SIZE_MAX),
new_data_lock(mutex, std::defer_lock),
free_space_lock(mutex, std::defer_lock) {}

/**
* Gets the maximum number of items that this ring buffer can hold.
*/
size_t capacity() const { return max_items_count; }
[[nodiscard]] size_t capacity() const { return max_items_count; }

/**
* Gets the number of item that currently occupy the ring buffer. This
Expand All @@ -39,10 +42,7 @@ class ThreadSafeRingBuffer {
* this does not guarantee that a subsequent call to read() or write()
* wouldn't cause the calling thread to be blocked.
*/
size_t size() const {
std::lock_guard<std::mutex> lock(mutex);
return active_items_count;
}
[[nodiscard]] size_t size() const { return active_items_count.load(); }

/**
* Checks if the ring buffer is empty.
Expand All @@ -51,10 +51,7 @@ class ThreadSafeRingBuffer {
* if empty() returns true this does not guarantee that calling the write()
* operation directly right after wouldn't block the calling thread.
*/
bool empty() const {
std::lock_guard<std::mutex> lock(mutex);
return active_items_count == 0;
}
[[nodiscard]] bool empty() const { return active_items_count.load() == 0; }

/**
* Checks if the ring buffer is full.
Expand All @@ -63,84 +60,172 @@ class ThreadSafeRingBuffer {
* if full() returns true this does not guarantee that calling the read()
* operation directly right after wouldn't block the calling thread.
*/
bool full() const {
std::lock_guard<std::mutex> lock(mutex);
return active_items_count == max_items_count;
[[nodiscard]] bool full() const {
return active_items_count.load() == capacity();
}

/**
* Writes to the buffer safely, the method will keep blocking until the
* Writes to the buffer safely, this method will keep blocking until the
* there is a space available within the buffer.
*/
template <class BufferWriteFn>
void write(BufferWriteFn&& buffer_write) {
std::unique_lock<std::mutex> lock(mutex);
fullCondition.wait(lock,
[this] { return active_items_count < capacity(); });
buffer_write(&buffer[write_idx * item_size]);
write_idx = (write_idx + 1) % capacity();
++active_items_count;
emptyCondition.notify_one();
free_space_lock.lock();
free_space_condition.wait(free_space_lock, [this] { return !full(); });
free_space_lock.unlock();
perform_write(buffer_write);
}

/**
* Writes to the buffer safely, if there is not space left then this method
* will overite the last item.
* Writes to the buffer safely, if there is no space left, then this method
* will overwrite the last item.
*/
template <class BufferWriteFn>
void write_overwrite(BufferWriteFn&& buffer_write) {
std::unique_lock<std::mutex> lock(mutex);
buffer_write(&buffer[write_idx * item_size]);
write_idx = (write_idx + 1) % capacity();
if (active_items_count < capacity()) {
++active_items_count;
} else {
read_idx = (read_idx + 1) % capacity();
}
emptyCondition.notify_one();
perform_write(buffer_write);
}

/**
* Writes to the buffer safely, this method will return immediately and if
* there is no space left, the data will not be written (will be dropped).
*/
template <class BufferWriteFn>
void write_nonblock(BufferWriteFn&& buffer_write) {
if (!full()) perform_write(buffer_write);
}

/**
* Gives access to read the buffer through a callback, the method will block
* until there is something to read is available.
* Gives access to read the buffer through a callback, this method will block
* until there is something to read available.
*/
template <typename BufferReadFn>
void read(BufferReadFn&& buffer_read) {
std::unique_lock<std::mutex> lock(mutex);
emptyCondition.wait(lock, [this] { return active_items_count > 0; });
buffer_read(&buffer[read_idx * item_size]);
read_idx = (read_idx + 1) % capacity();
--active_items_count;
fullCondition.notify_one();
new_data_lock.lock();
new_data_condition.wait(new_data_lock, [this] { return !empty(); });
new_data_lock.unlock();
perform_read(buffer_read);
}

/**
* Gives access to read the buffer through a callback, if buffer is
* inaccessible the method will timeout and buffer_read gets a nullptr.
* inaccessible this method will timeout and the callback is not performed.
*/
template <typename BufferReadFn>
void read_timeout(BufferReadFn&& buffer_read,
std::chrono::seconds timeout) {
std::unique_lock<std::mutex> lock(mutex);
if (emptyCondition.wait_for(
lock, timeout, [this] { return active_items_count > 0; })) {
buffer_read(&buffer[read_idx * item_size]);
read_idx = (read_idx + 1) % capacity();
--active_items_count;
fullCondition.notify_one();
} else {
buffer_read((uint8_t*)nullptr);
new_data_lock.lock();
if (new_data_condition.wait_for(
new_data_lock, timeout, [this] { return !empty(); })) {
new_data_lock.unlock();
perform_read(buffer_read);
return;
}
new_data_lock.unlock();
}

/**
* Gives access to read the buffer through a callback, this method will return
* immediately and the callback is performed only when there is data available.
*/
template <typename BufferReadFn>
void read_nonblock(BufferReadFn&& buffer_read) {
if (!empty()) perform_read(buffer_read);
}

protected:
/**
* Resets the write_idx to an initial value.
* @remarks Should be mostly used by tests to allow reading of the final
* item left in the buffer.
*/
void reset_write_idx() { write_idx = SIZE_MAX; }

private:
/**
* Performs the actual sequence of operations for writing.
* @tparam BufferWriteFn
* @param buffer_write
*/
template <class BufferWriteFn>
void perform_write(BufferWriteFn&& buffer_write) {
buffer_write(&buffer[increment_with_capacity(write_idx) * item_size]);
push();
new_data_condition.notify_all();
}

/**
* Performs the actual sequence of operations for reading.
* @tparam BufferReadFn
* @param buffer_read
* @remarks
* If this function attempts to read using an index currently held by the
* writer, it will not perform the operations.
*/
template <typename BufferReadFn>
void perform_read(BufferReadFn&& buffer_read) {
if (incremented_with_capacity(read_idx.load()) != write_idx.load()) {
buffer_read(&buffer[increment_with_capacity(read_idx) * item_size]);
pop();
free_space_condition.notify_one();
}
}

/**
* Atomically increments a given index, wrapping around with the buffer capacity.
* Also returns the incremented value so that only a single atomic load is
* performed for this operation.
* @param idx Reference to the index to be incremented.
* @return The new incremented value of the index.
*/
[[nodiscard]] size_t increment_with_capacity(std::atomic_size_t &idx) const {
const size_t incremented = (idx.load() + 1) % capacity();
idx = incremented;
return incremented;
}

/**
* Returns an incremented value of the given index, wrapping around with the
* buffer capacity. This function does not modify the given index.
* @param idx Current index value.
* @return Incremented value of the given index.
*/
[[nodiscard]] size_t incremented_with_capacity(const size_t idx) const {
return (idx + 1) % capacity();
}

/**
* Atomically increments the buffer active elements count, clamping at the
* buffer capacity.
*/
void push() {
size_t overflow = capacity() + 1;
++active_items_count;
active_items_count.compare_exchange_strong(overflow, capacity());
}

/**
* Atomically decrements the buffer active elements count, clamping at zero.
*/
void pop() {
size_t overflow = SIZE_MAX;
--active_items_count;
active_items_count.compare_exchange_strong(overflow, 0);
}

std::vector<uint8_t> buffer;
size_t item_size;
size_t max_items_count;
size_t active_items_count;
size_t write_idx;
size_t read_idx;
mutable std::mutex mutex;
std::condition_variable fullCondition;
std::condition_variable emptyCondition;

const size_t item_size;
const size_t max_items_count;

std::atomic_size_t active_items_count;
std::atomic_size_t write_idx;
std::atomic_size_t read_idx;

std::mutex mutex;
std::condition_variable new_data_condition;
std::unique_lock<std::mutex> new_data_lock;
std::condition_variable free_space_condition;
std::unique_lock<std::mutex> free_space_lock;

friend class ThreadSafeRingBufferTest;
};

0 comments on commit 1c03d79

Please sign in to comment.