Skip to content

Commit

Permalink
added more tests to simplequery
Browse files Browse the repository at this point in the history
  • Loading branch information
guillaumemichel committed Jul 5, 2023
1 parent 5f0fdd3 commit 88262ca
Show file tree
Hide file tree
Showing 9 changed files with 436 additions and 114 deletions.
5 changes: 3 additions & 2 deletions events/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"time"

"github.com/benbjohnson/clock"
"github.com/plprobelab/go-kademlia/events/action"
"github.com/plprobelab/go-kademlia/events/planner"
)
Expand All @@ -12,7 +13,7 @@ import (
// or at a specific time
type Scheduler interface {
// Now returns the time of the scheduler's clock
Now() time.Time
Clock() clock.Clock

// EnqueueAction enqueues an action to run as soon as possible
EnqueueAction(context.Context, action.Action)
Expand All @@ -33,7 +34,7 @@ func ScheduleActionIn(ctx context.Context, s Scheduler, d time.Duration, a actio
s.EnqueueAction(ctx, a)
return nil
} else {
return s.ScheduleAction(ctx, s.Now().Add(d), a)
return s.ScheduleAction(ctx, s.Clock().Now().Add(d), a)
}
}

Expand Down
4 changes: 2 additions & 2 deletions events/scheduler/simplescheduler/simplescheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ func NewSimpleScheduler(clk clock.Clock) *SimpleScheduler {
}

// Now returns the scheduler's current time.
func (s *SimpleScheduler) Now() time.Time {
return s.clk.Now()
func (s *SimpleScheduler) Clock() clock.Clock {
return s.clk
}

// EnqueueAction enqueues an action to be run as soon as possible.
Expand Down
2 changes: 1 addition & 1 deletion events/scheduler/simplescheduler/simplescheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func TestSimpleScheduler(t *testing.T) {

sched := NewSimpleScheduler(clk)

require.Equal(t, clk.Now(), sched.Now())
require.Equal(t, clk.Now(), sched.Clock().Now())

nActions := 10
actions := make([]*ta.FuncAction, nActions)
Expand Down
7 changes: 5 additions & 2 deletions examples/connect/findpeer.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,13 +127,16 @@ func FindPeer(ctx context.Context) {
queryOpts := []simplequery.Option{
simplequery.WithProtocolID(protocolID),
simplequery.WithConcurrency(1),
simplequery.WithRequestTimeout(5 * time.Second),
simplequery.WithRequestTimeout(2 * time.Second),
simplequery.WithHandleResultsFunc(handleResultsFn),
simplequery.WithRoutingTable(rt),
simplequery.WithEndpoint(msgEndpoint),
simplequery.WithScheduler(sched),
}
simplequery.NewSimpleQuery(ctx, req, queryOpts...)
_, err = simplequery.NewSimpleQuery(ctx, req, queryOpts...)
if err != nil {
panic(err)
}

span.AddEvent("start request execution")

Expand Down
7 changes: 6 additions & 1 deletion network/endpoint/fakeendpoint/fakeendpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,12 @@ func (e *FakeEndpoint) HandleMessage(ctx context.Context, id address.NodeID,

resp, ok := msg.(message.MinKadResponseMessage)
var err error
if !ok {
if ok {
for _, p := range resp.CloserNodes() {
e.peerstore[p.NodeID().String()] = p
e.connStatus[p.NodeID().String()] = network.CanConnect
}
} else {
err = ErrInvalidResponseType
}
if followup != nil {
Expand Down
23 changes: 18 additions & 5 deletions network/endpoint/libp2pendpoint/libp2pendpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,12 @@ func (e *Libp2pEndpoint) SendRequestHandleResponse(ctx context.Context,
attribute.String("PeerID", n.String()),
))
defer span.End()
ctx, cancel := context.WithCancel(ctx)
var cancel context.CancelFunc
if timeout > 0 {
ctx, cancel = e.sched.Clock().WithTimeout(ctx, timeout)
} else {
ctx, cancel = context.WithCancel(ctx)
}
defer cancel()

var err error
Expand All @@ -197,15 +202,19 @@ func (e *Libp2pEndpoint) SendRequestHandleResponse(ctx context.Context,
s, err = e.host.NewStream(ctx, p.ID, protocol.ID(protoID))
if err != nil {
span.RecordError(err, trace.WithAttributes(attribute.String("where", "stream creation")))
responseHandlerFn(ctx, nil, err)
e.sched.EnqueueAction(ctx, ba.BasicAction(func(ctx context.Context) {
responseHandlerFn(ctx, nil, err)
}))
return
}
defer s.Close()

err = WriteMsg(s, protoReq)
if err != nil {
span.RecordError(err, trace.WithAttributes(attribute.String("where", "write message")))
responseHandlerFn(ctx, nil, err)
e.sched.EnqueueAction(ctx, ba.BasicAction(func(ctx context.Context) {
responseHandlerFn(ctx, nil, err)
}))
return
}

Expand All @@ -231,12 +240,16 @@ func (e *Libp2pEndpoint) SendRequestHandleResponse(ctx context.Context,
}
if err != nil {
span.RecordError(err, trace.WithAttributes(attribute.String("where", "read message")))
responseHandlerFn(ctx, protoResp, err)
e.sched.EnqueueAction(ctx, ba.BasicAction(func(ctx context.Context) {
responseHandlerFn(ctx, protoResp, err)
}))
return
}

span.AddEvent("response received")
responseHandlerFn(ctx, protoResp, err)
e.sched.EnqueueAction(ctx, ba.BasicAction(func(ctx context.Context) {
responseHandlerFn(ctx, protoResp, err)
}))
}()
return nil
}
Expand Down
37 changes: 34 additions & 3 deletions network/endpoint/libp2pendpoint/libp2pendpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,15 @@ func TestLibp2pEndpoint(t *testing.T) {
time.Sleep(time.Millisecond)
}
require.False(t, scheds[1].RunOne(ctx)) // only 1 action should run on server
wg.Add(1)
go func() {
// timeout is queued in the scheduler 0
for !scheds[0].RunOne(ctx) {
time.Sleep(time.Millisecond)
}
require.False(t, scheds[0].RunOne(ctx))
wg.Done()
}()
wg.Wait()

// invalid response format (not protobuf)
Expand Down Expand Up @@ -238,6 +247,15 @@ func TestLibp2pEndpoint(t *testing.T) {
err = endpoints[0].SendRequestHandleResponse(ctx, protoID, ids[2], req,
resp, time.Second, invalidRespHandlerBuilder(swarm.ErrNoAddresses))
require.NoError(t, err)
wg.Add(1)
go func() {
// timeout is queued in the scheduler 0
for !scheds[0].RunOne(ctx) {
time.Sleep(time.Millisecond)
}
require.False(t, scheds[0].RunOne(ctx))
wg.Done()
}()
wg.Wait()

// test timeout
Expand Down Expand Up @@ -277,10 +295,10 @@ func TestLibp2pEndpoint(t *testing.T) {
err = endpoints[0].SendRequestHandleResponse(ctx, protoID, ids[1], req,
resp, 0, responseHandler)
require.NoError(t, err)
wg.Add(1)
for !scheds[1].RunOne(ctx) {
time.Sleep(time.Millisecond)
}
wg.Wait() // wg count shoud be 0
require.False(t, scheds[1].RunOne(ctx))
// no response to be handled by 0
require.False(t, scheds[0].RunOne(ctx))
Expand All @@ -292,14 +310,27 @@ func TestLibp2pEndpoint(t *testing.T) {
return &simmessage.SimMessage{}, nil
})
require.NoError(t, err)
err = endpoints[0].SendRequestHandleResponse(ctx, protoID, ids[1], req,
require.False(t, scheds[0].RunOne(ctx))
wg.Add(1)
noResponseCtx, cancel := context.WithCancel(ctx)
err = endpoints[0].SendRequestHandleResponse(noResponseCtx, protoID, ids[1], req,
resp, 0, responseHandler)
require.NoError(t, err)
wg.Add(1)
for !scheds[1].RunOne(ctx) {
time.Sleep(time.Millisecond)
}
require.False(t, scheds[1].RunOne(ctx))
cancel()
wg.Add(1)
go func() {
// timeout is queued in the scheduler 0
for !scheds[0].RunOne(ctx) {
time.Sleep(time.Millisecond)
}
require.False(t, scheds[0].RunOne(ctx))
wg.Done()
}()
wg.Wait()
// no response to be handled by 0
require.False(t, scheds[0].RunOne(ctx))

Expand Down
Loading

0 comments on commit 88262ca

Please sign in to comment.