Skip to content

Latest commit

 

History

History
109 lines (83 loc) · 4.64 KB

README.md

File metadata and controls

109 lines (83 loc) · 4.64 KB

Cueue

A high performance, single-producer, single-consumer, bounded circular buffer of contiguous elements, that supports lock-free atomic batch operations, suitable for inter-thread communication.

Example

fn main() {
    let (mut w, mut r) = cueue::cueue(1 << 20).unwrap();

    let buf = w.write_chunk();
    assert!(buf.len() >= 9);
    buf[..9].copy_from_slice(b"foobarbaz");
    w.commit(9);

    let read_result = r.read_chunk();
    assert_eq!(read_result, b"foobarbaz");
    r.commit();
}

A bounded cueue of requested capacity is referenced by a single Writer and a single Reader. The Writer can request space to write (write_chunk), limited by the queue capacity minus the already committed but unread space. Requested space can written to, then committed (end_write). A special feature of this container is that stored elements are always initialized, (in the beginning, defaulted, therefore T must implement Default), and only dropped when the queue is dropped. Therefore, the writer can reuse previously written, but then consumed elements (useful if the elements own e.g: heap allocated memory), and in those cases, contention on the producers heap lock is avoided (that is otherwise present, if the consumer drops heap allocated elements continuously that the producer allocated).

The Reader can check out the written elements (read_chunk), process it at will, then mark it as consumed (commit). The returned slice of elements might be a result of multiple writer commits (i.e: the reading is batched), but it never include uncommitted elements (i.e: write commits are atomic). This prevents the reader observing partial messages.

Use-case

This data structure is designed to allow one thread (actor) sending variable-sized messages (bytes) to a different thread (actor), that processes the messages in batches (e.g: writes them to a file, sends them over the network, etc.). For example, asynchronous logging.

Alternative options:

  • Use a standard channel of Strings (or Vec<u8>). This is slow, because strings require memory allocations, and with one thread allocating and the other deallocating, quickly yields to contention on the heap lock.

  • Use a standard channel of fixed size arrays. Works, but bounds the size of the messages and wastes memory.

  • Use two ringbuffers of Vec<u8> containers (one for sending data, one for reusing the consumed vectors). Does not allow efficient reading (separate messages are not contiguous). Requires to estimate the max number of messages in flight, instead of the max sum of size of messages.

This data structure uses a single array of the user specified capacity. At any given time, this array is sliced into three logical parts: allocated for writing, ready for reading, unwritten. (Any maximum two of the three can be zero sized)

write_chunk joins the unwritten part to the part already allocated for writing: the result is limited by the capacity minus the space ready for reading. Writer::commit makes the written space ready for reading, zeroing the slice allocated for writing. read_chunk determines the boundary of the space ready for reading, Reader::commit marks this space unwritten. Thanks for the truly circular nature of cueue, the writer and reader can freely chase each other around.

How Does it Work

The cueue constructor creates a memory area, and maps it into virtual memory twice, the two maps next to each other. This means that for the resulting map of capacity cap, map[0] and map[cap], refers to the same byte. (In general, map[N] and map[cap+N] are the same for every 0 <= N < cap indices)

With this double map, there's no need to wrap around, this maximises the useful capacity of the queue during any point of usage, and simplifies the indexing logic of the code. Synchronization between writer and reader is done by atomic operations, there are no mutexes or lock ASM instruction prefixes (on the tested platforms: x86 and M1).

(Not shown here, but this structure also allows inter-process communication using shared memory, and data recovery from coredumps)

Limitations

  • Supported platforms: Linux (3.17) and macOS
  • rust 1.63
  • Uses unsafe operations

Build and Test

$ cargo build
$ cargo test
$ cargo run --example basics
$ cargo fmt
$ cargo clippy
$ cargo bench
$ cargo doc --open

Acknowledgments

This is a rust port of the binlog C++ cueue. The interface names are changed to match rtrb, to make it more familiar for Rust developers.