Skip to content

Commit

Permalink
implementing special topology for collection node to connect to peers…
Browse files Browse the repository at this point in the history
… within cluster
  • Loading branch information
vishalchangrani authored Sep 29, 2020
1 parent 5afe528 commit 03b6ce0
Show file tree
Hide file tree
Showing 16 changed files with 564 additions and 322 deletions.
14 changes: 12 additions & 2 deletions cmd/scaffold.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/onflow/flow-go/module/trace"
jsoncodec "github.com/onflow/flow-go/network/codec/json"
"github.com/onflow/flow-go/network/gossip/libp2p"
"github.com/onflow/flow-go/network/gossip/libp2p/topology"
"github.com/onflow/flow-go/network/gossip/libp2p/validators"
protocol "github.com/onflow/flow-go/state/protocol/badger"
"github.com/onflow/flow-go/state/protocol/events"
Expand Down Expand Up @@ -183,9 +184,18 @@ func (fnb *FlowNodeBuilder) enqueueNetworkInit() {
return nil, fmt.Errorf("could not get node id: %w", err)
}
nodeRole := nodeID.Role
topology := libp2p.NewRandPermTopology(nodeRole)

net, err := libp2p.NewNetwork(fnb.Logger, codec, participants, fnb.Me, fnb.Middleware, 10e6, topology, fnb.Metrics.Network)
var nodeTopology topology.Topology
if nodeRole == flow.RoleCollection {
nodeTopology, err = topology.NewCollectionTopology(nodeID.NodeID, fnb.State)
} else {
nodeTopology, err = topology.NewRandPermTopology(nodeRole, nodeID.NodeID)
}
if err != nil {
return nil, fmt.Errorf("could not create topology: %w", err)
}

net, err := libp2p.NewNetwork(fnb.Logger, codec, participants, fnb.Me, fnb.Middleware, 10e6, nodeTopology, fnb.Metrics.Network)
if err != nil {
return nil, fmt.Errorf("could not initialize network: %w", err)
}
Expand Down
29 changes: 0 additions & 29 deletions engine/ghost/engine/all_connect_topology.go

This file was deleted.

6 changes: 6 additions & 0 deletions model/flow/identity.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,12 @@ func (il IdentityList) Sample(size uint) IdentityList {
return dup[:size]
}

// DeterministicSample returns deterministic random sample from the `IdentityList` using the given seed
func (il IdentityList) DeterministicSample(size uint, seed int64) IdentityList {
rand.Seed(seed)
return il.Sample(size)
}

// SamplePct returns a random sample from the receiver identity list. The
// sample contains `pct` percentage of the list. The sample is rounded up
// if `pct>0`, so this will always select at least one identity.
Expand Down
5 changes: 0 additions & 5 deletions network/gossip/libp2p/middleware/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,3 @@ type Connection interface {
Send(msg interface{}) error
Receive() (interface{}, error)
}

// Topology represents an interface to get subset of nodes which a given node should directly connect to for 1-k messaging
type Topology interface {
Subset(idList flow.IdentityList, size int, seed string) (map[flow.Identifier]flow.Identity, error)
}
21 changes: 16 additions & 5 deletions network/gossip/libp2p/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/onflow/flow-go/network/gossip/libp2p/message"
"github.com/onflow/flow-go/network/gossip/libp2p/middleware"
"github.com/onflow/flow-go/network/gossip/libp2p/queue"
"github.com/onflow/flow-go/network/gossip/libp2p/topology"
)

type identifierFilter func(ids ...flow.Identifier) ([]flow.Identifier, error)
Expand All @@ -27,7 +28,7 @@ type Network struct {
ids flow.IdentityList
me module.Local
mw middleware.Middleware
top middleware.Topology
top topology.Topology
metrics module.NetworkMetrics
rcache *cache.RcvCache // used to deduplicate incoming messages
queue queue.MessageQueue
Expand All @@ -47,7 +48,7 @@ func NewNetwork(
me module.Local,
mw middleware.Middleware,
csize int,
top middleware.Topology,
top topology.Topology,
metrics module.NetworkMetrics,
) (*Network, error) {

Expand Down Expand Up @@ -156,7 +157,17 @@ func (n *Network) Identity() (map[flow.Identifier]flow.Identity, error) {

// Topology returns the identities of a uniform subset of nodes in protocol state using the topology provided earlier
func (n *Network) Topology() (map[flow.Identifier]flow.Identity, error) {
return n.top.Subset(n.ids, n.fanout(), n.me.NodeID().String())
subset, err := n.top.Subset(n.ids, n.fanout())
if err != nil {
return nil, fmt.Errorf("failed to derive list of peer nodes to connect to: %w", err)
}

// creates a map of all the selected ids
topMap := make(map[flow.Identifier]flow.Identity)
for _, id := range subset {
topMap[id.NodeID] = *id
}
return topMap, nil
}

func (n *Network) Receive(nodeID flow.Identifier, msg *message.Message) error {
Expand All @@ -176,9 +187,9 @@ func (n *Network) SetIDs(ids flow.IdentityList) {
}

// fanout returns the node fanout derived from the identity list
func (n *Network) fanout() int {
func (n *Network) fanout() uint {
// fanout is currently set to half of the system size for connectivity assurance
return (len(n.ids) + 1) / 2
return uint(len(n.ids)+1) / 2
}

func (n *Network) processNetworkMessage(senderID flow.Identifier, message *message.Message) error {
Expand Down
118 changes: 0 additions & 118 deletions network/gossip/libp2p/randPermTopology.go

This file was deleted.

125 changes: 0 additions & 125 deletions network/gossip/libp2p/randPermTopology_test.go

This file was deleted.

Loading

0 comments on commit 03b6ce0

Please sign in to comment.