From 88262ca97fdb01e3d6285c28ae665c2bba39d4a7 Mon Sep 17 00:00:00 2001 From: guillaumemichel Date: Wed, 5 Jul 2023 17:18:09 +0200 Subject: [PATCH] added more tests to simplequery --- events/scheduler/scheduler.go | 5 +- .../simplescheduler/simplescheduler.go | 4 +- .../simplescheduler/simplescheduler_test.go | 2 +- examples/connect/findpeer.go | 7 +- network/endpoint/fakeendpoint/fakeendpoint.go | 7 +- .../endpoint/libp2pendpoint/libp2pendpoint.go | 23 +- .../libp2pendpoint/libp2pendpoint_test.go | 37 +- query/simplequery/query.go | 144 ++++---- query/simplequery/query_test.go | 321 ++++++++++++++++-- 9 files changed, 436 insertions(+), 114 deletions(-) diff --git a/events/scheduler/scheduler.go b/events/scheduler/scheduler.go index a57ae91..d80e7bf 100644 --- a/events/scheduler/scheduler.go +++ b/events/scheduler/scheduler.go @@ -4,6 +4,7 @@ import ( "context" "time" + "github.com/benbjohnson/clock" "github.com/plprobelab/go-kademlia/events/action" "github.com/plprobelab/go-kademlia/events/planner" ) @@ -12,7 +13,7 @@ import ( // or at a specific time type Scheduler interface { // Now returns the time of the scheduler's clock - Now() time.Time + Clock() clock.Clock // EnqueueAction enqueues an action to run as soon as possible EnqueueAction(context.Context, action.Action) @@ -33,7 +34,7 @@ func ScheduleActionIn(ctx context.Context, s Scheduler, d time.Duration, a actio s.EnqueueAction(ctx, a) return nil } else { - return s.ScheduleAction(ctx, s.Now().Add(d), a) + return s.ScheduleAction(ctx, s.Clock().Now().Add(d), a) } } diff --git a/events/scheduler/simplescheduler/simplescheduler.go b/events/scheduler/simplescheduler/simplescheduler.go index 66502f1..c50e2eb 100644 --- a/events/scheduler/simplescheduler/simplescheduler.go +++ b/events/scheduler/simplescheduler/simplescheduler.go @@ -37,8 +37,8 @@ func NewSimpleScheduler(clk clock.Clock) *SimpleScheduler { } // Now returns the scheduler's current time. -func (s *SimpleScheduler) Now() time.Time { - return s.clk.Now() +func (s *SimpleScheduler) Clock() clock.Clock { + return s.clk } // EnqueueAction enqueues an action to be run as soon as possible. diff --git a/events/scheduler/simplescheduler/simplescheduler_test.go b/events/scheduler/simplescheduler/simplescheduler_test.go index f1a9cf8..e185a65 100644 --- a/events/scheduler/simplescheduler/simplescheduler_test.go +++ b/events/scheduler/simplescheduler/simplescheduler_test.go @@ -18,7 +18,7 @@ func TestSimpleScheduler(t *testing.T) { sched := NewSimpleScheduler(clk) - require.Equal(t, clk.Now(), sched.Now()) + require.Equal(t, clk.Now(), sched.Clock().Now()) nActions := 10 actions := make([]*ta.FuncAction, nActions) diff --git a/examples/connect/findpeer.go b/examples/connect/findpeer.go index 8604462..5b62b91 100644 --- a/examples/connect/findpeer.go +++ b/examples/connect/findpeer.go @@ -127,13 +127,16 @@ func FindPeer(ctx context.Context) { queryOpts := []simplequery.Option{ simplequery.WithProtocolID(protocolID), simplequery.WithConcurrency(1), - simplequery.WithRequestTimeout(5 * time.Second), + simplequery.WithRequestTimeout(2 * time.Second), simplequery.WithHandleResultsFunc(handleResultsFn), simplequery.WithRoutingTable(rt), simplequery.WithEndpoint(msgEndpoint), simplequery.WithScheduler(sched), } - simplequery.NewSimpleQuery(ctx, req, queryOpts...) + _, err = simplequery.NewSimpleQuery(ctx, req, queryOpts...) + if err != nil { + panic(err) + } span.AddEvent("start request execution") diff --git a/network/endpoint/fakeendpoint/fakeendpoint.go b/network/endpoint/fakeendpoint/fakeendpoint.go index b93499e..76677fc 100644 --- a/network/endpoint/fakeendpoint/fakeendpoint.go +++ b/network/endpoint/fakeendpoint/fakeendpoint.go @@ -193,7 +193,12 @@ func (e *FakeEndpoint) HandleMessage(ctx context.Context, id address.NodeID, resp, ok := msg.(message.MinKadResponseMessage) var err error - if !ok { + if ok { + for _, p := range resp.CloserNodes() { + e.peerstore[p.NodeID().String()] = p + e.connStatus[p.NodeID().String()] = network.CanConnect + } + } else { err = ErrInvalidResponseType } if followup != nil { diff --git a/network/endpoint/libp2pendpoint/libp2pendpoint.go b/network/endpoint/libp2pendpoint/libp2pendpoint.go index 0afe2e5..b48dd94 100644 --- a/network/endpoint/libp2pendpoint/libp2pendpoint.go +++ b/network/endpoint/libp2pendpoint/libp2pendpoint.go @@ -188,7 +188,12 @@ func (e *Libp2pEndpoint) SendRequestHandleResponse(ctx context.Context, attribute.String("PeerID", n.String()), )) defer span.End() - ctx, cancel := context.WithCancel(ctx) + var cancel context.CancelFunc + if timeout > 0 { + ctx, cancel = e.sched.Clock().WithTimeout(ctx, timeout) + } else { + ctx, cancel = context.WithCancel(ctx) + } defer cancel() var err error @@ -197,7 +202,9 @@ func (e *Libp2pEndpoint) SendRequestHandleResponse(ctx context.Context, s, err = e.host.NewStream(ctx, p.ID, protocol.ID(protoID)) if err != nil { span.RecordError(err, trace.WithAttributes(attribute.String("where", "stream creation"))) - responseHandlerFn(ctx, nil, err) + e.sched.EnqueueAction(ctx, ba.BasicAction(func(ctx context.Context) { + responseHandlerFn(ctx, nil, err) + })) return } defer s.Close() @@ -205,7 +212,9 @@ func (e *Libp2pEndpoint) SendRequestHandleResponse(ctx context.Context, err = WriteMsg(s, protoReq) if err != nil { span.RecordError(err, trace.WithAttributes(attribute.String("where", "write message"))) - responseHandlerFn(ctx, nil, err) + e.sched.EnqueueAction(ctx, ba.BasicAction(func(ctx context.Context) { + responseHandlerFn(ctx, nil, err) + })) return } @@ -231,12 +240,16 @@ func (e *Libp2pEndpoint) SendRequestHandleResponse(ctx context.Context, } if err != nil { span.RecordError(err, trace.WithAttributes(attribute.String("where", "read message"))) - responseHandlerFn(ctx, protoResp, err) + e.sched.EnqueueAction(ctx, ba.BasicAction(func(ctx context.Context) { + responseHandlerFn(ctx, protoResp, err) + })) return } span.AddEvent("response received") - responseHandlerFn(ctx, protoResp, err) + e.sched.EnqueueAction(ctx, ba.BasicAction(func(ctx context.Context) { + responseHandlerFn(ctx, protoResp, err) + })) }() return nil } diff --git a/network/endpoint/libp2pendpoint/libp2pendpoint_test.go b/network/endpoint/libp2pendpoint/libp2pendpoint_test.go index ce99571..f02627a 100644 --- a/network/endpoint/libp2pendpoint/libp2pendpoint_test.go +++ b/network/endpoint/libp2pendpoint/libp2pendpoint_test.go @@ -200,6 +200,15 @@ func TestLibp2pEndpoint(t *testing.T) { time.Sleep(time.Millisecond) } require.False(t, scheds[1].RunOne(ctx)) // only 1 action should run on server + wg.Add(1) + go func() { + // timeout is queued in the scheduler 0 + for !scheds[0].RunOne(ctx) { + time.Sleep(time.Millisecond) + } + require.False(t, scheds[0].RunOne(ctx)) + wg.Done() + }() wg.Wait() // invalid response format (not protobuf) @@ -238,6 +247,15 @@ func TestLibp2pEndpoint(t *testing.T) { err = endpoints[0].SendRequestHandleResponse(ctx, protoID, ids[2], req, resp, time.Second, invalidRespHandlerBuilder(swarm.ErrNoAddresses)) require.NoError(t, err) + wg.Add(1) + go func() { + // timeout is queued in the scheduler 0 + for !scheds[0].RunOne(ctx) { + time.Sleep(time.Millisecond) + } + require.False(t, scheds[0].RunOne(ctx)) + wg.Done() + }() wg.Wait() // test timeout @@ -277,10 +295,10 @@ func TestLibp2pEndpoint(t *testing.T) { err = endpoints[0].SendRequestHandleResponse(ctx, protoID, ids[1], req, resp, 0, responseHandler) require.NoError(t, err) - wg.Add(1) for !scheds[1].RunOne(ctx) { time.Sleep(time.Millisecond) } + wg.Wait() // wg count shoud be 0 require.False(t, scheds[1].RunOne(ctx)) // no response to be handled by 0 require.False(t, scheds[0].RunOne(ctx)) @@ -292,14 +310,27 @@ func TestLibp2pEndpoint(t *testing.T) { return &simmessage.SimMessage{}, nil }) require.NoError(t, err) - err = endpoints[0].SendRequestHandleResponse(ctx, protoID, ids[1], req, + require.False(t, scheds[0].RunOne(ctx)) + wg.Add(1) + noResponseCtx, cancel := context.WithCancel(ctx) + err = endpoints[0].SendRequestHandleResponse(noResponseCtx, protoID, ids[1], req, resp, 0, responseHandler) require.NoError(t, err) - wg.Add(1) for !scheds[1].RunOne(ctx) { time.Sleep(time.Millisecond) } require.False(t, scheds[1].RunOne(ctx)) + cancel() + wg.Add(1) + go func() { + // timeout is queued in the scheduler 0 + for !scheds[0].RunOne(ctx) { + time.Sleep(time.Millisecond) + } + require.False(t, scheds[0].RunOne(ctx)) + wg.Done() + }() + wg.Wait() // no response to be handled by 0 require.False(t, scheds[0].RunOne(ctx)) diff --git a/query/simplequery/query.go b/query/simplequery/query.go index 4a7a74e..58f887e 100644 --- a/query/simplequery/query.go +++ b/query/simplequery/query.go @@ -59,7 +59,6 @@ type SimpleQuery struct { // key, and the peers that have been queried so far. func NewSimpleQuery(ctx context.Context, req message.MinKadRequestMessage, opts ...Option) (*SimpleQuery, error) { - ctx, span := util.StartSpan(ctx, "SimpleQuery.NewSimpleQuery", trace.WithAttributes(attribute.String("Target", req.Target().Hex()))) defer span.End() @@ -71,14 +70,22 @@ func NewSimpleQuery(ctx context.Context, req message.MinKadRequestMessage, return nil, err } + // get the closest peers to the target from the routing table closestPeers, err := cfg.RoutingTable.NearestPeers(ctx, req.Target(), cfg.NumberUsefulCloserPeers) if err != nil { span.RecordError(err) return nil, err } + if len(closestPeers) == 0 { + err := errors.New("no peers in routing table") + span.RecordError(err) + return nil, err + } + // create new empty peerlist pl := newPeerList(req.Target(), cfg.Endpoint) + // add the closest peers to peerlist pl.addToPeerlist(closestPeers) q := &SimpleQuery{ @@ -96,11 +103,28 @@ func NewSimpleQuery(ctx context.Context, req message.MinKadRequestMessage, peerlist: pl, } + // add concurrency number of requests to eventqueue q.enqueueNewRequests(ctx) return q, nil } +// checkIfDone cheks if the query is done, and return an error if it is done. +func (q *SimpleQuery) checkIfDone() error { + if q.done { + // query is done, don't send any more requests + return errors.New("query done") + } + if q.ctx.Err() != nil { + q.done = true + return q.ctx.Err() + } + return nil +} + +// enqueueNewRequests adds the maximal number of requests to the scheduler's +// event queue. The maximal number of conccurent requests is limited by the +// concurrency factor and by the number of queued peers in the peerlist. func (q *SimpleQuery) enqueueNewRequests(ctx context.Context) { ctx, span := util.StartSpan(ctx, "SimpleQuery.enqueueNewRequests") defer span.End() @@ -117,6 +141,7 @@ func (q *SimpleQuery) enqueueNewRequests(ctx context.Context) { q.done = true span.AddEvent("all peers queried") q.notifyFailureFn(ctx) + return } span.AddEvent("newRequestsToSend: " + strconv.Itoa(newRequestsToSend) + @@ -127,114 +152,95 @@ func (q *SimpleQuery) enqueueNewRequests(ctx context.Context) { q.sched.EnqueueAction(ctx, ba.BasicAction(q.newRequest)) } + // increase number of inflight requests. Note that it counts both queued + // requests and requests in flight q.inflightRequests += newRequestsToSend span.AddEvent("Enqueued " + strconv.Itoa(newRequestsToSend) + " SimpleQuery.newRequest") } -func (q *SimpleQuery) checkIfDone() error { - if q.done { - // query is done, don't send any more requests - return errors.New("query done") - } - - select { - case <-q.ctx.Done(): - // query is cancelled, mark it as done - q.done = true - return errors.New("query cancelled") - default: - } - return nil -} - +// newRequest sends a request to the closest peer that hasn't been queried yet. func (q *SimpleQuery) newRequest(ctx context.Context) { - ctx, cancel := context.WithTimeout(ctx, q.timeout) - defer cancel() - ctx, span := util.StartSpan(ctx, "SimpleQuery.newRequest") defer span.End() if err := q.checkIfDone(); err != nil { span.RecordError(err) + // decrease counter, because there is one less request queued q.inflightRequests-- return } + // get the closest peer from target that hasn't been queried yet id := q.peerlist.popClosestQueued() if id == nil { + // the peer list is empty, we don't have any more peers to query. This + // shouldn't happen because enqueueNewRequests doesn't enqueue more + // requests than there are queued peers in the peerlist q.inflightRequests-- return } - span.AddEvent("peer selected: " + id.String()) - - // function to be executed when a response is received - handleResp := func(ctx context.Context, resp message.MinKadResponseMessage, err error) { - ctx, span := util.StartSpan(ctx, "SimpleQuery.handleResp") - defer span.End() + span.AddEvent("Peer selected: " + id.String()) + // this function will be queued when a response is received, an error + // occures or the request times out (with appropriate parameters) + handleResp := func(ctx context.Context, resp message.MinKadResponseMessage, + err error) { if err != nil { - span.AddEvent("got error") - q.sched.EnqueueAction(ctx, ba.BasicAction(func(ctx context.Context) { - q.requestError(ctx, id, err) - })) + q.requestError(ctx, id, err) } else { - span.AddEvent("got response") - q.sched.EnqueueAction(ctx, ba.BasicAction(func(ctx context.Context) { - q.handleResponse(ctx, id, resp) - })) - span.AddEvent("Enqueued SimpleQuery.handleResponse") + q.handleResponse(ctx, id, resp) } } // send request - q.msgEndpoint.SendRequestHandleResponse(ctx, q.protoID, id, q.req, q.req.EmptyResponse(), q.timeout, handleResp) + err := q.msgEndpoint.SendRequestHandleResponse(ctx, q.protoID, id, q.req, + q.req.EmptyResponse(), q.timeout, handleResp) + if err != nil { + // there was an error before the request was sent, handle it + span.RecordError(err) + q.requestError(ctx, id, err) + } } -func (q *SimpleQuery) handleResponse(ctx context.Context, id address.NodeID, resp message.MinKadResponseMessage) { +// handleResponse handles a response to a past query request +func (q *SimpleQuery) handleResponse(ctx context.Context, id address.NodeID, + resp message.MinKadResponseMessage) { ctx, span := util.StartSpan(ctx, "SimpleQuery.handleResponse", - trace.WithAttributes(attribute.String("Target", q.req.Target().Hex()), attribute.String("From Peer", id.String()))) + trace.WithAttributes(attribute.String("Target", q.req.Target().Hex()), + attribute.String("From Peer", id.String()))) defer span.End() if err := q.checkIfDone(); err != nil { + // request completed or was cancelled was the message was in flight, + // don't handle the message span.RecordError(err) return } if resp == nil { - span.AddEvent("response is nil") - q.requestError(ctx, id, errors.New("nil response")) + err := errors.New("nil response") + span.RecordError(err) + q.requestError(ctx, id, err) return } - q.inflightRequests-- - closerPeers := resp.CloserNodes() if len(closerPeers) > 0 { // consider that remote peer is behaving correctly if it returns - // at least 1 peer + // at least 1 peer. We add it to our routing table only if it behaves + // as expected (we don't want to add unresponsive nodes to the rt) q.rt.AddPeer(ctx, id) } - q.peerlist.queriedPeer(id) - - for i, id := range closerPeers { - c, err := id.NodeID().Key().Compare(q.rt.Self()) - if err != nil { - // wrong format of kad key, skip - span.AddEvent("remote peer provided wrong format of kad key") - continue - } - if c == 0 { - // don't add self to queries or routing table - span.AddEvent("remote peer provided self as closer peer") - closerPeers = append(closerPeers[:i], closerPeers[i+1:]...) - continue - } + q.inflightRequests-- - q.msgEndpoint.MaybeAddToPeerstore(ctx, id, q.peerstoreTTL) - } + // set peer as queried in the peerlist + q.peerlist.queriedPeer(id) + // handle the response using the function provided by the caller, this + // function decides whether the query should terminate and returns the list + // of useful nodes that should be queried next stop, usefulNodeIDs := q.handleResultFn(ctx, id, resp) if stop { // query is done, don't send any more requests @@ -243,33 +249,43 @@ func (q *SimpleQuery) handleResponse(ctx context.Context, id address.NodeID, res return } - // remove q.self from usefulNodeIDs + // remove all occurneces of q.self from usefulNodeIDs writeIndex := 0 - // Iterate over the slice and copy elements that do not match the value for _, id := range usefulNodeIDs { if c, err := q.rt.Self().Compare(id.Key()); err == nil && c != 0 { // id is valid and isn't self usefulNodeIDs[writeIndex] = id writeIndex++ + } else if err != nil { + span.AddEvent("wrong KadKey lenght") + } else { + span.AddEvent("never add self to query peerlist") } } usefulNodeIDs = usefulNodeIDs[:writeIndex] + // add usefulNodeIDs to peerlist q.peerlist.addToPeerlist(usefulNodeIDs) + // enqueue new query requests to the event loop (usually 1) q.enqueueNewRequests(ctx) } +// requestError handle an error that occured while sending a request or +// receiving a response. func (q *SimpleQuery) requestError(ctx context.Context, id address.NodeID, err error) { ctx, span := util.StartSpan(ctx, "SimpleQuery.requestError", trace.WithAttributes(attribute.String("PeerID", id.String()), attribute.String("Error", err.Error()))) defer span.End() + // the request isn't in flight anymore since it failed q.inflightRequests-- if q.ctx.Err() == nil { - // remove peer from routing table unless context was cancelled + // remove peer from routing table unless context was cancelled. We don't + // want to keep peers that timed out or peers that returned nil/invalid + // responses. q.rt.RemoveKey(ctx, id.Key()) } @@ -278,7 +294,9 @@ func (q *SimpleQuery) requestError(ctx context.Context, id address.NodeID, err e return } + // set peer as unreachable in the peerlist q.peerlist.unreachablePeer(id) + // enqueue new query requests to the event loop (usually 1) q.enqueueNewRequests(ctx) } diff --git a/query/simplequery/query_test.go b/query/simplequery/query_test.go index 4a22c92..a20cd73 100644 --- a/query/simplequery/query_test.go +++ b/query/simplequery/query_test.go @@ -2,6 +2,8 @@ package simplequery import ( "context" + "errors" + "fmt" "testing" "time" @@ -10,6 +12,7 @@ import ( ss "github.com/plprobelab/go-kademlia/events/scheduler/simplescheduler" "github.com/plprobelab/go-kademlia/events/simulator" "github.com/plprobelab/go-kademlia/events/simulator/litesimulator" + "github.com/plprobelab/go-kademlia/key" "github.com/plprobelab/go-kademlia/network/address" "github.com/plprobelab/go-kademlia/network/address/kadid" si "github.com/plprobelab/go-kademlia/network/address/stringid" @@ -170,6 +173,100 @@ func TestInvalidQueryOptions(t *testing.T) { _, err = NewSimpleQuery(ctx, req, invalidOpts...) require.Error(t, err) + // fails because req.Target() isn't the same size as node.Key() + invalidOpts = []Option{ + WithRoutingTable(rt), + WithEndpoint(fendpoint), + WithScheduler(sched), + WithConcurrency(1), + } + _, err = NewSimpleQuery(ctx, req, invalidOpts...) + require.Error(t, err) +} + +func simulationSetup(t *testing.T, ctx context.Context, n, bucketSize int, + clk clock.Clock, protoID address.ProtocolID, peerstoreTTL time.Duration, + defaultQueryOpts []Option) ( + []address.NodeAddr, []scheduler.AwareScheduler, []endpoint.SimEndpoint, + []routingtable.RoutingTable, []server.Server, [][]Option) { + + router := fe.NewFakeRouter() + + ids := make([]address.NodeAddr, n) + scheds := make([]scheduler.AwareScheduler, n) + fendpoints := make([]endpoint.SimEndpoint, n) + rts := make([]routingtable.RoutingTable, n) + servers := make([]server.Server, n) + + spacing := 256 / n + + for i := 0; i < n; i++ { + scheds[i] = ss.NewSimpleScheduler(clk) + ids[i] = kadid.NewKadID([]byte{byte(i * spacing)}) + fendpoints[i] = fe.NewFakeEndpoint(ids[i].NodeID(), scheds[i], router) + rts[i] = simplert.NewSimpleRT(ids[i].NodeID().Key(), bucketSize) + servers[i] = basicserver.NewBasicServer(rts[i], fendpoints[i], + basicserver.WithNumberUsefulCloserPeers(bucketSize)) + fendpoints[i].AddRequestHandler(protoID, &sm.SimMessage{}, servers[i].HandleRequest) + } + + // peer ids (KadIDs) are i*8 for i in [0, 32), the keyspace is 1 byte [0, 255] + // so peers cover the whole keyspace, and they have the format XXXX X000. + + // connect peers, and add them to the routing tables + // note: when the bucket is complete, it contains the peers with the + // smallest identifier (KadID). + for i := 0; i < n; i++ { + for j := i + 1; j < n; j++ { + // add peer to peerstore + err := fendpoints[i].MaybeAddToPeerstore(ctx, ids[j], peerstoreTTL) + require.NoError(t, err) + // we don't require the the peer is added to the routing table, + // because the bucket might be full already and it is fine + _, err = rts[i].AddPeer(ctx, ids[j].NodeID()) + require.NoError(t, err) + } + } + + // query options for each peer + queryOpts := make([][]Option, n) + for i := 0; i < n; i++ { + queryOpts[i] = append(defaultQueryOpts, + WithRoutingTable(rts[i]), + WithEndpoint(fendpoints[i]), + WithScheduler(scheds[i]), + ) + } + + return ids, scheds, fendpoints, rts, servers, queryOpts +} + +func getHandleResults(t *testing.T, req message.MinKadRequestMessage, + expectedPeers []key.KadKey, expectedResponses [][]key.KadKey) func( + ctx context.Context, id address.NodeID, resp message.MinKadResponseMessage) ( + bool, []address.NodeID) { + + var responseCount int + return func(ctx context.Context, id address.NodeID, + resp message.MinKadResponseMessage) (bool, []address.NodeID) { + // check that the request was sent to the correct peer + require.Equal(t, expectedPeers[responseCount], id.Key(), "responseCount: ", responseCount) + + ids := make([]address.NodeID, len(resp.CloserNodes())) + var found bool + for i, n := range resp.CloserNodes() { + ids[i] = n.NodeID() + if match, err := ids[i].Key().Equal(req.Target()); err == nil && match { + // the target was found, stop the query + found = true + } + // check that the response contains the expected peers + require.Contains(t, expectedResponses[responseCount], n.NodeID().Key()) + } + responseCount++ + fmt.Println(ids) + return found, ids + } } func TestElementaryQuery(t *testing.T) { @@ -178,19 +275,105 @@ func TestElementaryQuery(t *testing.T) { protoID := address.ProtocolID("/test/1.0.0") bucketSize := 4 + nPeers := 32 + peerstoreTTL := time.Minute + + // generic query options to be used by all peers + defaultQueryOpts := []Option{ + WithProtocolID(protoID), + WithConcurrency(1), + WithNumberUsefulCloserPeers(bucketSize), + WithRequestTimeout(time.Second), + WithPeerstoreTTL(peerstoreTTL), + } + + ids, scheds, _, rts, _, queryOpts := simulationSetup(t, ctx, nPeers, + bucketSize, clk, protoID, peerstoreTTL, defaultQueryOpts) + + // smallest peer is looking for biggest peer (which is the most far away + // in hop numbers, given the routing table configuration) + req := sm.NewSimRequest(ids[len(ids)-1].NodeID().Key()) + + // peers that are expected to be queried, in order + expectedPeers := []key.KadKey{} + // peer that are expected to be included in responses, in order + expectedResponses := [][]key.KadKey{} + + currID := 0 + // while currID != target.Key() + for c, _ := ids[currID].NodeID().Key().Equal(req.Target()); !c; { + // get closest peer to target from the sollicited peer + closest, err := rts[currID].NearestPeers(ctx, req.Target(), 1) + require.NoError(t, err) + require.Len(t, closest, 1, fmt.Sprint(ids[currID].NodeID().Key().Equal(req.Target()))) + expectedPeers = append(expectedPeers, closest[0].Key()) + + // the next current id is the closest peer to the target + currID = int(closest[0].Key()[0] / 8) + + // the peers included in the response are the closest to the target + // from the sollicited peer + responseClosest, err := rts[currID].NearestPeers(ctx, req.Target(), bucketSize) + require.NoError(t, err) + closestKey := make([]key.KadKey, len(responseClosest)) + for i, n := range responseClosest { + closestKey[i] = n.Key() + } + expectedResponses = append(expectedResponses, closestKey) + + // test if the current ID is the target + c, _ = ids[currID].NodeID().Key().Equal(req.Target()) + } + + // handleResults is called when a peer receives a response from a peer. If + // the response contains the target, it returns true, and the query stops. + // Otherwise, it returns false, and the query continues. This function also + // checks that the response come from the expected peer and contains the + // expected peers addresses. + handleResults := getHandleResults(t, req, expectedPeers, expectedResponses) + + // the request will not fail + notifyFailure := func(context.Context) { + require.Fail(t, "notify failure shouldn't be called") + } + + _, err := NewSimpleQuery(ctx, req, append(queryOpts[0], + WithHandleResultsFunc(handleResults), + WithNotifyFailureFunc(notifyFailure))...) + require.NoError(t, err) + + // create simulator + sim := litesimulator.NewLiteSimulator(clk) + simulator.AddPeers(sim, scheds...) + // run simulation + sim.Run(ctx) +} + +func TestFailedQuery(t *testing.T) { + // the key doesn't exist and cannot be found, test with no exit condition + // all peers of the peerlist should have been queried by the end +} + +func TestConcurrentQuery(t *testing.T) { + ctx := context.Background() + clk := clock.NewMock() + + protoID := address.ProtocolID("/test/1.0.0") + bucketSize := 4 + nPeers := 8 peerstoreTTL := time.Minute router := fe.NewFakeRouter() - nPeers := 32 ids := make([]address.NodeAddr, nPeers) scheds := make([]scheduler.AwareScheduler, nPeers) fendpoints := make([]endpoint.SimEndpoint, nPeers) rts := make([]routingtable.RoutingTable, nPeers) servers := make([]server.Server, nPeers) + for i := 0; i < nPeers; i++ { scheds[i] = ss.NewSimpleScheduler(clk) - ids[i] = kadid.NewKadID([]byte{byte(i * 8)}) + ids[i] = kadid.NewKadID([]byte{byte(i * 32)}) fendpoints[i] = fe.NewFakeEndpoint(ids[i].NodeID(), scheds[i], router) rts[i] = simplert.NewSimpleRT(ids[i].NodeID().Key(), bucketSize) servers[i] = basicserver.NewBasicServer(rts[i], fendpoints[i], @@ -198,32 +381,34 @@ func TestElementaryQuery(t *testing.T) { fendpoints[i].AddRequestHandler(protoID, &sm.SimMessage{}, servers[i].HandleRequest) } - // peer ids (KadIDs) are i*8 for i in [0, 32), the keyspace is 1 byte [0, 255] - // so peers cover the whole keyspace, and they have the format XXXX X000. + // 0 is looking for 7 + // 0 knows 1, 2, 3: it will query 3, 2 at first + // 3 knows 4, 0 + // 2 knows 6, 0 + // 4 knows 5, 3 + // 6 knows 7, 2, 5 + // 5 knows 4, 6 + // sequence of outgoing requests from 0 to find 7: 3, 2, 4, 6 - // connect peers, and add them to the routing tables - // note: when the bucket is complete, it contains the peers with the - // smallest identifier (KadID). - for i := 0; i < nPeers; i++ { - for j := i + 1; j < nPeers; j++ { + connections := [...][2]int{{0, 1}, {0, 2}, {0, 3}, {3, 4}, {2, 6}, + {4, 5}, {6, 7}, {6, 5}} + + for _, c := range connections { + for i := range c { // add peer to peerstore - err := fendpoints[i].MaybeAddToPeerstore(ctx, ids[j], peerstoreTTL) + err := fendpoints[c[i]].MaybeAddToPeerstore(ctx, ids[c[1-i]], peerstoreTTL) require.NoError(t, err) // we don't require the the peer is added to the routing table, // because the bucket might be full already and it is fine - _, err = rts[i].AddPeer(ctx, ids[j].NodeID()) + _, err = rts[c[i]].AddPeer(ctx, ids[c[1-i]].NodeID()) require.NoError(t, err) } } - // smallest peer is looking for biggest peer (which is the most far away - // in hop numbers, given the routing table configuration) - req := sm.NewSimRequest(ids[len(ids)-1].NodeID().Key()) - // generic query options to be used by all peers defaultQueryOpts := []Option{ WithProtocolID(protoID), - WithConcurrency(1), + WithConcurrency(2), WithNumberUsefulCloserPeers(bucketSize), WithRequestTimeout(time.Second), WithPeerstoreTTL(peerstoreTTL), @@ -239,29 +424,32 @@ func TestElementaryQuery(t *testing.T) { ) } - // handleResultsFnInfinity is called when a peer receives a response from a - // peer, it returns false to indicate that the query should not be stopped - // before all peers have been queried. - handleResultsFnInfinity := func(ctx context.Context, id address.NodeID, - resp message.MinKadResponseMessage) (bool, []address.NodeID) { - // TODO: test that the responses are the expected ones - ids := make([]address.NodeID, len(resp.CloserNodes())) - for i, n := range resp.CloserNodes() { - ids[i] = n.NodeID() - } - return false, ids + // smallest peer is looking for biggest peer (which is the most far away + // in hop numbers, given the routing table configuration) + req := sm.NewSimRequest(ids[len(ids)-1].NodeID().Key()) + fmt.Println("req.Target():", req.Target()) + + // peers that are expected to be queried, in order + expectedPeers := []key.KadKey{ids[3].NodeID().Key(), ids[2].NodeID().Key(), + ids[4].NodeID().Key(), ids[6].NodeID().Key()} + // peer that are expected to be included in responses, in order + expectedResponses := [][]key.KadKey{ + {ids[4].NodeID().Key(), ids[0].NodeID().Key()}, + {ids[6].NodeID().Key(), ids[0].NodeID().Key()}, + {ids[5].NodeID().Key(), ids[3].NodeID().Key()}, + {ids[7].NodeID().Key(), ids[2].NodeID().Key(), ids[5].NodeID().Key()}, } - // the request will eventually fail because handleResultsFnInfinity always - // return false, so the query will never stop. - var failedQuery bool - notifyFailureFnInfinity := func(context.Context) { - failedQuery = true + handleResults := getHandleResults(t, req, expectedPeers, expectedResponses) + + // the request will not fail + notifyFailure := func(context.Context) { + require.Fail(t, "notify failure shouldn't be called") } _, err := NewSimpleQuery(ctx, req, append(queryOpts[0], - WithHandleResultsFunc(handleResultsFnInfinity), - WithNotifyFailureFunc(notifyFailureFnInfinity))...) + WithHandleResultsFunc(handleResults), + WithNotifyFailureFunc(notifyFailure))...) require.NoError(t, err) // create simulator @@ -269,6 +457,69 @@ func TestElementaryQuery(t *testing.T) { simulator.AddPeers(sim, scheds...) // run simulation sim.Run(ctx) +} + +func TestUnresponsivePeer(t *testing.T) { + ctx := context.Background() + clk := clock.NewMock() + + protoID := address.ProtocolID("/test/1.0.0") + peerstoreTTL := time.Minute + bucketSize := 1 + + router := fe.NewFakeRouter() + id0 := kadid.NewKadID([]byte{0}) + id1 := kadid.NewKadID([]byte{1}) + sched0 := ss.NewSimpleScheduler(clk) + sched1 := ss.NewSimpleScheduler(clk) + fendpoint0 := fe.NewFakeEndpoint(id0.NodeID(), sched0, router) + fendpoint1 := fe.NewFakeEndpoint(id1.NodeID(), sched1, router) + rt0 := simplert.NewSimpleRT(id0.NodeID().Key(), bucketSize) + + serverRequestHandler := func(context.Context, address.NodeID, + message.MinKadMessage) (message.MinKadMessage, error) { + return nil, errors.New("") + } + fendpoint1.AddRequestHandler(protoID, &sm.SimMessage{}, serverRequestHandler) + + req := sm.NewSimRequest([]byte{0xff}) + + responseHandler := func(ctx context.Context, sender address.NodeID, + msg message.MinKadResponseMessage) (bool, []address.NodeID) { + require.Fail(t, "response handler shouldn't be called") + return false, nil + } + queryOpts := []Option{ + WithProtocolID(protoID), + WithConcurrency(2), + WithNumberUsefulCloserPeers(bucketSize), + WithRequestTimeout(time.Millisecond), + WithEndpoint(fendpoint0), + WithRoutingTable(rt0), + WithScheduler(sched0), + WithHandleResultsFunc(responseHandler), + } + + // query creation fails because the routing table of 0 is empty + _, err := NewSimpleQuery(ctx, req, queryOpts...) + require.Error(t, err) + + // connect 0 and 1 + err = fendpoint0.MaybeAddToPeerstore(ctx, id1, peerstoreTTL) + require.NoError(t, err) + success, err := rt0.AddPeer(ctx, id1.NodeID()) + require.NoError(t, err) + require.True(t, success) + + q, err := NewSimpleQuery(ctx, req, queryOpts...) + require.NoError(t, err) + + // create simulator + sim := litesimulator.NewLiteSimulator(clk) + simulator.AddPeers(sim, sched0, sched1) + // run simulation + sim.Run(ctx) - require.True(t, failedQuery) + // make sure the peer is marked as unreachable + require.Equal(t, unreachable, q.peerlist.closest.status) }