Skip to content

Commit

Permalink
pass addresses through the system
Browse files Browse the repository at this point in the history
  • Loading branch information
dennis-tra committed Sep 8, 2023
1 parent 183fa8f commit c84468e
Show file tree
Hide file tree
Showing 7 changed files with 81 additions and 80 deletions.
28 changes: 14 additions & 14 deletions coord/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ func (c *Coordinator[K, A]) advanceBootstrap(ctx context.Context, ev routing.Boo
bstate := c.bootstrap.Advance(ctx, ev)
switch st := bstate.(type) {
case *routing.StateBootstrapMessage[K, A]:
c.sendBootstrapFindNode(ctx, st.NodeID, st.QueryID, st.Stats)
c.sendBootstrapFindNode(ctx, st.Node, st.QueryID, st.Stats)

case *routing.StateBootstrapWaiting:
// bootstrap waiting for a message response, nothing to do
Expand Down Expand Up @@ -277,7 +277,7 @@ func (c *Coordinator[K, A]) advancePool(ctx context.Context, ev query.PoolEvent)
state := c.pool.Advance(ctx, ev)
switch st := state.(type) {
case *query.StatePoolQueryMessage[K, A]:
c.sendQueryMessage(ctx, st.ProtocolID, st.NodeID, st.Message, st.QueryID, st.Stats)
c.sendQueryMessage(ctx, st.ProtocolID, st.Node, st.Message, st.QueryID, st.Stats)
case *query.StatePoolWaitingAtCapacity:
// nothing to do except wait for message response or timeout
case *query.StatePoolWaitingWithCapacity:
Expand All @@ -296,7 +296,7 @@ func (c *Coordinator[K, A]) advancePool(ctx context.Context, ev query.PoolEvent)
}
}

func (c *Coordinator[K, A]) sendQueryMessage(ctx context.Context, protoID address.ProtocolID, to kad.NodeID[K], msg kad.Request[K, A], queryID query.QueryID, stats query.QueryStats) {
func (c *Coordinator[K, A]) sendQueryMessage(ctx context.Context, protoID address.ProtocolID, to kad.NodeInfo[K, A], msg kad.Request[K, A], queryID query.QueryID, stats query.QueryStats) {
ctx, span := util.StartSpan(ctx, "Coordinator.sendQueryMessage")
defer span.End()

Expand All @@ -308,7 +308,7 @@ func (c *Coordinator[K, A]) sendQueryMessage(ctx context.Context, protoID addres
}

c.advancePool(ctx, &query.EventPoolMessageFailure[K]{
NodeID: to,
NodeID: to.ID(),
QueryID: queryID,
Error: err,
})
Expand Down Expand Up @@ -337,19 +337,19 @@ func (c *Coordinator[K, A]) sendQueryMessage(ctx context.Context, protoID addres
}

c.advancePool(ctx, &query.EventPoolMessageResponse[K, A]{
NodeID: to,
Node: to,
QueryID: queryID,
Response: resp,
})
}

err := c.ep.SendRequestHandleResponse(ctx, protoID, to, msg, msg.EmptyResponse(), 0, onMessageResponse)
err := c.ep.SendRequestHandleResponse(ctx, protoID, to.ID(), msg, msg.EmptyResponse(), 0, onMessageResponse)
if err != nil {
onSendError(ctx, err)
}
}

func (c *Coordinator[K, A]) sendBootstrapFindNode(ctx context.Context, to kad.NodeID[K], queryID query.QueryID, stats query.QueryStats) {
func (c *Coordinator[K, A]) sendBootstrapFindNode(ctx context.Context, to kad.NodeInfo[K, A], queryID query.QueryID, stats query.QueryStats) {
ctx, span := util.StartSpan(ctx, "Coordinator.sendBootstrapFindNode")
defer span.End()

Expand All @@ -361,7 +361,7 @@ func (c *Coordinator[K, A]) sendBootstrapFindNode(ctx context.Context, to kad.No
}

c.advanceBootstrap(ctx, &routing.EventBootstrapMessageFailure[K]{
NodeID: to,
NodeID: to.ID(),
Error: err,
})
}
Expand Down Expand Up @@ -389,13 +389,13 @@ func (c *Coordinator[K, A]) sendBootstrapFindNode(ctx context.Context, to kad.No
}

c.advanceBootstrap(ctx, &routing.EventBootstrapMessageResponse[K, A]{
NodeID: to,
Node: to,
Response: resp,
})
}

protoID, msg := c.findNodeFn(c.self)
err := c.ep.SendRequestHandleResponse(ctx, protoID, to, msg, msg.EmptyResponse(), 0, onMessageResponse)
err := c.ep.SendRequestHandleResponse(ctx, protoID, to.ID(), msg, msg.EmptyResponse(), 0, onMessageResponse)
if err != nil {
onSendError(ctx, err)
}
Expand Down Expand Up @@ -443,14 +443,14 @@ func (c *Coordinator[K, A]) sendIncludeFindNode(ctx context.Context, to kad.Node
func (c *Coordinator[K, A]) StartQuery(ctx context.Context, queryID query.QueryID, protocolID address.ProtocolID, msg kad.Request[K, A]) error {
ctx, span := util.StartSpan(ctx, "Coordinator.StartQuery")
defer span.End()
knownClosestPeers := c.rt.NearestNodes(msg.Target(), 20)
// knownClosestPeers := c.rt.NearestNodes(msg.Target(), 20)

c.schedulePoolEvent(ctx, &query.EventPoolAddQuery[K, A]{
QueryID: queryID,
Target: msg.Target(),
ProtocolID: protocolID,
Message: msg,
KnownClosestNodes: knownClosestPeers,
KnownClosestNodes: nil,
})

return nil
Expand Down Expand Up @@ -488,7 +488,7 @@ func (c *Coordinator[K, A]) AddNodes(ctx context.Context, infos []kad.NodeInfo[K

// Bootstrap instructs the coordinator to begin bootstrapping the routing table.
// While bootstrap is in progress, no other queries will make progress.
func (c *Coordinator[K, A]) Bootstrap(ctx context.Context, seeds []kad.NodeID[K]) error {
func (c *Coordinator[K, A]) Bootstrap(ctx context.Context, seeds []kad.NodeInfo[K, A]) error {
protoID, msg := c.findNodeFn(c.self)

c.scheduleBootstrapEvent(ctx, &routing.EventBootstrapStart[K, A]{
Expand All @@ -510,7 +510,7 @@ type KademliaEvent interface {
// response from a node.
type KademliaOutboundQueryProgressedEvent[K kad.Key[K], A kad.Address[A]] struct {
QueryID query.QueryID
NodeID kad.NodeID[K]
NodeID kad.NodeInfo[K, A]
Response kad.Response[K, A]
Stats query.QueryStats
}
Expand Down
4 changes: 1 addition & 3 deletions coord/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,9 +314,7 @@ func TestBootstrap(t *testing.T) {

queryID := query.QueryID("bootstrap")

seeds := []kad.NodeID[key.Key8]{
nodes[1].ID(),
}
seeds := []kad.NodeInfo[key.Key8, kadtest.StrAddr]{nodes[1]}
err = c.Bootstrap(ctx, seeds)
require.NoError(t, err)

Expand Down
50 changes: 26 additions & 24 deletions query/iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,53 +3,55 @@ package query
import (
"context"

"github.com/plprobelab/go-kademlia/internal/kadtest"

"github.com/plprobelab/go-kademlia/kad"
"github.com/plprobelab/go-kademlia/key"
"github.com/plprobelab/go-kademlia/key/trie"
)

// A NodeIter iterates nodes according to some strategy.
type NodeIter[K kad.Key[K]] interface {
type NodeIter[K kad.Key[K], A kad.Address[A]] interface {
// Add adds node information to the iterator
Add(*NodeStatus[K])
Add(*NodeStatus[K, A])

// Find returns the node information corresponding to the given Kademlia key
Find(K) (*NodeStatus[K], bool)
Find(K) (*NodeStatus[K, A], bool)

// Each applies fn to each entry in the iterator in order. Each stops and returns true if fn returns true.
// Otherwise Each returns false when there are no further entries.
Each(ctx context.Context, fn func(context.Context, *NodeStatus[K]) bool) bool
Each(ctx context.Context, fn func(context.Context, *NodeStatus[K, A]) bool) bool
}

// A ClosestNodesIter iterates nodes in order of ascending distance from a key.
type ClosestNodesIter[K kad.Key[K]] struct {
type ClosestNodesIter[K kad.Key[K], A kad.Address[A]] struct {
// target is the key whose distance to a node determines the position of that node in the iterator.
target K

// nodelist holds the nodes discovered so far, ordered by increasing distance from the target.
nodes *trie.Trie[K, *NodeStatus[K]]
nodes *trie.Trie[K, *NodeStatus[K, A]]
}

var _ NodeIter[key.Key8] = (*ClosestNodesIter[key.Key8])(nil)
var _ NodeIter[key.Key8, kadtest.StrAddr] = (*ClosestNodesIter[key.Key8, kadtest.StrAddr])(nil)

// NewClosestNodesIter creates a new ClosestNodesIter
func NewClosestNodesIter[K kad.Key[K]](target K) *ClosestNodesIter[K] {
return &ClosestNodesIter[K]{
func NewClosestNodesIter[K kad.Key[K], A kad.Address[A]](target K) *ClosestNodesIter[K, A] {
return &ClosestNodesIter[K, A]{
target: target,
nodes: trie.New[K, *NodeStatus[K]](),
nodes: trie.New[K, *NodeStatus[K, A]](),
}
}

func (iter *ClosestNodesIter[K]) Add(ni *NodeStatus[K]) {
iter.nodes.Add(ni.NodeID.Key(), ni)
func (iter *ClosestNodesIter[K, A]) Add(ni *NodeStatus[K, A]) {
iter.nodes.Add(ni.Node.ID().Key(), ni)
}

func (iter *ClosestNodesIter[K]) Find(k K) (*NodeStatus[K], bool) {
func (iter *ClosestNodesIter[K, A]) Find(k K) (*NodeStatus[K, A], bool) {
found, ni := trie.Find(iter.nodes, k)
return ni, found
}

func (iter *ClosestNodesIter[K]) Each(ctx context.Context, fn func(context.Context, *NodeStatus[K]) bool) bool {
func (iter *ClosestNodesIter[K, A]) Each(ctx context.Context, fn func(context.Context, *NodeStatus[K, A]) bool) bool {
// get all the nodes in order of distance from the target
// TODO: turn this into a walk or iterator on trie.Trie
entries := trie.Closest(iter.nodes, iter.target, iter.nodes.Size())
Expand All @@ -63,37 +65,37 @@ func (iter *ClosestNodesIter[K]) Each(ctx context.Context, fn func(context.Conte
}

// A SequentialIter iterates nodes in the order they were added to the iterator.
type SequentialIter[K kad.Key[K]] struct {
type SequentialIter[K kad.Key[K], A kad.Address[A]] struct {
// nodelist holds the nodes discovered so far, ordered by increasing distance from the target.
nodes []*NodeStatus[K]
nodes []*NodeStatus[K, A]
}

var _ NodeIter[key.Key8] = (*SequentialIter[key.Key8])(nil)
var _ NodeIter[key.Key8, kadtest.StrAddr] = (*SequentialIter[key.Key8, kadtest.StrAddr])(nil)

// NewSequentialIter creates a new SequentialIter
func NewSequentialIter[K kad.Key[K]]() *SequentialIter[K] {
return &SequentialIter[K]{
nodes: make([]*NodeStatus[K], 0),
func NewSequentialIter[K kad.Key[K], A kad.Address[A]]() *SequentialIter[K, A] {
return &SequentialIter[K, A]{
nodes: make([]*NodeStatus[K, A], 0),
}
}

func (iter *SequentialIter[K]) Add(ni *NodeStatus[K]) {
func (iter *SequentialIter[K, A]) Add(ni *NodeStatus[K, A]) {
iter.nodes = append(iter.nodes, ni)
}

// Find returns the node information corresponding to the given Kademlia key. It uses a linear
// search which makes it unsuitable for large numbers of entries.
func (iter *SequentialIter[K]) Find(k K) (*NodeStatus[K], bool) {
func (iter *SequentialIter[K, A]) Find(k K) (*NodeStatus[K, A], bool) {
for i := range iter.nodes {
if key.Equal(k, iter.nodes[i].NodeID.Key()) {
if key.Equal(k, iter.nodes[i].Node.ID().Key()) {
return iter.nodes[i], true
}
}

return nil, false
}

func (iter *SequentialIter[K]) Each(ctx context.Context, fn func(context.Context, *NodeStatus[K]) bool) bool {
func (iter *SequentialIter[K, A]) Each(ctx context.Context, fn func(context.Context, *NodeStatus[K, A]) bool) bool {
for _, ns := range iter.nodes {
if fn(ctx, ns) {
return true
Expand Down
6 changes: 3 additions & 3 deletions query/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ import (
"github.com/plprobelab/go-kademlia/kad"
)

type NodeStatus[K kad.Key[K]] struct {
NodeID kad.NodeID[K]
State NodeState
type NodeStatus[K kad.Key[K], A kad.Address[A]] struct {
Node kad.NodeInfo[K, A]
State NodeState
}

type NodeState interface {
Expand Down
24 changes: 12 additions & 12 deletions query/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func (p *Pool[K, A]) Advance(ctx context.Context, ev PoolEvent) PoolState {
case *EventPoolMessageResponse[K, A]:
if qry, ok := p.queryIndex[tev.QueryID]; ok {
state, terminal := p.advanceQuery(ctx, qry, &EventQueryMessageResponse[K, A]{
NodeID: tev.NodeID,
Node: tev.Node,
Response: tev.Response,
})
if terminal {
Expand Down Expand Up @@ -197,7 +197,7 @@ func (p *Pool[K, A]) advanceQuery(ctx context.Context, qry *Query[K, A], qev Que
return &StatePoolQueryMessage[K, A]{
QueryID: st.QueryID,
Stats: st.Stats,
NodeID: st.NodeID,
Node: st.Node,
ProtocolID: st.ProtocolID,
Message: st.Message,
}, true
Expand Down Expand Up @@ -247,18 +247,18 @@ func (p *Pool[K, A]) removeQuery(queryID QueryID) {

// addQuery adds a query to the pool, returning the new query id
// TODO: remove target argument and use msg.Target
func (p *Pool[K, A]) addQuery(ctx context.Context, queryID QueryID, target K, protocolID address.ProtocolID, msg kad.Request[K, A], knownClosestNodes []kad.NodeID[K]) error {
func (p *Pool[K, A]) addQuery(ctx context.Context, queryID QueryID, target K, protocolID address.ProtocolID, msg kad.Request[K, A], knownClosestNodes []kad.NodeInfo[K, A]) error {
if _, exists := p.queryIndex[queryID]; exists {
return fmt.Errorf("query id already in use")
}
iter := NewClosestNodesIter(target)
iter := NewClosestNodesIter[K, A](target)

qryCfg := DefaultQueryConfig[K]()
qryCfg.Clock = p.cfg.Clock
qryCfg.Concurrency = p.cfg.QueryConcurrency
qryCfg.RequestTimeout = p.cfg.RequestTimeout

qry, err := NewQuery[K](p.self, queryID, protocolID, msg, iter, knownClosestNodes, qryCfg)
qry, err := NewQuery[K, A](p.self, queryID, protocolID, msg, iter, knownClosestNodes, qryCfg)
if err != nil {
return fmt.Errorf("new query: %w", err)
}
Expand All @@ -281,7 +281,7 @@ type StatePoolIdle struct{}
// StatePoolQueryMessage indicates that a pool query is waiting to message a node.
type StatePoolQueryMessage[K kad.Key[K], A kad.Address[A]] struct {
QueryID QueryID
NodeID kad.NodeID[K]
Node kad.NodeInfo[K, A]
ProtocolID address.ProtocolID
Message kad.Request[K, A]
Stats QueryStats
Expand Down Expand Up @@ -322,11 +322,11 @@ type PoolEvent interface {

// EventPoolAddQuery is an event that attempts to add a new query
type EventPoolAddQuery[K kad.Key[K], A kad.Address[A]] struct {
QueryID QueryID // the id to use for the new query
Target K // the target key for the query
ProtocolID address.ProtocolID // the protocol that defines how the message should be interpreted
Message kad.Request[K, A] // the message the query should send to each node it traverses
KnownClosestNodes []kad.NodeID[K] // an initial set of close nodes the query should use
QueryID QueryID // the id to use for the new query
Target K // the target key for the query
ProtocolID address.ProtocolID // the protocol that defines how the message should be interpreted
Message kad.Request[K, A] // the message the query should send to each node it traverses
KnownClosestNodes []kad.NodeInfo[K, A] // an initial set of close nodes the query should use
}

// EventPoolStopQuery notifies a pool to stop a query.
Expand All @@ -337,7 +337,7 @@ type EventPoolStopQuery struct {
// EventPoolMessageResponse notifies a pool that a query that a sent message has received a successful response.
type EventPoolMessageResponse[K kad.Key[K], A kad.Address[A]] struct {
QueryID QueryID // the id of the query that sent the message
NodeID kad.NodeID[K] // the node the message was sent to
Node kad.NodeInfo[K, A] // the node the message was sent to
Response kad.Response[K, A] // the message response sent by the node
}

Expand Down
Loading

0 comments on commit c84468e

Please sign in to comment.