diff --git a/protocol/v1/p2p/min_peers.go b/protocol/v1/p2p/min_peers.go index 76849ec8f3..20d2d6969a 100644 --- a/protocol/v1/p2p/min_peers.go +++ b/protocol/v1/p2p/min_peers.go @@ -19,9 +19,11 @@ func WaitForMinPeers(pctx context.Context, logger *zap.Logger, subscriber Subscr logger.Warn("could not get peers of topic", zap.Error(err)) continue } - if len(peers) >= minPeers { + n := len(peers) + if n >= minPeers { return nil } + logger.Debug("looking for min peers", zap.Int("expected", minPeers), zap.Int("actual", n)) } return ctx.Err() diff --git a/protocol/v1/qbft/controller/msgqueue_consumer.go b/protocol/v1/qbft/controller/msgqueue_consumer.go index 154ef3def1..d3f1760af3 100644 --- a/protocol/v1/qbft/controller/msgqueue_consumer.go +++ b/protocol/v1/qbft/controller/msgqueue_consumer.go @@ -133,12 +133,11 @@ func (c *Controller) processNoRunningInstance( // processByState if an instance is running -> get the state and get the relevant messages func (c *Controller) processByState(handler MessageHandler, identifier string) bool { currentInstance := c.GetCurrentInstance() - if c.GetCurrentInstance() == nil { + if currentInstance == nil { return false } var msg *spectypes.SSVMessage - currentState := currentInstance.GetState() msg = c.getNextMsgForState(currentState, identifier) if msg == nil { diff --git a/protocol/v1/qbft/msgqueue/queue.go b/protocol/v1/qbft/msgqueue/queue.go index 3ab5e5691b..f375829b26 100644 --- a/protocol/v1/qbft/msgqueue/queue.go +++ b/protocol/v1/qbft/msgqueue/queue.go @@ -2,13 +2,13 @@ package msgqueue import ( "fmt" - spectypes "github.com/bloxapp/ssv-spec/types" "strconv" "sync" "sync/atomic" spec "github.com/attestantio/go-eth2-client/spec/phase0" specqbft "github.com/bloxapp/ssv-spec/qbft" + spectypes "github.com/bloxapp/ssv-spec/types" "github.com/pkg/errors" "go.uber.org/zap" @@ -187,14 +187,24 @@ func (q *queue) Peek(n int, idx Index) []*spectypes.SSVMessage { // WithIterator looping through all indexes and return true when relevant and pop func (q *queue) WithIterator(n int, peek bool, iterator func(index Index) bool) []*spectypes.SSVMessage { + indices := make([]Index, 0, len(q.items)) + + q.itemsLock.RLock() for k := range q.items { - if iterator(k) { - if peek { - return q.Peek(n, k) - } - return q.Pop(n, k) + indices = append(indices, k) + } + q.itemsLock.RUnlock() + + for _, idx := range indices { + if !iterator(idx) { + continue } + if peek { + return q.Peek(n, idx) + } + return q.Pop(n, idx) } + return nil }