diff --git a/lc-lib/publisher/endpoint/sink_process.go b/lc-lib/publisher/endpoint/sink_process.go index cd25a636..7b55b7d7 100644 --- a/lc-lib/publisher/endpoint/sink_process.go +++ b/lc-lib/publisher/endpoint/sink_process.go @@ -47,7 +47,6 @@ func (s *Sink) ProcessEvent(event transports.Event) (endpoint *Endpoint, err err err = fmt.Errorf("unexpected %T message received", event) } - s.Scheduler.Reschedule() return } diff --git a/lc-lib/publisher/publisher.go b/lc-lib/publisher/publisher.go index 0d860a5d..ccd54377 100644 --- a/lc-lib/publisher/publisher.go +++ b/lc-lib/publisher/publisher.go @@ -206,9 +206,7 @@ func (p *Publisher) runOnce() bool { } else if p.resendList.Len() != 0 { log.Debug("Holding %d new events until the resend queue is flushed", len(spool)) } else if p.endpointSink.CanQueue() { - _, ok := p.sendEvents(spool) - p.endpointSink.Scheduler.Reschedule() - if ok { + if _, ok := p.sendEvents(spool); ok { break } diff --git a/lc-lib/receiver/pool.go b/lc-lib/receiver/pool.go index cd5e7d55..2c52f637 100644 --- a/lc-lib/receiver/pool.go +++ b/lc-lib/receiver/pool.go @@ -159,7 +159,6 @@ ReceiverLoop: position = nextPosition } r.ackEventsEvent(currentContext, connection, position.nonce, position.sequence) - r.scheduler.Reschedule() r.connectionLock.Unlock() // If shutting down, have all acknowledgemente been handled, and all receivers closed? @@ -188,7 +187,6 @@ ReceiverLoop: // Schedule partial ack if this is first set of events if len(connectionStatus.progress) == 0 { r.scheduler.Set(connection, 5*time.Second) - r.scheduler.Reschedule() } connectionStatus.progress = append(connectionStatus.progress, &poolEventProgress{event: eventImpl, sequence: 0}) r.connectionLock.Unlock() diff --git a/lc-lib/scheduler/heap_test.go b/lc-lib/scheduler/heap_test.go new file mode 100644 index 00000000..c68480a6 --- /dev/null +++ b/lc-lib/scheduler/heap_test.go @@ -0,0 +1,73 @@ +package scheduler + +import ( + "container/heap" + "testing" + "time" +) + +func TestHeapOrder(t *testing.T) { + tq := new(timerQueue) + item1 := &timerItem{ + value: 1, + when: time.Now().Add(100 * time.Second), + } + item2 := &timerItem{ + value: 3, + when: time.Now().Add(300 * time.Second), + } + item3 := &timerItem{ + value: 2, + when: time.Now().Add(200 * time.Second), + } + heap.Push(tq, item1) + heap.Push(tq, item2) + heap.Push(tq, item3) + item := heap.Pop(tq).(*timerItem) + if item.value != 1 { + t.Error("Unexpected scheduler ordering") + } + item = heap.Pop(tq).(*timerItem) + if item.value != 2 { + t.Error("Unexpected scheduler ordering") + } + heap.Push(tq, item3) + item = heap.Pop(tq).(*timerItem) + if item.value != 2 { + t.Error("Unexpected scheduler ordering") + } + item = heap.Pop(tq).(*timerItem) + if item.value != 3 { + t.Error("Unexpected scheduler ordering") + } +} + +func TestHeapSwap(t *testing.T) { + tq := new(timerQueue) + item1 := &timerItem{ + value: 1, + when: time.Now().Add(100 * time.Second), + } + item2 := &timerItem{ + value: 3, + when: time.Now().Add(300 * time.Second), + } + item3 := &timerItem{ + value: 2, + when: time.Now().Add(200 * time.Second), + } + heap.Push(tq, item1) + heap.Push(tq, item2) + heap.Push(tq, item3) + item := heap.Pop(tq).(*timerItem) + if item.value != 1 { + t.Error("Unexpected scheduler ordering") + } + item2.when = time.Now().Add(1 * time.Second) + heap.Fix(tq, item2.index) + item = heap.Pop(tq).(*timerItem) + if item.value != 3 { + t.Error("Unexpected scheduler ordering") + } + +} diff --git a/lc-lib/scheduler/scheduler.go b/lc-lib/scheduler/scheduler.go index 9f74ef92..7a95f5f5 100644 --- a/lc-lib/scheduler/scheduler.go +++ b/lc-lib/scheduler/scheduler.go @@ -68,6 +68,8 @@ func (s *Scheduler) Remove(v interface{}) { heap.Remove(s.tq, item.index) delete(s.index, v) } + + // No need to update timer, just let it fire and handle the next item, or reschedule } // Next returns the next item that is due, or nil if none are due