Skip to content

Commit

Permalink
fix: close consume chan when close in throttle waiting
Browse files Browse the repository at this point in the history
  • Loading branch information
joway committed Sep 3, 2024
1 parent 33d61f3 commit d9947ef
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 6 deletions.
2 changes: 2 additions & 0 deletions lang/channel/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,8 @@ func (c *channel) consume() {
// check throttle
if c.throttling(c.consumerThrottle) {
// closed
close(c.consumer) // close consumer
atomic.StoreInt32(&c.state, -2) // -2 means closed totally
return
}

Expand Down
60 changes: 54 additions & 6 deletions lang/channel/channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,40 +373,88 @@ func TestChannelProduceRateControl(t *testing.T) {
ch := New(
WithRateThrottle(produceMaxRate, 0),
)
defer ch.Close()

var wg sync.WaitGroup
const total = 300
wg.Add(1)
go func() {
defer wg.Done()
for c := range ch.Output() {
id := c.(int)
//tlogf(t, "consumed: %d", id)
_ = id
}
}()
begin := time.Now()
for i := 1; i <= 500; i++ {
for i := 1; i <= total; i++ {
ch.Input(i)
}
ch.Close() // when channel closed, ch.Output() should return
wg.Wait()
cost := time.Now().Sub(begin)
tlogf(t, "Cost %dms", cost.Milliseconds())
}

func TestChannelConsumeRateControl(t *testing.T) {
consumeRate := 100
ch := New(
WithRateThrottle(0, 100),
WithRateThrottle(0, consumeRate),
)
defer ch.Close()

var wg sync.WaitGroup
const total = 300
var counter int32
wg.Add(1)
go func() {
defer wg.Done()
for c := range ch.Output() {
id := c.(int)
//tlogf(t, "consumed: %d", id)
_ = id
if id == 0 {
t.Errorf("get zero output")
}
atomic.AddInt32(&counter, 1)
}
}()
begin := time.Now()
for i := 1; i <= total; i++ {
ch.Input(i)
}
ch.Close() // when channel closed, ch.Output() should return
wg.Wait()
assert.Equal(t, int32(total), atomic.LoadInt32(&counter))
cost := time.Now().Sub(begin)
tlogf(t, "Cost %dms", cost.Milliseconds())
}

func TestChannelProduceAndConsumeRateControl(t *testing.T) {
produceRate, consumeRate := 100, 50
ch := New(
WithRateThrottle(produceRate, consumeRate),
)

var wg sync.WaitGroup
const total = 300
var counter int32
wg.Add(1)
go func() {
defer wg.Done()
for c := range ch.Output() {
id := c.(int)
//tlogf(t, "consumed: %d", id)
if id == 0 {
t.Errorf("get zero output")
}
atomic.AddInt32(&counter, 1)
}
}()
begin := time.Now()
for i := 1; i <= 500; i++ {
for i := 1; i <= total; i++ {
ch.Input(i)
}
ch.Close() // when channel closed, ch.Output() should return
wg.Wait()
assert.Equal(t, int32(total), atomic.LoadInt32(&counter))
cost := time.Now().Sub(begin)
tlogf(t, "Cost %dms", cost.Milliseconds())
}
Expand Down

0 comments on commit d9947ef

Please sign in to comment.