diff --git a/examples/dispatchquery/main.go b/examples/dispatchquery/main.go index 1cc6f00..852a983 100644 --- a/examples/dispatchquery/main.go +++ b/examples/dispatchquery/main.go @@ -2,7 +2,6 @@ package main import ( "context" - "fmt" "log" "time" @@ -57,7 +56,7 @@ func queryTest(ctx context.Context) { } selfA := &peerid.PeerID{ID: pidA} // peer.ID is necessary for ipfskadv1 message format addrA := multiaddr.StringCast("/ip4/1.1.1.1/tcp/4001/") - var naddrA address.NodeID = addrinfo.NewAddrInfo(peer.AddrInfo{ID: selfA.ID, + var naddrA address.NodeAddr = addrinfo.NewAddrInfo(peer.AddrInfo{ID: selfA.ID, Addrs: []multiaddr.Multiaddr{addrA}}) rtA := simplert.NewSimpleRT(selfA.Key(), 2) schedA := ss.NewSimpleScheduler(clk) @@ -75,7 +74,7 @@ func queryTest(ctx context.Context) { } selfB := &peerid.PeerID{ID: pidB} addrB := multiaddr.StringCast("/ip4/2.2.2.2/tcp/4001/") - var naddrB address.NodeID = addrinfo.NewAddrInfo(peer.AddrInfo{ID: selfB.ID, + var naddrB address.NodeAddr = addrinfo.NewAddrInfo(peer.AddrInfo{ID: selfB.ID, Addrs: []multiaddr.Multiaddr{addrB}}) rtB := simplert.NewSimpleRT(selfB.Key(), 2) schedB := ss.NewSimpleScheduler(clk) @@ -93,7 +92,7 @@ func queryTest(ctx context.Context) { } selfC := &peerid.PeerID{ID: pidC} addrC := multiaddr.StringCast("/ip4/3.3.3.3/tcp/4001/") - var naddrC address.NodeID = addrinfo.NewAddrInfo(peer.AddrInfo{ID: selfC.ID, + var naddrC address.NodeAddr = addrinfo.NewAddrInfo(peer.AddrInfo{ID: selfC.ID, Addrs: []multiaddr.Multiaddr{addrC}}) rtC := simplert.NewSimpleRT(selfC.Key(), 2) schedC := ss.NewSimpleScheduler(clk) @@ -124,7 +123,6 @@ func queryTest(ctx context.Context) { // dummy parameters handleResp := func(ctx context.Context, _ address.NodeID, resp message.MinKadResponseMessage) (bool, []address.NodeID) { - fmt.Println(resp.CloserNodes()) peerids := make([]address.NodeID, len(resp.CloserNodes())) for i, p := range resp.CloserNodes() { peerids[i] = p.(*addrinfo.AddrInfo).PeerID() diff --git a/examples/fullsim/findnode.go b/examples/fullsim/findnode.go index 2615332..bbf2753 100644 --- a/examples/fullsim/findnode.go +++ b/examples/fullsim/findnode.go @@ -33,14 +33,14 @@ const ( ) // connectNodes adds nodes to each other's peerstores and routing tables -func connectNodes(ctx context.Context, n0, n1 address.NodeID, ep0, ep1 endpoint.Endpoint, +func connectNodes(ctx context.Context, n0, n1 address.NodeAddr, ep0, ep1 endpoint.Endpoint, rt0, rt1 routingtable.RoutingTable) { // add n1 to n0's peerstore and routing table ep0.MaybeAddToPeerstore(ctx, n1, peerstoreTTL) - rt0.AddPeer(ctx, n1) + rt0.AddPeer(ctx, n1.NodeID()) // add n0 to n1's peerstore and routing table ep1.MaybeAddToPeerstore(ctx, n0, peerstoreTTL) - rt1.AddPeer(ctx, n0) + rt1.AddPeer(ctx, n0.NodeID()) } func findNode(ctx context.Context) { @@ -107,16 +107,18 @@ func findNode(ctx context.Context) { resp := msg.(*simmessage.SimMessage) fmt.Println("got a response from", id, "with", resp.CloserNodes()) - for _, peer := range resp.CloserNodes() { - if peer.String() == ids[3].NodeID().String() { + newIds := make([]address.NodeID, len(resp.CloserNodes())) + for i, peer := range resp.CloserNodes() { + if peer.NodeID().String() == ids[3].NodeID().String() { // the response contains the address of D (ids[3]) fmt.Println("success") // returning true will stop the query process return true, nil } + newIds[i] = peer.NodeID() } // returning false will continue the query process - return false, resp.CloserNodes() + return false, newIds } // create a query on A (using A's scheduler, endpoint and routing table), diff --git a/network/address/address.go b/network/address/address.go index 409e8f0..6e233a7 100644 --- a/network/address/address.go +++ b/network/address/address.go @@ -12,5 +12,13 @@ type NodeID interface { String() string } +type Addr any + +type NodeAddr interface { + NodeID() NodeID + + Addresses() []Addr +} + // ProtocolID is a protocol identifier. type ProtocolID string diff --git a/network/address/addrinfo/addrinfo.go b/network/address/addrinfo/addrinfo.go index 724ac98..c82e988 100644 --- a/network/address/addrinfo/addrinfo.go +++ b/network/address/addrinfo/addrinfo.go @@ -12,7 +12,7 @@ type AddrInfo struct { id *peerid.PeerID } -var _ address.NodeID = (*AddrInfo)(nil) +var _ address.NodeAddr = (*AddrInfo)(nil) func NewAddrInfo(ai peer.AddrInfo) *AddrInfo { return &AddrInfo{ @@ -32,3 +32,15 @@ func (ai AddrInfo) String() string { func (ai AddrInfo) PeerID() *peerid.PeerID { return ai.id } + +func (ai AddrInfo) NodeID() address.NodeID { + return ai.id +} + +func (ai AddrInfo) Addresses() []address.Addr { + addrs := make([]address.Addr, len(ai.Addrs)) + for i, a := range ai.Addrs { + addrs[i] = a + } + return addrs +} diff --git a/network/address/kadaddr/kadaddr.go b/network/address/kadaddr/kadaddr.go new file mode 100644 index 0000000..1aa4cb0 --- /dev/null +++ b/network/address/kadaddr/kadaddr.go @@ -0,0 +1,48 @@ +package kadaddr + +import ( + "github.com/plprobelab/go-kademlia/network/address" + "github.com/plprobelab/go-kademlia/network/address/kadid" +) + +type KadAddr struct { + id *kadid.KadID + addrs []string +} + +var _ address.NodeAddr = (*KadAddr)(nil) + +func NewKadAddr(id *kadid.KadID, addrs []string) *KadAddr { + return &KadAddr{ + id: id, + addrs: addrs, + } +} + +func (ka *KadAddr) AddAddr(addr string) { + ka.addrs = append(ka.addrs, addr) +} + +func (ka *KadAddr) RemoveAddr(addr string) { + writeIndex := 0 + // remove all occurrences of addr + for _, a := range ka.addrs { + if a != addr { + ka.addrs[writeIndex] = a + writeIndex++ + } + } + ka.addrs = ka.addrs[:writeIndex] +} + +func (ka *KadAddr) NodeID() address.NodeID { + return ka.id +} + +func (ka *KadAddr) Addresses() []address.Addr { + addresses := make([]address.Addr, len(ka.addrs)) + for i, a := range ka.addrs { + addresses[i] = a + } + return addresses +} diff --git a/network/address/kadid/kadid.go b/network/address/kadid/kadid.go index 9a6c9e2..c97892e 100644 --- a/network/address/kadid/kadid.go +++ b/network/address/kadid/kadid.go @@ -9,7 +9,7 @@ type KadID struct { key.KadKey } -var _ address.NodeID = (*KadID)(nil) +var _ address.NodeAddr = (*KadID)(nil) func NewKadID(k key.KadKey) *KadID { return &KadID{k} @@ -26,3 +26,7 @@ func (k KadID) NodeID() address.NodeID { func (k KadID) String() string { return k.Hex() } + +func (k KadID) Addresses() []address.Addr { + return []address.Addr{k} +} diff --git a/network/address/stringid/stringid.go b/network/address/stringid/stringid.go index 059dd0f..17d7f32 100644 --- a/network/address/stringid/stringid.go +++ b/network/address/stringid/stringid.go @@ -8,7 +8,7 @@ import ( type StringID string -var _ address.NodeID = (*StringID)(nil) +var _ address.NodeAddr = (*StringID)(nil) func NewStringID(s string) *StringID { return (*StringID)(&s) @@ -25,3 +25,7 @@ func (s StringID) Key() key.KadKey { func (s StringID) NodeID() address.NodeID { return &s } + +func (s StringID) Addresses() []address.Addr { + return []address.Addr{s} +} diff --git a/network/endpoint/endpoint.go b/network/endpoint/endpoint.go index a2c8191..495d2fe 100644 --- a/network/endpoint/endpoint.go +++ b/network/endpoint/endpoint.go @@ -22,7 +22,7 @@ type ResponseHandlerFn func(context.Context, message.MinKadResponseMessage, erro type Endpoint interface { // MaybeAddToPeerstore adds the given address to the peerstore if it is // valid and if it is not already there. - MaybeAddToPeerstore(context.Context, address.NodeID, time.Duration) error + MaybeAddToPeerstore(context.Context, address.NodeAddr, time.Duration) error // SendRequestHandleResponse sends a request to the given peer and handles // the response with the given handler. SendRequestHandleResponse(context.Context, address.ProtocolID, address.NodeID, @@ -32,7 +32,7 @@ type Endpoint interface { // KadKey returns the KadKey of the local node. KadKey() key.KadKey // NetworkAddress returns the network address of the given peer (if known). - NetworkAddress(address.NodeID) (address.NodeID, error) + NetworkAddress(address.NodeID) (address.NodeAddr, error) } // ServerEndpoint is a Kademlia endpoint that can handle requests from remote diff --git a/network/endpoint/fakeendpoint/fakeendpoint.go b/network/endpoint/fakeendpoint/fakeendpoint.go index bf0c799..ac8d08c 100644 --- a/network/endpoint/fakeendpoint/fakeendpoint.go +++ b/network/endpoint/fakeendpoint/fakeendpoint.go @@ -23,7 +23,7 @@ type FakeEndpoint struct { self address.NodeID sched scheduler.Scheduler // client - peerstore map[string]address.NodeID + peerstore map[string]address.NodeAddr connStatus map[string]network.Connectedness serverProtos map[address.ProtocolID]endpoint.RequestHandlerFn // server streamFollowup map[endpoint.StreamID]endpoint.ResponseHandlerFn // client @@ -41,7 +41,7 @@ func NewFakeEndpoint(self address.NodeID, sched scheduler.Scheduler, router *Fak sched: sched, serverProtos: make(map[address.ProtocolID]endpoint.RequestHandlerFn), - peerstore: make(map[string]address.NodeID), + peerstore: make(map[string]address.NodeAddr), connStatus: make(map[string]network.Connectedness), streamFollowup: make(map[endpoint.StreamID]endpoint.ResponseHandlerFn), @@ -78,8 +78,8 @@ func (e *FakeEndpoint) DialPeer(ctx context.Context, id address.NodeID) error { // MaybeAddToPeerstore adds the given address to the peerstore. FakeEndpoint // doesn't take into account the ttl. -func (e *FakeEndpoint) MaybeAddToPeerstore(ctx context.Context, id address.NodeID, ttl time.Duration) error { - strNodeID := id.String() +func (e *FakeEndpoint) MaybeAddToPeerstore(ctx context.Context, id address.NodeAddr, ttl time.Duration) error { + strNodeID := id.NodeID().String() _, span := util.StartSpan(ctx, "MaybeAddToPeerstore", trace.WithAttributes(attribute.String("self", e.self.String())), trace.WithAttributes(attribute.String("id", strNodeID)), @@ -107,16 +107,28 @@ func (e *FakeEndpoint) SendRequestHandleResponse(ctx context.Context, if err := e.DialPeer(ctx, id); err != nil { span.RecordError(err) - handleResp(ctx, nil, err) - return err + e.sched.EnqueueAction(ctx, ba.BasicAction(func(ctx context.Context) { + handleResp(ctx, nil, err) + })) + return nil } // send request - sid, err := e.router.SendMessage(ctx, e.self, id, protoID, 0, req) + addr, ok := e.peerstore[id.String()] + if !ok { + span.RecordError(endpoint.ErrUnknownPeer) + e.sched.EnqueueAction(ctx, ba.BasicAction(func(ctx context.Context) { + handleResp(ctx, nil, endpoint.ErrUnknownPeer) + })) + return nil + } + sid, err := e.router.SendMessage(ctx, e.self, addr.NodeID(), protoID, 0, req) if err != nil { span.RecordError(err) - handleResp(ctx, nil, err) - return err + e.sched.EnqueueAction(ctx, ba.BasicAction(func(ctx context.Context) { + handleResp(ctx, nil, err) + })) + return nil } e.streamFollowup[sid] = handleResp @@ -151,10 +163,13 @@ func (e *FakeEndpoint) Connectedness(id address.NodeID) (network.Connectedness, } } -func (e *FakeEndpoint) NetworkAddress(id address.NodeID) (address.NodeID, error) { +func (e *FakeEndpoint) NetworkAddress(id address.NodeID) (address.NodeAddr, error) { if ai, ok := e.peerstore[id.String()]; ok { return ai, nil } + if na, ok := id.(address.NodeAddr); ok { + return na, nil + } return nil, endpoint.ErrUnknownPeer } diff --git a/network/endpoint/fakeendpoint/fakeendpoint_test.go b/network/endpoint/fakeendpoint/fakeendpoint_test.go index 290f26d..defc086 100644 --- a/network/endpoint/fakeendpoint/fakeendpoint_test.go +++ b/network/endpoint/fakeendpoint/fakeendpoint_test.go @@ -27,14 +27,14 @@ func TestFakeEndpoint(t *testing.T) { ctx := context.Background() clk := clock.NewMock() - kadid := si.StringID("self") + selfID := si.StringID("self") router := NewFakeRouter() sched := simplescheduler.NewSimpleScheduler(clk) - fakeEndpoint := NewFakeEndpoint(kadid, sched, router) + fakeEndpoint := NewFakeEndpoint(selfID, sched, router) - b, err := kadid.Key().Equal(fakeEndpoint.KadKey()) + b, err := selfID.Key().Equal(fakeEndpoint.KadKey()) require.NoError(t, err) require.True(t, b) @@ -46,10 +46,11 @@ func TestFakeEndpoint(t *testing.T) { require.NoError(t, err) require.Equal(t, network.NotConnected, connectedness) - _, err = fakeEndpoint.NetworkAddress(node0) - require.Equal(t, endpoint.ErrUnknownPeer, err) + na, err := fakeEndpoint.NetworkAddress(node0) + require.NoError(t, err) + require.Equal(t, na, node0) - req := simmessage.NewSimRequest(kadid.Key()) + req := simmessage.NewSimRequest(selfID.Key()) resp := &simmessage.SimMessage{} var runCheck bool @@ -58,6 +59,8 @@ func TestFakeEndpoint(t *testing.T) { runCheck = true } fakeEndpoint.SendRequestHandleResponse(ctx, protoID, node0, req, resp, 0, respHandler) + require.True(t, sched.RunOne(ctx)) + require.False(t, sched.RunOne(ctx)) require.True(t, runCheck) err = fakeEndpoint.MaybeAddToPeerstore(ctx, node0, peerstoreTTL) @@ -67,12 +70,15 @@ func TestFakeEndpoint(t *testing.T) { require.NoError(t, err) require.Equal(t, network.CanConnect, connectedness) - na, err := fakeEndpoint.NetworkAddress(node0) + na, err = fakeEndpoint.NetworkAddress(node0) require.NoError(t, err) require.Equal(t, node0, na) // it will still be an ErrUnknownPeer because we haven't added node0 to the router - fakeEndpoint.SendRequestHandleResponse(ctx, protoID, node0, req, resp, 0, respHandler) + err = fakeEndpoint.SendRequestHandleResponse(ctx, protoID, node0, req, resp, 0, respHandler) + require.NoError(t, err) + require.True(t, sched.RunOne(ctx)) + require.False(t, sched.RunOne(ctx)) sched0 := simplescheduler.NewSimpleScheduler(clk) fakeEndpoint0 := NewFakeEndpoint(node0, sched0, router) @@ -90,7 +96,9 @@ func TestFakeEndpoint(t *testing.T) { require.NoError(t, err) runCheck = true } - fakeEndpoint.SendRequestHandleResponse(ctx, protoID, node0, req, resp, 0, respHandler) + + err = fakeEndpoint.SendRequestHandleResponse(ctx, protoID, node0, req, resp, 0, respHandler) + require.NoError(t, err) require.True(t, sched0.RunOne(ctx)) require.False(t, sched0.RunOne(ctx)) @@ -134,12 +142,12 @@ func TestRequestTimeout(t *testing.T) { nPeers := 2 scheds := make([]scheduler.AwareScheduler, nPeers) - ids := make([]address.NodeID, nPeers) + ids := make([]address.NodeAddr, nPeers) fakeEndpoints := make([]*FakeEndpoint, nPeers) for i := 0; i < nPeers; i++ { ids[i] = kadid.NewKadID([]byte{byte(i)}) scheds[i] = simplescheduler.NewSimpleScheduler(clk) - fakeEndpoints[i] = NewFakeEndpoint(ids[i], scheds[i], router) + fakeEndpoints[i] = NewFakeEndpoint(ids[i].NodeID(), scheds[i], router) } // connect the peers to each other @@ -160,7 +168,7 @@ func TestRequestTimeout(t *testing.T) { fakeEndpoints[1].AddRequestHandler(protoID, nil, dropRequestHandler) // fakeEndpoints[0] will send a request to fakeEndpoints[1], but the request // will timeout (because fakeEndpoints[1] will not respond) - fakeEndpoints[0].SendRequestHandleResponse(ctx, protoID, ids[1], nil, nil, + fakeEndpoints[0].SendRequestHandleResponse(ctx, protoID, ids[1].NodeID(), nil, nil, time.Second, func(ctx context.Context, msg message.MinKadResponseMessage, err error) { timeoutExecuted = true @@ -181,7 +189,7 @@ func TestRequestTimeout(t *testing.T) { require.True(t, timeoutExecuted) // timeout without followup action - fakeEndpoints[0].SendRequestHandleResponse(ctx, protoID, ids[1], nil, nil, + fakeEndpoints[0].SendRequestHandleResponse(ctx, protoID, ids[1].NodeID(), nil, nil, time.Second, nil) // peer[1] has 1 action to run: message handling and not responding require.True(t, scheds[1].RunOne(ctx)) @@ -202,10 +210,10 @@ func TestRequestTimeout(t *testing.T) { return req, nil } // create valid message - msg := simmessage.NewSimResponse(make([]address.NodeID, 0)) + msg := simmessage.NewSimResponse([]address.NodeAddr{}) // overwrite request handler fakeEndpoints[1].AddRequestHandler(protoID, nil, dumbResponseHandler) - fakeEndpoints[0].SendRequestHandleResponse(ctx, protoID, ids[1], msg, nil, + fakeEndpoints[0].SendRequestHandleResponse(ctx, protoID, ids[1].NodeID(), msg, nil, time.Second, func(ctx context.Context, msg message.MinKadResponseMessage, err error) { require.NoError(t, err) diff --git a/network/endpoint/libp2pendpoint/libp2pendpoint.go b/network/endpoint/libp2pendpoint/libp2pendpoint.go index e42a23b..0afe2e5 100644 --- a/network/endpoint/libp2pendpoint/libp2pendpoint.go +++ b/network/endpoint/libp2pendpoint/libp2pendpoint.go @@ -128,9 +128,9 @@ func (e *Libp2pEndpoint) DialPeer(ctx context.Context, id address.NodeID) error } func (e *Libp2pEndpoint) MaybeAddToPeerstore(ctx context.Context, - id address.NodeID, ttl time.Duration) error { + id address.NodeAddr, ttl time.Duration) error { _, span := util.StartSpan(ctx, "Libp2pEndpoint.MaybeAddToPeerstore", - trace.WithAttributes(attribute.String("PeerID", id.String()))) + trace.WithAttributes(attribute.String("PeerID", id.NodeID().String()))) defer span.End() ai, ok := id.(*addrinfo.AddrInfo) @@ -139,11 +139,11 @@ func (e *Libp2pEndpoint) MaybeAddToPeerstore(ctx context.Context, } // Don't add addresses for self or our connected peers. We have better ones. - if ai.ID == e.host.ID() || - e.host.Network().Connectedness(ai.ID) == network.Connected { + if ai.PeerID().ID == e.host.ID() || + e.host.Network().Connectedness(ai.PeerID().ID) == network.Connected { return nil } - e.host.Peerstore().AddAddrs(ai.ID, ai.Addrs, ttl) + e.host.Peerstore().AddAddrs(ai.PeerID().ID, ai.Addrs, ttl) return nil } @@ -261,7 +261,7 @@ func (e *Libp2pEndpoint) KadKey() key.KadKey { return peerid.PeerID{ID: e.host.ID()}.Key() } -func (e *Libp2pEndpoint) NetworkAddress(n address.NodeID) (address.NodeID, error) { +func (e *Libp2pEndpoint) NetworkAddress(n address.NodeID) (address.NodeAddr, error) { ai, err := e.PeerInfo(n) if err != nil { return nil, err @@ -305,7 +305,9 @@ func (e *Libp2pEndpoint) AddRequestHandler(protoID address.ProtocolID, return } - requester := peerid.NewPeerID(s.Conn().RemotePeer()) + requester := addrinfo.NewAddrInfo( + e.host.Peerstore().PeerInfo(s.Conn().RemotePeer()), + ) resp, err := reqHandler(ctx, requester, req) if err != nil { span.RecordError(err) diff --git a/network/endpoint/libp2pendpoint/libp2pendpoint_test.go b/network/endpoint/libp2pendpoint/libp2pendpoint_test.go index 521d8df..ce99571 100644 --- a/network/endpoint/libp2pendpoint/libp2pendpoint_test.go +++ b/network/endpoint/libp2pendpoint/libp2pendpoint_test.go @@ -93,7 +93,7 @@ func TestLibp2pEndpoint(t *testing.T) { require.NoError(t, err) ai, ok := netAddr.(*addrinfo.AddrInfo) require.True(t, ok) - require.Equal(t, ids[1].ID, ai.ID) + require.Equal(t, ids[1].ID, ai.PeerID().ID) require.Len(t, ai.Addrs, len(addrinfos[1].Addrs)) for _, addr := range ai.Addrs { require.Contains(t, addrinfos[1].Addrs, addr) @@ -102,7 +102,6 @@ func TestLibp2pEndpoint(t *testing.T) { netAddr, err = endpoints[0].NetworkAddress(invalidID) require.Equal(t, endpoint.ErrInvalidPeer, err) require.Nil(t, netAddr) - // dial from 0 to 1 err = endpoints[0].DialPeer(ctx, ids[1]) require.NoError(t, err) diff --git a/network/message/ipfsv1/helpers.go b/network/message/ipfsv1/helpers.go index 66ec3bc..3af9fd9 100644 --- a/network/message/ipfsv1/helpers.go +++ b/network/message/ipfsv1/helpers.go @@ -48,10 +48,10 @@ func (msg *Message) EmptyResponse() message.MinKadResponseMessage { return &Message{} } -func (msg *Message) CloserNodes() []address.NodeID { +func (msg *Message) CloserNodes() []address.NodeAddr { closerPeers := msg.GetCloserPeers() if closerPeers == nil { - return []address.NodeID{} + return []address.NodeAddr{} } return ParsePeers(closerPeers) } @@ -74,8 +74,8 @@ func PBPeerToPeerInfo(pbp *Message_Peer) (*addrinfo.AddrInfo, error) { }), nil } -func ParsePeers(pbps []*Message_Peer) []address.NodeID { - peers := make([]address.NodeID, 0, len(pbps)) +func ParsePeers(pbps []*Message_Peer) []address.NodeAddr { + peers := make([]address.NodeAddr, 0, len(pbps)) for _, p := range pbps { pi, err := PBPeerToPeerInfo(p) if err == nil { diff --git a/network/message/ipfsv1/helpers_test.go b/network/message/ipfsv1/helpers_test.go index 415f236..d114136 100644 --- a/network/message/ipfsv1/helpers_test.go +++ b/network/message/ipfsv1/helpers_test.go @@ -56,7 +56,7 @@ func TestFindPeerResponse(t *testing.T) { fakeEndpoint := fakeendpoint.NewFakeEndpoint(selfAddr, nil, nil) nPeers := 5 - closerPeers := make([]address.NodeID, nPeers) + closerPeers := make([]address.NodeAddr, nPeers) closerIds := make([]address.NodeID, nPeers) for i := 0; i < nPeers; i++ { s := strconv.Itoa(2 + i) @@ -86,7 +86,7 @@ func TestCornerCases(t *testing.T) { require.Nil(t, resp.Target()) require.Equal(t, 0, len(resp.CloserNodes())) - fakeEndpoint := fakeendpoint.NewFakeEndpoint(peerid.PeerID{}, nil, nil) + fakeEndpoint := fakeendpoint.NewFakeEndpoint(addrinfo.AddrInfo{}, nil, nil) n0, err := peer.Decode("1D3oooUnknownPeer") require.NoError(t, err) ids = append(ids, &peerid.PeerID{ID: n0}) diff --git a/network/message/message.go b/network/message/message.go index efccbbe..7068a84 100644 --- a/network/message/message.go +++ b/network/message/message.go @@ -19,7 +19,7 @@ type MinKadRequestMessage interface { type MinKadResponseMessage interface { MinKadMessage - CloserNodes() []address.NodeID + CloserNodes() []address.NodeAddr } type ProtoKadMessage interface { diff --git a/network/message/simmessage/simmessage.go b/network/message/simmessage/simmessage.go index b2dea5c..f5d1739 100644 --- a/network/message/simmessage/simmessage.go +++ b/network/message/simmessage/simmessage.go @@ -8,7 +8,7 @@ import ( type SimMessage struct { target key.KadKey - closerPeers []address.NodeID + closerPeers []address.NodeAddr } var _ message.MinKadRequestMessage = (*SimMessage)(nil) @@ -20,7 +20,7 @@ func NewSimRequest(target key.KadKey) *SimMessage { } } -func NewSimResponse(closerPeers []address.NodeID) *SimMessage { +func NewSimResponse(closerPeers []address.NodeAddr) *SimMessage { return &SimMessage{ closerPeers: closerPeers, } @@ -34,6 +34,6 @@ func (m *SimMessage) EmptyResponse() message.MinKadResponseMessage { return &SimMessage{} } -func (m *SimMessage) CloserNodes() []address.NodeID { +func (m *SimMessage) CloserNodes() []address.NodeAddr { return m.closerPeers } diff --git a/network/message/simmessage/simmessage_test.go b/network/message/simmessage/simmessage_test.go index 83e6510..951702c 100644 --- a/network/message/simmessage/simmessage_test.go +++ b/network/message/simmessage/simmessage_test.go @@ -23,7 +23,7 @@ func TestSimRequest(t *testing.T) { } func TestSimResponse(t *testing.T) { - closerPeers := []address.NodeID{si.StringID("peer1"), si.StringID("peer2")} + closerPeers := []address.NodeAddr{si.StringID("peer1"), si.StringID("peer2")} msg := NewSimResponse(closerPeers) require.Nil(t, msg.Target()) diff --git a/query/simplequery/options.go b/query/simplequery/options.go index 1d58d1b..d78cdb1 100644 --- a/query/simplequery/options.go +++ b/query/simplequery/options.go @@ -80,7 +80,11 @@ var DefaultConfig = func(cfg *Config) error { cfg.HandleResultsFunc = func(ctx context.Context, id address.NodeID, resp message.MinKadResponseMessage) (bool, []address.NodeID) { - return false, resp.CloserNodes() + ids := make([]address.NodeID, len(resp.CloserNodes())) + for i, n := range resp.CloserNodes() { + ids[i] = n.NodeID() + } + return false, ids } cfg.NotifyFailureFunc = func(context.Context) {} diff --git a/query/simplequery/peerlist.go b/query/simplequery/peerlist.go index a4b4766..8f7d3f3 100644 --- a/query/simplequery/peerlist.go +++ b/query/simplequery/peerlist.go @@ -5,6 +5,7 @@ import ( "github.com/plprobelab/go-kademlia/key" "github.com/plprobelab/go-kademlia/network/address" + "github.com/plprobelab/go-kademlia/network/endpoint" ) type peerStatus uint8 @@ -17,15 +18,18 @@ const ( ) type peerInfo struct { - distance key.KadKey - status peerStatus - id address.NodeID + distance key.KadKey + status peerStatus + id address.NodeID + addrs []address.Addr + tryAgainOnFailure bool next *peerInfo } type peerList struct { - target key.KadKey + target key.KadKey + endpoint endpoint.NetworkedEndpoint closest *peerInfo closestQueued *peerInfo @@ -33,10 +37,14 @@ type peerList struct { queuedCount int } -func newPeerList(target key.KadKey) *peerList { +func newPeerList(target key.KadKey, ep endpoint.Endpoint) *peerList { + nep, ok := ep.(endpoint.NetworkedEndpoint) + if !ok { + nep = nil + } return &peerList{ - target: target, - queuedCount: 0, + target: target, + endpoint: nep, } } @@ -65,7 +73,7 @@ func (pl *peerList) addToPeerlist(ids []address.NodeID) { // merge the new sorted list into the existing sorted list var prev *peerInfo - currOld := true + currOld := true // current element is from old list closestQueuedReached := false r, _ := oldHead.distance.Compare(newHead.distance) @@ -106,12 +114,40 @@ func (pl *peerList) addToPeerlist(ids []address.NodeID) { prev.next = oldHead currOld = true } - prev = oldHead - oldHead = oldHead.next if r == 0 { // newHead is a duplicate of oldHead, discard newHead newHead = newHead.next + + if pl.endpoint != nil && (oldHead.status == unreachable || + oldHead.status == waiting) { + // If oldHead is unreachable, and new addresses are discovered + // for this NodeID, set status to queued as we will try again. + // If oldHead is waiting, set tryAgainOnFailure to true + // so that we will try again with the newly discovered + // addresses upon failure. + na, err := pl.endpoint.NetworkAddress(oldHead.id) + if err == nil { + for _, addr := range na.Addresses() { + found := false + for _, oldAddr := range oldHead.addrs { + if addr == oldAddr { + found = true + break + } + } + if !found { + if oldHead.status == unreachable { + pl.enqueueUnreachablePeer(oldHead) + } else if oldHead.status == waiting { + oldHead.tryAgainOnFailure = true + } + } + } + } + } } + prev = oldHead + oldHead = oldHead.next } // we are done when we reach the end of either list if oldHead == nil || newHead == nil { @@ -122,6 +158,9 @@ func (pl *peerList) addToPeerlist(ids []address.NodeID) { // append the remaining list to the end if oldHead == nil { + if !closestQueuedReached { + pl.closestQueued = newHead + } prev.next = newHead // if there are still new peers to be appended, increase queued count @@ -179,41 +218,46 @@ func addrInfoToPeerInfo(target key.KadKey, id address.NodeID) *peerInfo { } } -func (pl *peerList) updatePeerStatusInPeerlist(id address.NodeID, newStatus peerStatus) { - curr := pl.closest - for curr != nil && curr.id.String() != id.String() { - curr = curr.next - } - if curr != nil { - if curr.status == queued && newStatus != queued { - pl.queuedCount-- - } else if curr.status != queued && newStatus == queued { - pl.queuedCount++ +func (pl *peerList) enqueueUnreachablePeer(pi *peerInfo) { + if pi != nil { + // curr is the id we are looking for + if pi.status != queued { + pi.tryAgainOnFailure = false + pi.status = queued - for curr := pl.closest; curr != nil; curr = curr.next { - // if a peer is set to queued, we may need to update closestQueued - if curr.id.String() == id.String() { - pl.closestQueued = curr - break - } else if curr == pl.closestQueued { - break - } + pl.queuedCount++ + // if curr is closer to target than closestQueued, update closestQueued + if r, _ := pi.distance.Compare(pl.closestQueued.distance); r < 0 { + pl.closestQueued = pi } } - - curr.status = newStatus - - if curr == pl.closestQueued && newStatus != queued { - pl.closestQueued = findNextQueued(curr) - } } + } +// setPeerWaiting sets the status of a "queued" peer to "waiting". it records +// the addresses associated with id in the peerstore at the time of the request +// to curr.addrs. func (pl *peerList) popClosestQueued() address.NodeID { if pl.closestQueued == nil { return nil } pi := pl.closestQueued + if pl.endpoint != nil { + na, _ := pl.endpoint.NetworkAddress(pi.id) + for na == nil { + // if peer doesn't have addresses, set status to unreachable + pi.status = unreachable + pl.queuedCount-- + + pi = findNextQueued(pi) + if pi == nil { + return nil + } + na, _ = pl.endpoint.NetworkAddress(pi.id) + } + pi.addrs = na.Addresses() + } pi.status = waiting pl.queuedCount-- @@ -221,6 +265,37 @@ func (pl *peerList) popClosestQueued() address.NodeID { return pi.id } +func (pl *peerList) queriedPeer(id address.NodeID) { + curr := pl.closest + for curr != nil && curr.id.String() != id.String() { + curr = curr.next + } + if curr != nil { + // curr is the id we are looking for + if curr.status == waiting { + curr.status = queried + } + } +} + +// unreachablePeer sets the status of a "waiting" peer to "unreachable". +func (pl *peerList) unreachablePeer(id address.NodeID) { + curr := pl.closest + for curr != nil && curr.id.String() != id.String() { + curr = curr.next + } + if curr != nil { + // curr is the id we are looking for + if curr.status == waiting { + if curr.tryAgainOnFailure { + pl.enqueueUnreachablePeer(curr) + } else { + curr.status = unreachable + } + } + } +} + func findNextQueued(pi *peerInfo) *peerInfo { curr := pi for curr != nil && curr.status != queued { diff --git a/query/simplequery/peerlist_test.go b/query/simplequery/peerlist_test.go index b3b1661..9d0ed4a 100644 --- a/query/simplequery/peerlist_test.go +++ b/query/simplequery/peerlist_test.go @@ -1,254 +1,298 @@ package simplequery import ( + "context" "testing" "github.com/libp2p/go-libp2p/core/peer" "github.com/plprobelab/go-kademlia/network/address" + "github.com/plprobelab/go-kademlia/network/address/kadaddr" + "github.com/plprobelab/go-kademlia/network/address/kadid" "github.com/plprobelab/go-kademlia/network/address/peerid" + "github.com/plprobelab/go-kademlia/network/endpoint/fakeendpoint" "github.com/stretchr/testify/require" ) func TestAddPeers(t *testing.T) { + target := kadid.NewKadID([]byte{0x00}) + // create empty peer list - pl := newPeerList(make([]byte, 32)) + pl := newPeerList(target.Key(), nil) require.Nil(t, pl.closest) require.Nil(t, pl.closestQueued) - // add initial peers - nPeers := 3 - peerids := make([]address.NodeID, nPeers+1) - for i := 0; i < nPeers; i++ { - peerids[i] = peerid.PeerID{ID: peer.ID(byte(i))} + initialIds := []address.NodeID{ + kadid.NewKadID([]byte{0xa0}), + kadid.NewKadID([]byte{0x08}), + kadid.NewKadID([]byte{0x20}), + kadid.NewKadID([]byte{0xa0}), // duplicate with initialIds[0] } - peerids[nPeers] = peerid.PeerID{ID: peer.ID(byte(0))} // duplicate with peerids[0] - - // distances - // peerids[0]: 6e340b9cffb37a989ca544e6bb780a2c78901d3fb33738768511a30617afa01d - // peerids[1]: 4bf5122f344554c53bde2ebb8cd2b7e3d1600ad631c385a5d7cce23c7785459a - // peerids[2]: dbc1b4c900ffe48d575b5da5c638040125f65db0fe3e24494b76ea986457d986 - // peerids[3]: 6e340b9cffb37a989ca544e6bb780a2c78901d3fb33738768511a30617afa01d // add 4 peers (incl. 1 duplicate) - pl.addToPeerlist(peerids) - - require.Equal(t, nPeers, pl.queuedCount) + pl.addToPeerlist(initialIds) + require.Equal(t, len(initialIds)-1, pl.queuedCount) curr := pl.closest - // verify that closest peer is peerids[1] - require.Equal(t, peerids[1], curr.id) + // verify that closest peer is initialIds[1] + require.Equal(t, initialIds[1], curr.id) + // verify that closestQueued peer is initialIds[1] + require.Equal(t, initialIds[1], pl.closestQueued.id) curr = curr.next - // second closest peer should be peerids[0] - require.Equal(t, peerids[0], curr.id) + // verify that next peer is initialIds[2] + require.Equal(t, initialIds[2], curr.id) curr = curr.next - // third closest peer should be peerids[2] - require.Equal(t, peerids[2], curr.id) - + // verify that next peer is initialIds[0] + require.Equal(t, initialIds[0], curr.id) // end of the list require.Nil(t, curr.next) - // verify that closestQueued peer is peerids[0] - require.Equal(t, peerids[1], pl.closestQueued.id) - - // add more peers - nPeers = 5 - newPeerids := make([]address.NodeID, nPeers+2) - for i := 0; i < nPeers; i++ { - newPeerids[i] = peerid.PeerID{ID: peer.ID(byte(10 + i))} + additionalIds := []address.NodeID{ + kadid.NewKadID([]byte{0x40}), + kadid.NewKadID([]byte{0x20}), // duplicate with initialIds[2] + kadid.NewKadID([]byte{0x60}), + kadid.NewKadID([]byte{0x80}), + kadid.NewKadID([]byte{0x60}), // duplicate additionalIds[2] + kadid.NewKadID([]byte{0x18}), + kadid.NewKadID([]byte{0xf0}), } - newPeerids[nPeers] = peerid.PeerID{ID: peer.ID(byte(10))} // duplicate with newPeerids[0] - newPeerids[nPeers+1] = peerid.PeerID{ID: peer.ID(byte(1))} // duplicate with peerids[1] - // distances - // newPeerids[0]: 01ba4719c80b6fe911b091a7c05124b64eeece964e09c058ef8f9805daca546b - // newPeerids[1]: e7cf46a078fed4fafd0b5e3aff144802b853f8ae459a4f0c14add3314b7cc3a6 - // newPeerids[2]: ef6cbd2161eaea7943ce8693b9824d23d1793ffb1c0fca05b600d3899b44c977 - // newPeerids[3]: 9d1e0e2d9459d06523ad13e28a4093c2316baafe7aec5b25f30eba2e113599c4 - // newPeerids[4]: 4d7b3ef7300acf70c892d8327db8272f54434adbc61a4e130a563cb59a0d0f47 - // newPeerids[5]: 01ba4719c80b6fe911b091a7c05124b64eeece964e09c058ef8f9805daca546b - // newPeerids[6]: 4bf5122f344554c53bde2ebb8cd2b7e3d1600ad631c385a5d7cce23c7785459a - - // add 7 peers (incl. 2 duplicates) - pl.addToPeerlist(newPeerids) - - require.Equal(t, 8, pl.queuedCount) - - // order is now as follows: - order := []address.NodeID{newPeerids[0], peerids[1], newPeerids[4], peerids[0], newPeerids[3], - peerids[2], newPeerids[1], newPeerids[2]} + // add 7 more peers (incl. 2 duplicates) + pl.addToPeerlist(additionalIds) + require.Equal(t, len(initialIds)-1+len(additionalIds)-2, pl.queuedCount) curr = pl.closest - for _, p := range order { - require.Equal(t, p, curr.id) - curr = curr.next - } - require.Nil(t, curr) - - // verify that closestQueued peer is peerids[0] - require.Equal(t, newPeerids[0], pl.closestQueued.id) + // verify that closest peer is initialIds[1] 0x08 + require.Equal(t, initialIds[1], curr.id) + // verify that closestQueued peer is initialIds[1] 0x08 + require.Equal(t, initialIds[1], pl.closestQueued.id) + curr = curr.next + // verify that next peer is additionalIds[5] 0x18 + require.Equal(t, additionalIds[5], curr.id) + curr = curr.next + // verify that next peer is initialIds[2] 0x20 + require.Equal(t, initialIds[2], curr.id) + curr = curr.next + // verify that next peer is additionalIds[0] 0x40 + require.Equal(t, additionalIds[0], curr.id) + curr = curr.next + // verify that next peer is additionalIds[2] 0x60 + require.Equal(t, additionalIds[2], curr.id) + curr = curr.next + // verify that next peer is additionalIds[3] 0x80 + require.Equal(t, additionalIds[3], curr.id) + curr = curr.next + // verify that next peer is initialIds[0] 0xa0 + require.Equal(t, initialIds[0], curr.id) + curr = curr.next + // verify that next peer is additionalIds[6] 0xf0 + require.Equal(t, additionalIds[6], curr.id) + // end of the list + require.Nil(t, curr.next) - // add a single peer that isn't the closest one - newPeer := peerid.PeerID{ID: peer.ID(byte(20))} + // add 1 more peer, the closest to target + newId := kadid.NewKadID([]byte{0x00}) + pl.addToPeerlist([]address.NodeID{newId}) + require.Equal(t, len(initialIds)-1+len(additionalIds)-2+1, pl.queuedCount) - pl.addToPeerlist([]address.NodeID{newPeer}) - order = append(order[:5], order[4:]...) - order[4] = newPeer + // it must be the closest peer + require.Equal(t, newId, pl.closest.id) + require.Equal(t, newId, pl.closestQueued.id) - curr = pl.closest - for _, p := range order { - require.Equal(t, p, curr.id) - curr = curr.next - } + // add empty list + pl.addToPeerlist([]address.NodeID{}) + require.Equal(t, len(initialIds)-1+len(additionalIds)-2+1, pl.queuedCount) - require.Nil(t, curr) + // add list containing nil element + pl.addToPeerlist([]address.NodeID{nil}) + require.Equal(t, len(initialIds)-1+len(additionalIds)-2+1, pl.queuedCount) } -func TestPeerlistCornerCases(t *testing.T) { - // different empty peerid lists - emptyPeeridLists := [][]address.NodeID{ - {}, - {peerid.PeerID{ID: peer.ID("")}}, - make([]address.NodeID, 3), - } - - for _, peerids := range emptyPeeridLists { - // adding them to a peerlist should result in an empty list - require.Nil(t, sliceToPeerInfos(make([]byte, 32), peerids)) - - pl := newPeerList(make([]byte, 32)) - pl.addToPeerlist(peerids) - require.Nil(t, pl.closest) - require.Nil(t, pl.closestQueued) - require.Equal(t, 0, pl.queuedCount) - } - - pl := newPeerList(make([]byte, 32)) +func TestChangeStatus(t *testing.T) { + target := kadid.NewKadID([]byte{0x00}) - singlePeerList0 := []address.NodeID{peerid.PeerID{ID: peer.ID(byte(0))}} - // 6e340b9cffb37a989ca544e6bb780a2c78901d3fb33738768511a30617afa01d - pl.addToPeerlist(singlePeerList0) - require.Equal(t, singlePeerList0[0], pl.closest.id) - require.Equal(t, singlePeerList0[0], pl.closestQueued.id) - require.Equal(t, 1, pl.queuedCount) - - singlePeerList1 := []address.NodeID{peerid.PeerID{ID: peer.ID(byte(1))}} - // 4bf5122f344554c53bde2ebb8cd2b7e3d1600ad631c385a5d7cce23c7785459a - pl.addToPeerlist(singlePeerList1) - require.Equal(t, singlePeerList1[0], pl.closest.id) - require.Equal(t, singlePeerList1[0], pl.closest.id) - require.Equal(t, 2, pl.queuedCount) - - singlePeerList2 := []address.NodeID{peerid.PeerID{ID: peer.ID(byte(2))}} - // dbc1b4c900ffe48d575b5da5c638040125f65db0fe3e24494b76ea986457d986 - pl.addToPeerlist(singlePeerList2) - require.Equal(t, 3, pl.queuedCount) - - curr := pl.closest - require.Equal(t, singlePeerList1[0], curr.id) - curr = curr.next - require.Equal(t, singlePeerList0[0], curr.id) - curr = curr.next - require.Equal(t, singlePeerList2[0], curr.id) - curr = curr.next - require.Nil(t, curr) -} - -func TestUpdatePeerStatusInPeerlist(t *testing.T) { // create empty peer list - pl := newPeerList(make([]byte, 32)) + pl := newPeerList(target.Key(), nil) - // add initial peers - nPeers := 3 - peerids := make([]address.NodeID, nPeers) + require.Nil(t, pl.closest) + require.Nil(t, pl.closestQueued) + require.Nil(t, pl.popClosestQueued()) + + nPeers := 32 + ids := make([]address.NodeID, nPeers) for i := 0; i < nPeers; i++ { - peerids[i] = peerid.PeerID{ID: peer.ID(byte(i))} + ids[i] = kadid.NewKadID([]byte{byte(4 * i)}) } - pl.addToPeerlist(peerids) - - require.Equal(t, 3, pl.queuedCount) - - // initial queue state - // peerids[1], queued, 4bf5122f344554c53bde2ebb8cd2b7e3d1600ad631c385a5d7cce23c7785459a - // peerids[0], queued, 6e340b9cffb37a989ca544e6bb780a2c78901d3fb33738768511a30617afa01d - // peerids[2], queued, dbc1b4c900ffe48d575b5da5c638040125f65db0fe3e24494b76ea986457d986 - - pl.updatePeerStatusInPeerlist(peerids[0], waiting) - require.Equal(t, 2, pl.queuedCount) - pl.updatePeerStatusInPeerlist(peerids[1], unreachable) - require.Equal(t, 1, pl.queuedCount) + // add peers + pl.addToPeerlist(ids) + require.Equal(t, nPeers, pl.queuedCount) - curr := pl.closest - require.Equal(t, curr.status, unreachable) - curr = curr.next - require.Equal(t, curr.status, waiting) - curr = curr.next - require.Equal(t, curr.status, queued) - curr = curr.next - require.Nil(t, curr) + require.Equal(t, ids[0], pl.closest.id) + require.Equal(t, ids[0], pl.closestQueued.id) + + var popCounter int + // change status of the closest peer + require.Equal(t, ids[popCounter], pl.popClosestQueued()) + popCounter++ + // ids[0] is still the closest + require.Equal(t, ids[0], pl.closest.id) + // ids[1] is now the closestQueued + require.Equal(t, ids[popCounter], pl.closestQueued.id) + require.Equal(t, ids[popCounter], pl.popClosestQueued()) + popCounter++ + // ids[2] is now the closestQueued + require.Equal(t, ids[popCounter], pl.closestQueued.id) + + // mark ids[1] as queried + pl.queriedPeer(ids[1]) + require.Equal(t, waiting, pl.closest.status) + require.Equal(t, queried, pl.closest.next.status) + + // ids[2] cannot be set as unreachable (hasn't been waiting yet) + pl.unreachablePeer(ids[2]) + require.Equal(t, queued, pl.closest.next.next.status) + // set ids[2] as waiting + require.Equal(t, ids[popCounter], pl.popClosestQueued()) + popCounter++ + require.Equal(t, waiting, pl.closest.next.next.status) + // set ids[2] as unreachable + pl.unreachablePeer(ids[2]) + require.Equal(t, unreachable, pl.closest.next.next.status) + + // inserting a new peer that isn't the absolute closest, but is now the + // closest queued peer + newID := kadid.NewKadID([]byte{0x05}) + pl.addToPeerlist([]address.NodeID{newID}) + require.Equal(t, newID, pl.closestQueued.id) + require.Equal(t, ids[0], pl.closest.id) } -func TestPopClosestQueued(t *testing.T) { +func TestMultiAddrs(t *testing.T) { + ctx := context.Background() + keylen := 32 + self := kadid.NewKadID(append([]byte{0x80}, make([]byte, keylen-1)...)) + ep := fakeendpoint.NewFakeEndpoint(self, nil, nil) + // create empty peer list - pl := newPeerList(make([]byte, 32)) + pl := newPeerList(make([]byte, keylen), ep) - // add initial peers - nPeers := 3 - peerids := make([]address.NodeID, nPeers) + require.Nil(t, pl.closest) + require.Nil(t, pl.closestQueued) + + // create initial peers + nPeers := 5 + ids := make([]address.NodeID, nPeers) + addrs := make([]*kadaddr.KadAddr, nPeers) for i := 0; i < nPeers; i++ { - peerids[i] = peerid.PeerID{ID: peer.ID(byte(i))} + id := kadid.NewKadID(append([]byte{byte(16 * i)}, make([]byte, keylen-1)...)) + ids[i] = id + addrs[i] = kadaddr.NewKadAddr(id, []string{}) + ep.MaybeAddToPeerstore(ctx, addrs[i], 1) } - pl.addToPeerlist(peerids) - require.Equal(t, 3, pl.queuedCount) - - // initial queue state - // peerids[1], queued, 4bf5122f344554c53bde2ebb8cd2b7e3d1600ad631c385a5d7cce23c7785459a - // peerids[0], queued, 6e340b9cffb37a989ca544e6bb780a2c78901d3fb33738768511a30617afa01d - // peerids[2], queued, dbc1b4c900ffe48d575b5da5c638040125f65db0fe3e24494b76ea986457d986 - - require.Equal(t, peerids[1], pl.popClosestQueued()) - require.Equal(t, peerids[1], pl.closest.id) - require.Equal(t, peerids[0], pl.closestQueued.id) - require.Equal(t, 2, pl.queuedCount) - require.Equal(t, peerids[0], pl.popClosestQueued()) - require.Equal(t, peerids[1], pl.closest.id) - require.Equal(t, peerids[2], pl.closestQueued.id) - require.Equal(t, 1, pl.queuedCount) - require.Equal(t, peerids[2], pl.popClosestQueued()) - require.Equal(t, 0, pl.queuedCount) - require.Equal(t, nil, pl.popClosestQueued()) - require.Equal(t, 0, pl.queuedCount) - - pl = newPeerList(make([]byte, 32)) - - pl.addToPeerlist(peerids) - require.Equal(t, 3, pl.queuedCount) - - // mark second item (peerids[0]) as waiting - pl.updatePeerStatusInPeerlist(peerids[0], waiting) - require.Equal(t, peerids[1], pl.closest.id) - require.Equal(t, peerids[1], pl.closestQueued.id) - require.Equal(t, 2, pl.queuedCount) - - // pop closest queued (peerids[1]) - require.Equal(t, peerids[1], pl.popClosestQueued()) - require.Equal(t, peerids[1], pl.closest.id) - // peerids[2] is now closestQueued - require.Equal(t, peerids[2], pl.closestQueued.id) - require.Equal(t, 1, pl.queuedCount) - - pl.updatePeerStatusInPeerlist(peerids[2], unreachable) - require.Equal(t, peerids[1], pl.closest.id) - require.Equal(t, 0, pl.queuedCount) + // ids[0]: 0000... + // ids[1]: 1000... + // ids[2]: 2000... + // etc. + + // add peers + pl.addToPeerlist(ids) + require.Equal(t, nPeers, pl.queuedCount) + + var popCounter int + // change status of the closest peer + require.Equal(t, ids[popCounter], pl.popClosestQueued()) + popCounter++ + require.Equal(t, ids[popCounter], pl.popClosestQueued()) + popCounter++ + + // change ids[1] from waiting to unreachable + require.Equal(t, waiting, pl.closest.next.status) + pl.unreachablePeer(ids[1]) + require.Equal(t, unreachable, pl.closest.next.status) + + // ids[1] is added again, with the same address, it should be ignored + pl.addToPeerlist([]address.NodeID{ids[1]}) + require.Equal(t, unreachable, pl.closest.next.status) + + // ids[1] added again, with a different address, its status should be queued + addrs[1].AddAddr("0") + ep.MaybeAddToPeerstore(ctx, addrs[1], 1) + pl.addToPeerlist([]address.NodeID{ids[1]}) + require.Equal(t, queued, pl.closest.next.status) + + // change ids[1] to waiting again + require.Equal(t, ids[1], pl.popClosestQueued()) + + // mark ids[1] as unreachable again + pl.unreachablePeer(ids[1]) + require.Equal(t, unreachable, pl.closest.next.status) + + // ids[1] added again with same address as before, should remain unreachable + pl.addToPeerlist([]address.NodeID{ids[1]}) + require.Equal(t, unreachable, pl.closest.next.status) + + // add new addr to ids[1] + addrs[1].AddAddr("1") + ep.MaybeAddToPeerstore(ctx, addrs[1], 1) + // add ids[1] again to peerlist, this time it should be queued + pl.addToPeerlist([]address.NodeID{ids[1]}) + require.Equal(t, queued, pl.closest.next.status) + require.Equal(t, ids[1], pl.popClosestQueued()) + + // ids[0] is still waiting + addrs[0].AddAddr("0") + ep.MaybeAddToPeerstore(ctx, addrs[0], 1) + // add ids[0] new address to peerlist + pl.addToPeerlist([]address.NodeID{ids[0]}) + require.Equal(t, waiting, pl.closest.status) + require.True(t, pl.closest.tryAgainOnFailure) + + // ids[0] is now unreachable (on the tried address) + pl.unreachablePeer(ids[0]) + require.Equal(t, queued, pl.closest.status) + + // set ids[0] as waiting again + require.Equal(t, ids[0], pl.popClosestQueued()) + // set ids[2] as waiting + require.Equal(t, ids[popCounter], pl.popClosestQueued()) + popCounter++ // popCounter = 3 + require.Equal(t, waiting, pl.closest.next.next.status) + require.Equal(t, ids[popCounter], pl.popClosestQueued()) + popCounter++ // popCounter = 4 + + parsed, err := peer.Decode("1D3oooUnknownPeer") + // key: 32eeb6aa672800824eb16d8ba5814cd9acc20203f656e78431154a17ce17e5ad + require.NoError(t, err) + pid := peerid.NewPeerID(parsed) + + // add unknown peer, this peer doesn't have any address (and the peerlist + // has an endpoint configured). Hence, it can never be queried, when + // "popped" it should be set as unreachable + pl.addToPeerlist([]address.NodeID{pid}) + require.Equal(t, nPeers-popCounter+1, pl.queuedCount) // 4 peers have been poped so far + require.Equal(t, pid, pl.closestQueued.id) + + // popClosestQueued should set pid as unreachable and return ids[4] + require.Equal(t, ids[popCounter], pl.popClosestQueued()) + popCounter++ // popCounter = 5 + // element between ids[3] and ids[4], that is pid + require.Equal(t, unreachable, pl.closest.next.next.next.next.status) + require.Equal(t, waiting, pl.closest.next.next.next.next.next.status) // ids[4] require.Nil(t, pl.closestQueued) - require.Equal(t, 0, pl.queuedCount) - - pl.updatePeerStatusInPeerlist(peerids[1], queued) - require.Equal(t, peerids[1], pl.closestQueued.id) - require.Equal(t, 1, pl.queuedCount) - pl.updatePeerStatusInPeerlist(peerids[2], queued) - require.Equal(t, peerids[1], pl.closestQueued.id) - require.Equal(t, 2, pl.queuedCount) + + parsed, err = peer.Decode("1DoooUnknownPeer2") + // key: c28defd90aea579236cc5553fcedf59c7fa8a1daa2e7b350c28fbabf94867ddc + require.NoError(t, err) + pid2 := peerid.NewPeerID(parsed) + + // add unknown peer 2, same as before, but there is no successor to pid2 + pl.addToPeerlist([]address.NodeID{pid2}) + require.Equal(t, nPeers-popCounter+1, pl.queuedCount) + require.Equal(t, pid2, pl.closestQueued.id) + // 5 peers have been poped so far, pid is not counted because unreachable + require.Nil(t, pl.popClosestQueued()) + // pid is set as unreachable, even though it was not popped + require.Equal(t, unreachable, pl.closest.next.next.next.next.next.next.status) } diff --git a/query/simplequery/query.go b/query/simplequery/query.go index 7699a55..4a7a74e 100644 --- a/query/simplequery/query.go +++ b/query/simplequery/query.go @@ -78,7 +78,7 @@ func NewSimpleQuery(ctx context.Context, req message.MinKadRequestMessage, return nil, err } - pl := newPeerList(req.Target()) + pl := newPeerList(req.Target(), cfg.Endpoint) pl.addToPeerlist(closestPeers) q := &SimpleQuery{ @@ -163,11 +163,7 @@ func (q *SimpleQuery) newRequest(ctx context.Context) { id := q.peerlist.popClosestQueued() if id == nil { - // TODO: should never happen - q.done = true - span.AddEvent("all peers queried") q.inflightRequests-- - q.notifyFailureFn(ctx) return } span.AddEvent("peer selected: " + id.String()) @@ -220,11 +216,16 @@ func (q *SimpleQuery) handleResponse(ctx context.Context, id address.NodeID, res q.rt.AddPeer(ctx, id) } - q.peerlist.updatePeerStatusInPeerlist(id, queried) + q.peerlist.queriedPeer(id) for i, id := range closerPeers { - c, err := id.Key().Compare(q.rt.Self()) - if err == nil && c == 0 { + c, err := id.NodeID().Key().Compare(q.rt.Self()) + if err != nil { + // wrong format of kad key, skip + span.AddEvent("remote peer provided wrong format of kad key") + continue + } + if c == 0 { // don't add self to queries or routing table span.AddEvent("remote peer provided self as closer peer") closerPeers = append(closerPeers[:i], closerPeers[i+1:]...) @@ -234,7 +235,7 @@ func (q *SimpleQuery) handleResponse(ctx context.Context, id address.NodeID, res q.msgEndpoint.MaybeAddToPeerstore(ctx, id, q.peerstoreTTL) } - stop, usefulNodeID := q.handleResultFn(ctx, id, resp) + stop, usefulNodeIDs := q.handleResultFn(ctx, id, resp) if stop { // query is done, don't send any more requests span.AddEvent("query over") @@ -242,7 +243,19 @@ func (q *SimpleQuery) handleResponse(ctx context.Context, id address.NodeID, res return } - q.peerlist.addToPeerlist(usefulNodeID) + // remove q.self from usefulNodeIDs + writeIndex := 0 + // Iterate over the slice and copy elements that do not match the value + for _, id := range usefulNodeIDs { + if c, err := q.rt.Self().Compare(id.Key()); err == nil && c != 0 { + // id is valid and isn't self + usefulNodeIDs[writeIndex] = id + writeIndex++ + } + } + usefulNodeIDs = usefulNodeIDs[:writeIndex] + + q.peerlist.addToPeerlist(usefulNodeIDs) q.enqueueNewRequests(ctx) } @@ -265,7 +278,7 @@ func (q *SimpleQuery) requestError(ctx context.Context, id address.NodeID, err e return } - q.peerlist.updatePeerStatusInPeerlist(id, unreachable) + q.peerlist.unreachablePeer(id) q.enqueueNewRequests(ctx) } diff --git a/query/simplequery/query_test.go b/query/simplequery/query_test.go index 9de4c65..4a22c92 100644 --- a/query/simplequery/query_test.go +++ b/query/simplequery/query_test.go @@ -183,7 +183,7 @@ func TestElementaryQuery(t *testing.T) { router := fe.NewFakeRouter() nPeers := 32 - ids := make([]address.NodeID, nPeers) + ids := make([]address.NodeAddr, nPeers) scheds := make([]scheduler.AwareScheduler, nPeers) fendpoints := make([]endpoint.SimEndpoint, nPeers) rts := make([]routingtable.RoutingTable, nPeers) @@ -191,8 +191,8 @@ func TestElementaryQuery(t *testing.T) { for i := 0; i < nPeers; i++ { scheds[i] = ss.NewSimpleScheduler(clk) ids[i] = kadid.NewKadID([]byte{byte(i * 8)}) - fendpoints[i] = fe.NewFakeEndpoint(ids[i], scheds[i], router) - rts[i] = simplert.NewSimpleRT(ids[i].Key(), bucketSize) + fendpoints[i] = fe.NewFakeEndpoint(ids[i].NodeID(), scheds[i], router) + rts[i] = simplert.NewSimpleRT(ids[i].NodeID().Key(), bucketSize) servers[i] = basicserver.NewBasicServer(rts[i], fendpoints[i], basicserver.WithNumberUsefulCloserPeers(bucketSize)) fendpoints[i].AddRequestHandler(protoID, &sm.SimMessage{}, servers[i].HandleRequest) @@ -211,14 +211,14 @@ func TestElementaryQuery(t *testing.T) { require.NoError(t, err) // we don't require the the peer is added to the routing table, // because the bucket might be full already and it is fine - _, err = rts[i].AddPeer(ctx, ids[j]) + _, err = rts[i].AddPeer(ctx, ids[j].NodeID()) require.NoError(t, err) } } // smallest peer is looking for biggest peer (which is the most far away // in hop numbers, given the routing table configuration) - req := sm.NewSimRequest(ids[len(ids)-1].Key()) + req := sm.NewSimRequest(ids[len(ids)-1].NodeID().Key()) // generic query options to be used by all peers defaultQueryOpts := []Option{ @@ -245,7 +245,11 @@ func TestElementaryQuery(t *testing.T) { handleResultsFnInfinity := func(ctx context.Context, id address.NodeID, resp message.MinKadResponseMessage) (bool, []address.NodeID) { // TODO: test that the responses are the expected ones - return false, resp.CloserNodes() + ids := make([]address.NodeID, len(resp.CloserNodes())) + for i, n := range resp.CloserNodes() { + ids[i] = n.NodeID() + } + return false, ids } // the request will eventually fail because handleResultsFnInfinity always diff --git a/server/basicserver/basicserver.go b/server/basicserver/basicserver.go index da9d153..9dd2efc 100644 --- a/server/basicserver/basicserver.go +++ b/server/basicserver/basicserver.go @@ -87,13 +87,18 @@ func (s *BasicServer) HandleFindNodeRequest(ctx context.Context, return nil, ErrUnknownMessageFormat } - s.endpoint.MaybeAddToPeerstore(ctx, rpeer, s.peerstoreTTL) - _, span := util.StartSpan(ctx, "SimServer.HandleFindNodeRequest", trace.WithAttributes( attribute.Stringer("Requester", rpeer), attribute.Stringer("Target", target))) defer span.End() + rPeerAddr, err := s.endpoint.NetworkAddress(rpeer) + if err != nil { + span.RecordError(err) + return nil, err + } + s.endpoint.MaybeAddToPeerstore(ctx, rPeerAddr, s.peerstoreTTL) + peers, err := s.rt.NearestPeers(ctx, target, s.numberOfCloserPeersToSend) if err != nil { span.RecordError(err) @@ -108,7 +113,15 @@ func (s *BasicServer) HandleFindNodeRequest(ctx context.Context, var resp message.MinKadMessage switch msg.(type) { case *simmessage.SimMessage: - resp = simmessage.NewSimResponse(peers) + peerAddrs := make([]address.NodeAddr, len(peers)) + for i, p := range peers { + peerAddrs[i], err = s.endpoint.NetworkAddress(p) + if err != nil { + span.RecordError(err) + continue + } + } + resp = simmessage.NewSimResponse(peerAddrs) case *ipfsv1.Message: nEndpoint, ok := s.endpoint.(endpoint.NetworkedEndpoint) if !ok { diff --git a/server/basicserver/basicserver_test.go b/server/basicserver/basicserver_test.go index 60f7d56..9216538 100644 --- a/server/basicserver/basicserver_test.go +++ b/server/basicserver/basicserver_test.go @@ -65,6 +65,7 @@ func TestSimMessageHandling(t *testing.T) { WithNumberUsefulCloserPeers(numberOfCloserPeersToSend)) requester := kadid.KadID{KadKey: []byte{0b00000001}} // 0000 0001 + fakeEndpoint.MaybeAddToPeerstore(ctx, requester, peerstoreTTL) req0 := simmessage.NewSimRequest([]byte{0b00000000}) msg, err := s0.HandleRequest(ctx, requester, req0) @@ -222,6 +223,10 @@ func TestIPFSv1Handling(t *testing.T) { requesterPid, err := peer.Decode("1WoooREQUESTER") require.NoError(t, err) requester := peerid.NewPeerID(requesterPid) + fakeEndpoint.MaybeAddToPeerstore(ctx, addrinfo.NewAddrInfo(peer.AddrInfo{ + ID: requesterPid, + Addrs: nil, + }), peerstoreTTL) req0 := ipfsv1.FindPeerRequest(self) msg, err := s0.HandleRequest(ctx, requester, req0) @@ -246,7 +251,7 @@ type invalidEndpoint struct{} var _ endpoint.Endpoint = (*invalidEndpoint)(nil) -func (e *invalidEndpoint) MaybeAddToPeerstore(context.Context, address.NodeID, +func (e *invalidEndpoint) MaybeAddToPeerstore(context.Context, address.NodeAddr, time.Duration) error { return nil } @@ -261,7 +266,7 @@ func (e *invalidEndpoint) KadKey() key.KadKey { return make([]byte, 0) } -func (e *invalidEndpoint) NetworkAddress(address.NodeID) (address.NodeID, error) { +func (e *invalidEndpoint) NetworkAddress(address.NodeID) (address.NodeAddr, error) { return nil, nil }