Skip to content

Commit

Permalink
fix: channel consume left data after close
Browse files Browse the repository at this point in the history
  • Loading branch information
joway committed Apr 25, 2024
1 parent fefc805 commit 0ed54c1
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 5 deletions.
11 changes: 6 additions & 5 deletions lang/channel/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,10 +216,7 @@ func (c *channel) Close() {
if !atomic.CompareAndSwapInt32(&c.state, 0, -1) {
return
}
// stop consumer
c.bufferLock.Lock()
c.buffer.Init() // clear buffer
c.bufferLock.Unlock()
// Close function only notify Input/consume goroutine to close gracefully
c.bufferCond.Broadcast()
}

Expand Down Expand Up @@ -253,6 +250,10 @@ func (c *channel) Input(v interface{}) {
for c.buffer.Len() >= c.size {
// wait for consuming
c.bufferCond.Wait()
if c.isClosed() {
// blocking send a closed channel should return directly
return
}
}
}
c.enqueueBuffer(it)
Expand Down Expand Up @@ -289,7 +290,7 @@ func (c *channel) consume() {
c.bufferLock.Lock()
for c.buffer.Len() == 0 {
if c.isClosed() {
close(c.consumer)
close(c.consumer) // close consumer
atomic.StoreInt32(&c.state, -2) // -2 means closed totally
c.bufferLock.Unlock()
return
Expand Down
29 changes: 29 additions & 0 deletions lang/channel/channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -462,3 +462,32 @@ func TestFastRecoverConsumer(t *testing.T) {
}
// all consumed
}

func TestChannelCloseThenConsume(t *testing.T) {
size := 10
ch := New(WithNonBlock(), WithSize(size))
for i := 0; i < size; i++ {
ch.Input(i)
}
ch.Close()
for i := 0; i < size; i++ {
x := <-ch.Output()
assert.NotNil(t, x)
n := x.(int)
assert.Equal(t, n, x)
}
}

func TestChannelInputAndClose(t *testing.T) {
ch := New(WithSize(1))
go func() {
time.Sleep(time.Millisecond * 100)
ch.Close()
}()
begin := time.Now()
for i := 0; i < 10; i++ {
ch.Input(1)
}
cost := time.Now().Sub(begin)
assert.True(t, cost.Milliseconds() >= 100)
}

0 comments on commit 0ed54c1

Please sign in to comment.