Skip to content

Commit

Permalink
Capacity one ring (#4)
Browse files Browse the repository at this point in the history
* Implement special handling for ring of size one
  • Loading branch information
gammazero authored Aug 4, 2023
1 parent 793ec7e commit 48475bd
Show file tree
Hide file tree
Showing 4 changed files with 127 additions and 40 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ Concurrently access a dynamic queue using channels.

ChannelQueue implements a queue that uses channels for input and output to provide concurrent access to a dynamically-sized queue. This allows the queue to be used like a channel, in a thread-safe manner. Closing the input channel closes the output channel when all queued items are read, consistent with channel behavior. In other words a ChannelQueue is a dynamically buffered channel with up to infinite capacity.

ChannelQueue also supports circular buffer behavior when created using `NewRing`. When the buffer is full, writing an additional item discards the oldest buffered item.

When specifying an unlimited buffer capacity use caution as the buffer is still limited by the resources available on the host system.

The ChannelQueue buffer auto-resizes according to the number of items buffered. For more information on the queue, see: https://github.com/gammazero/deque
Expand Down
73 changes: 56 additions & 17 deletions channelqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import "github.com/gammazero/deque"
type ChannelQueue[T any] struct {
input, output chan T
length chan int
buffer deque.Deque[T]
capacity int
}

Expand Down Expand Up @@ -46,7 +45,11 @@ func NewRing[T any](capacity int) *ChannelQueue[T] {
length: make(chan int),
capacity: capacity,
}
go cq.ringBufferData()
if capacity == 1 {
go cq.oneBufferData()
} else {
go cq.ringBufferData()
}
return cq
}

Expand Down Expand Up @@ -80,6 +83,7 @@ func (cq *ChannelQueue[T]) Close() {
// bufferData is the goroutine that transfers data from the In() chan to the
// buffer and from the buffer to the Out() chan.
func (cq *ChannelQueue[T]) bufferData() {
var buffer deque.Deque[T]
var output chan T
var next, zero T
inputChan := cq.input
Expand All @@ -90,31 +94,31 @@ func (cq *ChannelQueue[T]) bufferData() {
case elem, open := <-input:
if open {
// Push data from input chan to buffer.
cq.buffer.PushBack(elem)
buffer.PushBack(elem)
} else {
// Input chan closed; do not select input chan.
input = nil
inputChan = nil
}
case output <- next:
// Wrote buffered data to output chan. Remove item from buffer.
cq.buffer.PopFront()
case cq.length <- cq.buffer.Len():
buffer.PopFront()
case cq.length <- buffer.Len():
}

if cq.buffer.Len() == 0 {
if buffer.Len() == 0 {
// No buffered data; do not select output chan.
output = nil
next = zero
next = zero // set to zero to GC value
} else {
// Try to write it to output chan.
output = cq.output
next = cq.buffer.Front()
next = buffer.Front()
}

if cq.capacity != -1 {
// If buffer at capacity, then stop accepting input.
if cq.buffer.Len() >= cq.capacity {
if buffer.Len() >= cq.capacity {
input = nil
} else {
input = inputChan
Expand All @@ -130,6 +134,7 @@ func (cq *ChannelQueue[T]) bufferData() {
// the buffer and from the buffer to the Out() chan, with circular buffer
// behavior of discarding the oldest item when writing to a full buffer.
func (cq *ChannelQueue[T]) ringBufferData() {
var buffer deque.Deque[T]
var output chan T
var next, zero T
input := cq.input
Expand All @@ -139,28 +144,62 @@ func (cq *ChannelQueue[T]) ringBufferData() {
case elem, open := <-input:
if open {
// Push data from input chan to buffer.
cq.buffer.PushBack(elem)
if cq.buffer.Len() > cq.capacity {
cq.buffer.PopFront()
buffer.PushBack(elem)
if buffer.Len() > cq.capacity {
buffer.PopFront()
}
} else {
// Input chan closed; do not select input chan.
input = nil
}
case output <- next:
// Wrote buffered data to output chan. Remove item from buffer.
cq.buffer.PopFront()
case cq.length <- cq.buffer.Len():
buffer.PopFront()
case cq.length <- buffer.Len():
}

if cq.buffer.Len() == 0 {
if buffer.Len() == 0 {
// No buffered data; do not select output chan.
output = nil
next = zero
next = zero // set to zero to GC value
} else {
// Try to write it to output chan.
output = cq.output
next = cq.buffer.Front()
next = buffer.Front()
}
}

close(cq.output)
close(cq.length)
}

// oneBufferData is the same as ringBufferData, but with a buffer size of 1.
func (cq *ChannelQueue[T]) oneBufferData() {
var bufLen int
var output chan T
var next, zero T
input := cq.input

for input != nil || output != nil {
select {
case elem, open := <-input:
if open {
// Push data from input chan to buffer.
next = elem
bufLen = 1
// Try to write it to output chan.
output = cq.output
} else {
// Input chan closed; do not select input chan.
input = nil
}
case output <- next:
// Wrote buffered data to output chan. Remove item from buffer.
bufLen = 0
next = zero // set to zero to GC value
// No buffered data; do not select output chan.
output = nil
case cq.length <- bufLen:
}
}

Expand Down
43 changes: 43 additions & 0 deletions channelqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,49 @@ func TestRing(t *testing.T) {
}
}

func TestRingOne(t *testing.T) {
ch := cq.NewRing[rune](1)
for _, r := range "hello" {
ch.In() <- r
}

ch.In() <- 'w'
if ch.Len() != 1 {
t.Fatalf("expected length 1, got %d", ch.Len())
}
char := <-ch.Out()
if char != 'w' {
t.Fatal("expected 'w' but got", char)
}
if ch.Len() != 0 {
t.Fatal("expected length 0")
}

for _, r := range "abcdefghij" {
ch.In() <- r
}

ch.Close()

out := make([]rune, 0, ch.Len())
for r := range ch.Out() {
out = append(out, r)
}
if string(out) != "j" {
t.Fatalf("expected \"j\" but got %q", out)
}

defer func() {
if r := recover(); r == nil {
t.Error("expected panic from capacity 0")
}
}()
ch = cq.NewRing[rune](0)
if ch != nil {
t.Fatal("expected nil")
}
}

func BenchmarkSerial(b *testing.B) {
ch := cq.New[int](b.N)
for i := 0; i < b.N; i++ {
Expand Down
49 changes: 26 additions & 23 deletions doc.go
Original file line number Diff line number Diff line change
@@ -1,24 +1,27 @@
/*
Package channelqueue implements a queue that uses channels for input and output
to provide concurrent access to a resizable queue, and allowing the queue to be
used like a channel. Closing the input channel closes the output channel when
all queued items are read, consistent with channel behavior. In other words
channelqueue is a dynamically buffered channel with up to infinite capacity.
When specifying an unlimited buffer capacity use caution as the buffer is still
limited by the resources available on the host system.
Caution
The behavior of channelqueue differs from the behavior of a normal channel in
one important way: After writing to the In() channel, the data may not be
immediately available on the Out() channel (until the buffer goroutine is
scheduled), and may be missed by a non-blocking select.
Credits
This implementation is based on ideas/examples from:
https://github.com/eapache/channels
*/
// Package channelqueue implements a queue that uses channels for input and
// output to provide concurrent access to a re-sizable queue.
//
// This allows the queue to be used like a channel. Closing the input channel
// closes the output channel when all queued items are read, consistent with
// channel behavior. In other words channelqueue is a dynamically buffered
// channel with up to infinite capacity.
//
// ChannelQueue also supports circular buffer behavior when created using
// `NewRing`. When the buffer is full, writing an additional item discards the
// oldest buffered item.
//
// When specifying an unlimited buffer capacity use caution as the buffer is
// still limited by the resources available on the host system.
//
// # Caution
//
// The behavior of channelqueue differs from the behavior of a normal channel
// in one important way: After writing to the In() channel, the data may not be
// immediately available on the Out() channel (until the buffer goroutine is
// scheduled), and may be missed by a non-blocking select.
//
// # Credits
//
// This implementation is based on ideas/examples from:
// https://github.com/eapache/channels
package channelqueue

0 comments on commit 48475bd

Please sign in to comment.