Skip to content

Commit

Permalink
chore: Add test for scheduler heap sorting and remove unnecessary Res…
Browse files Browse the repository at this point in the history
…chedule as it called during Set() anyway
  • Loading branch information
driskell committed Jul 30, 2024
1 parent 20e5751 commit 9bd2550
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 6 deletions.
1 change: 0 additions & 1 deletion lc-lib/publisher/endpoint/sink_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ func (s *Sink) ProcessEvent(event transports.Event) (endpoint *Endpoint, err err
err = fmt.Errorf("unexpected %T message received", event)
}

s.Scheduler.Reschedule()
return
}

Expand Down
4 changes: 1 addition & 3 deletions lc-lib/publisher/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,9 +206,7 @@ func (p *Publisher) runOnce() bool {
} else if p.resendList.Len() != 0 {
log.Debug("Holding %d new events until the resend queue is flushed", len(spool))
} else if p.endpointSink.CanQueue() {
_, ok := p.sendEvents(spool)
p.endpointSink.Scheduler.Reschedule()
if ok {
if _, ok := p.sendEvents(spool); ok {
break
}

Expand Down
2 changes: 0 additions & 2 deletions lc-lib/receiver/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,6 @@ ReceiverLoop:
position = nextPosition
}
r.ackEventsEvent(currentContext, connection, position.nonce, position.sequence)
r.scheduler.Reschedule()
r.connectionLock.Unlock()

// If shutting down, have all acknowledgemente been handled, and all receivers closed?
Expand Down Expand Up @@ -188,7 +187,6 @@ ReceiverLoop:
// Schedule partial ack if this is first set of events
if len(connectionStatus.progress) == 0 {
r.scheduler.Set(connection, 5*time.Second)
r.scheduler.Reschedule()
}
connectionStatus.progress = append(connectionStatus.progress, &poolEventProgress{event: eventImpl, sequence: 0})
r.connectionLock.Unlock()
Expand Down
73 changes: 73 additions & 0 deletions lc-lib/scheduler/heap_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package scheduler

import (
"container/heap"
"testing"
"time"
)

func TestHeapOrder(t *testing.T) {
tq := new(timerQueue)
item1 := &timerItem{
value: 1,
when: time.Now().Add(100 * time.Second),
}
item2 := &timerItem{
value: 3,
when: time.Now().Add(300 * time.Second),
}
item3 := &timerItem{
value: 2,
when: time.Now().Add(200 * time.Second),
}
heap.Push(tq, item1)
heap.Push(tq, item2)
heap.Push(tq, item3)
item := heap.Pop(tq).(*timerItem)
if item.value != 1 {
t.Error("Unexpected scheduler ordering")
}
item = heap.Pop(tq).(*timerItem)
if item.value != 2 {
t.Error("Unexpected scheduler ordering")
}
heap.Push(tq, item3)
item = heap.Pop(tq).(*timerItem)
if item.value != 2 {
t.Error("Unexpected scheduler ordering")
}
item = heap.Pop(tq).(*timerItem)
if item.value != 3 {
t.Error("Unexpected scheduler ordering")
}
}

func TestHeapSwap(t *testing.T) {
tq := new(timerQueue)
item1 := &timerItem{
value: 1,
when: time.Now().Add(100 * time.Second),
}
item2 := &timerItem{
value: 3,
when: time.Now().Add(300 * time.Second),
}
item3 := &timerItem{
value: 2,
when: time.Now().Add(200 * time.Second),
}
heap.Push(tq, item1)
heap.Push(tq, item2)
heap.Push(tq, item3)
item := heap.Pop(tq).(*timerItem)
if item.value != 1 {
t.Error("Unexpected scheduler ordering")
}
item2.when = time.Now().Add(1 * time.Second)
heap.Fix(tq, item2.index)
item = heap.Pop(tq).(*timerItem)
if item.value != 3 {
t.Error("Unexpected scheduler ordering")
}

}
2 changes: 2 additions & 0 deletions lc-lib/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ func (s *Scheduler) Remove(v interface{}) {
heap.Remove(s.tq, item.index)
delete(s.index, v)
}

// No need to update timer, just let it fire and handle the next item, or reschedule
}

// Next returns the next item that is due, or nil if none are due
Expand Down

0 comments on commit 9bd2550

Please sign in to comment.