Skip to content

Commit

Permalink
fixed simplequery peerlist
Browse files Browse the repository at this point in the history
  • Loading branch information
guillaumemichel committed Jul 4, 2023
1 parent eda993d commit f96ac6f
Show file tree
Hide file tree
Showing 24 changed files with 577 additions and 319 deletions.
8 changes: 3 additions & 5 deletions examples/dispatchquery/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package main

import (
"context"
"fmt"
"log"
"time"

Expand Down Expand Up @@ -57,7 +56,7 @@ func queryTest(ctx context.Context) {
}
selfA := &peerid.PeerID{ID: pidA} // peer.ID is necessary for ipfskadv1 message format
addrA := multiaddr.StringCast("/ip4/1.1.1.1/tcp/4001/")
var naddrA address.NodeID = addrinfo.NewAddrInfo(peer.AddrInfo{ID: selfA.ID,
var naddrA address.NodeAddr = addrinfo.NewAddrInfo(peer.AddrInfo{ID: selfA.ID,
Addrs: []multiaddr.Multiaddr{addrA}})
rtA := simplert.NewSimpleRT(selfA.Key(), 2)
schedA := ss.NewSimpleScheduler(clk)
Expand All @@ -75,7 +74,7 @@ func queryTest(ctx context.Context) {
}
selfB := &peerid.PeerID{ID: pidB}
addrB := multiaddr.StringCast("/ip4/2.2.2.2/tcp/4001/")
var naddrB address.NodeID = addrinfo.NewAddrInfo(peer.AddrInfo{ID: selfB.ID,
var naddrB address.NodeAddr = addrinfo.NewAddrInfo(peer.AddrInfo{ID: selfB.ID,
Addrs: []multiaddr.Multiaddr{addrB}})
rtB := simplert.NewSimpleRT(selfB.Key(), 2)
schedB := ss.NewSimpleScheduler(clk)
Expand All @@ -93,7 +92,7 @@ func queryTest(ctx context.Context) {
}
selfC := &peerid.PeerID{ID: pidC}
addrC := multiaddr.StringCast("/ip4/3.3.3.3/tcp/4001/")
var naddrC address.NodeID = addrinfo.NewAddrInfo(peer.AddrInfo{ID: selfC.ID,
var naddrC address.NodeAddr = addrinfo.NewAddrInfo(peer.AddrInfo{ID: selfC.ID,
Addrs: []multiaddr.Multiaddr{addrC}})
rtC := simplert.NewSimpleRT(selfC.Key(), 2)
schedC := ss.NewSimpleScheduler(clk)
Expand Down Expand Up @@ -124,7 +123,6 @@ func queryTest(ctx context.Context) {
// dummy parameters
handleResp := func(ctx context.Context, _ address.NodeID,
resp message.MinKadResponseMessage) (bool, []address.NodeID) {
fmt.Println(resp.CloserNodes())
peerids := make([]address.NodeID, len(resp.CloserNodes()))
for i, p := range resp.CloserNodes() {
peerids[i] = p.(*addrinfo.AddrInfo).PeerID()
Expand Down
14 changes: 8 additions & 6 deletions examples/fullsim/findnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,14 @@ const (
)

// connectNodes adds nodes to each other's peerstores and routing tables
func connectNodes(ctx context.Context, n0, n1 address.NodeID, ep0, ep1 endpoint.Endpoint,
func connectNodes(ctx context.Context, n0, n1 address.NodeAddr, ep0, ep1 endpoint.Endpoint,
rt0, rt1 routingtable.RoutingTable) {
// add n1 to n0's peerstore and routing table
ep0.MaybeAddToPeerstore(ctx, n1, peerstoreTTL)
rt0.AddPeer(ctx, n1)
rt0.AddPeer(ctx, n1.NodeID())
// add n0 to n1's peerstore and routing table
ep1.MaybeAddToPeerstore(ctx, n0, peerstoreTTL)
rt1.AddPeer(ctx, n0)
rt1.AddPeer(ctx, n0.NodeID())
}

func findNode(ctx context.Context) {
Expand Down Expand Up @@ -107,16 +107,18 @@ func findNode(ctx context.Context) {
resp := msg.(*simmessage.SimMessage)
fmt.Println("got a response from", id, "with", resp.CloserNodes())

for _, peer := range resp.CloserNodes() {
if peer.String() == ids[3].NodeID().String() {
newIds := make([]address.NodeID, len(resp.CloserNodes()))
for i, peer := range resp.CloserNodes() {
if peer.NodeID().String() == ids[3].NodeID().String() {
// the response contains the address of D (ids[3])
fmt.Println("success")
// returning true will stop the query process
return true, nil
}
newIds[i] = peer.NodeID()
}
// returning false will continue the query process
return false, resp.CloserNodes()
return false, newIds
}

// create a query on A (using A's scheduler, endpoint and routing table),
Expand Down
8 changes: 8 additions & 0 deletions network/address/address.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,13 @@ type NodeID interface {
String() string
}

type Addr any

type NodeAddr interface {
NodeID() NodeID

Addresses() []Addr
}

// ProtocolID is a protocol identifier.
type ProtocolID string
14 changes: 13 additions & 1 deletion network/address/addrinfo/addrinfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ type AddrInfo struct {
id *peerid.PeerID
}

var _ address.NodeID = (*AddrInfo)(nil)
var _ address.NodeAddr = (*AddrInfo)(nil)

func NewAddrInfo(ai peer.AddrInfo) *AddrInfo {
return &AddrInfo{
Expand All @@ -32,3 +32,15 @@ func (ai AddrInfo) String() string {
func (ai AddrInfo) PeerID() *peerid.PeerID {
return ai.id
}

func (ai AddrInfo) NodeID() address.NodeID {
return ai.id
}

func (ai AddrInfo) Addresses() []address.Addr {
addrs := make([]address.Addr, len(ai.Addrs))
for i, a := range ai.Addrs {
addrs[i] = a
}
return addrs
}
48 changes: 48 additions & 0 deletions network/address/kadaddr/kadaddr.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package kadaddr

import (
"github.com/plprobelab/go-kademlia/network/address"
"github.com/plprobelab/go-kademlia/network/address/kadid"
)

type KadAddr struct {
id *kadid.KadID
addrs []string
}

var _ address.NodeAddr = (*KadAddr)(nil)

func NewKadAddr(id *kadid.KadID, addrs []string) *KadAddr {
return &KadAddr{
id: id,
addrs: addrs,
}
}

func (ka *KadAddr) AddAddr(addr string) {
ka.addrs = append(ka.addrs, addr)
}

func (ka *KadAddr) RemoveAddr(addr string) {
writeIndex := 0
// remove all occurrences of addr
for _, a := range ka.addrs {
if a != addr {
ka.addrs[writeIndex] = a
writeIndex++
}
}
ka.addrs = ka.addrs[:writeIndex]
}

func (ka *KadAddr) NodeID() address.NodeID {
return ka.id
}

func (ka *KadAddr) Addresses() []address.Addr {
addresses := make([]address.Addr, len(ka.addrs))
for i, a := range ka.addrs {
addresses[i] = a
}
return addresses
}
6 changes: 5 additions & 1 deletion network/address/kadid/kadid.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ type KadID struct {
key.KadKey
}

var _ address.NodeID = (*KadID)(nil)
var _ address.NodeAddr = (*KadID)(nil)

func NewKadID(k key.KadKey) *KadID {
return &KadID{k}
Expand All @@ -26,3 +26,7 @@ func (k KadID) NodeID() address.NodeID {
func (k KadID) String() string {
return k.Hex()
}

func (k KadID) Addresses() []address.Addr {
return []address.Addr{k}
}
6 changes: 5 additions & 1 deletion network/address/stringid/stringid.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (

type StringID string

var _ address.NodeID = (*StringID)(nil)
var _ address.NodeAddr = (*StringID)(nil)

func NewStringID(s string) *StringID {
return (*StringID)(&s)
Expand All @@ -25,3 +25,7 @@ func (s StringID) Key() key.KadKey {
func (s StringID) NodeID() address.NodeID {
return &s
}

func (s StringID) Addresses() []address.Addr {
return []address.Addr{s}
}
4 changes: 2 additions & 2 deletions network/endpoint/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ type ResponseHandlerFn func(context.Context, message.MinKadResponseMessage, erro
type Endpoint interface {
// MaybeAddToPeerstore adds the given address to the peerstore if it is
// valid and if it is not already there.
MaybeAddToPeerstore(context.Context, address.NodeID, time.Duration) error
MaybeAddToPeerstore(context.Context, address.NodeAddr, time.Duration) error
// SendRequestHandleResponse sends a request to the given peer and handles
// the response with the given handler.
SendRequestHandleResponse(context.Context, address.ProtocolID, address.NodeID,
Expand All @@ -32,7 +32,7 @@ type Endpoint interface {
// KadKey returns the KadKey of the local node.
KadKey() key.KadKey
// NetworkAddress returns the network address of the given peer (if known).
NetworkAddress(address.NodeID) (address.NodeID, error)
NetworkAddress(address.NodeID) (address.NodeAddr, error)
}

// ServerEndpoint is a Kademlia endpoint that can handle requests from remote
Expand Down
35 changes: 25 additions & 10 deletions network/endpoint/fakeendpoint/fakeendpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type FakeEndpoint struct {
self address.NodeID
sched scheduler.Scheduler // client

peerstore map[string]address.NodeID
peerstore map[string]address.NodeAddr
connStatus map[string]network.Connectedness
serverProtos map[address.ProtocolID]endpoint.RequestHandlerFn // server
streamFollowup map[endpoint.StreamID]endpoint.ResponseHandlerFn // client
Expand All @@ -41,7 +41,7 @@ func NewFakeEndpoint(self address.NodeID, sched scheduler.Scheduler, router *Fak
sched: sched,
serverProtos: make(map[address.ProtocolID]endpoint.RequestHandlerFn),

peerstore: make(map[string]address.NodeID),
peerstore: make(map[string]address.NodeAddr),
connStatus: make(map[string]network.Connectedness),

streamFollowup: make(map[endpoint.StreamID]endpoint.ResponseHandlerFn),
Expand Down Expand Up @@ -78,8 +78,8 @@ func (e *FakeEndpoint) DialPeer(ctx context.Context, id address.NodeID) error {

// MaybeAddToPeerstore adds the given address to the peerstore. FakeEndpoint
// doesn't take into account the ttl.
func (e *FakeEndpoint) MaybeAddToPeerstore(ctx context.Context, id address.NodeID, ttl time.Duration) error {
strNodeID := id.String()
func (e *FakeEndpoint) MaybeAddToPeerstore(ctx context.Context, id address.NodeAddr, ttl time.Duration) error {
strNodeID := id.NodeID().String()
_, span := util.StartSpan(ctx, "MaybeAddToPeerstore",
trace.WithAttributes(attribute.String("self", e.self.String())),
trace.WithAttributes(attribute.String("id", strNodeID)),
Expand Down Expand Up @@ -107,16 +107,28 @@ func (e *FakeEndpoint) SendRequestHandleResponse(ctx context.Context,

if err := e.DialPeer(ctx, id); err != nil {
span.RecordError(err)
handleResp(ctx, nil, err)
return err
e.sched.EnqueueAction(ctx, ba.BasicAction(func(ctx context.Context) {
handleResp(ctx, nil, err)
}))
return nil
}

// send request
sid, err := e.router.SendMessage(ctx, e.self, id, protoID, 0, req)
addr, ok := e.peerstore[id.String()]
if !ok {
span.RecordError(endpoint.ErrUnknownPeer)
e.sched.EnqueueAction(ctx, ba.BasicAction(func(ctx context.Context) {
handleResp(ctx, nil, endpoint.ErrUnknownPeer)
}))
return nil
}
sid, err := e.router.SendMessage(ctx, e.self, addr.NodeID(), protoID, 0, req)
if err != nil {
span.RecordError(err)
handleResp(ctx, nil, err)
return err
e.sched.EnqueueAction(ctx, ba.BasicAction(func(ctx context.Context) {
handleResp(ctx, nil, err)
}))
return nil
}
e.streamFollowup[sid] = handleResp

Expand Down Expand Up @@ -151,10 +163,13 @@ func (e *FakeEndpoint) Connectedness(id address.NodeID) (network.Connectedness,
}
}

func (e *FakeEndpoint) NetworkAddress(id address.NodeID) (address.NodeID, error) {
func (e *FakeEndpoint) NetworkAddress(id address.NodeID) (address.NodeAddr, error) {
if ai, ok := e.peerstore[id.String()]; ok {
return ai, nil
}
if na, ok := id.(address.NodeAddr); ok {
return na, nil
}
return nil, endpoint.ErrUnknownPeer
}

Expand Down
Loading

0 comments on commit f96ac6f

Please sign in to comment.