Skip to content

Commit

Permalink
Merge pull request #730 from bloxapp/stage
Browse files Browse the repository at this point in the history
stage to main (v0.3.4)
  • Loading branch information
nkryuchkov authored Oct 20, 2022
2 parents 1c6339e + 91ba416 commit 117d9f7
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 9 deletions.
4 changes: 3 additions & 1 deletion protocol/v1/p2p/min_peers.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@ func WaitForMinPeers(pctx context.Context, logger *zap.Logger, subscriber Subscr
logger.Warn("could not get peers of topic", zap.Error(err))
continue
}
if len(peers) >= minPeers {
n := len(peers)
if n >= minPeers {
return nil
}
logger.Debug("looking for min peers", zap.Int("expected", minPeers), zap.Int("actual", n))
}

return ctx.Err()
Expand Down
3 changes: 1 addition & 2 deletions protocol/v1/qbft/controller/msgqueue_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,12 +133,11 @@ func (c *Controller) processNoRunningInstance(
// processByState if an instance is running -> get the state and get the relevant messages
func (c *Controller) processByState(handler MessageHandler, identifier string) bool {
currentInstance := c.GetCurrentInstance()
if c.GetCurrentInstance() == nil {
if currentInstance == nil {
return false
}

var msg *spectypes.SSVMessage

currentState := currentInstance.GetState()
msg = c.getNextMsgForState(currentState, identifier)
if msg == nil {
Expand Down
22 changes: 16 additions & 6 deletions protocol/v1/qbft/msgqueue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@ package msgqueue

import (
"fmt"
spectypes "github.com/bloxapp/ssv-spec/types"
"strconv"
"sync"
"sync/atomic"

spec "github.com/attestantio/go-eth2-client/spec/phase0"
specqbft "github.com/bloxapp/ssv-spec/qbft"
spectypes "github.com/bloxapp/ssv-spec/types"
"github.com/pkg/errors"
"go.uber.org/zap"

Expand Down Expand Up @@ -187,14 +187,24 @@ func (q *queue) Peek(n int, idx Index) []*spectypes.SSVMessage {

// WithIterator looping through all indexes and return true when relevant and pop
func (q *queue) WithIterator(n int, peek bool, iterator func(index Index) bool) []*spectypes.SSVMessage {
indices := make([]Index, 0, len(q.items))

q.itemsLock.RLock()
for k := range q.items {
if iterator(k) {
if peek {
return q.Peek(n, k)
}
return q.Pop(n, k)
indices = append(indices, k)
}
q.itemsLock.RUnlock()

for _, idx := range indices {
if !iterator(idx) {
continue
}
if peek {
return q.Peek(n, idx)
}
return q.Pop(n, idx)
}

return nil
}

Expand Down

0 comments on commit 117d9f7

Please sign in to comment.